mirror of
https://github.com/masterking32/MasterHttpRelayVPN.git
synced 2026-05-17 21:24:37 +03:00
Add stabilization tests for queue cleanup and server batch draining
This commit is contained in:
+2
-1
@@ -1,2 +1,3 @@
|
|||||||
Python Edition
|
Python Edition
|
||||||
config.toml
|
config.toml
|
||||||
|
.gocache
|
||||||
@@ -0,0 +1,101 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"masterhttprelayvpn/internal/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSOCKSConnectionStoreDeleteClearsTransportState(t *testing.T) {
|
||||||
|
store := NewSOCKSConnectionStore()
|
||||||
|
chunkPolicy := ChunkPolicy{
|
||||||
|
MaxChunkSize: 1024,
|
||||||
|
MaxPacketsPerBatch: 4,
|
||||||
|
MaxBatchBytes: 4096,
|
||||||
|
WorkerCount: 1,
|
||||||
|
MaxQueueBytesPerSOCKS: 4096,
|
||||||
|
}
|
||||||
|
|
||||||
|
localConn, peerConn := net.Pipe()
|
||||||
|
defer peerConn.Close()
|
||||||
|
|
||||||
|
socksConn := store.New("client-session", "127.0.0.1:1000", chunkPolicy)
|
||||||
|
socksConn.LocalConn = localConn
|
||||||
|
socksConn.InitialPayload = []byte("initial-payload")
|
||||||
|
socksConn.BufferedBytes = len(socksConn.InitialPayload)
|
||||||
|
|
||||||
|
if err := socksConn.EnqueuePacket(socksConn.BuildSOCKSDataPacket([]byte("hello"), false)); err != nil {
|
||||||
|
t.Fatalf("enqueue first packet: %v", err)
|
||||||
|
}
|
||||||
|
if err := socksConn.EnqueuePacket(socksConn.BuildSOCKSDataPacket([]byte("world"), false)); err != nil {
|
||||||
|
t.Fatalf("enqueue second packet: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
item := socksConn.DequeuePacket()
|
||||||
|
if item == nil {
|
||||||
|
t.Fatal("expected dequeued item")
|
||||||
|
}
|
||||||
|
socksConn.MarkInFlight([]*SOCKSOutboundQueueItem{item})
|
||||||
|
|
||||||
|
store.Delete(socksConn.ID)
|
||||||
|
|
||||||
|
if got := store.Get(socksConn.ID); got != nil {
|
||||||
|
t.Fatal("expected connection to be removed from store")
|
||||||
|
}
|
||||||
|
if len(socksConn.OutboundQueue) != 0 {
|
||||||
|
t.Fatalf("expected empty outbound queue, got %d items", len(socksConn.OutboundQueue))
|
||||||
|
}
|
||||||
|
if socksConn.QueuedBytes != 0 {
|
||||||
|
t.Fatalf("expected zero queued bytes, got %d", socksConn.QueuedBytes)
|
||||||
|
}
|
||||||
|
if len(socksConn.InFlight) != 0 {
|
||||||
|
t.Fatalf("expected empty inflight map, got %d items", len(socksConn.InFlight))
|
||||||
|
}
|
||||||
|
if socksConn.InitialPayload != nil {
|
||||||
|
t.Fatal("expected initial payload to be cleared")
|
||||||
|
}
|
||||||
|
if socksConn.BufferedBytes != 0 {
|
||||||
|
t.Fatalf("expected buffered bytes to be reset, got %d", socksConn.BufferedBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-socksConn.closedC:
|
||||||
|
default:
|
||||||
|
t.Fatal("expected local connection close signal")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildNextBatchRotatesAcrossConnections(t *testing.T) {
|
||||||
|
cfg := config.Config{
|
||||||
|
MaxChunkSize: 1024,
|
||||||
|
MaxPacketsPerBatch: 1,
|
||||||
|
MaxBatchBytes: 4096,
|
||||||
|
WorkerCount: 1,
|
||||||
|
MaxQueueBytesPerSOCKS: 4096,
|
||||||
|
}
|
||||||
|
|
||||||
|
client := New(cfg, nil)
|
||||||
|
client.chunkPolicy = newChunkPolicy(cfg)
|
||||||
|
|
||||||
|
conn1 := client.socksConnections.New(client.clientSessionKey, "127.0.0.1:1001", client.chunkPolicy)
|
||||||
|
conn2 := client.socksConnections.New(client.clientSessionKey, "127.0.0.1:1002", client.chunkPolicy)
|
||||||
|
conn3 := client.socksConnections.New(client.clientSessionKey, "127.0.0.1:1003", client.chunkPolicy)
|
||||||
|
|
||||||
|
for _, socksConn := range []*SOCKSConnection{conn1, conn2, conn3} {
|
||||||
|
if err := socksConn.EnqueuePacket(socksConn.BuildSOCKSDataPacket([]byte("x"), false)); err != nil {
|
||||||
|
t.Fatalf("enqueue packet for socks_id=%d: %v", socksConn.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := []uint64{conn1.ID, conn2.ID, conn3.ID}
|
||||||
|
for i, want := range expected {
|
||||||
|
batch, selected := client.buildNextBatch()
|
||||||
|
if len(batch.Packets) != 1 || len(selected) != 1 {
|
||||||
|
t.Fatalf("iteration %d: expected one selected packet, got packets=%d selected=%d", i, len(batch.Packets), len(selected))
|
||||||
|
}
|
||||||
|
if got := batch.Packets[0].SOCKSID; got != want {
|
||||||
|
t.Fatalf("iteration %d: expected socks_id=%d, got %d", i, want, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,93 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"masterhttprelayvpn/internal/config"
|
||||||
|
"masterhttprelayvpn/internal/protocol"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDrainSessionOutboundLockedRespectsGlobalLimits(t *testing.T) {
|
||||||
|
srv := &Server{
|
||||||
|
cfg: config.Config{
|
||||||
|
MaxPacketsPerBatch: 2,
|
||||||
|
MaxBatchBytes: 10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
session := &ClientSession{
|
||||||
|
ClientSessionKey: "client-session",
|
||||||
|
SOCKSConnections: map[uint64]*SOCKSState{
|
||||||
|
1: {ID: 1},
|
||||||
|
2: {ID: 2},
|
||||||
|
3: {ID: 3},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
session.SOCKSConnections[1].OutboundQueue = []protocol.Packet{
|
||||||
|
testDataPacket("client-session", 1, 1, "abcd"),
|
||||||
|
}
|
||||||
|
session.SOCKSConnections[1].QueuedBytes = 4
|
||||||
|
|
||||||
|
session.SOCKSConnections[2].OutboundQueue = []protocol.Packet{
|
||||||
|
testDataPacket("client-session", 2, 1, "efgh"),
|
||||||
|
}
|
||||||
|
session.SOCKSConnections[2].QueuedBytes = 4
|
||||||
|
|
||||||
|
session.SOCKSConnections[3].OutboundQueue = []protocol.Packet{
|
||||||
|
testDataPacket("client-session", 3, 1, "ijkl"),
|
||||||
|
}
|
||||||
|
session.SOCKSConnections[3].QueuedBytes = 4
|
||||||
|
|
||||||
|
drained := srv.drainSessionOutboundLocked(session)
|
||||||
|
if len(drained) != 2 {
|
||||||
|
t.Fatalf("expected 2 drained packets, got %d", len(drained))
|
||||||
|
}
|
||||||
|
|
||||||
|
totalBytes := 0
|
||||||
|
for _, packet := range drained {
|
||||||
|
totalBytes += len(packet.Payload)
|
||||||
|
}
|
||||||
|
if totalBytes > srv.cfg.MaxBatchBytes {
|
||||||
|
t.Fatalf("expected drained bytes <= %d, got %d", srv.cfg.MaxBatchBytes, totalBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
remainingPackets := 0
|
||||||
|
for _, socksState := range session.SOCKSConnections {
|
||||||
|
remainingPackets += len(socksState.OutboundQueue)
|
||||||
|
}
|
||||||
|
if remainingPackets != 1 {
|
||||||
|
t.Fatalf("expected one packet to remain queued, got %d", remainingPackets)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSOCKSStateReleaseClearsQueueState(t *testing.T) {
|
||||||
|
socksState := &SOCKSState{
|
||||||
|
Target: &protocol.Target{Host: "example.com", Port: 443},
|
||||||
|
OutboundQueue: []protocol.Packet{
|
||||||
|
testDataPacket("client-session", 1, 1, "hello"),
|
||||||
|
testDataPacket("client-session", 1, 2, "world"),
|
||||||
|
},
|
||||||
|
QueuedBytes: 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
socksState.release()
|
||||||
|
|
||||||
|
if socksState.Target != nil {
|
||||||
|
t.Fatal("expected target to be cleared")
|
||||||
|
}
|
||||||
|
if len(socksState.OutboundQueue) != 0 {
|
||||||
|
t.Fatalf("expected empty outbound queue, got %d items", len(socksState.OutboundQueue))
|
||||||
|
}
|
||||||
|
if socksState.QueuedBytes != 0 {
|
||||||
|
t.Fatalf("expected queued bytes to be reset, got %d", socksState.QueuedBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testDataPacket(clientSessionKey string, socksID uint64, sequence uint64, payload string) protocol.Packet {
|
||||||
|
packet := protocol.NewPacket(clientSessionKey, protocol.PacketTypeSOCKSData)
|
||||||
|
packet.SOCKSID = socksID
|
||||||
|
packet.Sequence = sequence
|
||||||
|
packet.Payload = []byte(payload)
|
||||||
|
return packet
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user