chunk policy + queue item + per-SOCKS-connection queue

This commit is contained in:
Amin.MasterkinG
2026-04-20 18:25:20 +03:30
parent e4e29201c2
commit e06c766ca9
5 changed files with 212 additions and 16 deletions
+2
View File
@@ -24,6 +24,7 @@ type Client struct {
log *logger.Logger
clientSessionKey string
socksConnections *SOCKSConnectionStore
chunkPolicy ChunkPolicy
connMu sync.Mutex
conns map[net.Conn]struct{}
@@ -37,6 +38,7 @@ func New(cfg config.Config, lg *logger.Logger) *Client {
log: lg,
clientSessionKey: clientSessionKey,
socksConnections: NewSOCKSConnectionStore(),
chunkPolicy: newChunkPolicy(cfg),
conns: make(map[net.Conn]struct{}),
}
}
+7 -1
View File
@@ -16,6 +16,7 @@ import (
type SOCKSConnection struct {
ID uint64
ClientSessionKey string
ChunkPolicy ChunkPolicy
CreatedAt time.Time
LastActivityAt time.Time
ClientAddress string
@@ -32,6 +33,10 @@ type SOCKSConnection struct {
CloseReadSent bool
CloseWriteSent bool
ResetSent bool
queueMu sync.Mutex
OutboundQueue []*SOCKSOutboundQueueItem
QueuedBytes int
}
func (s *SOCKSConnection) InitialPayloadHex() string {
@@ -53,12 +58,13 @@ func NewSOCKSConnectionStore() *SOCKSConnectionStore {
}
}
func (s *SOCKSConnectionStore) New(clientSessionKey string, clientAddress string) *SOCKSConnection {
func (s *SOCKSConnectionStore) New(clientSessionKey string, clientAddress string, chunkPolicy ChunkPolicy) *SOCKSConnection {
id := s.nextID.Add(1)
now := time.Now()
socksConn := &SOCKSConnection{
ID: id,
ClientSessionKey: clientSessionKey,
ChunkPolicy: chunkPolicy,
CreatedAt: now,
LastActivityAt: now,
ClientAddress: clientAddress,
+18 -5
View File
@@ -46,7 +46,7 @@ func (c *Client) handleConn(ctx context.Context, conn net.Conn) {
defer c.unregisterConn(conn)
defer conn.Close()
socksConn := c.socksConnections.New(c.clientSessionKey, conn.RemoteAddr().String())
socksConn := c.socksConnections.New(c.clientSessionKey, conn.RemoteAddr().String(), c.chunkPolicy)
defer c.socksConnections.Delete(socksConn.ID)
c.log.Infof(
@@ -101,6 +101,10 @@ func (c *Client) handleSOCKS5(ctx context.Context, conn net.Conn, socksConn *SOC
socksConn.ID, socksConn.TargetHost, socksConn.TargetPort, socksConn.SOCKSAuthMethod, socksConn.ClientSessionKey,
)
if err := socksConn.EnqueuePacket(socksConn.BuildSOCKSConnectPacket()); err != nil {
return err
}
return c.captureInitialPayload(ctx, conn, socksConn)
}
@@ -261,10 +265,14 @@ func (c *Client) captureInitialPayload(ctx context.Context, conn net.Conn, socks
socksConn.InitialPayload = append([]byte(nil), buf[:n]...)
socksConn.BufferedBytes += n
socksConn.LastActivityAt = time.Now()
enqueued, enqueueErr := socksConn.EnqueuePayloadChunks(buf[:n], false)
c.log.Debugf(
"<green>socks_id=<cyan>%d</cyan> captured initial payload bytes=<cyan>%d</cyan> target=<cyan>%s</cyan> client_session_key=<cyan>%s</cyan></green>",
socksConn.ID, n, net.JoinHostPort(socksConn.TargetHost, strconv.Itoa(int(socksConn.TargetPort))), socksConn.ClientSessionKey,
"<green>socks_id=<cyan>%d</cyan> captured initial payload bytes=<cyan>%d</cyan> target=<cyan>%s</cyan> queued_packets=<cyan>%d</cyan> client_session_key=<cyan>%s</cyan></green>",
socksConn.ID, n, net.JoinHostPort(socksConn.TargetHost, strconv.Itoa(int(socksConn.TargetPort))), enqueued, socksConn.ClientSessionKey,
)
if enqueueErr != nil {
return enqueueErr
}
} else if ne, ok := err.(net.Error); !ok || !ne.Timeout() {
if errors.Is(err, io.EOF) {
return nil
@@ -289,10 +297,15 @@ func (c *Client) captureInitialPayload(ctx context.Context, conn net.Conn, socks
if n > 0 {
socksConn.BufferedBytes += n
socksConn.LastActivityAt = time.Now()
enqueued, enqueueErr := socksConn.EnqueuePayloadChunks(buf[:n], false)
queueItems, queueBytes := socksConn.QueueSnapshot()
c.log.Debugf(
"<green>socks_id=<cyan>%d</cyan> buffered payload chunk=<cyan>%d</cyan> total=<cyan>%d</cyan> client_session_key=<cyan>%s</cyan></green>",
socksConn.ID, n, socksConn.BufferedBytes, socksConn.ClientSessionKey,
"<green>socks_id=<cyan>%d</cyan> buffered payload chunk=<cyan>%d</cyan> total=<cyan>%d</cyan> queued_packets=<cyan>%d</cyan> queue_depth=<cyan>%d</cyan> queue_bytes=<cyan>%d</cyan> client_session_key=<cyan>%s</cyan></green>",
socksConn.ID, n, socksConn.BufferedBytes, enqueued, queueItems, queueBytes, socksConn.ClientSessionKey,
)
if enqueueErr != nil {
return enqueueErr
}
}
if err != nil {
+115
View File
@@ -0,0 +1,115 @@
// ==============================================================================
// MasterHttpRelayVPN
// Author: MasterkinG32
// Github: https://github.com/masterking32
// Year: 2026
// ==============================================================================
package client
import (
"errors"
"time"
"masterhttprelayvpn/internal/config"
"masterhttprelayvpn/internal/protocol"
)
var ErrSOCKSQueueFull = errors.New("socks outbound queue is full")
type ChunkPolicy struct {
MaxChunkSize int
MaxPacketsPerBatch int
MaxBatchBytes int
WorkerCount int
MaxQueueBytesPerSOCKS int
}
func newChunkPolicy(cfg config.Config) ChunkPolicy {
return ChunkPolicy{
MaxChunkSize: cfg.MaxChunkSize,
MaxPacketsPerBatch: cfg.MaxPacketsPerBatch,
MaxBatchBytes: cfg.MaxBatchBytes,
WorkerCount: cfg.WorkerCount,
MaxQueueBytesPerSOCKS: cfg.MaxQueueBytesPerSOCKS,
}
}
type SOCKSOutboundQueueItem struct {
IdentityKey string
Packet protocol.Packet
QueuedAt time.Time
PayloadSize int
}
func (s *SOCKSConnection) EnqueuePacket(packet protocol.Packet) error {
if err := packet.Validate(); err != nil {
return err
}
item := &SOCKSOutboundQueueItem{
IdentityKey: protocol.PacketIdentityKey(
packet.ClientSessionKey,
packet.SOCKSID,
packet.Type,
packet.Sequence,
packet.FragmentID,
),
Packet: packet,
QueuedAt: time.Now(),
PayloadSize: len(packet.Payload),
}
s.queueMu.Lock()
defer s.queueMu.Unlock()
nextBytes := s.QueuedBytes + item.PayloadSize
if nextBytes > s.ChunkPolicy.MaxQueueBytesPerSOCKS {
return ErrSOCKSQueueFull
}
s.OutboundQueue = append(s.OutboundQueue, item)
s.QueuedBytes = nextBytes
return nil
}
func (s *SOCKSConnection) EnqueuePayloadChunks(payload []byte, final bool) (int, error) {
chunks := splitPayloadChunks(payload, s.ChunkPolicy.MaxChunkSize)
if len(chunks) == 0 && !final {
return 0, nil
}
enqueued := 0
for i, chunk := range chunks {
packetFinal := final && i == len(chunks)-1
packet := s.BuildSOCKSDataPacket(chunk, packetFinal)
if err := s.EnqueuePacket(packet); err != nil {
return enqueued, err
}
enqueued++
}
return enqueued, nil
}
func (s *SOCKSConnection) QueueSnapshot() (items int, bytes int) {
s.queueMu.Lock()
defer s.queueMu.Unlock()
return len(s.OutboundQueue), s.QueuedBytes
}
func splitPayloadChunks(payload []byte, maxChunkSize int) [][]byte {
if len(payload) == 0 || maxChunkSize <= 0 {
return nil
}
chunks := make([][]byte, 0, (len(payload)+maxChunkSize-1)/maxChunkSize)
for start := 0; start < len(payload); start += maxChunkSize {
end := start + maxChunkSize
if end > len(payload) {
end = len(payload)
}
chunk := append([]byte(nil), payload[start:end]...)
chunks = append(chunks, chunk)
}
return chunks
}
+70 -10
View File
@@ -16,20 +16,30 @@ import (
)
type Config struct {
AESEncryptionKey string
SOCKSHost string
SOCKSPort int
SOCKSAuth bool
SOCKSUsername string
SOCKSPassword string
LogLevel string
AESEncryptionKey string
SOCKSHost string
SOCKSPort int
SOCKSAuth bool
SOCKSUsername string
SOCKSPassword string
LogLevel string
MaxChunkSize int
MaxPacketsPerBatch int
MaxBatchBytes int
WorkerCount int
MaxQueueBytesPerSOCKS int
}
func Load(path string) (Config, error) {
cfg := Config{
SOCKSHost: "127.0.0.1",
SOCKSPort: 1080,
LogLevel: "INFO",
SOCKSHost: "127.0.0.1",
SOCKSPort: 1080,
LogLevel: "INFO",
MaxChunkSize: 16 * 1024,
MaxPacketsPerBatch: 32,
MaxBatchBytes: 256 * 1024,
WorkerCount: 4,
MaxQueueBytesPerSOCKS: 1024 * 1024,
}
file, err := os.Open(path)
@@ -76,6 +86,36 @@ func Load(path string) (Config, error) {
cfg.SOCKSPassword = trimString(value)
case "LOG_LEVEL":
cfg.LogLevel = trimString(value)
case "MAX_CHUNK_SIZE":
size, err := strconv.Atoi(value)
if err != nil {
return Config{}, fmt.Errorf("parse MAX_CHUNK_SIZE: %w", err)
}
cfg.MaxChunkSize = size
case "MAX_PACKETS_PER_BATCH":
count, err := strconv.Atoi(value)
if err != nil {
return Config{}, fmt.Errorf("parse MAX_PACKETS_PER_BATCH: %w", err)
}
cfg.MaxPacketsPerBatch = count
case "MAX_BATCH_BYTES":
size, err := strconv.Atoi(value)
if err != nil {
return Config{}, fmt.Errorf("parse MAX_BATCH_BYTES: %w", err)
}
cfg.MaxBatchBytes = size
case "WORKER_COUNT":
count, err := strconv.Atoi(value)
if err != nil {
return Config{}, fmt.Errorf("parse WORKER_COUNT: %w", err)
}
cfg.WorkerCount = count
case "MAX_QUEUE_BYTES_PER_SOCKS":
size, err := strconv.Atoi(value)
if err != nil {
return Config{}, fmt.Errorf("parse MAX_QUEUE_BYTES_PER_SOCKS: %w", err)
}
cfg.MaxQueueBytesPerSOCKS = size
}
}
@@ -91,6 +131,26 @@ func Load(path string) (Config, error) {
return Config{}, fmt.Errorf("invalid SOCKS_PORT: %d", cfg.SOCKSPort)
}
if cfg.MaxChunkSize < 1 {
return Config{}, fmt.Errorf("invalid MAX_CHUNK_SIZE: %d", cfg.MaxChunkSize)
}
if cfg.MaxPacketsPerBatch < 1 {
return Config{}, fmt.Errorf("invalid MAX_PACKETS_PER_BATCH: %d", cfg.MaxPacketsPerBatch)
}
if cfg.MaxBatchBytes < cfg.MaxChunkSize {
return Config{}, fmt.Errorf("MAX_BATCH_BYTES must be >= MAX_CHUNK_SIZE")
}
if cfg.WorkerCount < 1 {
return Config{}, fmt.Errorf("invalid WORKER_COUNT: %d", cfg.WorkerCount)
}
if cfg.MaxQueueBytesPerSOCKS < cfg.MaxChunkSize {
return Config{}, fmt.Errorf("MAX_QUEUE_BYTES_PER_SOCKS must be >= MAX_CHUNK_SIZE")
}
return cfg, nil
}