From c957f893bd576886872c52941dec7a77e34f1441 Mon Sep 17 00:00:00 2001 From: Kristoffer Dalby Date: Mon, 11 Sep 2023 06:18:31 -0500 Subject: [PATCH] Return simple responses immediatly This commit rearranges the poll handler to immediatly accept updates and notify its peers and return, not travel down the function for a bit. This reduces the DB calls and other holdups that isnt necessary to send a "lite response", a map response without peers, or accepting an endpoint update. Signed-off-by: Kristoffer Dalby --- hscontrol/app.go | 2 - hscontrol/db/machine.go | 14 +- hscontrol/poll.go | 342 +++++++++++++++++++++++++--------------- 3 files changed, 221 insertions(+), 137 deletions(-) diff --git a/hscontrol/app.go b/hscontrol/app.go index b279b3c9..630339c1 100644 --- a/hscontrol/app.go +++ b/hscontrol/app.go @@ -94,8 +94,6 @@ type Headscale struct { shutdownChan chan struct{} pollNetMapStreamWG sync.WaitGroup - - pollStreamOpenMu sync.Mutex } func NewHeadscale(cfg *types.Config) (*Headscale, error) { diff --git a/hscontrol/db/machine.go b/hscontrol/db/machine.go index 3cf0465f..c079f677 100644 --- a/hscontrol/db/machine.go +++ b/hscontrol/db/machine.go @@ -362,13 +362,15 @@ func (hsdb *HSDatabase) deleteMachine(machine *types.Machine) error { return nil } -func (hsdb *HSDatabase) TouchMachine(machine *types.Machine) error { - hsdb.mu.Lock() - defer hsdb.mu.Unlock() - +// UpdateLastSeen sets a machine's last seen field indicating that we +// have recently communicating with this machine. +// This is mostly used to indicate if a machine is online and is not +// extremely important to make sure is fully correct and to avoid +// holding up the hot path, does not contain any locks and isnt +// concurrency safe. But that should be ok. +func (hsdb *HSDatabase) UpdateLastSeen(machine *types.Machine) error { return hsdb.db.Model(machine).Updates(types.Machine{ - LastSeen: machine.LastSeen, - LastSuccessfulUpdate: machine.LastSuccessfulUpdate, + LastSeen: machine.LastSeen, }).Error } diff --git a/hscontrol/poll.go b/hscontrol/poll.go index dc9763b1..4df3865e 100644 --- a/hscontrol/poll.go +++ b/hscontrol/poll.go @@ -64,24 +64,80 @@ func (h *Headscale) handlePoll( mapRequest tailcfg.MapRequest, isNoise bool, ) { - // Immediate open the channel and register it if the client wants - // a stream of MapResponses to prevent initial map response and - // following updates missing - var updateChan chan types.StateUpdate - if mapRequest.Stream { - h.pollStreamOpenMu.Lock() - h.pollNetMapStreamWG.Add(1) - defer h.pollNetMapStreamWG.Done() + logInfo, logErr := logPollFunc(mapRequest, machine, isNoise) - updateChan = make(chan types.StateUpdate) - defer closeChanWithLog(updateChan, machine.Hostname, "updateChan") + // If OmitPeers is true, Stream is false, and ReadOnly is false, + // then te server will let clients update their endpoints without + // breaking existing long-polling (Stream == true) connections. + // In this case, the server can omit the entire response; the client + // only checks the HTTP response status code. + if mapRequest.OmitPeers && !mapRequest.Stream && !mapRequest.ReadOnly { + log.Info(). + Caller(). + Bool("noise", isNoise). + Bool("readOnly", mapRequest.ReadOnly). + Bool("omitPeers", mapRequest.OmitPeers). + Bool("stream", mapRequest.Stream). + Str("node_key", machine.NodeKey). + Str("machine", machine.Hostname). + Strs("endpoints", machine.Endpoints). + Msg("Received endpoint update") - // Register the node's update channel - h.nodeNotifier.AddNode(machine.MachineKey, updateChan) - defer h.nodeNotifier.RemoveNode(machine.MachineKey) + now := time.Now().UTC() + machine.Endpoints = mapRequest.Endpoints + machine.LastSeen = &now + + if err := h.db.MachineSave(machine); err != nil { + logErr(err, "Failed to persist/update machine in the database") + http.Error(writer, "", http.StatusInternalServerError) + + return + } + + h.nodeNotifier.NotifyWithIgnore( + types.StateUpdate{ + Type: types.StatePeerChanged, + Changed: types.Machines{machine}, + }, + machine.MachineKey) + + writer.WriteHeader(http.StatusOK) + if f, ok := writer.(http.Flusher); ok { + f.Flush() + } + + return + + // ReadOnly is whether the client just wants to fetch the + // MapResponse, without updating their Endpoints. The + // Endpoints field will be ignored and LastSeen will not be + // updated and peers will not be notified of changes. + // + // The intended use is for clients to discover the DERP map at + // start-up before their first real endpoint update. + } else if mapRequest.OmitPeers && !mapRequest.Stream && mapRequest.ReadOnly { + h.handleLiteRequest(writer, machine, mapRequest, isNoise) + + return + } else if mapRequest.OmitPeers && mapRequest.Stream { + logErr(nil, "Ignoring request, don't know how to handle it") + + return } - logInfo, logErr := logPollFunc(mapRequest, machine, isNoise) + // Handle requests not related to continouos updates immediately. + // TODO(kradalby): I am not sure if this has any function based on + // incoming requests from clients. + if mapRequest.ReadOnly && !mapRequest.Stream { + h.handleReadOnly(writer, machine, mapRequest, isNoise) + + return + } + + machine.Hostname = mapRequest.Hostinfo.Hostname + machine.HostInfo = types.HostInfo(*mapRequest.Hostinfo) + machine.DiscoKey = util.DiscoPublicKeyStripPrefix(mapRequest.DiscoKey) + machine.Endpoints = mapRequest.Endpoints // When a node connects to control, list the peers it has at // that given point, further updates are kept in memory in @@ -107,11 +163,6 @@ func (h *Headscale) handlePoll( h.cfg.RandomizeClientPort, ) - machine.Hostname = mapRequest.Hostinfo.Hostname - machine.HostInfo = types.HostInfo(*mapRequest.Hostinfo) - machine.DiscoKey = util.DiscoPublicKeyStripPrefix(mapRequest.DiscoKey) - now := time.Now().UTC() - err = h.db.SaveMachineRoutes(machine) if err != nil { logErr(err, "Error processing machine routes") @@ -126,19 +177,6 @@ func (h *Headscale) handlePoll( } } - // From Tailscale client: - // - // ReadOnly is whether the client just wants to fetch the MapResponse, - // without updating their Endpoints. The Endpoints field will be ignored and - // LastSeen will not be updated and peers will not be notified of changes. - // - // The intended use is for clients to discover the DERP map at start-up - // before their first real endpoint update. - if !mapRequest.ReadOnly { - machine.Endpoints = mapRequest.Endpoints - machine.LastSeen = &now - } - // TODO(kradalby): Save specific stuff, not whole object. if err := h.db.MachineSave(machine); err != nil { logErr(err, "Failed to persist/update machine in the database") @@ -147,82 +185,6 @@ func (h *Headscale) handlePoll( return } - if !mapRequest.ReadOnly { - // It sounds like we should update the nodes when we have received a endpoint update - // even tho the comments in the tailscale code dont explicitly say so. - updateRequestsFromNode.WithLabelValues(machine.User.Name, machine.Hostname, "endpoint-update"). - Inc() - - // Tell all the other nodes about the new endpoint, but dont update ourselves. - h.nodeNotifier.NotifyWithIgnore( - types.StateUpdate{ - Type: types.StatePeerChanged, - Changed: types.Machines{machine}, - }, - machine.MachineKey) - } - - // We update our peers if the client is not sending ReadOnly in the MapRequest - // so we don't distribute its initial request (it comes with - // empty endpoints to peers) - - // Details on the protocol can be found in https://github.com/tailscale/tailscale/blob/main/tailcfg/tailcfg.go#L696 - logInfo("Client map request processed") - - if mapRequest.ReadOnly { - logInfo("Client is starting up. Probably interested in a DERP map") - - mapResp, err := mapp.FullMapResponse(mapRequest, machine, h.ACLPolicy) - if err != nil { - logErr(err, "Failed to create MapResponse") - http.Error(writer, "", http.StatusInternalServerError) - - return - } - - writer.Header().Set("Content-Type", "application/json; charset=utf-8") - writer.WriteHeader(http.StatusOK) - _, err = writer.Write(mapResp) - if err != nil { - logErr(err, "Failed to write response") - } - - if f, ok := writer.(http.Flusher); ok { - f.Flush() - } - - return - } - - if mapRequest.OmitPeers && !mapRequest.Stream { - logInfo("Client sent endpoint update and is ok with a response without peer list") - - mapResp, err := mapp.LiteMapResponse(mapRequest, machine, h.ACLPolicy) - if err != nil { - logErr(err, "Failed to create MapResponse") - http.Error(writer, "", http.StatusInternalServerError) - - return - } - - writer.Header().Set("Content-Type", "application/json; charset=utf-8") - writer.WriteHeader(http.StatusOK) - _, err = writer.Write(mapResp) - if err != nil { - logErr(err, "Failed to write response") - } - - return - } else if mapRequest.OmitPeers && mapRequest.Stream { - log.Warn(). - Str("handler", "PollNetMap"). - Bool("noise", isNoise). - Str("machine", machine.Hostname). - Msg("Ignoring request, don't know how to handle it") - - return - } - logInfo("Sending initial map") mapResp, err := mapp.FullMapResponse(mapRequest, machine, h.ACLPolicy) @@ -247,6 +209,24 @@ func (h *Headscale) handlePoll( return } + h.nodeNotifier.NotifyWithIgnore( + types.StateUpdate{ + Type: types.StatePeerChanged, + Changed: types.Machines{machine}, + }, + machine.MachineKey) + + // Set up the client stream + h.pollNetMapStreamWG.Add(1) + defer h.pollNetMapStreamWG.Done() + + updateChan := make(chan types.StateUpdate) + defer closeChanWithLog(updateChan, machine.Hostname, "updateChan") + + // Register the node's update channel + h.nodeNotifier.AddNode(machine.MachineKey, updateChan) + defer h.nodeNotifier.RemoveNode(machine.MachineKey) + keepAliveTicker := time.NewTicker(keepAliveInterval) ctx = context.WithValue(ctx, machineNameContextKey, machine.Hostname) @@ -254,8 +234,6 @@ func (h *Headscale) handlePoll( ctx, cancel := context.WithCancel(ctx) defer cancel() - h.pollStreamOpenMu.Unlock() - for { logInfo("Waiting for update on stream channel") select { @@ -280,15 +258,23 @@ func (h *Headscale) handlePoll( return } - err = h.db.TouchMachine(machine) - if err != nil { - logErr(err, "Cannot update machine LastSeen") + // This goroutine is not ideal, but we have a potential issue here + // where it blocks too long and that holds up updates. + // One alternative is to split these different channels into + // goroutines, but then you might have a problem without a lock + // if a keepalive is written at the same time as an update. + go func() { + err = h.db.UpdateLastSeen(machine) + if err != nil { + logErr(err, "Cannot update machine LastSeen") - return - } + return + } + }() case update := <-updateChan: logInfo("Received update") + now := time.Now() var data []byte var err error @@ -332,25 +318,37 @@ func (h *Headscale) handlePoll( return } - // Keep track of the last successful update, - // we sometimes end in a state were the update - // is not picked up by a client and we use this - // to determine if we should "force" an update. - err = h.db.TouchMachine(machine) - if err != nil { - logErr(err, "Cannot update machine LastSuccessfulUpdate") + // See comment in keepAliveTicker + go func() { + err = h.db.UpdateLastSeen(machine) + if err != nil { + logErr(err, "Cannot update machine LastSeen") - return - } + return + } + }() - logInfo("Update sent") + log.Info(). + Caller(). + Bool("noise", isNoise). + Bool("readOnly", mapRequest.ReadOnly). + Bool("omitPeers", mapRequest.OmitPeers). + Bool("stream", mapRequest.Stream). + Str("node_key", machine.NodeKey). + Str("machine", machine.Hostname). + TimeDiff("timeSpent", time.Now(), now). + Msg("update sent") case <-ctx.Done(): logInfo("The client has closed the connection") - err := h.db.TouchMachine(machine) - if err != nil { - logErr(err, "Cannot update machine LastSeen") - } + go func() { + err = h.db.UpdateLastSeen(machine) + if err != nil { + logErr(err, "Cannot update machine LastSeen") + + return + } + }() // The connection has been closed, so we can stop polling. return @@ -372,3 +370,89 @@ func closeChanWithLog[C chan []byte | chan struct{} | chan types.StateUpdate](ch close(channel) } + +// TODO(kradalby): This might not actually be used, +// observing incoming client requests indicates it +// is not. +func (h *Headscale) handleReadOnly( + writer http.ResponseWriter, + machine *types.Machine, + mapRequest tailcfg.MapRequest, + isNoise bool, +) { + logInfo, logErr := logPollFunc(mapRequest, machine, isNoise) + + mapp := mapper.NewMapper( + machine, + // TODO(kradalby): It might not be acceptable to send + // an empty peer list here. + types.Machines{}, + h.privateKey2019, + isNoise, + h.DERPMap, + h.cfg.BaseDomain, + h.cfg.DNSConfig, + h.cfg.LogTail.Enabled, + h.cfg.RandomizeClientPort, + ) + logInfo("Client is starting up. Probably interested in a DERP map") + + mapResp, err := mapp.FullMapResponse(mapRequest, machine, h.ACLPolicy) + if err != nil { + logErr(err, "Failed to create MapResponse") + http.Error(writer, "", http.StatusInternalServerError) + + return + } + + writer.Header().Set("Content-Type", "application/json; charset=utf-8") + writer.WriteHeader(http.StatusOK) + _, err = writer.Write(mapResp) + if err != nil { + logErr(err, "Failed to write response") + } + + if f, ok := writer.(http.Flusher); ok { + f.Flush() + } +} + +func (h *Headscale) handleLiteRequest( + writer http.ResponseWriter, + machine *types.Machine, + mapRequest tailcfg.MapRequest, + isNoise bool, +) { + logInfo, logErr := logPollFunc(mapRequest, machine, isNoise) + + mapp := mapper.NewMapper( + machine, + // TODO(kradalby): It might not be acceptable to send + // an empty peer list here. + types.Machines{}, + h.privateKey2019, + isNoise, + h.DERPMap, + h.cfg.BaseDomain, + h.cfg.DNSConfig, + h.cfg.LogTail.Enabled, + h.cfg.RandomizeClientPort, + ) + + logInfo("Client asked for a lite update, responding without peers") + + mapResp, err := mapp.LiteMapResponse(mapRequest, machine, h.ACLPolicy) + if err != nil { + logErr(err, "Failed to create MapResponse") + http.Error(writer, "", http.StatusInternalServerError) + + return + } + + writer.Header().Set("Content-Type", "application/json; charset=utf-8") + writer.WriteHeader(http.StatusOK) + _, err = writer.Write(mapResp) + if err != nil { + logErr(err, "Failed to write response") + } +}