1
0
mirror of https://github.com/juanfont/headscale.git synced 2025-08-19 13:48:20 +02:00

mapper: produce map before poll

Before this patch, we would send a message to each "node stream"
that there is an update that needs to be turned into a mapresponse
and sent to a node.

Producing the mapresponse is a "costly" afair which means that while
a node was producing one, it might start blocking and creating full
queues from the poller and all the way up to where updates where sent.

This could cause updates to time out and being dropped as a bad node
going away or spending too time processing would cause all the other
nodes to not get any updates.

In addition, it contributed to "uncontrolled parallel processing" by
potentially doing too many expensive operations at the same time:

Each node stream is essentially a channel, meaning that if you have 30
nodes, we will try to process 30 map requests at the same time. If you
have 8 cpu cores, that will saturate all the cores immediately and cause
a lot of wasted switching between the processing.

Now, all the maps are processed by workers in the mapper, and the number
of workers are controlable. These would now be recommended to be a bit
less than number of CPU cores, allowing us to process them as fast as we
can, and then send them to the poll.

When the poll recieved the map, it is only responsible for taking it and
sending it to the node.

This might not directly improve the performance of Headscale, but it will
likely make the performance a lot more consistent. And I would argue the
design is a lot easier to reason about.

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
Kristoffer Dalby 2025-07-18 15:26:14 +02:00
parent 07a25d6255
commit e6a03cddd1
No known key found for this signature in database

View File

