1
0
mirror of https://github.com/juanfont/headscale.git synced 2026-02-07 20:04:00 +01:00

hscontrol/mapper: use sub-loggers and zf constants

Add sub-logger patterns to worker(), AddNode(), RemoveNode() and
multiChannelNodeConn to eliminate repeated field calls. Use zf.*
constants for consistent field naming.

Changes in batcher_lockfree.go:
- Add wlog sub-logger in worker() with worker.id context
- Add log field to multiChannelNodeConn struct
- Initialize mc.log with node.id in newMultiChannelNodeConn()
- Add nlog sub-loggers in AddNode() and RemoveNode()
- Update all connection methods to use mc.log

Changes in batcher.go:
- Use zf.NodeID and zf.Reason in handleNodeChange()
This commit is contained in:
Kristoffer Dalby 2026-02-05 11:04:54 +00:00
parent 7148a690d0
commit 53cdeff129
2 changed files with 57 additions and 49 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/juanfont/headscale/hscontrol/state"
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/types/change"
"github.com/juanfont/headscale/hscontrol/util/zlog/zf"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/puzpuzpuz/xsync/v4"
@ -140,7 +141,7 @@ func handleNodeChange(nc nodeConnection, mapper *mapper, r change.Change) error
nodeID := nc.nodeID()
log.Debug().Caller().Uint64("node.id", nodeID.Uint64()).Str("reason", r.Reason).Msg("Node change processing started because change notification received")
log.Debug().Caller().Uint64(zf.NodeID, nodeID.Uint64()).Str(zf.Reason, r.Reason).Msg("node change processing started")
data, err := generateMapResponse(nc, mapper, r)
if err != nil {

View File

@ -10,7 +10,9 @@ import (
"github.com/juanfont/headscale/hscontrol/types"
"github.com/juanfont/headscale/hscontrol/types/change"
"github.com/juanfont/headscale/hscontrol/util/zlog/zf"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"tailscale.com/tailcfg"
"tailscale.com/types/ptr"
@ -48,6 +50,7 @@ type LockFreeBatcher struct {
// and notifies other nodes that this node has come online.
func (b *LockFreeBatcher) AddNode(id types.NodeID, c chan<- *tailcfg.MapResponse, version tailcfg.CapabilityVersion) error {
addNodeStart := time.Now()
nlog := log.With().Uint64(zf.NodeID, id.Uint64()).Logger()
// Generate connection ID
connID := generateConnectionID()
@ -76,7 +79,7 @@ func (b *LockFreeBatcher) AddNode(id types.NodeID, c chan<- *tailcfg.MapResponse
// Use the worker pool for controlled concurrency instead of direct generation
initialMap, err := b.MapResponseFromChange(id, change.FullSelf(id))
if err != nil {
log.Error().Uint64("node.id", id.Uint64()).Err(err).Msg("Initial map generation failed")
nlog.Error().Err(err).Msg("initial map generation failed")
nodeConn.removeConnectionByChannel(c)
return fmt.Errorf("failed to generate initial map for node %d: %w", id, err)
}
@ -86,10 +89,10 @@ func (b *LockFreeBatcher) AddNode(id types.NodeID, c chan<- *tailcfg.MapResponse
select {
case c <- initialMap:
// Success
case <-time.After(5 * time.Second):
log.Error().Uint64("node.id", id.Uint64()).Err(fmt.Errorf("timeout")).Msg("Initial map send timeout")
log.Debug().Caller().Uint64("node.id", id.Uint64()).Dur("timeout.duration", 5*time.Second).
Msg("Initial map send timed out because channel was blocked or receiver not ready")
case <-time.After(5 * time.Second): //nolint:mnd
nlog.Error().Err(errors.New("timeout")).Msg("initial map send timeout") //nolint:err113
nlog.Debug().Caller().Dur("timeout.duration", 5*time.Second). //nolint:mnd
Msg("initial map send timed out because channel was blocked or receiver not ready")
nodeConn.removeConnectionByChannel(c)
return fmt.Errorf("failed to send initial map to node %d: timeout", id)
}
@ -100,9 +103,9 @@ func (b *LockFreeBatcher) AddNode(id types.NodeID, c chan<- *tailcfg.MapResponse
// Node will automatically receive updates through the normal flow
// The initial full map already contains all current state
log.Debug().Caller().Uint64("node.id", id.Uint64()).Dur("total.duration", time.Since(addNodeStart)).
nlog.Debug().Caller().Dur(zf.TotalDuration, time.Since(addNodeStart)).
Int("active.connections", nodeConn.getActiveConnectionCount()).
Msg("Node connection established in batcher because AddNode completed successfully")
Msg("node connection established in batcher")
return nil
}
@ -112,30 +115,32 @@ func (b *LockFreeBatcher) AddNode(id types.NodeID, c chan<- *tailcfg.MapResponse
// and keeps the node entry alive for rapid reconnections instead of aggressive deletion.
// Reports if the node still has active connections after removal.
func (b *LockFreeBatcher) RemoveNode(id types.NodeID, c chan<- *tailcfg.MapResponse) bool {
nlog := log.With().Uint64(zf.NodeID, id.Uint64()).Logger()
nodeConn, exists := b.nodes.Load(id)
if !exists {
log.Debug().Caller().Uint64("node.id", id.Uint64()).Msg("RemoveNode called for non-existent node because node not found in batcher")
nlog.Debug().Caller().Msg("RemoveNode called for non-existent node")
return false
}
// Remove specific connection
removed := nodeConn.removeConnectionByChannel(c)
if !removed {
log.Debug().Caller().Uint64("node.id", id.Uint64()).Msg("RemoveNode: channel not found because connection already removed or invalid")
nlog.Debug().Caller().Msg("RemoveNode: channel not found, connection already removed or invalid")
return false
}
// Check if node has any remaining active connections
if nodeConn.hasActiveConnections() {
log.Debug().Caller().Uint64("node.id", id.Uint64()).
nlog.Debug().Caller().
Int("active.connections", nodeConn.getActiveConnectionCount()).
Msg("Node connection removed but keeping online because other connections remain")
Msg("node connection removed but keeping online, other connections remain")
return true // Node still has active connections
}
// No active connections - keep the node entry alive for rapid reconnections
// The node will get a fresh full map when it reconnects
log.Debug().Caller().Uint64("node.id", id.Uint64()).Msg("Node disconnected from batcher because all connections removed, keeping entry for rapid reconnection")
nlog.Debug().Caller().Msg("node disconnected from batcher, keeping entry for rapid reconnection")
b.connected.Store(id, ptr.To(time.Now()))
return false
@ -196,11 +201,13 @@ func (b *LockFreeBatcher) doWork() {
}
func (b *LockFreeBatcher) worker(workerID int) {
wlog := log.With().Int(zf.WorkerID, workerID).Logger()
for {
select {
case w, ok := <-b.workCh:
if !ok {
log.Debug().Int("worker.id", workerID).Msgf("worker channel closing, shutting down worker %d", workerID)
wlog.Debug().Msg("worker channel closing, shutting down")
return
}
@ -219,10 +226,9 @@ func (b *LockFreeBatcher) worker(workerID int) {
result.err = err
if result.err != nil {
b.workErrors.Add(1)
log.Error().Err(result.err).
Int("worker.id", workerID).
Uint64("node.id", w.nodeID.Uint64()).
Str("reason", w.c.Reason).
wlog.Error().Err(result.err).
Uint64(zf.NodeID, w.nodeID.Uint64()).
Str(zf.Reason, w.c.Reason).
Msg("failed to generate map response for synchronous work")
} else if result.mapResponse != nil {
// Update peer tracking for synchronous responses too
@ -232,9 +238,8 @@ func (b *LockFreeBatcher) worker(workerID int) {
result.err = fmt.Errorf("node %d not found", w.nodeID)
b.workErrors.Add(1)
log.Error().Err(result.err).
Int("worker.id", workerID).
Uint64("node.id", w.nodeID.Uint64()).
wlog.Error().Err(result.err).
Uint64(zf.NodeID, w.nodeID.Uint64()).
Msg("node not found for synchronous work")
}
@ -257,15 +262,14 @@ func (b *LockFreeBatcher) worker(workerID int) {
err := nc.change(w.c)
if err != nil {
b.workErrors.Add(1)
log.Error().Err(err).
Int("worker.id", workerID).
Uint64("node.id", w.nodeID.Uint64()).
Str("reason", w.c.Reason).
wlog.Error().Err(err).
Uint64(zf.NodeID, w.nodeID.Uint64()).
Str(zf.Reason, w.c.Reason).
Msg("failed to apply change")
}
}
case <-b.done:
log.Debug().Int("worker.id", workerID).Msg("batcher shutting down, exiting worker")
wlog.Debug().Msg("batcher shutting down, exiting worker")
return
}
}
@ -310,8 +314,8 @@ func (b *LockFreeBatcher) addToBatch(changes ...change.Change) {
if _, existed := b.nodes.LoadAndDelete(removedID); existed {
b.totalNodes.Add(-1)
log.Debug().
Uint64("node.id", removedID.Uint64()).
Msg("Removed deleted node from batcher")
Uint64(zf.NodeID, removedID.Uint64()).
Msg("removed deleted node from batcher")
}
b.connected.Delete(removedID)
@ -403,9 +407,9 @@ func (b *LockFreeBatcher) cleanupOfflineNodes() {
// Clean up the identified nodes
for _, nodeID := range nodesToCleanup {
log.Info().Uint64("node.id", nodeID.Uint64()).
log.Info().Uint64(zf.NodeID, nodeID.Uint64()).
Dur("offline_duration", cleanupThreshold).
Msg("Cleaning up node that has been offline for too long")
Msg("cleaning up node that has been offline for too long")
b.nodes.Delete(nodeID)
b.connected.Delete(nodeID)
@ -413,8 +417,8 @@ func (b *LockFreeBatcher) cleanupOfflineNodes() {
}
if len(nodesToCleanup) > 0 {
log.Info().Int("cleaned_nodes", len(nodesToCleanup)).
Msg("Completed cleanup of long-offline nodes")
log.Info().Int(zf.CleanedNodes, len(nodesToCleanup)).
Msg("completed cleanup of long-offline nodes")
}
}
@ -502,6 +506,7 @@ type connectionEntry struct {
type multiChannelNodeConn struct {
id types.NodeID
mapper *mapper
log zerolog.Logger
mutex sync.RWMutex
connections []*connectionEntry
@ -528,6 +533,7 @@ func newMultiChannelNodeConn(id types.NodeID, mapper *mapper) *multiChannelNodeC
id: id,
mapper: mapper,
lastSentPeers: xsync.NewMap[tailcfg.NodeID, struct{}](),
log: log.With().Uint64(zf.NodeID, id.Uint64()).Logger(),
}
}
@ -546,7 +552,8 @@ func (mc *multiChannelNodeConn) close() {
// addConnection adds a new connection.
func (mc *multiChannelNodeConn) addConnection(entry *connectionEntry) {
mutexWaitStart := time.Now()
log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", entry.c)).Str("conn.id", entry.id).
mc.log.Debug().Caller().Str(zf.Chan, fmt.Sprintf("%p", entry.c)).Str(zf.ConnID, entry.id).
Msg("addConnection: waiting for mutex - POTENTIAL CONTENTION POINT")
mc.mutex.Lock()
@ -554,10 +561,10 @@ func (mc *multiChannelNodeConn) addConnection(entry *connectionEntry) {
defer mc.mutex.Unlock()
mc.connections = append(mc.connections, entry)
log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", entry.c)).Str("conn.id", entry.id).
mc.log.Debug().Caller().Str(zf.Chan, fmt.Sprintf("%p", entry.c)).Str(zf.ConnID, entry.id).
Int("total_connections", len(mc.connections)).
Dur("mutex_wait_time", mutexWaitDur).
Msg("Successfully added connection after mutex wait")
Msg("successfully added connection after mutex wait")
}
// removeConnectionByChannel removes a connection by matching channel pointer.
@ -569,9 +576,9 @@ func (mc *multiChannelNodeConn) removeConnectionByChannel(c chan<- *tailcfg.MapR
if entry.c == c {
// Remove this connection
mc.connections = append(mc.connections[:i], mc.connections[i+1:]...)
log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", c)).
mc.log.Debug().Caller().Str(zf.Chan, fmt.Sprintf("%p", c)).
Int("remaining_connections", len(mc.connections)).
Msg("Successfully removed connection")
Msg("successfully removed connection")
return true
}
}
@ -606,12 +613,12 @@ func (mc *multiChannelNodeConn) send(data *tailcfg.MapResponse) error {
if len(mc.connections) == 0 {
// During rapid reconnection, nodes may temporarily have no active connections
// This is not an error - the node will receive a full map when it reconnects
log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).
mc.log.Debug().Caller().
Msg("send: skipping send to node with no active connections (likely rapid reconnection)")
return nil // Return success instead of error
}
log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).
mc.log.Debug().Caller().
Int("total_connections", len(mc.connections)).
Msg("send: broadcasting to all connections")
@ -621,21 +628,21 @@ func (mc *multiChannelNodeConn) send(data *tailcfg.MapResponse) error {
// Send to all connections
for i, conn := range mc.connections {
log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", conn.c)).
Str("conn.id", conn.id).Int("connection_index", i).
mc.log.Debug().Caller().Str(zf.Chan, fmt.Sprintf("%p", conn.c)).
Str(zf.ConnID, conn.id).Int(zf.ConnectionIndex, i).
Msg("send: attempting to send to connection")
if err := conn.send(data); err != nil {
lastErr = err
failedConnections = append(failedConnections, i)
log.Warn().Err(err).
Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", conn.c)).
Str("conn.id", conn.id).Int("connection_index", i).
mc.log.Warn().Err(err).Str(zf.Chan, fmt.Sprintf("%p", conn.c)).
Str(zf.ConnID, conn.id).Int(zf.ConnectionIndex, i).
Msg("send: connection send failed")
} else {
successCount++
log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", conn.c)).
Str("conn.id", conn.id).Int("connection_index", i).
mc.log.Debug().Caller().Str(zf.Chan, fmt.Sprintf("%p", conn.c)).
Str(zf.ConnID, conn.id).Int(zf.ConnectionIndex, i).
Msg("send: successfully sent to connection")
}
}
@ -643,15 +650,15 @@ func (mc *multiChannelNodeConn) send(data *tailcfg.MapResponse) error {
// Remove failed connections (in reverse order to maintain indices)
for i := len(failedConnections) - 1; i >= 0; i-- {
idx := failedConnections[i]
log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).
Str("conn.id", mc.connections[idx].id).
mc.log.Debug().Caller().
Str(zf.ConnID, mc.connections[idx].id).
Msg("send: removing failed connection")
mc.connections = append(mc.connections[:idx], mc.connections[idx+1:]...)
}
mc.updateCount.Add(1)
log.Debug().Uint64("node.id", mc.id.Uint64()).
mc.log.Debug().
Int("successful_sends", successCount).
Int("failed_connections", len(failedConnections)).
Int("remaining_connections", len(mc.connections)).