diff --git a/hscontrol/mapper/batcher_test.go b/hscontrol/mapper/batcher_test.go index b2a632d4..0a8b544a 100644 --- a/hscontrol/mapper/batcher_test.go +++ b/hscontrol/mapper/batcher_test.go @@ -98,7 +98,11 @@ type node struct { // node data for testing full map responses and comprehensive update scenarios. // // Returns TestData struct containing all created entities and a cleanup function. -func setupBatcherWithTestData(t *testing.T, bf batcherFunc, userCount, nodesPerUser, bufferSize int) (*TestData, func()) { +func setupBatcherWithTestData( + t *testing.T, + bf batcherFunc, + userCount, nodesPerUser, bufferSize int, +) (*TestData, func()) { t.Helper() // Create database and populate with test data first @@ -470,7 +474,9 @@ func TestEnhancedTrackingWithBatcher(t *testing.T) { stats.TotalUpdates, stats.FullUpdates, stats.PatchUpdates, stats.MaxPeersSeen) if stats.TotalUpdates == 0 { - t.Error("Enhanced tracking with batcher received 0 updates - batcher may not be working") + t.Error( + "Enhanced tracking with batcher received 0 updates - batcher may not be working", + ) } }) } @@ -504,7 +510,11 @@ func TestBatcherScalabilityAllToAll(t *testing.T) { t.Run(batcherFunc.name, func(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - t.Logf("ALL-TO-ALL TEST: %d nodes with %s batcher", tc.nodeCount, batcherFunc.name) + t.Logf( + "ALL-TO-ALL TEST: %d nodes with %s batcher", + tc.nodeCount, + batcherFunc.name, + ) // Create test environment - all nodes from same user so they can be peers // We need enough users to support the node count (max 1000 nodes per user) @@ -515,13 +525,24 @@ func TestBatcherScalabilityAllToAll(t *testing.T) { // Buffer needs to handle nodeCount * average_updates_per_node // Estimate: each node receives ~2*nodeCount updates during all-to-all bufferSize := max(1000, tc.nodeCount*2) - testData, cleanup := setupBatcherWithTestData(t, batcherFunc.fn, usersNeeded, nodesPerUser, bufferSize) + testData, cleanup := setupBatcherWithTestData( + t, + batcherFunc.fn, + usersNeeded, + nodesPerUser, + bufferSize, + ) defer cleanup() batcher := testData.Batcher allNodes := testData.Nodes[:tc.nodeCount] // Limit to requested count - t.Logf("Created %d nodes across %d users, buffer size: %d", len(allNodes), usersNeeded, bufferSize) + t.Logf( + "Created %d nodes across %d users, buffer size: %d", + len(allNodes), + usersNeeded, + bufferSize, + ) // Start enhanced tracking for all nodes for i := range allNodes { @@ -621,16 +642,25 @@ func TestBatcherScalabilityAllToAll(t *testing.T) { // Collect details for first few nodes or failing nodes if len(nodeDetails) < 10 || stats.MaxPeersSeen < expectedPeers { nodeDetails = append(nodeDetails, - fmt.Sprintf("Node %d: %d updates (%d full), max %d peers", - node.n.ID, stats.TotalUpdates, stats.FullUpdates, stats.MaxPeersSeen)) + fmt.Sprintf( + "Node %d: %d updates (%d full), max %d peers", + node.n.ID, + stats.TotalUpdates, + stats.FullUpdates, + stats.MaxPeersSeen, + )) } } // Final results t.Logf("ALL-TO-ALL RESULTS: %d nodes, %d total updates (%d full)", len(allNodes), totalUpdates, totalFull) - t.Logf(" Connectivity: %d/%d nodes successful (%.1f%%)", - successfulNodes, len(allNodes), float64(successfulNodes)/float64(len(allNodes))*100) + t.Logf( + " Connectivity: %d/%d nodes successful (%.1f%%)", + successfulNodes, + len(allNodes), + float64(successfulNodes)/float64(len(allNodes))*100, + ) t.Logf(" Peers seen: min=%d, max=%d, expected=%d", minPeersSeen, maxPeersGlobal, expectedPeers) t.Logf(" Timing: join=%v, total=%v", joinTime, totalTime) @@ -649,7 +679,10 @@ func TestBatcherScalabilityAllToAll(t *testing.T) { // Final verification: Since we waited until all nodes achieved connectivity, // this should always pass, but we verify the final state for completeness if successfulNodes == len(allNodes) { - t.Logf("✅ PASS: All-to-all connectivity achieved for %d nodes", len(allNodes)) + t.Logf( + "✅ PASS: All-to-all connectivity achieved for %d nodes", + len(allNodes), + ) } else { // This should not happen since we loop until success, but handle it just in case failedNodes := len(allNodes) - successfulNodes @@ -727,7 +760,11 @@ func TestBatcherBasicOperations(t *testing.T) { case data := <-tn2.ch: // Verify it's a full map response assert.NotNil(t, data) - assert.True(t, len(data.Peers) >= 1 || data.Node != nil, "Should receive initial full map") + assert.True( + t, + len(data.Peers) >= 1 || data.Node != nil, + "Should receive initial full map", + ) case <-time.After(200 * time.Millisecond): t.Error("Second node should receive its initial full map") } @@ -950,7 +987,11 @@ func TestBatcherWorkQueueBatching(t *testing.T) { updateCount, 5, expectedUpdates) if updateCount != expectedUpdates { - t.Errorf("Expected %d updates but received %d", expectedUpdates, updateCount) + t.Errorf( + "Expected %d updates but received %d", + expectedUpdates, + updateCount, + ) } // Validate that all updates have valid content @@ -1153,8 +1194,12 @@ func TestBatcherWorkerChannelSafety(t *testing.T) { mutex.Lock() defer mutex.Unlock() - t.Logf("Worker safety test results: %d panics, %d channel errors, %d invalid data packets", - panics, channelErrors, invalidData) + t.Logf( + "Worker safety test results: %d panics, %d channel errors, %d invalid data packets", + panics, + channelErrors, + invalidData, + ) // Test failure conditions if panics > 0 { @@ -1187,7 +1232,13 @@ func TestBatcherConcurrentClients(t *testing.T) { for _, batcherFunc := range allBatcherFunctions { t.Run(batcherFunc.name, func(t *testing.T) { // Create comprehensive test environment with real data - testData, cleanup := setupBatcherWithTestData(t, batcherFunc.fn, TEST_USER_COUNT, TEST_NODES_PER_USER, 8) + testData, cleanup := setupBatcherWithTestData( + t, + batcherFunc.fn, + TEST_USER_COUNT, + TEST_NODES_PER_USER, + 8, + ) defer cleanup() batcher := testData.Batcher @@ -1211,7 +1262,10 @@ func TestBatcherConcurrentClients(t *testing.T) { select { case data := <-channel: if valid, reason := validateUpdateContent(data); valid { - tracker.recordUpdate(nodeID, 1) // Use 1 as update size since we have MapResponse + tracker.recordUpdate( + nodeID, + 1, + ) // Use 1 as update size since we have MapResponse } else { t.Errorf("Invalid update received for stable node %d: %s", nodeID, reason) } @@ -1266,7 +1320,10 @@ func TestBatcherConcurrentClients(t *testing.T) { select { case data := <-ch: if valid, _ := validateUpdateContent(data); valid { - tracker.recordUpdate(nodeID, 1) // Use 1 as update size since we have MapResponse + tracker.recordUpdate( + nodeID, + 1, + ) // Use 1 as update size since we have MapResponse } case <-time.After(20 * time.Millisecond): return @@ -1373,7 +1430,10 @@ func TestBatcherConcurrentClients(t *testing.T) { t.Logf("Total updates - Stable clients: %d, Churning clients: %d", stableUpdateCount, churningUpdateCount) - t.Logf("Average per stable client: %.1f updates", float64(stableUpdateCount)/float64(len(stableNodes))) + t.Logf( + "Average per stable client: %.1f updates", + float64(stableUpdateCount)/float64(len(stableNodes)), + ) t.Logf("Panics during test: %d", finalPanicCount) // Validate test success criteria @@ -1457,7 +1517,13 @@ func XTestBatcherScalability(t *testing.T) { // expectBreak = true // } - name := fmt.Sprintf("%s_%dn_%dc_%db", chaosType, nodeCount, cycleCount, bufferSize) + name := fmt.Sprintf( + "%s_%dn_%dc_%db", + chaosType, + nodeCount, + cycleCount, + bufferSize, + ) description := fmt.Sprintf("%s chaos: %d nodes, %d cycles, %d buffers", chaosType, nodeCount, cycleCount, bufferSize) @@ -1483,13 +1549,24 @@ func XTestBatcherScalability(t *testing.T) { // Need 1000 nodes for largest test case, all from same user so they can be peers usersNeeded := max(1, tc.nodeCount/1000) // 1 user per 1000 nodes, minimum 1 nodesPerUser := tc.nodeCount / usersNeeded - testData, cleanup := setupBatcherWithTestData(t, batcherFunc.fn, usersNeeded, nodesPerUser, tc.bufferSize) + testData, cleanup := setupBatcherWithTestData( + t, + batcherFunc.fn, + usersNeeded, + nodesPerUser, + tc.bufferSize, + ) defer cleanup() batcher := testData.Batcher allNodes := testData.Nodes t.Logf("[%d/%d] SCALABILITY TEST: %s", i+1, len(testCases), tc.description) - t.Logf(" Cycles: %d, Buffer Size: %d, Chaos Type: %s", tc.cycles, tc.bufferSize, tc.chaosType) + t.Logf( + " Cycles: %d, Buffer Size: %d, Chaos Type: %s", + tc.cycles, + tc.bufferSize, + tc.chaosType, + ) // Use provided nodes, limit to requested count testNodes := allNodes[:min(len(allNodes), tc.nodeCount)] @@ -1500,7 +1577,11 @@ func XTestBatcherScalability(t *testing.T) { startTime := time.Now() setupTime := time.Since(startTime) - t.Logf("Starting scalability test with %d nodes (setup took: %v)", len(testNodes), setupTime) + t.Logf( + "Starting scalability test with %d nodes (setup took: %v)", + len(testNodes), + setupTime, + ) // Comprehensive stress test done := make(chan struct{}) @@ -1533,7 +1614,11 @@ func XTestBatcherScalability(t *testing.T) { defer close(done) var wg sync.WaitGroup - t.Logf("Starting load generation: %d cycles with %d nodes", tc.cycles, len(testNodes)) + t.Logf( + "Starting load generation: %d cycles with %d nodes", + tc.cycles, + len(testNodes), + ) // Main load generation - varies by chaos type for cycle := range tc.cycles { @@ -1586,7 +1671,10 @@ func XTestBatcherScalability(t *testing.T) { connectedNodes[nodeID] = false connectedNodesMutex.Unlock() } - }(node.n.ID, node.ch) + }( + node.n.ID, + node.ch, + ) // Then reconnection go func(nodeID types.NodeID, channel chan *tailcfg.MapResponse, index int) { @@ -1599,7 +1687,12 @@ func XTestBatcherScalability(t *testing.T) { // Small delay before reconnecting time.Sleep(time.Duration(index%3) * time.Millisecond) - batcher.AddNode(nodeID, channel, false, tailcfg.CapabilityVersion(100)) + batcher.AddNode( + nodeID, + channel, + false, + tailcfg.CapabilityVersion(100), + ) connectedNodesMutex.Lock() connectedNodes[nodeID] = true connectedNodesMutex.Unlock() @@ -1608,7 +1701,11 @@ func XTestBatcherScalability(t *testing.T) { if index%5 == 0 { batcher.AddWork(change.FullSet) } - }(node.n.ID, node.ch, i) + }( + node.n.ID, + node.ch, + i, + ) } } @@ -1636,7 +1733,9 @@ func XTestBatcherScalability(t *testing.T) { // Pick a random node and generate a node change if len(testNodes) > 0 { nodeIdx := index % len(testNodes) - batcher.AddWork(change.NodeAdded(testNodes[nodeIdx].n.ID)) + batcher.AddWork( + change.NodeAdded(testNodes[nodeIdx].n.ID), + ) } else { batcher.AddWork(change.FullSet) } @@ -1667,12 +1766,20 @@ func XTestBatcherScalability(t *testing.T) { } interimPanics := atomic.LoadInt64(&panicCount) t.Logf("TIMEOUT DIAGNOSIS: Test timed out after %v", TEST_TIMEOUT) - t.Logf(" Progress at timeout: %d total updates, %d panics", totalUpdates, interimPanics) - t.Logf(" Possible causes: deadlock, excessive load, or performance bottleneck") + t.Logf( + " Progress at timeout: %d total updates, %d panics", + totalUpdates, + interimPanics, + ) + t.Logf( + " Possible causes: deadlock, excessive load, or performance bottleneck", + ) // Try to detect if workers are still active if totalUpdates > 0 { - t.Logf(" System was processing updates - likely performance bottleneck") + t.Logf( + " System was processing updates - likely performance bottleneck", + ) } else { t.Logf(" No updates processed - likely deadlock or startup issue") } @@ -1710,14 +1817,25 @@ func XTestBatcherScalability(t *testing.T) { if stats.TotalUpdates > 0 { nodeStatsReport = append(nodeStatsReport, - fmt.Sprintf("Node %d: %d total (%d patch, %d full), max %d peers", - node.n.ID, stats.TotalUpdates, stats.PatchUpdates, stats.FullUpdates, stats.MaxPeersSeen)) + fmt.Sprintf( + "Node %d: %d total (%d patch, %d full), max %d peers", + node.n.ID, + stats.TotalUpdates, + stats.PatchUpdates, + stats.FullUpdates, + stats.MaxPeersSeen, + )) } } // Comprehensive final summary - t.Logf("FINAL RESULTS: %d total updates (%d patch, %d full), max peers seen: %d", - totalUpdates, totalPatches, totalFull, maxPeersGlobal) + t.Logf( + "FINAL RESULTS: %d total updates (%d patch, %d full), max peers seen: %d", + totalUpdates, + totalPatches, + totalFull, + maxPeersGlobal, + ) if len(nodeStatsReport) <= 10 { // Only log details for smaller tests for _, report := range nodeStatsReport { t.Logf(" %s", report) @@ -1733,7 +1851,11 @@ func XTestBatcherScalability(t *testing.T) { legacyTotalUpdates += stats.TotalUpdates } if legacyTotalUpdates != int(totalUpdates) { - t.Logf("Note: Legacy tracker mismatch - legacy: %d, new: %d", legacyTotalUpdates, totalUpdates) + t.Logf( + "Note: Legacy tracker mismatch - legacy: %d, new: %d", + legacyTotalUpdates, + totalUpdates, + ) } finalPanicCount := atomic.LoadInt64(&panicCount) @@ -1743,12 +1865,18 @@ func XTestBatcherScalability(t *testing.T) { if tc.expectBreak { // For tests expected to break, we're mainly checking that we don't crash if finalPanicCount > 0 { - t.Errorf("System crashed with %d panics (even breaking point tests shouldn't crash)", finalPanicCount) + t.Errorf( + "System crashed with %d panics (even breaking point tests shouldn't crash)", + finalPanicCount, + ) testPassed = false } // Timeout/deadlock is acceptable for breaking point tests if deadlockDetected { - t.Logf("Expected breaking point reached: system overloaded at %d nodes", len(testNodes)) + t.Logf( + "Expected breaking point reached: system overloaded at %d nodes", + len(testNodes), + ) } } else { // For tests expected to pass, validate proper operation @@ -1856,19 +1984,35 @@ func TestBatcherFullPeerUpdates(t *testing.T) { updateType = "DERP" } - t.Logf(" Update %d: %s - Peers=%d, PeersChangedPatch=%d, DERPMap=%v", - updateNum, updateType, len(data.Peers), len(data.PeersChangedPatch), data.DERPMap != nil) + t.Logf( + " Update %d: %s - Peers=%d, PeersChangedPatch=%d, DERPMap=%v", + updateNum, + updateType, + len(data.Peers), + len(data.PeersChangedPatch), + data.DERPMap != nil, + ) if len(data.Peers) > 0 { t.Logf(" Full peer list with %d peers", len(data.Peers)) for j, peer := range data.Peers[:min(3, len(data.Peers))] { - t.Logf(" Peer %d: NodeID=%d, Online=%v", j, peer.ID, peer.Online) + t.Logf( + " Peer %d: NodeID=%d, Online=%v", + j, + peer.ID, + peer.Online, + ) } } if len(data.PeersChangedPatch) > 0 { t.Logf(" Patch update with %d changes", len(data.PeersChangedPatch)) for j, patch := range data.PeersChangedPatch[:min(3, len(data.PeersChangedPatch))] { - t.Logf(" Patch %d: NodeID=%d, Online=%v", j, patch.NodeID, patch.Online) + t.Logf( + " Patch %d: NodeID=%d, Online=%v", + j, + patch.NodeID, + patch.Online, + ) } } @@ -1882,7 +2026,9 @@ func TestBatcherFullPeerUpdates(t *testing.T) { if !foundFullUpdate { t.Errorf("CRITICAL: No FULL updates received despite sending change.FullSet!") - t.Errorf("This confirms the bug - FullSet updates are not generating full peer responses") + t.Errorf( + "This confirms the bug - FullSet updates are not generating full peer responses", + ) } }) }