1
0
mirror of https://github.com/juanfont/headscale.git synced 2024-12-30 00:09:42 +01:00
juanfont.headscale/hscontrol/poll.go
Kristoffer Dalby 68a8ecee7a
Prepare notify channel before sending first update (#1730)
* create channel before sending first update

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>

* do not notify on register, wait for connect

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>

---------

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-02-12 09:11:17 +01:00

665 lines
19 KiB
Go

package hscontrol
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/juanfont/headscale/hscontrol/db"
"github.com/juanfont/headscale/hscontrol/mapper"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/rs/zerolog/log"
xslices "golang.org/x/exp/slices"
"gorm.io/gorm"
"tailscale.com/envknob"
"tailscale.com/tailcfg"
)
const (
keepAliveInterval = 60 * time.Second
)
type contextKey string
const nodeNameContextKey = contextKey("nodeName")
type UpdateNode func()
func logPollFunc(
mapRequest tailcfg.MapRequest,
node *types.Node,
) (func(string), func(error, string)) {
return func(msg string) {
log.Info().
Caller().
Bool("readOnly", mapRequest.ReadOnly).
Bool("omitPeers", mapRequest.OmitPeers).
Bool("stream", mapRequest.Stream).
Str("node_key", node.NodeKey.ShortString()).
Str("node", node.Hostname).
Msg(msg)
},
func(err error, msg string) {
log.Error().
Caller().
Bool("readOnly", mapRequest.ReadOnly).
Bool("omitPeers", mapRequest.OmitPeers).
Bool("stream", mapRequest.Stream).
Str("node_key", node.NodeKey.ShortString()).
Str("node", node.Hostname).
Err(err).
Msg(msg)
}
}
// handlePoll ensures the node gets the appropriate updates from either
// polling or immediate responses.
//
//nolint:gocyclo
func (h *Headscale) handlePoll(
writer http.ResponseWriter,
ctx context.Context,
node *types.Node,
mapRequest tailcfg.MapRequest,
) {
logInfo, logErr := logPollFunc(mapRequest, node)
// This is the mechanism where the node gives us information about its
// current configuration.
//
// If OmitPeers is true, Stream is false, and ReadOnly is false,
// then te server will let clients update their endpoints without
// breaking existing long-polling (Stream == true) connections.
// In this case, the server can omit the entire response; the client
// only checks the HTTP response status code.
// TODO(kradalby): remove ReadOnly when we only support capVer 68+
if mapRequest.OmitPeers && !mapRequest.Stream && !mapRequest.ReadOnly {
log.Info().
Caller().
Bool("readOnly", mapRequest.ReadOnly).
Bool("omitPeers", mapRequest.OmitPeers).
Bool("stream", mapRequest.Stream).
Str("node_key", node.NodeKey.ShortString()).
Str("node", node.Hostname).
Int("cap_ver", int(mapRequest.Version)).
Msg("Received update")
change := node.PeerChangeFromMapRequest(mapRequest)
online := h.nodeNotifier.IsConnected(node.MachineKey)
change.Online = &online
node.ApplyPeerChange(&change)
hostInfoChange := node.Hostinfo.Equal(mapRequest.Hostinfo)
logTracePeerChange(node.Hostname, hostInfoChange, &change)
// Check if the Hostinfo of the node has changed.
// If it has changed, check if there has been a change tod
// the routable IPs of the host and update update them in
// the database. Then send a Changed update
// (containing the whole node object) to peers to inform about
// the route change.
// If the hostinfo has changed, but not the routes, just update
// hostinfo and let the function continue.
if !hostInfoChange {
oldRoutes := node.Hostinfo.RoutableIPs
newRoutes := mapRequest.Hostinfo.RoutableIPs
oldServicesCount := len(node.Hostinfo.Services)
newServicesCount := len(mapRequest.Hostinfo.Services)
node.Hostinfo = mapRequest.Hostinfo
sendUpdate := false
// Route changes come as part of Hostinfo, which means that
// when an update comes, the Node Route logic need to run.
// This will require a "change" in comparison to a "patch",
// which is more costly.
if !xslices.Equal(oldRoutes, newRoutes) {
var err error
sendUpdate, err = h.db.SaveNodeRoutes(node)
if err != nil {
logErr(err, "Error processing node routes")
http.Error(writer, "", http.StatusInternalServerError)
return
}
if h.ACLPolicy != nil {
// update routes with peer information
update, err := h.db.EnableAutoApprovedRoutes(h.ACLPolicy, node)
if err != nil {
logErr(err, "Error running auto approved routes")
}
if update != nil {
sendUpdate = true
}
}
}
// Services is mostly useful for discovery and not critical,
// except for peerapi, which is how nodes talk to eachother.
// If peerapi was not part of the initial mapresponse, we
// need to make sure its sent out later as it is needed for
// Taildrop.
// TODO(kradalby): Length comparison is a bit naive, replace.
if oldServicesCount != newServicesCount {
sendUpdate = true
}
if sendUpdate {
if err := h.db.DB.Save(node).Error; err != nil {
logErr(err, "Failed to persist/update node in the database")
http.Error(writer, "", http.StatusInternalServerError)
return
}
// Send an update to all peers to propagate the new routes
// available.
stateUpdate := types.StateUpdate{
Type: types.StatePeerChanged,
ChangeNodes: types.Nodes{node},
Message: "called from handlePoll -> update -> new hostinfo",
}
if stateUpdate.Valid() {
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-peers-hostinfochange", node.Hostname)
h.nodeNotifier.NotifyWithIgnore(
ctx,
stateUpdate,
node.MachineKey.String())
}
// Send an update to the node itself with to ensure it
// has an updated packetfilter allowing the new route
// if it is defined in the ACL.
selfUpdate := types.StateUpdate{
Type: types.StateSelfUpdate,
ChangeNodes: types.Nodes{node},
}
if selfUpdate.Valid() {
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-self-hostinfochange", node.Hostname)
h.nodeNotifier.NotifyByMachineKey(
ctx,
selfUpdate,
node.MachineKey)
}
return
}
}
if err := h.db.DB.Save(node).Error; err != nil {
logErr(err, "Failed to persist/update node in the database")
http.Error(writer, "", http.StatusInternalServerError)
return
}
// TODO(kradalby): Figure out why patch changes does
// not show up in output from `tailscale debug netmap`.
// stateUpdate := types.StateUpdate{
// Type: types.StatePeerChangedPatch,
// ChangePatches: []*tailcfg.PeerChange{&change},
// }
stateUpdate := types.StateUpdate{
Type: types.StatePeerChanged,
ChangeNodes: types.Nodes{node},
}
if stateUpdate.Valid() {
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-peers-patch", node.Hostname)
h.nodeNotifier.NotifyWithIgnore(
ctx,
stateUpdate,
node.MachineKey.String())
}
writer.WriteHeader(http.StatusOK)
if f, ok := writer.(http.Flusher); ok {
f.Flush()
}
return
} else if mapRequest.OmitPeers && !mapRequest.Stream && mapRequest.ReadOnly {
// ReadOnly is whether the client just wants to fetch the
// MapResponse, without updating their Endpoints. The
// Endpoints field will be ignored and LastSeen will not be
// updated and peers will not be notified of changes.
//
// The intended use is for clients to discover the DERP map at
// start-up before their first real endpoint update.
} else if mapRequest.OmitPeers && !mapRequest.Stream && mapRequest.ReadOnly {
h.handleLiteRequest(writer, node, mapRequest)
return
} else if mapRequest.OmitPeers && mapRequest.Stream {
logErr(nil, "Ignoring request, don't know how to handle it")
return
}
change := node.PeerChangeFromMapRequest(mapRequest)
// A stream is being set up, the node is Online
online := true
change.Online = &online
node.ApplyPeerChange(&change)
// Only save HostInfo if changed, update routes if changed
// TODO(kradalby): Remove when capver is over 68
if !node.Hostinfo.Equal(mapRequest.Hostinfo) {
oldRoutes := node.Hostinfo.RoutableIPs
newRoutes := mapRequest.Hostinfo.RoutableIPs
node.Hostinfo = mapRequest.Hostinfo
if !xslices.Equal(oldRoutes, newRoutes) {
_, err := h.db.SaveNodeRoutes(node)
if err != nil {
logErr(err, "Error processing node routes")
http.Error(writer, "", http.StatusInternalServerError)
return
}
}
}
if err := h.db.DB.Save(node).Error; err != nil {
logErr(err, "Failed to persist/update node in the database")
http.Error(writer, "", http.StatusInternalServerError)
return
}
// Set up the client stream
h.pollNetMapStreamWG.Add(1)
defer h.pollNetMapStreamWG.Done()
// Use a buffered channel in case a node is not fully ready
// to receive a message to make sure we dont block the entire
// notifier.
// 12 is arbitrarily chosen.
chanSize := 3
if size, ok := envknob.LookupInt("HEADSCALE_TUNING_POLL_QUEUE_SIZE"); ok {
chanSize = size
}
updateChan := make(chan types.StateUpdate, chanSize)
defer closeChanWithLog(updateChan, node.Hostname, "updateChan")
// Register the node's update channel
h.nodeNotifier.AddNode(node.MachineKey, updateChan)
defer h.nodeNotifier.RemoveNode(node.MachineKey)
// When a node connects to control, list the peers it has at
// that given point, further updates are kept in memory in
// the Mapper, which lives for the duration of the polling
// session.
peers, err := h.db.ListPeers(node)
if err != nil {
logErr(err, "Failed to list peers when opening poller")
http.Error(writer, "", http.StatusInternalServerError)
return
}
isConnected := h.nodeNotifier.ConnectedMap()
for _, peer := range peers {
online := isConnected[peer.MachineKey]
peer.IsOnline = &online
}
mapp := mapper.NewMapper(
node,
peers,
h.DERPMap,
h.cfg.BaseDomain,
h.cfg.DNSConfig,
h.cfg.LogTail.Enabled,
h.cfg.RandomizeClientPort,
)
// update ACLRules with peer informations (to update server tags if necessary)
if h.ACLPolicy != nil {
// update routes with peer information
// This state update is ignored as it will be sent
// as part of the whole node
// TODO(kradalby): figure out if that is actually correct
_, err = h.db.EnableAutoApprovedRoutes(h.ACLPolicy, node)
if err != nil {
logErr(err, "Error running auto approved routes")
}
}
logInfo("Sending initial map")
mapResp, err := mapp.FullMapResponse(mapRequest, node, h.ACLPolicy)
if err != nil {
logErr(err, "Failed to create MapResponse")
http.Error(writer, "", http.StatusInternalServerError)
return
}
// Send the client an update to make sure we send an initial mapresponse
_, err = writer.Write(mapResp)
if err != nil {
logErr(err, "Could not write the map response")
return
}
if flusher, ok := writer.(http.Flusher); ok {
flusher.Flush()
} else {
return
}
stateUpdate := types.StateUpdate{
Type: types.StatePeerChanged,
ChangeNodes: types.Nodes{node},
Message: "called from handlePoll -> new node added",
}
if stateUpdate.Valid() {
ctx := types.NotifyCtx(context.Background(), "poll-newnode-peers", node.Hostname)
h.nodeNotifier.NotifyWithIgnore(
ctx,
stateUpdate,
node.MachineKey.String())
}
if len(node.Routes) > 0 {
go h.pollFailoverRoutes(logErr, "new node", node)
}
keepAliveTicker := time.NewTicker(keepAliveInterval)
ctx, cancel := context.WithCancel(context.WithValue(ctx, nodeNameContextKey, node.Hostname))
defer cancel()
for {
logInfo("Waiting for update on stream channel")
select {
case <-keepAliveTicker.C:
data, err := mapp.KeepAliveResponse(mapRequest, node)
if err != nil {
logErr(err, "Error generating the keep alive msg")
return
}
_, err = writer.Write(data)
if err != nil {
logErr(err, "Cannot write keep alive message")
return
}
if flusher, ok := writer.(http.Flusher); ok {
flusher.Flush()
} else {
log.Error().Msg("Failed to create http flusher")
return
}
// This goroutine is not ideal, but we have a potential issue here
// where it blocks too long and that holds up updates.
// One alternative is to split these different channels into
// goroutines, but then you might have a problem without a lock
// if a keepalive is written at the same time as an update.
go h.updateNodeOnlineStatus(true, node)
case update := <-updateChan:
logInfo("Received update")
now := time.Now()
var data []byte
var err error
// Ensure the node object is updated, for example, there
// might have been a hostinfo update in a sidechannel
// which contains data needed to generate a map response.
node, err = h.db.GetNodeByMachineKey(node.MachineKey)
if err != nil {
logErr(err, "Could not get machine from db")
return
}
startMapResp := time.Now()
switch update.Type {
case types.StateFullUpdate:
logInfo("Sending Full MapResponse")
data, err = mapp.FullMapResponse(mapRequest, node, h.ACLPolicy)
case types.StatePeerChanged:
logInfo(fmt.Sprintf("Sending Changed MapResponse: %s", update.Message))
isConnectedMap := h.nodeNotifier.ConnectedMap()
for _, node := range update.ChangeNodes {
// If a node is not reported to be online, it might be
// because the value is outdated, check with the notifier.
// However, if it is set to Online, and not in the notifier,
// this might be because it has announced itself, but not
// reached the stage to actually create the notifier channel.
if node.IsOnline != nil && !*node.IsOnline {
isOnline := isConnectedMap[node.MachineKey]
node.IsOnline = &isOnline
}
}
data, err = mapp.PeerChangedResponse(mapRequest, node, update.ChangeNodes, h.ACLPolicy, update.Message)
case types.StatePeerChangedPatch:
logInfo("Sending PeerChangedPatch MapResponse")
data, err = mapp.PeerChangedPatchResponse(mapRequest, node, update.ChangePatches, h.ACLPolicy)
case types.StatePeerRemoved:
logInfo("Sending PeerRemoved MapResponse")
data, err = mapp.PeerRemovedResponse(mapRequest, node, update.Removed)
case types.StateSelfUpdate:
if len(update.ChangeNodes) == 1 {
logInfo("Sending SelfUpdate MapResponse")
node = update.ChangeNodes[0]
data, err = mapp.LiteMapResponse(mapRequest, node, h.ACLPolicy, types.SelfUpdateIdentifier)
} else {
logInfo("SelfUpdate contained too many nodes, this is likely a bug in the code, please report.")
}
case types.StateDERPUpdated:
logInfo("Sending DERPUpdate MapResponse")
data, err = mapp.DERPMapResponse(mapRequest, node, update.DERPMap)
}
if err != nil {
logErr(err, "Could not get the create map update")
return
}
log.Trace().Str("node", node.Hostname).TimeDiff("timeSpent", time.Now(), startMapResp).Str("mkey", node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished making map response")
// Only send update if there is change
if data != nil {
startWrite := time.Now()
_, err = writer.Write(data)
if err != nil {
logErr(err, "Could not write the map response")
updateRequestsSentToNode.WithLabelValues(node.User.Name, node.Hostname, "failed").
Inc()
return
}
if flusher, ok := writer.(http.Flusher); ok {
flusher.Flush()
} else {
log.Error().Msg("Failed to create http flusher")
return
}
log.Trace().Str("node", node.Hostname).TimeDiff("timeSpent", time.Now(), startWrite).Str("mkey", node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished writing mapresp to node")
log.Info().
Caller().
Bool("readOnly", mapRequest.ReadOnly).
Bool("omitPeers", mapRequest.OmitPeers).
Bool("stream", mapRequest.Stream).
Str("node_key", node.NodeKey.ShortString()).
Str("machine_key", node.MachineKey.ShortString()).
Str("node", node.Hostname).
TimeDiff("timeSpent", time.Now(), now).
Msg("update sent")
}
case <-ctx.Done():
logInfo("The client has closed the connection")
go h.updateNodeOnlineStatus(false, node)
// Failover the node's routes if any.
go h.pollFailoverRoutes(logErr, "node closing connection", node)
// The connection has been closed, so we can stop polling.
return
case <-h.shutdownChan:
logInfo("The long-poll handler is shutting down")
return
}
}
}
func (h *Headscale) pollFailoverRoutes(logErr func(error, string), where string, node *types.Node) {
update, err := db.Write(h.db.DB, func(tx *gorm.DB) (*types.StateUpdate, error) {
return db.EnsureFailoverRouteIsAvailable(tx, h.nodeNotifier.ConnectedMap(), node)
})
if err != nil {
logErr(err, fmt.Sprintf("failed to ensure failover routes, %s", where))
return
}
if update != nil && !update.Empty() && update.Valid() {
ctx := types.NotifyCtx(context.Background(), fmt.Sprintf("poll-%s-routes-ensurefailover", strings.ReplaceAll(where, " ", "-")), node.Hostname)
h.nodeNotifier.NotifyWithIgnore(ctx, *update, node.MachineKey.String())
}
}
// updateNodeOnlineStatus records the last seen status of a node and notifies peers
// about change in their online/offline status.
// It takes a StateUpdateType of either StatePeerOnlineChanged or StatePeerOfflineChanged.
func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) {
now := time.Now()
node.LastSeen = &now
statusUpdate := types.StateUpdate{
Type: types.StatePeerChangedPatch,
ChangePatches: []*tailcfg.PeerChange{
{
NodeID: tailcfg.NodeID(node.ID),
Online: &online,
LastSeen: &now,
},
},
}
if statusUpdate.Valid() {
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-onlinestatus", node.Hostname)
h.nodeNotifier.NotifyWithIgnore(ctx, statusUpdate, node.MachineKey.String())
}
err := h.db.DB.Transaction(func(tx *gorm.DB) error {
return db.UpdateLastSeen(tx, node.ID, *node.LastSeen)
})
if err != nil {
log.Error().Err(err).Msg("Cannot update node LastSeen")
return
}
}
func closeChanWithLog[C chan []byte | chan struct{} | chan types.StateUpdate](channel C, node, name string) {
log.Trace().
Str("handler", "PollNetMap").
Str("node", node).
Str("channel", "Done").
Msg(fmt.Sprintf("Closing %s channel", name))
close(channel)
}
func (h *Headscale) handleLiteRequest(
writer http.ResponseWriter,
node *types.Node,
mapRequest tailcfg.MapRequest,
) {
logInfo, logErr := logPollFunc(mapRequest, node)
mapp := mapper.NewMapper(
node,
types.Nodes{},
h.DERPMap,
h.cfg.BaseDomain,
h.cfg.DNSConfig,
h.cfg.LogTail.Enabled,
h.cfg.RandomizeClientPort,
)
logInfo("Client asked for a lite update, responding without peers")
mapResp, err := mapp.LiteMapResponse(mapRequest, node, h.ACLPolicy)
if err != nil {
logErr(err, "Failed to create MapResponse")
http.Error(writer, "", http.StatusInternalServerError)
return
}
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
writer.WriteHeader(http.StatusOK)
_, err = writer.Write(mapResp)
if err != nil {
logErr(err, "Failed to write response")
}
}
func logTracePeerChange(hostname string, hostinfoChange bool, change *tailcfg.PeerChange) {
trace := log.Trace().Str("node_id", change.NodeID.String()).Str("hostname", hostname)
if change.Key != nil {
trace = trace.Str("node_key", change.Key.ShortString())
}
if change.DiscoKey != nil {
trace = trace.Str("disco_key", change.DiscoKey.ShortString())
}
if change.Online != nil {
trace = trace.Bool("online", *change.Online)
}
if change.Endpoints != nil {
eps := make([]string, len(change.Endpoints))
for idx, ep := range change.Endpoints {
eps[idx] = ep.String()
}
trace = trace.Strs("endpoints", eps)
}
if hostinfoChange {
trace = trace.Bool("hostinfo_changed", hostinfoChange)
}
if change.DERPRegion != 0 {
trace = trace.Int("derp_region", change.DERPRegion)
}
trace.Time("last_seen", *change.LastSeen).Msg("PeerChange received")
}