1
0
mirror of https://github.com/juanfont/headscale.git synced 2024-12-20 19:09:07 +01:00

rearrange poll, lock, notify

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
Kristoffer Dalby 2023-07-26 17:54:19 +02:00 committed by Kristoffer Dalby
parent 593b3ad981
commit a8079a2096
2 changed files with 35 additions and 50 deletions

View File

@ -9,7 +9,7 @@ import (
) )
type Notifier struct { type Notifier struct {
l sync.RWMutex l sync.Mutex
nodes map[string]chan<- types.StateUpdate nodes map[string]chan<- types.StateUpdate
} }
@ -54,8 +54,8 @@ func (n *Notifier) NotifyAll(update types.StateUpdate) {
} }
func (n *Notifier) NotifyWithIgnore(update types.StateUpdate, ignore ...string) { func (n *Notifier) NotifyWithIgnore(update types.StateUpdate, ignore ...string) {
n.l.RLock() n.l.Lock()
defer n.l.RUnlock() defer n.l.Unlock()
for key, c := range n.nodes { for key, c := range n.nodes {
if util.IsStringInSlice(ignore, key) { if util.IsStringInSlice(ignore, key) {

View File

@ -62,6 +62,22 @@ func (h *Headscale) handlePoll(
mapRequest tailcfg.MapRequest, mapRequest tailcfg.MapRequest,
isNoise bool, isNoise bool,
) { ) {
// Immediate open the channel and register it if the client wants
// a stream of MapResponses to prevent initial map response and
// following updates missing
var updateChan chan types.StateUpdate
if mapRequest.Stream {
h.pollNetMapStreamWG.Add(1)
defer h.pollNetMapStreamWG.Done()
updateChan = make(chan types.StateUpdate)
defer closeChanWithLog(updateChan, machine.Hostname, "updateChan")
// Register the node's update channel
h.nodeNotifier.AddNode(machine.MachineKey, updateChan)
defer h.nodeNotifier.RemoveNode(machine.MachineKey)
}
logInfo, logErr := logPollFunc(mapRequest, machine, isNoise) logInfo, logErr := logPollFunc(mapRequest, machine, isNoise)
mapp := mapper.NewMapper( mapp := mapper.NewMapper(
@ -116,6 +132,21 @@ func (h *Headscale) handlePoll(
return return
} }
if !mapRequest.ReadOnly {
// It sounds like we should update the nodes when we have received a endpoint update
// even tho the comments in the tailscale code dont explicitly say so.
updateRequestsFromNode.WithLabelValues(machine.User.Name, machine.Hostname, "endpoint-update").
Inc()
// Tell all the other nodes about the new endpoint, but dont update ourselves.
h.nodeNotifier.NotifyWithIgnore(
types.StateUpdate{
Type: types.StatePeerChanged,
Changed: []uint64{machine.ID},
},
machine.MachineKey)
}
// We update our peers if the client is not sending ReadOnly in the MapRequest // We update our peers if the client is not sending ReadOnly in the MapRequest
// so we don't distribute its initial request (it comes with // so we don't distribute its initial request (it comes with
// empty endpoints to peers) // empty endpoints to peers)
@ -165,18 +196,6 @@ func (h *Headscale) handlePoll(
if err != nil { if err != nil {
logErr(err, "Failed to write response") logErr(err, "Failed to write response")
} }
// It sounds like we should update the nodes when we have received a endpoint update
// even tho the comments in the tailscale code dont explicitly say so.
updateRequestsFromNode.WithLabelValues(machine.User.Name, machine.Hostname, "endpoint-update").
Inc()
// Tell all the other nodes about the new endpoint, but dont update ourselves.
h.nodeNotifier.NotifyWithIgnore(
types.StateUpdate{
Type: types.StatePeerChanged,
Changed: []uint64{machine.ID},
},
machine.MachineKey)
return return
} else if mapRequest.OmitPeers && mapRequest.Stream { } else if mapRequest.OmitPeers && mapRequest.Stream {
@ -213,43 +232,9 @@ func (h *Headscale) handlePoll(
return return
} }
h.pollNetMapStream(
writer,
ctx,
machine,
mapp,
mapRequest,
isNoise,
)
logInfo("Finished stream, closing PollNetMap session")
}
// pollNetMapStream stream logic for /machine/map,
// ensuring we communicate updates and data to the connected clients.
func (h *Headscale) pollNetMapStream(
writer http.ResponseWriter,
ctxReq context.Context,
machine *types.Machine,
mapp *mapper.Mapper,
mapRequest tailcfg.MapRequest,
isNoise bool,
) {
logInfo, logErr := logPollFunc(mapRequest, machine, isNoise)
keepAliveTicker := time.NewTicker(keepAliveInterval) keepAliveTicker := time.NewTicker(keepAliveInterval)
h.pollNetMapStreamWG.Add(1) ctx = context.WithValue(ctx, machineNameContextKey, machine.Hostname)
defer h.pollNetMapStreamWG.Done()
updateChan := make(chan types.StateUpdate)
defer closeChanWithLog(updateChan, machine.Hostname, "updateChan")
// Register the node's update channel
h.nodeNotifier.AddNode(machine.MachineKey, updateChan)
defer h.nodeNotifier.RemoveNode(machine.MachineKey)
ctx := context.WithValue(ctxReq, machineNameContextKey, machine.Hostname)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()