@ -98,7 +98,11 @@ type node struct {
// node data for testing full map responses and comprehensive update scenarios. // node data for testing full map responses and comprehensive update scenarios.
// //
// Returns TestData struct containing all created entities and a cleanup function. // Returns TestData struct containing all created entities and a cleanup function.
func setupBatcherWithTestData(t *testing.T, bf batcherFunc, userCount, nodesPerUser, bufferSize int) (*TestData, func()) { func setupBatcherWithTestData(
t *testing.T,
bf batcherFunc,
userCount, nodesPerUser, bufferSize int,
) (*TestData, func()) {
t.Helper() t.Helper()
// Create database and populate with test data first // Create database and populate with test data first
@ -470,7 +474,9 @@ func TestEnhancedTrackingWithBatcher(t *testing.T) {
stats.TotalUpdates, stats.FullUpdates, stats.PatchUpdates, stats.MaxPeersSeen) stats.TotalUpdates, stats.FullUpdates, stats.PatchUpdates, stats.MaxPeersSeen)
if stats.TotalUpdates == 0 { if stats.TotalUpdates == 0 {
t.Error("Enhanced tracking with batcher received 0 updates - batcher may not be working") t.Error(
"Enhanced tracking with batcher received 0 updates - batcher may not be working",
)
} }
}) })
} }
@ -504,7 +510,11 @@ func TestBatcherScalabilityAllToAll(t *testing.T) {
t.Run(batcherFunc.name, func(t *testing.T) { t.Run(batcherFunc.name, func(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
t.Logf("ALL-TO-ALL TEST: %d nodes with %s batcher", tc.nodeCount, batcherFunc.name) t.Logf(
"ALL-TO-ALL TEST: %d nodes with %s batcher",
tc.nodeCount,
batcherFunc.name,
)
// Create test environment - all nodes from same user so they can be peers // Create test environment - all nodes from same user so they can be peers
// We need enough users to support the node count (max 1000 nodes per user) // We need enough users to support the node count (max 1000 nodes per user)
@ -515,13 +525,24 @@ func TestBatcherScalabilityAllToAll(t *testing.T) {
// Buffer needs to handle nodeCount * average_updates_per_node // Buffer needs to handle nodeCount * average_updates_per_node
// Estimate: each node receives ~2*nodeCount updates during all-to-all // Estimate: each node receives ~2*nodeCount updates during all-to-all
bufferSize := max(1000, tc.nodeCount*2) bufferSize := max(1000, tc.nodeCount*2)
testData, cleanup := setupBatcherWithTestData(t, batcherFunc.fn, usersNeeded, nodesPerUser, bufferSize) testData, cleanup := setupBatcherWithTestData(
t,
batcherFunc.fn,
usersNeeded,
nodesPerUser,
bufferSize,
)
defer cleanup() defer cleanup()
batcher := testData.Batcher batcher := testData.Batcher
allNodes := testData.Nodes[:tc.nodeCount] // Limit to requested count allNodes := testData.Nodes[:tc.nodeCount] // Limit to requested count
t.Logf("Created %d nodes across %d users, buffer size: %d", len(allNodes), usersNeeded, bufferSize) t.Logf(
"Created %d nodes across %d users, buffer size: %d",
len(allNodes),
usersNeeded,
bufferSize,
)
// Start enhanced tracking for all nodes // Start enhanced tracking for all nodes
for i := range allNodes { for i := range allNodes {
@ -621,16 +642,25 @@ func TestBatcherScalabilityAllToAll(t *testing.T) {
// Collect details for first few nodes or failing nodes // Collect details for first few nodes or failing nodes
if len(nodeDetails) < 10 || stats.MaxPeersSeen < expectedPeers { if len(nodeDetails) < 10 || stats.MaxPeersSeen < expectedPeers {
nodeDetails = append(nodeDetails, nodeDetails = append(nodeDetails,
fmt.Sprintf("Node %d: %d updates (%d full), max %d peers", fmt.Sprintf(
node.n.ID, stats.TotalUpdates, stats.FullUpdates, stats.MaxPeersSeen)) "Node %d: %d updates (%d full), max %d peers",
node.n.ID,
stats.TotalUpdates,
stats.FullUpdates,
stats.MaxPeersSeen,
))
} }
} }
// Final results // Final results
t.Logf("ALL-TO-ALL RESULTS: %d nodes, %d total updates (%d full)", t.Logf("ALL-TO-ALL RESULTS: %d nodes, %d total updates (%d full)",
len(allNodes), totalUpdates, totalFull) len(allNodes), totalUpdates, totalFull)
t.Logf(" Connectivity: %d/%d nodes successful (%.1f%%)", t.Logf(
successfulNodes, len(allNodes), float64(successfulNodes)/float64(len(allNodes))*100) " Connectivity: %d/%d nodes successful (%.1f%%)",
successfulNodes,
len(allNodes),
float64(successfulNodes)/float64(len(allNodes))*100,
)
t.Logf(" Peers seen: min=%d, max=%d, expected=%d", t.Logf(" Peers seen: min=%d, max=%d, expected=%d",
minPeersSeen, maxPeersGlobal, expectedPeers) minPeersSeen, maxPeersGlobal, expectedPeers)
t.Logf(" Timing: join=%v, total=%v", joinTime, totalTime) t.Logf(" Timing: join=%v, total=%v", joinTime, totalTime)
@ -649,7 +679,10 @@ func TestBatcherScalabilityAllToAll(t *testing.T) {
// Final verification: Since we waited until all nodes achieved connectivity, // Final verification: Since we waited until all nodes achieved connectivity,
// this should always pass, but we verify the final state for completeness // this should always pass, but we verify the final state for completeness
if successfulNodes == len(allNodes) { if successfulNodes == len(allNodes) {
t.Logf("✅ PASS: All-to-all connectivity achieved for %d nodes", len(allNodes)) t.Logf(
"✅ PASS: All-to-all connectivity achieved for %d nodes",
len(allNodes),
)
} else { } else {
// This should not happen since we loop until success, but handle it just in case // This should not happen since we loop until success, but handle it just in case
failedNodes := len(allNodes) - successfulNodes failedNodes := len(allNodes) - successfulNodes
@ -727,7 +760,11 @@ func TestBatcherBasicOperations(t *testing.T) {
case data := <-tn2.ch: case data := <-tn2.ch:
// Verify it's a full map response // Verify it's a full map response
assert.NotNil(t, data) assert.NotNil(t, data)
assert.True(t, len(data.Peers) >= 1 || data.Node != nil, "Should receive initial full map") assert.True(
t,
len(data.Peers) >= 1 || data.Node != nil,
"Should receive initial full map",
)
case <-time.After(200 * time.Millisecond): case <-time.After(200 * time.Millisecond):
t.Error("Second node should receive its initial full map") t.Error("Second node should receive its initial full map")
} }
@ -950,7 +987,11 @@ func TestBatcherWorkQueueBatching(t *testing.T) {
updateCount, 5, expectedUpdates) updateCount, 5, expectedUpdates)
if updateCount != expectedUpdates { if updateCount != expectedUpdates {
t.Errorf("Expected %d updates but received %d", expectedUpdates, updateCount) t.Errorf(
"Expected %d updates but received %d",
expectedUpdates,
updateCount,
)
} }
// Validate that all updates have valid content // Validate that all updates have valid content
@ -1153,8 +1194,12 @@ func TestBatcherWorkerChannelSafety(t *testing.T) {
mutex.Lock() mutex.Lock()
defer mutex.Unlock() defer mutex.Unlock()
t.Logf("Worker safety test results: %d panics, %d channel errors, %d invalid data packets", t.Logf(
panics, channelErrors, invalidData) "Worker safety test results: %d panics, %d channel errors, %d invalid data packets",
panics,
channelErrors,
invalidData,
)
// Test failure conditions // Test failure conditions
if panics > 0 { if panics > 0 {
@ -1187,7 +1232,13 @@ func TestBatcherConcurrentClients(t *testing.T) {
for _, batcherFunc := range allBatcherFunctions { for _, batcherFunc := range allBatcherFunctions {
t.Run(batcherFunc.name, func(t *testing.T) { t.Run(batcherFunc.name, func(t *testing.T) {
// Create comprehensive test environment with real data // Create comprehensive test environment with real data
testData, cleanup := setupBatcherWithTestData(t, batcherFunc.fn, TEST_USER_COUNT, TEST_NODES_PER_USER, 8) testData, cleanup := setupBatcherWithTestData(
t,
batcherFunc.fn,
TEST_USER_COUNT,
TEST_NODES_PER_USER,
8,
)
defer cleanup() defer cleanup()
batcher := testData.Batcher batcher := testData.Batcher
@ -1211,7 +1262,10 @@ func TestBatcherConcurrentClients(t *testing.T) {
select { select {
case data := <-channel: case data := <-channel:
if valid, reason := validateUpdateContent(data); valid { if valid, reason := validateUpdateContent(data); valid {
tracker.recordUpdate(nodeID, 1) // Use 1 as update size since we have MapResponse tracker.recordUpdate(
nodeID,
1,
) // Use 1 as update size since we have MapResponse
} else { } else {
t.Errorf("Invalid update received for stable node %d: %s", nodeID, reason) t.Errorf("Invalid update received for stable node %d: %s", nodeID, reason)
} }
@ -1266,7 +1320,10 @@ func TestBatcherConcurrentClients(t *testing.T) {
select { select {
case data := <-ch: case data := <-ch:
if valid, _ := validateUpdateContent(data); valid { if valid, _ := validateUpdateContent(data); valid {
tracker.recordUpdate(nodeID, 1) // Use 1 as update size since we have MapResponse tracker.recordUpdate(
nodeID,
1,
) // Use 1 as update size since we have MapResponse
} }
case <-time.After(20 * time.Millisecond): case <-time.After(20 * time.Millisecond):
return return
@ -1373,7 +1430,10 @@ func TestBatcherConcurrentClients(t *testing.T) {
t.Logf("Total updates - Stable clients: %d, Churning clients: %d", t.Logf("Total updates - Stable clients: %d, Churning clients: %d",
stableUpdateCount, churningUpdateCount) stableUpdateCount, churningUpdateCount)
t.Logf("Average per stable client: %.1f updates", float64(stableUpdateCount)/float64(len(stableNodes))) t.Logf(
"Average per stable client: %.1f updates",
float64(stableUpdateCount)/float64(len(stableNodes)),
)
t.Logf("Panics during test: %d", finalPanicCount) t.Logf("Panics during test: %d", finalPanicCount)
// Validate test success criteria // Validate test success criteria
@ -1457,7 +1517,13 @@ func XTestBatcherScalability(t *testing.T) {
// expectBreak = true // expectBreak = true
// } // }
name := fmt.Sprintf("%s_%dn_%dc_%db", chaosType, nodeCount, cycleCount, bufferSize) name := fmt.Sprintf(
"%s_%dn_%dc_%db",
chaosType,
nodeCount,
cycleCount,
bufferSize,
)
description := fmt.Sprintf("%s chaos: %d nodes, %d cycles, %d buffers", description := fmt.Sprintf("%s chaos: %d nodes, %d cycles, %d buffers",
chaosType, nodeCount, cycleCount, bufferSize) chaosType, nodeCount, cycleCount, bufferSize)
@ -1483,13 +1549,24 @@ func XTestBatcherScalability(t *testing.T) {
// Need 1000 nodes for largest test case, all from same user so they can be peers // Need 1000 nodes for largest test case, all from same user so they can be peers
usersNeeded := max(1, tc.nodeCount/1000) // 1 user per 1000 nodes, minimum 1 usersNeeded := max(1, tc.nodeCount/1000) // 1 user per 1000 nodes, minimum 1
nodesPerUser := tc.nodeCount / usersNeeded nodesPerUser := tc.nodeCount / usersNeeded
testData, cleanup := setupBatcherWithTestData(t, batcherFunc.fn, usersNeeded, nodesPerUser, tc.bufferSize) testData, cleanup := setupBatcherWithTestData(
t,
batcherFunc.fn,
usersNeeded,
nodesPerUser,
tc.bufferSize,
)
defer cleanup() defer cleanup()
batcher := testData.Batcher batcher := testData.Batcher
allNodes := testData.Nodes allNodes := testData.Nodes
t.Logf("[%d/%d] SCALABILITY TEST: %s", i+1, len(testCases), tc.description) t.Logf("[%d/%d] SCALABILITY TEST: %s", i+1, len(testCases), tc.description)
t.Logf(" Cycles: %d, Buffer Size: %d, Chaos Type: %s", tc.cycles, tc.bufferSize, tc.chaosType) t.Logf(
" Cycles: %d, Buffer Size: %d, Chaos Type: %s",
tc.cycles,
tc.bufferSize,
tc.chaosType,
)
// Use provided nodes, limit to requested count // Use provided nodes, limit to requested count
testNodes := allNodes[:min(len(allNodes), tc.nodeCount)] testNodes := allNodes[:min(len(allNodes), tc.nodeCount)]
@ -1500,7 +1577,11 @@ func XTestBatcherScalability(t *testing.T) {
startTime := time.Now() startTime := time.Now()
setupTime := time.Since(startTime) setupTime := time.Since(startTime)
t.Logf("Starting scalability test with %d nodes (setup took: %v)", len(testNodes), setupTime) t.Logf(
"Starting scalability test with %d nodes (setup took: %v)",
len(testNodes),
setupTime,
)
// Comprehensive stress test // Comprehensive stress test
done := make(chan struct{}) done := make(chan struct{})
@ -1533,7 +1614,11 @@ func XTestBatcherScalability(t *testing.T) {
defer close(done) defer close(done)
var wg sync.WaitGroup var wg sync.WaitGroup
t.Logf("Starting load generation: %d cycles with %d nodes", tc.cycles, len(testNodes)) t.Logf(
"Starting load generation: %d cycles with %d nodes",
tc.cycles,
len(testNodes),
)
// Main load generation - varies by chaos type // Main load generation - varies by chaos type
for cycle := range tc.cycles { for cycle := range tc.cycles {
@ -1586,7 +1671,10 @@ func XTestBatcherScalability(t *testing.T) {
connectedNodes[nodeID] = false connectedNodes[nodeID] = false
connectedNodesMutex.Unlock() connectedNodesMutex.Unlock()
} }
}(node.n.ID, node.ch) }(
node.n.ID,
node.ch,
)
// Then reconnection // Then reconnection
go func(nodeID types.NodeID, channel chan *tailcfg.MapResponse, index int) { go func(nodeID types.NodeID, channel chan *tailcfg.MapResponse, index int) {
@ -1599,7 +1687,12 @@ func XTestBatcherScalability(t *testing.T) {
// Small delay before reconnecting // Small delay before reconnecting
time.Sleep(time.Duration(index%3) * time.Millisecond) time.Sleep(time.Duration(index%3) * time.Millisecond)
batcher.AddNode(nodeID, channel, false, tailcfg.CapabilityVersion(100)) batcher.AddNode(
nodeID,
channel,
false,
tailcfg.CapabilityVersion(100),
)
connectedNodesMutex.Lock() connectedNodesMutex.Lock()
connectedNodes[nodeID] = true connectedNodes[nodeID] = true
connectedNodesMutex.Unlock() connectedNodesMutex.Unlock()
@ -1608,7 +1701,11 @@ func XTestBatcherScalability(t *testing.T) {
if index%5 == 0 { if index%5 == 0 {
batcher.AddWork(change.FullSet) batcher.AddWork(change.FullSet)
} }
}(node.n.ID, node.ch, i) }(
node.n.ID,
node.ch,
i,
)
} }
} }
@ -1636,7 +1733,9 @@ func XTestBatcherScalability(t *testing.T) {
// Pick a random node and generate a node change // Pick a random node and generate a node change
if len(testNodes) > 0 { if len(testNodes) > 0 {
nodeIdx := index % len(testNodes) nodeIdx := index % len(testNodes)
batcher.AddWork(change.NodeAdded(testNodes[nodeIdx].n.ID)) batcher.AddWork(
change.NodeAdded(testNodes[nodeIdx].n.ID),
)
} else { } else {
batcher.AddWork(change.FullSet) batcher.AddWork(change.FullSet)
} }
@ -1667,12 +1766,20 @@ func XTestBatcherScalability(t *testing.T) {
} }
interimPanics := atomic.LoadInt64(&panicCount) interimPanics := atomic.LoadInt64(&panicCount)
t.Logf("TIMEOUT DIAGNOSIS: Test timed out after %v", TEST_TIMEOUT) t.Logf("TIMEOUT DIAGNOSIS: Test timed out after %v", TEST_TIMEOUT)
t.Logf(" Progress at timeout: %d total updates, %d panics", totalUpdates, interimPanics) t.Logf(
t.Logf(" Possible causes: deadlock, excessive load, or performance bottleneck") " Progress at timeout: %d total updates, %d panics",
totalUpdates,
interimPanics,
)
t.Logf(
" Possible causes: deadlock, excessive load, or performance bottleneck",
)
// Try to detect if workers are still active // Try to detect if workers are still active
if totalUpdates > 0 { if totalUpdates > 0 {
t.Logf(" System was processing updates - likely performance bottleneck") t.Logf(
" System was processing updates - likely performance bottleneck",
)
} else { } else {
t.Logf(" No updates processed - likely deadlock or startup issue") t.Logf(" No updates processed - likely deadlock or startup issue")
} }
@ -1710,14 +1817,25 @@ func XTestBatcherScalability(t *testing.T) {
if stats.TotalUpdates > 0 { if stats.TotalUpdates > 0 {
nodeStatsReport = append(nodeStatsReport, nodeStatsReport = append(nodeStatsReport,
fmt.Sprintf("Node %d: %d total (%d patch, %d full), max %d peers", fmt.Sprintf(
node.n.ID, stats.TotalUpdates, stats.PatchUpdates, stats.FullUpdates, stats.MaxPeersSeen)) "Node %d: %d total (%d patch, %d full), max %d peers",
node.n.ID,
stats.TotalUpdates,
stats.PatchUpdates,
stats.FullUpdates,
stats.MaxPeersSeen,
))
} }
} }
// Comprehensive final summary // Comprehensive final summary
t.Logf("FINAL RESULTS: %d total updates (%d patch, %d full), max peers seen: %d", t.Logf(
totalUpdates, totalPatches, totalFull, maxPeersGlobal) "FINAL RESULTS: %d total updates (%d patch, %d full), max peers seen: %d",
totalUpdates,
totalPatches,
totalFull,
maxPeersGlobal,
)
if len(nodeStatsReport) <= 10 { // Only log details for smaller tests if len(nodeStatsReport) <= 10 { // Only log details for smaller tests
for _, report := range nodeStatsReport { for _, report := range nodeStatsReport {
t.Logf(" %s", report) t.Logf(" %s", report)
@ -1733,7 +1851,11 @@ func XTestBatcherScalability(t *testing.T) {
legacyTotalUpdates += stats.TotalUpdates legacyTotalUpdates += stats.TotalUpdates
} }
if legacyTotalUpdates != int(totalUpdates) { if legacyTotalUpdates != int(totalUpdates) {
t.Logf("Note: Legacy tracker mismatch - legacy: %d, new: %d", legacyTotalUpdates, totalUpdates) t.Logf(
"Note: Legacy tracker mismatch - legacy: %d, new: %d",
legacyTotalUpdates,
totalUpdates,
)
} }
finalPanicCount := atomic.LoadInt64(&panicCount) finalPanicCount := atomic.LoadInt64(&panicCount)
@ -1743,12 +1865,18 @@ func XTestBatcherScalability(t *testing.T) {
if tc.expectBreak { if tc.expectBreak {
// For tests expected to break, we're mainly checking that we don't crash // For tests expected to break, we're mainly checking that we don't crash
if finalPanicCount > 0 { if finalPanicCount > 0 {
t.Errorf("System crashed with %d panics (even breaking point tests shouldn't crash)", finalPanicCount) t.Errorf(
"System crashed with %d panics (even breaking point tests shouldn't crash)",
finalPanicCount,
)
testPassed = false testPassed = false
} }
// Timeout/deadlock is acceptable for breaking point tests // Timeout/deadlock is acceptable for breaking point tests
if deadlockDetected { if deadlockDetected {
t.Logf("Expected breaking point reached: system overloaded at %d nodes", len(testNodes)) t.Logf(
"Expected breaking point reached: system overloaded at %d nodes",
len(testNodes),
)
} }
} else { } else {
// For tests expected to pass, validate proper operation // For tests expected to pass, validate proper operation
@ -1856,19 +1984,35 @@ func TestBatcherFullPeerUpdates(t *testing.T) {
updateType = "DERP" updateType = "DERP"
} }
t.Logf(" Update %d: %s - Peers=%d, PeersChangedPatch=%d, DERPMap=%v", t.Logf(
updateNum, updateType, len(data.Peers), len(data.PeersChangedPatch), data.DERPMap != nil) " Update %d: %s - Peers=%d, PeersChangedPatch=%d, DERPMap=%v",
updateNum,
updateType,
len(data.Peers),
len(data.PeersChangedPatch),
data.DERPMap != nil,
)
if len(data.Peers) > 0 { if len(data.Peers) > 0 {
t.Logf(" Full peer list with %d peers", len(data.Peers)) t.Logf(" Full peer list with %d peers", len(data.Peers))
for j, peer := range data.Peers[:min(3, len(data.Peers))] { for j, peer := range data.Peers[:min(3, len(data.Peers))] {
t.Logf(" Peer %d: NodeID=%d, Online=%v", j, peer.ID, peer.Online) t.Logf(
" Peer %d: NodeID=%d, Online=%v",
j,
peer.ID,
peer.Online,
)
} }
} }
if len(data.PeersChangedPatch) > 0 { if len(data.PeersChangedPatch) > 0 {
t.Logf(" Patch update with %d changes", len(data.PeersChangedPatch)) t.Logf(" Patch update with %d changes", len(data.PeersChangedPatch))
for j, patch := range data.PeersChangedPatch[:min(3, len(data.PeersChangedPatch))] { for j, patch := range data.PeersChangedPatch[:min(3, len(data.PeersChangedPatch))] {
t.Logf(" Patch %d: NodeID=%d, Online=%v", j, patch.NodeID, patch.Online) t.Logf(
" Patch %d: NodeID=%d, Online=%v",
j,
patch.NodeID,
patch.Online,
)
} }
} }
@ -1882,7 +2026,9 @@ func TestBatcherFullPeerUpdates(t *testing.T) {
if !foundFullUpdate { if !foundFullUpdate {
t.Errorf("CRITICAL: No FULL updates received despite sending change.FullSet!") t.Errorf("CRITICAL: No FULL updates received despite sending change.FullSet!")
t.Errorf("This confirms the bug - FullSet updates are not generating full peer responses") t.Errorf(
"This confirms the bug - FullSet updates are not generating full peer responses",
)
} }
}) })
} }