diff --git a/hscontrol/mapper/batcher.go b/hscontrol/mapper/batcher.go index 0a1e30d0..1652b213 100644 --- a/hscontrol/mapper/batcher.go +++ b/hscontrol/mapper/batcher.go @@ -8,6 +8,7 @@ import ( "github.com/juanfont/headscale/hscontrol/state" "github.com/juanfont/headscale/hscontrol/types" "github.com/juanfont/headscale/hscontrol/types/change" + "github.com/juanfont/headscale/hscontrol/util/zlog/zf" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/puzpuzpuz/xsync/v4" @@ -140,7 +141,7 @@ func handleNodeChange(nc nodeConnection, mapper *mapper, r change.Change) error nodeID := nc.nodeID() - log.Debug().Caller().Uint64("node.id", nodeID.Uint64()).Str("reason", r.Reason).Msg("Node change processing started because change notification received") + log.Debug().Caller().Uint64(zf.NodeID, nodeID.Uint64()).Str(zf.Reason, r.Reason).Msg("node change processing started") data, err := generateMapResponse(nc, mapper, r) if err != nil { diff --git a/hscontrol/mapper/batcher_lockfree.go b/hscontrol/mapper/batcher_lockfree.go index e00512b6..88b5c4f2 100644 --- a/hscontrol/mapper/batcher_lockfree.go +++ b/hscontrol/mapper/batcher_lockfree.go @@ -10,7 +10,9 @@ import ( "github.com/juanfont/headscale/hscontrol/types" "github.com/juanfont/headscale/hscontrol/types/change" + "github.com/juanfont/headscale/hscontrol/util/zlog/zf" "github.com/puzpuzpuz/xsync/v4" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "tailscale.com/tailcfg" "tailscale.com/types/ptr" @@ -48,6 +50,7 @@ type LockFreeBatcher struct { // and notifies other nodes that this node has come online. func (b *LockFreeBatcher) AddNode(id types.NodeID, c chan<- *tailcfg.MapResponse, version tailcfg.CapabilityVersion) error { addNodeStart := time.Now() + nlog := log.With().Uint64(zf.NodeID, id.Uint64()).Logger() // Generate connection ID connID := generateConnectionID() @@ -76,7 +79,7 @@ func (b *LockFreeBatcher) AddNode(id types.NodeID, c chan<- *tailcfg.MapResponse // Use the worker pool for controlled concurrency instead of direct generation initialMap, err := b.MapResponseFromChange(id, change.FullSelf(id)) if err != nil { - log.Error().Uint64("node.id", id.Uint64()).Err(err).Msg("Initial map generation failed") + nlog.Error().Err(err).Msg("initial map generation failed") nodeConn.removeConnectionByChannel(c) return fmt.Errorf("failed to generate initial map for node %d: %w", id, err) } @@ -86,10 +89,10 @@ func (b *LockFreeBatcher) AddNode(id types.NodeID, c chan<- *tailcfg.MapResponse select { case c <- initialMap: // Success - case <-time.After(5 * time.Second): - log.Error().Uint64("node.id", id.Uint64()).Err(fmt.Errorf("timeout")).Msg("Initial map send timeout") - log.Debug().Caller().Uint64("node.id", id.Uint64()).Dur("timeout.duration", 5*time.Second). - Msg("Initial map send timed out because channel was blocked or receiver not ready") + case <-time.After(5 * time.Second): //nolint:mnd + nlog.Error().Err(errors.New("timeout")).Msg("initial map send timeout") //nolint:err113 + nlog.Debug().Caller().Dur("timeout.duration", 5*time.Second). //nolint:mnd + Msg("initial map send timed out because channel was blocked or receiver not ready") nodeConn.removeConnectionByChannel(c) return fmt.Errorf("failed to send initial map to node %d: timeout", id) } @@ -100,9 +103,9 @@ func (b *LockFreeBatcher) AddNode(id types.NodeID, c chan<- *tailcfg.MapResponse // Node will automatically receive updates through the normal flow // The initial full map already contains all current state - log.Debug().Caller().Uint64("node.id", id.Uint64()).Dur("total.duration", time.Since(addNodeStart)). + nlog.Debug().Caller().Dur(zf.TotalDuration, time.Since(addNodeStart)). Int("active.connections", nodeConn.getActiveConnectionCount()). - Msg("Node connection established in batcher because AddNode completed successfully") + Msg("node connection established in batcher") return nil } @@ -112,30 +115,32 @@ func (b *LockFreeBatcher) AddNode(id types.NodeID, c chan<- *tailcfg.MapResponse // and keeps the node entry alive for rapid reconnections instead of aggressive deletion. // Reports if the node still has active connections after removal. func (b *LockFreeBatcher) RemoveNode(id types.NodeID, c chan<- *tailcfg.MapResponse) bool { + nlog := log.With().Uint64(zf.NodeID, id.Uint64()).Logger() + nodeConn, exists := b.nodes.Load(id) if !exists { - log.Debug().Caller().Uint64("node.id", id.Uint64()).Msg("RemoveNode called for non-existent node because node not found in batcher") + nlog.Debug().Caller().Msg("RemoveNode called for non-existent node") return false } // Remove specific connection removed := nodeConn.removeConnectionByChannel(c) if !removed { - log.Debug().Caller().Uint64("node.id", id.Uint64()).Msg("RemoveNode: channel not found because connection already removed or invalid") + nlog.Debug().Caller().Msg("RemoveNode: channel not found, connection already removed or invalid") return false } // Check if node has any remaining active connections if nodeConn.hasActiveConnections() { - log.Debug().Caller().Uint64("node.id", id.Uint64()). + nlog.Debug().Caller(). Int("active.connections", nodeConn.getActiveConnectionCount()). - Msg("Node connection removed but keeping online because other connections remain") + Msg("node connection removed but keeping online, other connections remain") return true // Node still has active connections } // No active connections - keep the node entry alive for rapid reconnections // The node will get a fresh full map when it reconnects - log.Debug().Caller().Uint64("node.id", id.Uint64()).Msg("Node disconnected from batcher because all connections removed, keeping entry for rapid reconnection") + nlog.Debug().Caller().Msg("node disconnected from batcher, keeping entry for rapid reconnection") b.connected.Store(id, ptr.To(time.Now())) return false @@ -196,11 +201,13 @@ func (b *LockFreeBatcher) doWork() { } func (b *LockFreeBatcher) worker(workerID int) { + wlog := log.With().Int(zf.WorkerID, workerID).Logger() + for { select { case w, ok := <-b.workCh: if !ok { - log.Debug().Int("worker.id", workerID).Msgf("worker channel closing, shutting down worker %d", workerID) + wlog.Debug().Msg("worker channel closing, shutting down") return } @@ -219,10 +226,9 @@ func (b *LockFreeBatcher) worker(workerID int) { result.err = err if result.err != nil { b.workErrors.Add(1) - log.Error().Err(result.err). - Int("worker.id", workerID). - Uint64("node.id", w.nodeID.Uint64()). - Str("reason", w.c.Reason). + wlog.Error().Err(result.err). + Uint64(zf.NodeID, w.nodeID.Uint64()). + Str(zf.Reason, w.c.Reason). Msg("failed to generate map response for synchronous work") } else if result.mapResponse != nil { // Update peer tracking for synchronous responses too @@ -232,9 +238,8 @@ func (b *LockFreeBatcher) worker(workerID int) { result.err = fmt.Errorf("node %d not found", w.nodeID) b.workErrors.Add(1) - log.Error().Err(result.err). - Int("worker.id", workerID). - Uint64("node.id", w.nodeID.Uint64()). + wlog.Error().Err(result.err). + Uint64(zf.NodeID, w.nodeID.Uint64()). Msg("node not found for synchronous work") } @@ -257,15 +262,14 @@ func (b *LockFreeBatcher) worker(workerID int) { err := nc.change(w.c) if err != nil { b.workErrors.Add(1) - log.Error().Err(err). - Int("worker.id", workerID). - Uint64("node.id", w.nodeID.Uint64()). - Str("reason", w.c.Reason). + wlog.Error().Err(err). + Uint64(zf.NodeID, w.nodeID.Uint64()). + Str(zf.Reason, w.c.Reason). Msg("failed to apply change") } } case <-b.done: - log.Debug().Int("worker.id", workerID).Msg("batcher shutting down, exiting worker") + wlog.Debug().Msg("batcher shutting down, exiting worker") return } } @@ -310,8 +314,8 @@ func (b *LockFreeBatcher) addToBatch(changes ...change.Change) { if _, existed := b.nodes.LoadAndDelete(removedID); existed { b.totalNodes.Add(-1) log.Debug(). - Uint64("node.id", removedID.Uint64()). - Msg("Removed deleted node from batcher") + Uint64(zf.NodeID, removedID.Uint64()). + Msg("removed deleted node from batcher") } b.connected.Delete(removedID) @@ -403,9 +407,9 @@ func (b *LockFreeBatcher) cleanupOfflineNodes() { // Clean up the identified nodes for _, nodeID := range nodesToCleanup { - log.Info().Uint64("node.id", nodeID.Uint64()). + log.Info().Uint64(zf.NodeID, nodeID.Uint64()). Dur("offline_duration", cleanupThreshold). - Msg("Cleaning up node that has been offline for too long") + Msg("cleaning up node that has been offline for too long") b.nodes.Delete(nodeID) b.connected.Delete(nodeID) @@ -413,8 +417,8 @@ func (b *LockFreeBatcher) cleanupOfflineNodes() { } if len(nodesToCleanup) > 0 { - log.Info().Int("cleaned_nodes", len(nodesToCleanup)). - Msg("Completed cleanup of long-offline nodes") + log.Info().Int(zf.CleanedNodes, len(nodesToCleanup)). + Msg("completed cleanup of long-offline nodes") } } @@ -502,6 +506,7 @@ type connectionEntry struct { type multiChannelNodeConn struct { id types.NodeID mapper *mapper + log zerolog.Logger mutex sync.RWMutex connections []*connectionEntry @@ -528,6 +533,7 @@ func newMultiChannelNodeConn(id types.NodeID, mapper *mapper) *multiChannelNodeC id: id, mapper: mapper, lastSentPeers: xsync.NewMap[tailcfg.NodeID, struct{}](), + log: log.With().Uint64(zf.NodeID, id.Uint64()).Logger(), } } @@ -546,7 +552,8 @@ func (mc *multiChannelNodeConn) close() { // addConnection adds a new connection. func (mc *multiChannelNodeConn) addConnection(entry *connectionEntry) { mutexWaitStart := time.Now() - log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", entry.c)).Str("conn.id", entry.id). + + mc.log.Debug().Caller().Str(zf.Chan, fmt.Sprintf("%p", entry.c)).Str(zf.ConnID, entry.id). Msg("addConnection: waiting for mutex - POTENTIAL CONTENTION POINT") mc.mutex.Lock() @@ -554,10 +561,10 @@ func (mc *multiChannelNodeConn) addConnection(entry *connectionEntry) { defer mc.mutex.Unlock() mc.connections = append(mc.connections, entry) - log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", entry.c)).Str("conn.id", entry.id). + mc.log.Debug().Caller().Str(zf.Chan, fmt.Sprintf("%p", entry.c)).Str(zf.ConnID, entry.id). Int("total_connections", len(mc.connections)). Dur("mutex_wait_time", mutexWaitDur). - Msg("Successfully added connection after mutex wait") + Msg("successfully added connection after mutex wait") } // removeConnectionByChannel removes a connection by matching channel pointer. @@ -569,9 +576,9 @@ func (mc *multiChannelNodeConn) removeConnectionByChannel(c chan<- *tailcfg.MapR if entry.c == c { // Remove this connection mc.connections = append(mc.connections[:i], mc.connections[i+1:]...) - log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", c)). + mc.log.Debug().Caller().Str(zf.Chan, fmt.Sprintf("%p", c)). Int("remaining_connections", len(mc.connections)). - Msg("Successfully removed connection") + Msg("successfully removed connection") return true } } @@ -606,12 +613,12 @@ func (mc *multiChannelNodeConn) send(data *tailcfg.MapResponse) error { if len(mc.connections) == 0 { // During rapid reconnection, nodes may temporarily have no active connections // This is not an error - the node will receive a full map when it reconnects - log.Debug().Caller().Uint64("node.id", mc.id.Uint64()). + mc.log.Debug().Caller(). Msg("send: skipping send to node with no active connections (likely rapid reconnection)") return nil // Return success instead of error } - log.Debug().Caller().Uint64("node.id", mc.id.Uint64()). + mc.log.Debug().Caller(). Int("total_connections", len(mc.connections)). Msg("send: broadcasting to all connections") @@ -621,21 +628,21 @@ func (mc *multiChannelNodeConn) send(data *tailcfg.MapResponse) error { // Send to all connections for i, conn := range mc.connections { - log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", conn.c)). - Str("conn.id", conn.id).Int("connection_index", i). + mc.log.Debug().Caller().Str(zf.Chan, fmt.Sprintf("%p", conn.c)). + Str(zf.ConnID, conn.id).Int(zf.ConnectionIndex, i). Msg("send: attempting to send to connection") if err := conn.send(data); err != nil { lastErr = err failedConnections = append(failedConnections, i) - log.Warn().Err(err). - Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", conn.c)). - Str("conn.id", conn.id).Int("connection_index", i). + mc.log.Warn().Err(err).Str(zf.Chan, fmt.Sprintf("%p", conn.c)). + Str(zf.ConnID, conn.id).Int(zf.ConnectionIndex, i). Msg("send: connection send failed") } else { successCount++ - log.Debug().Caller().Uint64("node.id", mc.id.Uint64()).Str("chan", fmt.Sprintf("%p", conn.c)). - Str("conn.id", conn.id).Int("connection_index", i). + + mc.log.Debug().Caller().Str(zf.Chan, fmt.Sprintf("%p", conn.c)). + Str(zf.ConnID, conn.id).Int(zf.ConnectionIndex, i). Msg("send: successfully sent to connection") } } @@ -643,15 +650,15 @@ func (mc *multiChannelNodeConn) send(data *tailcfg.MapResponse) error { // Remove failed connections (in reverse order to maintain indices) for i := len(failedConnections) - 1; i >= 0; i-- { idx := failedConnections[i] - log.Debug().Caller().Uint64("node.id", mc.id.Uint64()). - Str("conn.id", mc.connections[idx].id). + mc.log.Debug().Caller(). + Str(zf.ConnID, mc.connections[idx].id). Msg("send: removing failed connection") mc.connections = append(mc.connections[:idx], mc.connections[idx+1:]...) } mc.updateCount.Add(1) - log.Debug().Uint64("node.id", mc.id.Uint64()). + mc.log.Debug(). Int("successful_sends", successCount). Int("failed_connections", len(failedConnections)). Int("remaining_connections", len(mc.connections)).