From 9260230a354809475968033c6da9cdab6023777a Mon Sep 17 00:00:00 2001 From: "Amin.MasterkinG" Date: Tue, 21 Apr 2026 09:19:20 +0330 Subject: [PATCH] Add stabilization tests for queue cleanup and server batch draining --- .gitignore | 3 +- internal/client/sender_workers_test.go | 101 +++++++++++++++++++++++++ internal/server/server_test.go | 93 +++++++++++++++++++++++ 3 files changed, 196 insertions(+), 1 deletion(-) create mode 100644 internal/client/sender_workers_test.go create mode 100644 internal/server/server_test.go diff --git a/.gitignore b/.gitignore index 4f8ddef..f948847 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ Python Edition -config.toml \ No newline at end of file +config.toml +.gocache \ No newline at end of file diff --git a/internal/client/sender_workers_test.go b/internal/client/sender_workers_test.go new file mode 100644 index 0000000..b994d6a --- /dev/null +++ b/internal/client/sender_workers_test.go @@ -0,0 +1,101 @@ +package client + +import ( + "net" + "testing" + + "masterhttprelayvpn/internal/config" +) + +func TestSOCKSConnectionStoreDeleteClearsTransportState(t *testing.T) { + store := NewSOCKSConnectionStore() + chunkPolicy := ChunkPolicy{ + MaxChunkSize: 1024, + MaxPacketsPerBatch: 4, + MaxBatchBytes: 4096, + WorkerCount: 1, + MaxQueueBytesPerSOCKS: 4096, + } + + localConn, peerConn := net.Pipe() + defer peerConn.Close() + + socksConn := store.New("client-session", "127.0.0.1:1000", chunkPolicy) + socksConn.LocalConn = localConn + socksConn.InitialPayload = []byte("initial-payload") + socksConn.BufferedBytes = len(socksConn.InitialPayload) + + if err := socksConn.EnqueuePacket(socksConn.BuildSOCKSDataPacket([]byte("hello"), false)); err != nil { + t.Fatalf("enqueue first packet: %v", err) + } + if err := socksConn.EnqueuePacket(socksConn.BuildSOCKSDataPacket([]byte("world"), false)); err != nil { + t.Fatalf("enqueue second packet: %v", err) + } + + item := socksConn.DequeuePacket() + if item == nil { + t.Fatal("expected dequeued item") + } + socksConn.MarkInFlight([]*SOCKSOutboundQueueItem{item}) + + store.Delete(socksConn.ID) + + if got := store.Get(socksConn.ID); got != nil { + t.Fatal("expected connection to be removed from store") + } + if len(socksConn.OutboundQueue) != 0 { + t.Fatalf("expected empty outbound queue, got %d items", len(socksConn.OutboundQueue)) + } + if socksConn.QueuedBytes != 0 { + t.Fatalf("expected zero queued bytes, got %d", socksConn.QueuedBytes) + } + if len(socksConn.InFlight) != 0 { + t.Fatalf("expected empty inflight map, got %d items", len(socksConn.InFlight)) + } + if socksConn.InitialPayload != nil { + t.Fatal("expected initial payload to be cleared") + } + if socksConn.BufferedBytes != 0 { + t.Fatalf("expected buffered bytes to be reset, got %d", socksConn.BufferedBytes) + } + + select { + case <-socksConn.closedC: + default: + t.Fatal("expected local connection close signal") + } +} + +func TestBuildNextBatchRotatesAcrossConnections(t *testing.T) { + cfg := config.Config{ + MaxChunkSize: 1024, + MaxPacketsPerBatch: 1, + MaxBatchBytes: 4096, + WorkerCount: 1, + MaxQueueBytesPerSOCKS: 4096, + } + + client := New(cfg, nil) + client.chunkPolicy = newChunkPolicy(cfg) + + conn1 := client.socksConnections.New(client.clientSessionKey, "127.0.0.1:1001", client.chunkPolicy) + conn2 := client.socksConnections.New(client.clientSessionKey, "127.0.0.1:1002", client.chunkPolicy) + conn3 := client.socksConnections.New(client.clientSessionKey, "127.0.0.1:1003", client.chunkPolicy) + + for _, socksConn := range []*SOCKSConnection{conn1, conn2, conn3} { + if err := socksConn.EnqueuePacket(socksConn.BuildSOCKSDataPacket([]byte("x"), false)); err != nil { + t.Fatalf("enqueue packet for socks_id=%d: %v", socksConn.ID, err) + } + } + + expected := []uint64{conn1.ID, conn2.ID, conn3.ID} + for i, want := range expected { + batch, selected := client.buildNextBatch() + if len(batch.Packets) != 1 || len(selected) != 1 { + t.Fatalf("iteration %d: expected one selected packet, got packets=%d selected=%d", i, len(batch.Packets), len(selected)) + } + if got := batch.Packets[0].SOCKSID; got != want { + t.Fatalf("iteration %d: expected socks_id=%d, got %d", i, want, got) + } + } +} diff --git a/internal/server/server_test.go b/internal/server/server_test.go new file mode 100644 index 0000000..3ee2e1e --- /dev/null +++ b/internal/server/server_test.go @@ -0,0 +1,93 @@ +package server + +import ( + "testing" + + "masterhttprelayvpn/internal/config" + "masterhttprelayvpn/internal/protocol" +) + +func TestDrainSessionOutboundLockedRespectsGlobalLimits(t *testing.T) { + srv := &Server{ + cfg: config.Config{ + MaxPacketsPerBatch: 2, + MaxBatchBytes: 10, + }, + } + + session := &ClientSession{ + ClientSessionKey: "client-session", + SOCKSConnections: map[uint64]*SOCKSState{ + 1: {ID: 1}, + 2: {ID: 2}, + 3: {ID: 3}, + }, + } + + session.SOCKSConnections[1].OutboundQueue = []protocol.Packet{ + testDataPacket("client-session", 1, 1, "abcd"), + } + session.SOCKSConnections[1].QueuedBytes = 4 + + session.SOCKSConnections[2].OutboundQueue = []protocol.Packet{ + testDataPacket("client-session", 2, 1, "efgh"), + } + session.SOCKSConnections[2].QueuedBytes = 4 + + session.SOCKSConnections[3].OutboundQueue = []protocol.Packet{ + testDataPacket("client-session", 3, 1, "ijkl"), + } + session.SOCKSConnections[3].QueuedBytes = 4 + + drained := srv.drainSessionOutboundLocked(session) + if len(drained) != 2 { + t.Fatalf("expected 2 drained packets, got %d", len(drained)) + } + + totalBytes := 0 + for _, packet := range drained { + totalBytes += len(packet.Payload) + } + if totalBytes > srv.cfg.MaxBatchBytes { + t.Fatalf("expected drained bytes <= %d, got %d", srv.cfg.MaxBatchBytes, totalBytes) + } + + remainingPackets := 0 + for _, socksState := range session.SOCKSConnections { + remainingPackets += len(socksState.OutboundQueue) + } + if remainingPackets != 1 { + t.Fatalf("expected one packet to remain queued, got %d", remainingPackets) + } +} + +func TestSOCKSStateReleaseClearsQueueState(t *testing.T) { + socksState := &SOCKSState{ + Target: &protocol.Target{Host: "example.com", Port: 443}, + OutboundQueue: []protocol.Packet{ + testDataPacket("client-session", 1, 1, "hello"), + testDataPacket("client-session", 1, 2, "world"), + }, + QueuedBytes: 10, + } + + socksState.release() + + if socksState.Target != nil { + t.Fatal("expected target to be cleared") + } + if len(socksState.OutboundQueue) != 0 { + t.Fatalf("expected empty outbound queue, got %d items", len(socksState.OutboundQueue)) + } + if socksState.QueuedBytes != 0 { + t.Fatalf("expected queued bytes to be reset, got %d", socksState.QueuedBytes) + } +} + +func testDataPacket(clientSessionKey string, socksID uint64, sequence uint64, payload string) protocol.Packet { + packet := protocol.NewPacket(clientSessionKey, protocol.PacketTypeSOCKSData) + packet.SOCKSID = socksID + packet.Sequence = sequence + packet.Payload = []byte(payload) + return packet +}