1
0
mirror of https://github.com/juanfont/headscale.git synced 2024-10-17 20:05:55 +02:00
juanfont.headscale/poll.go
Kristoffer Dalby cfd53bc4aa Factor wgkey to types/key
This commit converts all the uses of wgkey to the new key interfaces.

It now has specific  machine, node and discovery keys and we now should
use them correctly.

Please note the new logic which strips a key prefix (in utils.go) that
is now standard inside tailscale.

In theory we could put it in the database, but to preserve backwards
compatibility and not spend a lot of resources on accounting for both,
we just strip them.
2021-11-26 23:30:42 +00:00

535 lines
15 KiB
Go

package headscale
import (
"encoding/json"
"errors"
"io"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
"go4.org/mem"
"gorm.io/datatypes"
"gorm.io/gorm"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
)
const (
keepAliveInterval = 60 * time.Second
updateCheckInterval = 10 * time.Second
)
// PollNetMapHandler takes care of /machine/:id/map
//
// This is the busiest endpoint, as it keeps the HTTP long poll that updates
// the clients when something in the network changes.
//
// The clients POST stuff like HostInfo and their Endpoints here, but
// only after their first request (marked with the ReadOnly field).
//
// At this moment the updates are sent in a quite horrendous way, but they kinda work.
func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
log.Trace().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Msg("PollNetMapHandler called")
body, _ := io.ReadAll(ctx.Request.Body)
mKeyStr := ctx.Param("id")
mKey, err := key.ParseMachinePublicUntyped(mem.S(mKeyStr))
if err != nil {
log.Error().
Str("handler", "PollNetMap").
Err(err).
Msg("Cannot parse client key")
ctx.String(http.StatusBadRequest, "")
return
}
req := tailcfg.MapRequest{}
err = decode(body, &req, &mKey, h.privateKey)
if err != nil {
log.Error().
Str("handler", "PollNetMap").
Err(err).
Msg("Cannot decode message")
ctx.String(http.StatusBadRequest, "")
return
}
machine, err := h.GetMachineByMachineKey(mKey)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Warn().
Str("handler", "PollNetMap").
Msgf("Ignoring request, cannot find machine with key %s", mKey.String())
ctx.String(http.StatusUnauthorized, "")
return
}
log.Error().
Str("handler", "PollNetMap").
Msgf("Failed to fetch machine from the database with Machine key: %s", mKey.String())
ctx.String(http.StatusInternalServerError, "")
}
log.Trace().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Str("machine", machine.Name).
Msg("Found machine in database")
hostinfo, _ := json.Marshal(req.Hostinfo)
machine.Name = req.Hostinfo.Hostname
machine.HostInfo = datatypes.JSON(hostinfo)
machine.DiscoKey = DiscoPublicKeyStripPrefix(req.DiscoKey)
now := time.Now().UTC()
// From Tailscale client:
//
// ReadOnly is whether the client just wants to fetch the MapResponse,
// without updating their Endpoints. The Endpoints field will be ignored and
// LastSeen will not be updated and peers will not be notified of changes.
//
// The intended use is for clients to discover the DERP map at start-up
// before their first real endpoint update.
if !req.ReadOnly {
endpoints, _ := json.Marshal(req.Endpoints)
machine.Endpoints = datatypes.JSON(endpoints)
machine.LastSeen = &now
}
h.db.Save(&machine)
data, err := h.getMapResponse(mKey, req, machine)
if err != nil {
log.Error().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Str("machine", machine.Name).
Err(err).
Msg("Failed to get Map response")
ctx.String(http.StatusInternalServerError, ":(")
return
}
// We update our peers if the client is not sending ReadOnly in the MapRequest
// so we don't distribute its initial request (it comes with
// empty endpoints to peers)
// Details on the protocol can be found in https://github.com/tailscale/tailscale/blob/main/tailcfg/tailcfg.go#L696
log.Debug().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Str("machine", machine.Name).
Bool("readOnly", req.ReadOnly).
Bool("omitPeers", req.OmitPeers).
Bool("stream", req.Stream).
Msg("Client map request processed")
if req.ReadOnly {
log.Info().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Msg("Client is starting up. Probably interested in a DERP map")
ctx.Data(http.StatusOK, "application/json; charset=utf-8", data)
return
}
// There has been an update to _any_ of the nodes that the other nodes would
// need to know about
h.setLastStateChangeToNow(machine.Namespace.Name)
// The request is not ReadOnly, so we need to set up channels for updating
// peers via longpoll
// Only create update channel if it has not been created
log.Trace().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Str("machine", machine.Name).
Msg("Loading or creating update channel")
updateChan := make(chan struct{})
pollDataChan := make(chan []byte)
keepAliveChan := make(chan []byte)
cancelKeepAlive := make(chan struct{})
defer close(cancelKeepAlive)
if req.OmitPeers && !req.Stream {
log.Info().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Msg("Client sent endpoint update and is ok with a response without peer list")
ctx.Data(http.StatusOK, "application/json; charset=utf-8", data)
// It sounds like we should update the nodes when we have received a endpoint update
// even tho the comments in the tailscale code dont explicitly say so.
updateRequestsFromNode.WithLabelValues(machine.Name, machine.Namespace.Name, "endpoint-update").
Inc()
go func() { updateChan <- struct{}{} }()
return
} else if req.OmitPeers && req.Stream {
log.Warn().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Msg("Ignoring request, don't know how to handle it")
ctx.String(http.StatusBadRequest, "")
return
}
log.Info().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Msg("Client is ready to access the tailnet")
log.Info().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Msg("Sending initial map")
go func() { pollDataChan <- data }()
log.Info().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Msg("Notifying peers")
updateRequestsFromNode.WithLabelValues(machine.Name, machine.Namespace.Name, "full-update").
Inc()
go func() { updateChan <- struct{}{} }()
h.PollNetMapStream(
ctx,
machine,
req,
mKey,
pollDataChan,
keepAliveChan,
updateChan,
cancelKeepAlive,
)
log.Trace().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Str("machine", machine.Name).
Msg("Finished stream, closing PollNetMap session")
}
// PollNetMapStream takes care of /machine/:id/map
// stream logic, ensuring we communicate updates and data
// to the connected clients.
func (h *Headscale) PollNetMapStream(
ctx *gin.Context,
machine *Machine,
mapRequest tailcfg.MapRequest,
machineKey key.MachinePublic,
pollDataChan chan []byte,
keepAliveChan chan []byte,
updateChan chan struct{},
cancelKeepAlive chan struct{},
) {
go h.scheduledPollWorker(
cancelKeepAlive,
updateChan,
keepAliveChan,
machineKey,
mapRequest,
machine,
)
ctx.Stream(func(writer io.Writer) bool {
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Msg("Waiting for data to stream...")
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Msgf("pollData is %#v, keepAliveChan is %#v, updateChan is %#v", pollDataChan, keepAliveChan, updateChan)
select {
case data := <-pollDataChan:
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "pollData").
Int("bytes", len(data)).
Msg("Sending data received via pollData channel")
_, err := writer.Write(data)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "pollData").
Err(err).
Msg("Cannot write data")
return false
}
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "pollData").
Int("bytes", len(data)).
Msg("Data from pollData channel written successfully")
// TODO(kradalby): Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err = h.UpdateMachine(machine)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "pollData").
Err(err).
Msg("Cannot update machine from database")
}
now := time.Now().UTC()
machine.LastSeen = &now
lastStateUpdate.WithLabelValues(machine.Namespace.Name, machine.Name).
Set(float64(now.Unix()))
machine.LastSuccessfulUpdate = &now
h.db.Save(&machine)
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "pollData").
Int("bytes", len(data)).
Msg("Machine entry in database updated successfully after sending pollData")
return true
case data := <-keepAliveChan:
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "keepAlive").
Int("bytes", len(data)).
Msg("Sending keep alive message")
_, err := writer.Write(data)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "keepAlive").
Err(err).
Msg("Cannot write keep alive message")
return false
}
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "keepAlive").
Int("bytes", len(data)).
Msg("Keep alive sent successfully")
// TODO(kradalby): Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err = h.UpdateMachine(machine)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "keepAlive").
Err(err).
Msg("Cannot update machine from database")
}
now := time.Now().UTC()
machine.LastSeen = &now
h.db.Save(&machine)
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "keepAlive").
Int("bytes", len(data)).
Msg("Machine updated successfully after sending keep alive")
return true
case <-updateChan:
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "update").
Msg("Received a request for update")
updateRequestsReceivedOnChannel.WithLabelValues(machine.Name, machine.Namespace.Name).
Inc()
if h.isOutdated(machine) {
log.Debug().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Time("last_successful_update", *machine.LastSuccessfulUpdate).
Time("last_state_change", h.getLastStateChange(machine.Namespace.Name)).
Msgf("There has been updates since the last successful update to %s", machine.Name)
data, err := h.getMapResponse(machineKey, mapRequest, machine)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "update").
Err(err).
Msg("Could not get the map update")
}
_, err = writer.Write(data)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "update").
Err(err).
Msg("Could not write the map response")
updateRequestsSentToNode.WithLabelValues(machine.Name, machine.Namespace.Name, "failed").
Inc()
return false
}
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "update").
Msg("Updated Map has been sent")
updateRequestsSentToNode.WithLabelValues(machine.Name, machine.Namespace.Name, "success").
Inc()
// Keep track of the last successful update,
// we sometimes end in a state were the update
// is not picked up by a client and we use this
// to determine if we should "force" an update.
// TODO(kradalby): Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err = h.UpdateMachine(machine)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "update").
Err(err).
Msg("Cannot update machine from database")
}
now := time.Now().UTC()
lastStateUpdate.WithLabelValues(machine.Namespace.Name, machine.Name).
Set(float64(now.Unix()))
machine.LastSuccessfulUpdate = &now
h.db.Save(&machine)
} else {
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Time("last_successful_update", *machine.LastSuccessfulUpdate).
Time("last_state_change", h.getLastStateChange(machine.Namespace.Name)).
Msgf("%s is up to date", machine.Name)
}
return true
case <-ctx.Request.Context().Done():
log.Info().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Msg("The client has closed the connection")
// TODO: Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err := h.UpdateMachine(machine)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "Done").
Err(err).
Msg("Cannot update machine from database")
}
now := time.Now().UTC()
machine.LastSeen = &now
h.db.Save(&machine)
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "Done").
Msg("Cancelling keepAlive channel")
cancelKeepAlive <- struct{}{}
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "Done").
Msg("Closing update channel")
// h.closeUpdateChannel(m)
close(updateChan)
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "Done").
Msg("Closing pollData channel")
close(pollDataChan)
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("channel", "Done").
Msg("Closing keepAliveChan channel")
close(keepAliveChan)
return false
}
})
}
func (h *Headscale) scheduledPollWorker(
cancelChan <-chan struct{},
updateChan chan<- struct{},
keepAliveChan chan<- []byte,
machineKey key.MachinePublic,
mapRequest tailcfg.MapRequest,
machine *Machine,
) {
keepAliveTicker := time.NewTicker(keepAliveInterval)
updateCheckerTicker := time.NewTicker(updateCheckInterval)
for {
select {
case <-cancelChan:
return
case <-keepAliveTicker.C:
data, err := h.getMapKeepAliveResponse(machineKey, mapRequest)
if err != nil {
log.Error().
Str("func", "keepAlive").
Err(err).
Msg("Error generating the keep alive msg")
return
}
log.Debug().
Str("func", "keepAlive").
Str("machine", machine.Name).
Msg("Sending keepalive")
keepAliveChan <- data
case <-updateCheckerTicker.C:
log.Debug().
Str("func", "scheduledPollWorker").
Str("machine", machine.Name).
Msg("Sending update request")
updateRequestsFromNode.WithLabelValues(machine.Name, machine.Namespace.Name, "scheduled-update").
Inc()
updateChan <- struct{}{}
}
}
}