Deduplicate packet reorder logic and consolidate client SOCKS session handling

This commit is contained in:
Amin.MasterkinG
2026-04-21 15:08:10 +03:30
parent 8a50614510
commit bf5c0ef06e
9 changed files with 497 additions and 573 deletions
-148
View File
@@ -1,148 +0,0 @@
// ==============================================================================
// MasterHttpRelayVPN
// Author: MasterkinG32
// Github: https://github.com/masterking32
// Year: 2026
// ==============================================================================
package client
import (
"time"
"masterhttprelayvpn/internal/protocol"
)
func (s *SOCKSConnection) queueInboundPacket(packet protocol.Packet, maxBuffered int) ([]protocol.Packet, bool, bool) {
s.reorderMu.Lock()
defer s.reorderMu.Unlock()
expected := s.expectedInboundSequenceLocked()
if packet.Sequence < expected {
return nil, true, false
}
pendingForSequence := s.PendingInbound[packet.Sequence]
if containsPendingInboundPacket(pendingForSequence, packet) {
return nil, true, false
}
if bufferedInboundPacketCount(s.PendingInbound) >= maxBuffered {
return nil, false, true
}
s.PendingInbound[packet.Sequence] = append(s.PendingInbound[packet.Sequence], PendingInboundPacket{
Packet: packet,
QueuedAt: time.Now(),
})
if !s.ConnectAccepted {
return nil, false, false
}
return s.drainReadyInboundLocked(), false, false
}
func (s *SOCKSConnection) activateInboundDrain() []protocol.Packet {
s.reorderMu.Lock()
defer s.reorderMu.Unlock()
return s.drainReadyInboundLocked()
}
func (s *SOCKSConnection) expectedInboundSequenceLocked() uint64 {
if s.NextInboundSequence == 0 {
return 1
}
return s.NextInboundSequence
}
func (s *SOCKSConnection) drainReadyInboundLocked() []protocol.Packet {
expected := s.expectedInboundSequenceLocked()
ready := make([]protocol.Packet, 0)
for {
pendingPackets, ok := s.PendingInbound[expected]
if !ok || len(pendingPackets) == 0 {
break
}
sortPendingInboundPackets(pendingPackets)
for _, pending := range pendingPackets {
ready = append(ready, pending.Packet)
}
delete(s.PendingInbound, expected)
expected++
}
s.NextInboundSequence = expected
return ready
}
func (s *SOCKSConnection) hasExpiredInboundGap(timeout time.Duration) bool {
if timeout <= 0 {
return false
}
s.reorderMu.Lock()
defer s.reorderMu.Unlock()
now := time.Now()
for _, pendingPackets := range s.PendingInbound {
for _, pending := range pendingPackets {
if now.Sub(pending.QueuedAt) >= timeout {
clear(s.PendingInbound)
return true
}
}
}
return false
}
func containsPendingInboundPacket(pendingPackets []PendingInboundPacket, packet protocol.Packet) bool {
for _, pending := range pendingPackets {
if pending.Packet.Type == packet.Type &&
pending.Packet.FragmentID == packet.FragmentID &&
pending.Packet.TotalFragments == packet.TotalFragments {
return true
}
}
return false
}
func bufferedInboundPacketCount(pending map[uint64][]PendingInboundPacket) int {
total := 0
for _, pendingPackets := range pending {
total += len(pendingPackets)
}
return total
}
func sortPendingInboundPackets(pendingPackets []PendingInboundPacket) {
for i := 1; i < len(pendingPackets); i++ {
current := pendingPackets[i]
j := i - 1
for ; j >= 0 && inboundPacketSortOrder(current.Packet.Type) < inboundPacketSortOrder(pendingPackets[j].Packet.Type); j-- {
pendingPackets[j+1] = pendingPackets[j]
}
pendingPackets[j+1] = current
}
}
func inboundPacketSortOrder(packetType protocol.PacketType) int {
switch packetType {
case protocol.PacketTypeSOCKSData:
return 0
case protocol.PacketTypeSOCKSCloseRead:
return 1
case protocol.PacketTypeSOCKSCloseWrite:
return 2
case protocol.PacketTypeSOCKSRST:
return 3
default:
return 4
}
}
func isReorderSequencedPacket(packetType protocol.PacketType) bool {
switch packetType {
case protocol.PacketTypeSOCKSData,
protocol.PacketTypeSOCKSCloseRead,
protocol.PacketTypeSOCKSCloseWrite,
protocol.PacketTypeSOCKSRST:
return true
default:
return false
}
}
+1 -1
View File
@@ -539,7 +539,7 @@ func (c *Client) applyResponsePacket(packet protocol.Packet) error {
return nil return nil
} }
if isReorderSequencedPacket(packet.Type) { if protocol.IsReorderSequencedPacket(packet.Type) {
readyPackets, duplicate, overflow := socksConn.queueInboundPacket(packet, c.cfg.MaxReorderBufferPackets) readyPackets, duplicate, overflow := socksConn.queueInboundPacket(packet, c.cfg.MaxReorderBufferPackets)
if duplicate { if duplicate {
c.log.Debugf( c.log.Debugf(
+4 -4
View File
@@ -88,7 +88,7 @@ func TestSOCKSConnectionStoreDeleteClearsTransportState(t *testing.T) {
func TestSOCKSConnectionInboundReorderQueuesAndDrainsInOrder(t *testing.T) { func TestSOCKSConnectionInboundReorderQueuesAndDrainsInOrder(t *testing.T) {
socksConn := &SOCKSConnection{ socksConn := &SOCKSConnection{
ConnectAccepted: true, ConnectAccepted: true,
PendingInbound: make(map[uint64][]PendingInboundPacket), PendingInbound: make(map[uint64][]protocol.PendingPacket),
} }
packet2 := protocol.NewPacket("client-session", protocol.PacketTypeSOCKSData) packet2 := protocol.NewPacket("client-session", protocol.PacketTypeSOCKSData)
@@ -123,9 +123,9 @@ func TestSOCKSConnectionInboundReorderQueuesAndDrainsInOrder(t *testing.T) {
func TestSOCKSConnectionInboundGapTimeout(t *testing.T) { func TestSOCKSConnectionInboundGapTimeout(t *testing.T) {
socksConn := &SOCKSConnection{ socksConn := &SOCKSConnection{
PendingInbound: make(map[uint64][]PendingInboundPacket), PendingInbound: make(map[uint64][]protocol.PendingPacket),
} }
socksConn.PendingInbound[5] = []PendingInboundPacket{{ socksConn.PendingInbound[5] = []protocol.PendingPacket{{
Packet: protocol.Packet{Sequence: 5}, Packet: protocol.Packet{Sequence: 5},
QueuedAt: time.Now().Add(-2 * time.Second), QueuedAt: time.Now().Add(-2 * time.Second),
}} }}
@@ -140,7 +140,7 @@ func TestSOCKSConnectionInboundGapTimeout(t *testing.T) {
func TestSOCKSConnectionInboundDataWaitsForConnectAck(t *testing.T) { func TestSOCKSConnectionInboundDataWaitsForConnectAck(t *testing.T) {
socksConn := &SOCKSConnection{ socksConn := &SOCKSConnection{
PendingInbound: make(map[uint64][]PendingInboundPacket), PendingInbound: make(map[uint64][]protocol.PendingPacket),
} }
packet1 := protocol.NewPacket("client-session", protocol.PacketTypeSOCKSData) packet1 := protocol.NewPacket("client-session", protocol.PacketTypeSOCKSData)
+407 -7
View File
@@ -8,14 +8,46 @@ package client
import ( import (
"context" "context"
"errors"
"net" "net"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"masterhttprelayvpn/internal/config"
"masterhttprelayvpn/internal/protocol" "masterhttprelayvpn/internal/protocol"
) )
var ErrSOCKSQueueFull = errors.New("socks outbound queue is full")
var ErrSOCKSConnectTimeout = errors.New("socks connect timeout")
type ChunkPolicy struct {
MaxChunkSize int
MaxPacketsPerBatch int
MaxBatchBytes int
WorkerCount int
MaxQueueBytesPerSOCKS int
}
func newChunkPolicy(cfg config.Config) ChunkPolicy {
return ChunkPolicy{
MaxChunkSize: cfg.MaxChunkSize,
MaxPacketsPerBatch: cfg.MaxPacketsPerBatch,
MaxBatchBytes: cfg.MaxBatchBytes,
WorkerCount: cfg.WorkerCount,
MaxQueueBytesPerSOCKS: cfg.MaxQueueBytesPerSOCKS,
}
}
type SOCKSOutboundQueueItem struct {
IdentityKey string
Packet protocol.Packet
QueuedAt time.Time
SentAt time.Time
PayloadSize int
RetryCount int
}
type SOCKSConnection struct { type SOCKSConnection struct {
ID uint64 ID uint64
ClientSessionKey string ClientSessionKey string
@@ -51,12 +83,7 @@ type SOCKSConnection struct {
QueuedBytes int QueuedBytes int
InFlight map[string]*SOCKSOutboundQueueItem InFlight map[string]*SOCKSOutboundQueueItem
NextInboundSequence uint64 NextInboundSequence uint64
PendingInbound map[uint64][]PendingInboundPacket PendingInbound map[uint64][]protocol.PendingPacket
}
type PendingInboundPacket struct {
Packet protocol.Packet
QueuedAt time.Time
} }
type SOCKSConnectionStore struct { type SOCKSConnectionStore struct {
@@ -84,7 +111,7 @@ func (s *SOCKSConnectionStore) New(clientSessionKey string, clientAddress string
closedC: make(chan struct{}), closedC: make(chan struct{}),
connectResultC: make(chan error, 1), connectResultC: make(chan error, 1),
InFlight: make(map[string]*SOCKSOutboundQueueItem), InFlight: make(map[string]*SOCKSOutboundQueueItem),
PendingInbound: make(map[uint64][]PendingInboundPacket), PendingInbound: make(map[uint64][]protocol.PendingPacket),
} }
s.mu.Lock() s.mu.Lock()
@@ -114,6 +141,63 @@ func (s *SOCKSConnection) CompleteConnect(err error) {
} }
} }
func (s *SOCKSConnection) nextSequence() uint64 {
s.NextSequence++
return s.NextSequence
}
func (s *SOCKSConnection) BuildSOCKSConnectPacket() protocol.Packet {
packet := protocol.NewPacket(s.ClientSessionKey, protocol.PacketTypeSOCKSConnect)
packet.SOCKSID = s.ID
packet.Target = &protocol.Target{
Host: s.TargetHost,
Port: s.TargetPort,
AddressType: s.TargetAddressType,
}
return packet
}
func (s *SOCKSConnection) BuildSOCKSDataPacket(payload []byte, final bool) protocol.Packet {
packet := protocol.NewPacket(s.ClientSessionKey, protocol.PacketTypeSOCKSData)
packet.SOCKSID = s.ID
packet.Sequence = s.nextSequence()
packet.Final = final
if len(payload) > 0 {
packet.Payload = append([]byte(nil), payload...)
}
return packet
}
func (s *SOCKSConnection) BuildSOCKSCloseReadPacket() protocol.Packet {
s.CloseReadSent = true
packet := protocol.NewPacket(s.ClientSessionKey, protocol.PacketTypeSOCKSCloseRead)
packet.SOCKSID = s.ID
packet.Sequence = s.nextSequence()
packet.Final = true
return packet
}
func (s *SOCKSConnection) BuildSOCKSCloseWritePacket() protocol.Packet {
s.CloseWriteSent = true
packet := protocol.NewPacket(s.ClientSessionKey, protocol.PacketTypeSOCKSCloseWrite)
packet.SOCKSID = s.ID
packet.Sequence = s.nextSequence()
packet.Final = true
return packet
}
func (s *SOCKSConnection) BuildSOCKSRSTPacket() protocol.Packet {
s.ResetSent = true
packet := protocol.NewPacket(s.ClientSessionKey, protocol.PacketTypeSOCKSRST)
packet.SOCKSID = s.ID
packet.Sequence = s.nextSequence()
packet.Final = true
return packet
}
func (s *SOCKSConnection) WriteToLocal(payload []byte) error { func (s *SOCKSConnection) WriteToLocal(payload []byte) error {
s.localWriteMu.Lock() s.localWriteMu.Lock()
defer s.localWriteMu.Unlock() defer s.localWriteMu.Unlock()
@@ -204,6 +288,322 @@ func (s *SOCKSConnection) ResetTransportState() {
s.reorderMu.Unlock() s.reorderMu.Unlock()
} }
func (s *SOCKSConnection) queueInboundPacket(packet protocol.Packet, maxBuffered int) ([]protocol.Packet, bool, bool) {
s.reorderMu.Lock()
defer s.reorderMu.Unlock()
expected := s.expectedInboundSequenceLocked()
if packet.Sequence < expected {
return nil, true, false
}
pendingForSequence := s.PendingInbound[packet.Sequence]
if protocol.ContainsPendingPacket(pendingForSequence, packet) {
return nil, true, false
}
if protocol.BufferedPendingPacketCount(s.PendingInbound) >= maxBuffered {
return nil, false, true
}
s.PendingInbound[packet.Sequence] = append(s.PendingInbound[packet.Sequence], protocol.PendingPacket{
Packet: packet,
QueuedAt: time.Now(),
})
if !s.ConnectAccepted {
return nil, false, false
}
return s.drainReadyInboundLocked(), false, false
}
func (s *SOCKSConnection) activateInboundDrain() []protocol.Packet {
s.reorderMu.Lock()
defer s.reorderMu.Unlock()
return s.drainReadyInboundLocked()
}
func (s *SOCKSConnection) expectedInboundSequenceLocked() uint64 {
if s.NextInboundSequence == 0 {
return 1
}
return s.NextInboundSequence
}
func (s *SOCKSConnection) drainReadyInboundLocked() []protocol.Packet {
expected := s.expectedInboundSequenceLocked()
ready := make([]protocol.Packet, 0)
for {
pendingPackets, ok := s.PendingInbound[expected]
if !ok || len(pendingPackets) == 0 {
break
}
protocol.SortPendingPackets(pendingPackets)
for _, pending := range pendingPackets {
ready = append(ready, pending.Packet)
}
delete(s.PendingInbound, expected)
expected++
}
s.NextInboundSequence = expected
return ready
}
func (s *SOCKSConnection) hasExpiredInboundGap(timeout time.Duration) bool {
if timeout <= 0 {
return false
}
s.reorderMu.Lock()
defer s.reorderMu.Unlock()
now := time.Now()
for _, pendingPackets := range s.PendingInbound {
for _, pending := range pendingPackets {
if now.Sub(pending.QueuedAt) >= timeout {
clear(s.PendingInbound)
return true
}
}
}
return false
}
func (s *SOCKSConnection) EnqueuePacket(packet protocol.Packet) error {
if err := packet.Validate(); err != nil {
return err
}
item := &SOCKSOutboundQueueItem{
IdentityKey: protocol.PacketIdentityKey(
packet.ClientSessionKey,
packet.SOCKSID,
packet.Type,
packet.Sequence,
packet.FragmentID,
),
Packet: packet,
QueuedAt: time.Now(),
PayloadSize: len(packet.Payload),
}
s.queueMu.Lock()
defer s.queueMu.Unlock()
nextBytes := s.QueuedBytes + item.PayloadSize
if nextBytes > s.ChunkPolicy.MaxQueueBytesPerSOCKS {
return ErrSOCKSQueueFull
}
s.OutboundQueue = append(s.OutboundQueue, item)
s.QueuedBytes = nextBytes
return nil
}
func (s *SOCKSConnection) EnqueuePayloadChunks(payload []byte, final bool) (int, error) {
chunks := splitPayloadChunks(payload, s.ChunkPolicy.MaxChunkSize)
if len(chunks) == 0 && !final {
return 0, nil
}
enqueued := 0
for i, chunk := range chunks {
packetFinal := final && i == len(chunks)-1
packet := s.BuildSOCKSDataPacket(chunk, packetFinal)
if err := s.EnqueuePacket(packet); err != nil {
return enqueued, err
}
enqueued++
}
return enqueued, nil
}
func (s *SOCKSConnection) QueueSnapshot() (items int, bytes int) {
s.queueMu.Lock()
defer s.queueMu.Unlock()
return len(s.OutboundQueue), s.QueuedBytes
}
func (s *SOCKSConnection) InFlightCount() int {
s.queueMu.Lock()
defer s.queueMu.Unlock()
return len(s.InFlight)
}
func (s *SOCKSConnection) DequeuePacket() *SOCKSOutboundQueueItem {
s.queueMu.Lock()
defer s.queueMu.Unlock()
if len(s.OutboundQueue) == 0 {
return nil
}
item := s.OutboundQueue[0]
s.OutboundQueue[0] = nil
s.OutboundQueue = s.OutboundQueue[1:]
s.QueuedBytes -= item.PayloadSize
if s.QueuedBytes < 0 {
s.QueuedBytes = 0
}
return item
}
func (s *SOCKSConnection) RequeueFront(items []*SOCKSOutboundQueueItem) {
if len(items) == 0 {
return
}
s.queueMu.Lock()
defer s.queueMu.Unlock()
front := make([]*SOCKSOutboundQueueItem, 0, len(items)+len(s.OutboundQueue))
for _, item := range items {
if item == nil {
continue
}
front = append(front, item)
s.QueuedBytes += item.PayloadSize
}
front = append(front, s.OutboundQueue...)
s.OutboundQueue = front
}
func (s *SOCKSConnection) MarkInFlight(items []*SOCKSOutboundQueueItem) {
if len(items) == 0 {
return
}
s.queueMu.Lock()
defer s.queueMu.Unlock()
for _, item := range items {
if item == nil {
continue
}
item.SentAt = time.Now()
s.InFlight[item.IdentityKey] = item
}
}
func (s *SOCKSConnection) AckPacket(packet protocol.Packet) bool {
identityKey := protocol.PacketIdentityKey(
packet.ClientSessionKey,
packet.SOCKSID,
ackTargetPacketType(packet.Type),
packet.Sequence,
packet.FragmentID,
)
s.queueMu.Lock()
defer s.queueMu.Unlock()
if _, ok := s.InFlight[identityKey]; ok {
delete(s.InFlight, identityKey)
return true
}
return false
}
func (s *SOCKSConnection) RequeueInFlightByIdentity(identityKeys []string) {
if len(identityKeys) == 0 {
return
}
s.queueMu.Lock()
defer s.queueMu.Unlock()
front := make([]*SOCKSOutboundQueueItem, 0, len(identityKeys)+len(s.OutboundQueue))
for _, identityKey := range identityKeys {
item, ok := s.InFlight[identityKey]
if !ok || item == nil {
continue
}
delete(s.InFlight, identityKey)
item.SentAt = time.Time{}
front = append(front, item)
s.QueuedBytes += item.PayloadSize
}
front = append(front, s.OutboundQueue...)
s.OutboundQueue = front
}
func (s *SOCKSConnection) ReclaimExpiredInFlight(ackTimeout time.Duration, maxRetryCount int) (requeued int, dropped int) {
now := time.Now()
s.queueMu.Lock()
defer s.queueMu.Unlock()
if len(s.InFlight) == 0 {
return 0, 0
}
front := make([]*SOCKSOutboundQueueItem, 0, len(s.InFlight)+len(s.OutboundQueue))
for identityKey, item := range s.InFlight {
if item == nil || item.SentAt.IsZero() || now.Sub(item.SentAt) < ackTimeout {
continue
}
delete(s.InFlight, identityKey)
if item.RetryCount >= maxRetryCount {
dropped++
continue
}
item.RetryCount++
item.SentAt = time.Time{}
front = append(front, item)
s.QueuedBytes += item.PayloadSize
requeued++
}
if len(front) > 0 {
front = append(front, s.OutboundQueue...)
s.OutboundQueue = front
}
return requeued, dropped
}
func ackTargetPacketType(packetType protocol.PacketType) protocol.PacketType {
switch packetType {
case protocol.PacketTypeSOCKSConnectAck,
protocol.PacketTypeSOCKSConnectFail,
protocol.PacketTypeSOCKSRuleSetDenied,
protocol.PacketTypeSOCKSNetworkUnreachable,
protocol.PacketTypeSOCKSHostUnreachable,
protocol.PacketTypeSOCKSConnectionRefused,
protocol.PacketTypeSOCKSTTLExpired,
protocol.PacketTypeSOCKSCommandUnsupported,
protocol.PacketTypeSOCKSAddressTypeUnsupported,
protocol.PacketTypeSOCKSAuthFailed,
protocol.PacketTypeSOCKSUpstreamUnavailable:
return protocol.PacketTypeSOCKSConnect
case protocol.PacketTypeSOCKSDataAck:
return protocol.PacketTypeSOCKSData
case protocol.PacketTypeSOCKSCloseRead:
return protocol.PacketTypeSOCKSCloseRead
case protocol.PacketTypeSOCKSCloseWrite:
return protocol.PacketTypeSOCKSCloseWrite
case protocol.PacketTypeSOCKSRST:
return protocol.PacketTypeSOCKSRST
default:
return packetType
}
}
func splitPayloadChunks(payload []byte, maxChunkSize int) [][]byte {
if len(payload) == 0 || maxChunkSize <= 0 {
return nil
}
chunks := make([][]byte, 0, (len(payload)+maxChunkSize-1)/maxChunkSize)
for start := 0; start < len(payload); start += maxChunkSize {
end := start + maxChunkSize
if end > len(payload) {
end = len(payload)
}
chunk := append([]byte(nil), payload[start:end]...)
chunks = append(chunks, chunk)
}
return chunks
}
func (s *SOCKSConnectionStore) Get(id uint64) *SOCKSConnection { func (s *SOCKSConnectionStore) Get(id uint64) *SOCKSConnection {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
-66
View File
@@ -1,66 +0,0 @@
// ==============================================================================
// MasterHttpRelayVPN
// Author: MasterkinG32
// Github: https://github.com/masterking32
// Year: 2026
// ==============================================================================
package client
import "masterhttprelayvpn/internal/protocol"
func (s *SOCKSConnection) nextSequence() uint64 {
s.NextSequence++
return s.NextSequence
}
func (s *SOCKSConnection) BuildSOCKSConnectPacket() protocol.Packet {
packet := protocol.NewPacket(s.ClientSessionKey, protocol.PacketTypeSOCKSConnect)
packet.SOCKSID = s.ID
packet.Target = &protocol.Target{
Host: s.TargetHost,
Port: s.TargetPort,
AddressType: s.TargetAddressType,
}
return packet
}
func (s *SOCKSConnection) BuildSOCKSDataPacket(payload []byte, final bool) protocol.Packet {
packet := protocol.NewPacket(s.ClientSessionKey, protocol.PacketTypeSOCKSData)
packet.SOCKSID = s.ID
packet.Sequence = s.nextSequence()
packet.Final = final
if len(payload) > 0 {
packet.Payload = append([]byte(nil), payload...)
}
return packet
}
func (s *SOCKSConnection) BuildSOCKSCloseReadPacket() protocol.Packet {
s.CloseReadSent = true
packet := protocol.NewPacket(s.ClientSessionKey, protocol.PacketTypeSOCKSCloseRead)
packet.SOCKSID = s.ID
packet.Sequence = s.nextSequence()
packet.Final = true
return packet
}
func (s *SOCKSConnection) BuildSOCKSCloseWritePacket() protocol.Packet {
s.CloseWriteSent = true
packet := protocol.NewPacket(s.ClientSessionKey, protocol.PacketTypeSOCKSCloseWrite)
packet.SOCKSID = s.ID
packet.Sequence = s.nextSequence()
packet.Final = true
return packet
}
func (s *SOCKSConnection) BuildSOCKSRSTPacket() protocol.Packet {
s.ResetSent = true
packet := protocol.NewPacket(s.ClientSessionKey, protocol.PacketTypeSOCKSRST)
packet.SOCKSID = s.ID
packet.Sequence = s.nextSequence()
packet.Final = true
return packet
}
-283
View File
@@ -1,283 +0,0 @@
// ==============================================================================
// MasterHttpRelayVPN
// Author: MasterkinG32
// Github: https://github.com/masterking32
// Year: 2026
// ==============================================================================
package client
import (
"errors"
"time"
"masterhttprelayvpn/internal/config"
"masterhttprelayvpn/internal/protocol"
)
var ErrSOCKSQueueFull = errors.New("socks outbound queue is full")
var ErrSOCKSConnectTimeout = errors.New("socks connect timeout")
type ChunkPolicy struct {
MaxChunkSize int
MaxPacketsPerBatch int
MaxBatchBytes int
WorkerCount int
MaxQueueBytesPerSOCKS int
}
func newChunkPolicy(cfg config.Config) ChunkPolicy {
return ChunkPolicy{
MaxChunkSize: cfg.MaxChunkSize,
MaxPacketsPerBatch: cfg.MaxPacketsPerBatch,
MaxBatchBytes: cfg.MaxBatchBytes,
WorkerCount: cfg.WorkerCount,
MaxQueueBytesPerSOCKS: cfg.MaxQueueBytesPerSOCKS,
}
}
type SOCKSOutboundQueueItem struct {
IdentityKey string
Packet protocol.Packet
QueuedAt time.Time
SentAt time.Time
PayloadSize int
RetryCount int
}
func (s *SOCKSConnection) EnqueuePacket(packet protocol.Packet) error {
if err := packet.Validate(); err != nil {
return err
}
item := &SOCKSOutboundQueueItem{
IdentityKey: protocol.PacketIdentityKey(
packet.ClientSessionKey,
packet.SOCKSID,
packet.Type,
packet.Sequence,
packet.FragmentID,
),
Packet: packet,
QueuedAt: time.Now(),
PayloadSize: len(packet.Payload),
}
s.queueMu.Lock()
defer s.queueMu.Unlock()
nextBytes := s.QueuedBytes + item.PayloadSize
if nextBytes > s.ChunkPolicy.MaxQueueBytesPerSOCKS {
return ErrSOCKSQueueFull
}
s.OutboundQueue = append(s.OutboundQueue, item)
s.QueuedBytes = nextBytes
return nil
}
func (s *SOCKSConnection) EnqueuePayloadChunks(payload []byte, final bool) (int, error) {
chunks := splitPayloadChunks(payload, s.ChunkPolicy.MaxChunkSize)
if len(chunks) == 0 && !final {
return 0, nil
}
enqueued := 0
for i, chunk := range chunks {
packetFinal := final && i == len(chunks)-1
packet := s.BuildSOCKSDataPacket(chunk, packetFinal)
if err := s.EnqueuePacket(packet); err != nil {
return enqueued, err
}
enqueued++
}
return enqueued, nil
}
func (s *SOCKSConnection) QueueSnapshot() (items int, bytes int) {
s.queueMu.Lock()
defer s.queueMu.Unlock()
return len(s.OutboundQueue), s.QueuedBytes
}
func (s *SOCKSConnection) InFlightCount() int {
s.queueMu.Lock()
defer s.queueMu.Unlock()
return len(s.InFlight)
}
func (s *SOCKSConnection) DequeuePacket() *SOCKSOutboundQueueItem {
s.queueMu.Lock()
defer s.queueMu.Unlock()
if len(s.OutboundQueue) == 0 {
return nil
}
item := s.OutboundQueue[0]
s.OutboundQueue[0] = nil
s.OutboundQueue = s.OutboundQueue[1:]
s.QueuedBytes -= item.PayloadSize
if s.QueuedBytes < 0 {
s.QueuedBytes = 0
}
return item
}
func (s *SOCKSConnection) RequeueFront(items []*SOCKSOutboundQueueItem) {
if len(items) == 0 {
return
}
s.queueMu.Lock()
defer s.queueMu.Unlock()
front := make([]*SOCKSOutboundQueueItem, 0, len(items)+len(s.OutboundQueue))
for _, item := range items {
if item == nil {
continue
}
front = append(front, item)
s.QueuedBytes += item.PayloadSize
}
front = append(front, s.OutboundQueue...)
s.OutboundQueue = front
}
func (s *SOCKSConnection) MarkInFlight(items []*SOCKSOutboundQueueItem) {
if len(items) == 0 {
return
}
s.queueMu.Lock()
defer s.queueMu.Unlock()
for _, item := range items {
if item == nil {
continue
}
item.SentAt = time.Now()
s.InFlight[item.IdentityKey] = item
}
}
func (s *SOCKSConnection) AckPacket(packet protocol.Packet) bool {
identityKey := protocol.PacketIdentityKey(
packet.ClientSessionKey,
packet.SOCKSID,
ackTargetPacketType(packet.Type),
packet.Sequence,
packet.FragmentID,
)
s.queueMu.Lock()
defer s.queueMu.Unlock()
if _, ok := s.InFlight[identityKey]; ok {
delete(s.InFlight, identityKey)
return true
}
return false
}
func (s *SOCKSConnection) RequeueInFlightByIdentity(identityKeys []string) {
if len(identityKeys) == 0 {
return
}
s.queueMu.Lock()
defer s.queueMu.Unlock()
front := make([]*SOCKSOutboundQueueItem, 0, len(identityKeys)+len(s.OutboundQueue))
for _, identityKey := range identityKeys {
item, ok := s.InFlight[identityKey]
if !ok || item == nil {
continue
}
delete(s.InFlight, identityKey)
item.SentAt = time.Time{}
front = append(front, item)
s.QueuedBytes += item.PayloadSize
}
front = append(front, s.OutboundQueue...)
s.OutboundQueue = front
}
func (s *SOCKSConnection) ReclaimExpiredInFlight(ackTimeout time.Duration, maxRetryCount int) (requeued int, dropped int) {
now := time.Now()
s.queueMu.Lock()
defer s.queueMu.Unlock()
if len(s.InFlight) == 0 {
return 0, 0
}
front := make([]*SOCKSOutboundQueueItem, 0, len(s.InFlight)+len(s.OutboundQueue))
for identityKey, item := range s.InFlight {
if item == nil || item.SentAt.IsZero() || now.Sub(item.SentAt) < ackTimeout {
continue
}
delete(s.InFlight, identityKey)
if item.RetryCount >= maxRetryCount {
dropped++
continue
}
item.RetryCount++
item.SentAt = time.Time{}
front = append(front, item)
s.QueuedBytes += item.PayloadSize
requeued++
}
if len(front) > 0 {
front = append(front, s.OutboundQueue...)
s.OutboundQueue = front
}
return requeued, dropped
}
func ackTargetPacketType(packetType protocol.PacketType) protocol.PacketType {
switch packetType {
case protocol.PacketTypeSOCKSConnectAck,
protocol.PacketTypeSOCKSConnectFail,
protocol.PacketTypeSOCKSRuleSetDenied,
protocol.PacketTypeSOCKSNetworkUnreachable,
protocol.PacketTypeSOCKSHostUnreachable,
protocol.PacketTypeSOCKSConnectionRefused,
protocol.PacketTypeSOCKSTTLExpired,
protocol.PacketTypeSOCKSCommandUnsupported,
protocol.PacketTypeSOCKSAddressTypeUnsupported,
protocol.PacketTypeSOCKSAuthFailed,
protocol.PacketTypeSOCKSUpstreamUnavailable:
return protocol.PacketTypeSOCKSConnect
case protocol.PacketTypeSOCKSDataAck:
return protocol.PacketTypeSOCKSData
case protocol.PacketTypeSOCKSCloseRead:
return protocol.PacketTypeSOCKSCloseRead
case protocol.PacketTypeSOCKSCloseWrite:
return protocol.PacketTypeSOCKSCloseWrite
case protocol.PacketTypeSOCKSRST:
return protocol.PacketTypeSOCKSRST
default:
return packetType
}
}
func splitPayloadChunks(payload []byte, maxChunkSize int) [][]byte {
if len(payload) == 0 || maxChunkSize <= 0 {
return nil
}
chunks := make([][]byte, 0, (len(payload)+maxChunkSize-1)/maxChunkSize)
for start := 0; start < len(payload); start += maxChunkSize {
end := start + maxChunkSize
if end > len(payload) {
end = len(payload)
}
chunk := append([]byte(nil), payload[start:end]...)
chunks = append(chunks, chunk)
}
return chunks
}
+71
View File
@@ -0,0 +1,71 @@
// ==============================================================================
// MasterHttpRelayVPN
// Author: MasterkinG32
// Github: https://github.com/masterking32
// Year: 2026
// ==============================================================================
package protocol
import "time"
type PendingPacket struct {
Packet Packet
QueuedAt time.Time
}
func ContainsPendingPacket(pendingPackets []PendingPacket, packet Packet) bool {
for _, pending := range pendingPackets {
if pending.Packet.Type == packet.Type &&
pending.Packet.FragmentID == packet.FragmentID &&
pending.Packet.TotalFragments == packet.TotalFragments {
return true
}
}
return false
}
func BufferedPendingPacketCount(pending map[uint64][]PendingPacket) int {
total := 0
for _, pendingPackets := range pending {
total += len(pendingPackets)
}
return total
}
func SortPendingPackets(pendingPackets []PendingPacket) {
for i := 1; i < len(pendingPackets); i++ {
current := pendingPackets[i]
j := i - 1
for ; j >= 0 && PacketSortOrder(current.Packet.Type) < PacketSortOrder(pendingPackets[j].Packet.Type); j-- {
pendingPackets[j+1] = pendingPackets[j]
}
pendingPackets[j+1] = current
}
}
func PacketSortOrder(packetType PacketType) int {
switch packetType {
case PacketTypeSOCKSData:
return 0
case PacketTypeSOCKSCloseRead:
return 1
case PacketTypeSOCKSCloseWrite:
return 2
case PacketTypeSOCKSRST:
return 3
default:
return 4
}
}
func IsReorderSequencedPacket(packetType PacketType) bool {
switch packetType {
case PacketTypeSOCKSData,
PacketTypeSOCKSCloseRead,
PacketTypeSOCKSCloseWrite,
PacketTypeSOCKSRST:
return true
default:
return false
}
}
+9 -59
View File
@@ -55,7 +55,7 @@ type SOCKSState struct {
LastSequenceSeen uint64 LastSequenceSeen uint64
NextInboundSequence uint64 NextInboundSequence uint64
OutboundSequence uint64 OutboundSequence uint64
PendingInbound map[uint64][]PendingInboundPacket PendingInbound map[uint64][]protocol.PendingPacket
UpstreamConn net.Conn UpstreamConn net.Conn
activityMu sync.RWMutex activityMu sync.RWMutex
upstreamStateMu sync.RWMutex upstreamStateMu sync.RWMutex
@@ -68,11 +68,6 @@ type SOCKSState struct {
MaxQueueBytes int MaxQueueBytes int
} }
type PendingInboundPacket struct {
Packet protocol.Packet
QueuedAt time.Time
}
func New(cfg config.Config, lg *logger.Logger) *Server { func New(cfg config.Config, lg *logger.Logger) *Server {
return &Server{ return &Server{
cfg: cfg, cfg: cfg,
@@ -235,7 +230,7 @@ func (s *Server) processPacketLocked(session *ClientSession, packet protocol.Pac
ConnectSeen: true, ConnectSeen: true,
LastSequenceSeen: packet.Sequence, LastSequenceSeen: packet.Sequence,
MaxQueueBytes: s.cfg.MaxServerQueueBytes, MaxQueueBytes: s.cfg.MaxServerQueueBytes,
PendingInbound: make(map[uint64][]PendingInboundPacket), PendingInbound: make(map[uint64][]protocol.PendingPacket),
} }
session.SOCKSConnections[packet.SOCKSID] = socksState session.SOCKSConnections[packet.SOCKSID] = socksState
} else { } else {
@@ -243,7 +238,7 @@ func (s *Server) processPacketLocked(session *ClientSession, packet protocol.Pac
socksState.Target = packet.Target socksState.Target = packet.Target
socksState.ConnectSeen = true socksState.ConnectSeen = true
if socksState.PendingInbound == nil { if socksState.PendingInbound == nil {
socksState.PendingInbound = make(map[uint64][]PendingInboundPacket) socksState.PendingInbound = make(map[uint64][]protocol.PendingPacket)
} }
if packet.Sequence > socksState.LastSequenceSeen { if packet.Sequence > socksState.LastSequenceSeen {
socksState.LastSequenceSeen = packet.Sequence socksState.LastSequenceSeen = packet.Sequence
@@ -387,7 +382,7 @@ func (s *Server) getOrCreateSOCKSStateLocked(session *ClientSession, packet prot
socksState := session.SOCKSConnections[packet.SOCKSID] socksState := session.SOCKSConnections[packet.SOCKSID]
if socksState != nil { if socksState != nil {
if socksState.PendingInbound == nil { if socksState.PendingInbound == nil {
socksState.PendingInbound = make(map[uint64][]PendingInboundPacket) socksState.PendingInbound = make(map[uint64][]protocol.PendingPacket)
} }
return socksState return socksState
} }
@@ -399,7 +394,7 @@ func (s *Server) getOrCreateSOCKSStateLocked(session *ClientSession, packet prot
Target: packet.Target, Target: packet.Target,
LastSequenceSeen: packet.Sequence, LastSequenceSeen: packet.Sequence,
MaxQueueBytes: s.cfg.MaxServerQueueBytes, MaxQueueBytes: s.cfg.MaxServerQueueBytes,
PendingInbound: make(map[uint64][]PendingInboundPacket), PendingInbound: make(map[uint64][]protocol.PendingPacket),
} }
session.SOCKSConnections[packet.SOCKSID] = socksState session.SOCKSConnections[packet.SOCKSID] = socksState
s.log.Debugf( s.log.Debugf(
@@ -633,13 +628,13 @@ func (s *SOCKSState) queueInboundPacketLocked(packet protocol.Packet, now time.T
return nil, true, false return nil, true, false
} }
pendingForSequence := s.PendingInbound[packet.Sequence] pendingForSequence := s.PendingInbound[packet.Sequence]
if containsPendingInboundPacketLocked(pendingForSequence, packet) { if protocol.ContainsPendingPacket(pendingForSequence, packet) {
return nil, true, false return nil, true, false
} }
if bufferedInboundPacketCountLocked(s.PendingInbound) >= maxBuffered { if protocol.BufferedPendingPacketCount(s.PendingInbound) >= maxBuffered {
return nil, false, true return nil, false, true
} }
s.PendingInbound[packet.Sequence] = append(s.PendingInbound[packet.Sequence], PendingInboundPacket{ s.PendingInbound[packet.Sequence] = append(s.PendingInbound[packet.Sequence], protocol.PendingPacket{
Packet: packet, Packet: packet,
QueuedAt: now, QueuedAt: now,
}) })
@@ -657,7 +652,7 @@ func (s *SOCKSState) drainReadyInboundLocked() []protocol.Packet {
if !ok || len(pendingPackets) == 0 { if !ok || len(pendingPackets) == 0 {
break break
} }
sortPendingInboundPacketsLocked(pendingPackets) protocol.SortPendingPackets(pendingPackets)
for _, pending := range pendingPackets { for _, pending := range pendingPackets {
ready = append(ready, pending.Packet) ready = append(ready, pending.Packet)
} }
@@ -680,51 +675,6 @@ func (s *SOCKSState) hasExpiredInboundGapLocked(now time.Time, timeout time.Dura
return false return false
} }
func containsPendingInboundPacketLocked(pendingPackets []PendingInboundPacket, packet protocol.Packet) bool {
for _, pending := range pendingPackets {
if pending.Packet.Type == packet.Type &&
pending.Packet.FragmentID == packet.FragmentID &&
pending.Packet.TotalFragments == packet.TotalFragments {
return true
}
}
return false
}
func bufferedInboundPacketCountLocked(pending map[uint64][]PendingInboundPacket) int {
total := 0
for _, pendingPackets := range pending {
total += len(pendingPackets)
}
return total
}
func sortPendingInboundPacketsLocked(pendingPackets []PendingInboundPacket) {
for i := 1; i < len(pendingPackets); i++ {
current := pendingPackets[i]
j := i - 1
for ; j >= 0 && inboundPacketSortOrderLocked(current.Packet.Type) < inboundPacketSortOrderLocked(pendingPackets[j].Packet.Type); j-- {
pendingPackets[j+1] = pendingPackets[j]
}
pendingPackets[j+1] = current
}
}
func inboundPacketSortOrderLocked(packetType protocol.PacketType) int {
switch packetType {
case protocol.PacketTypeSOCKSData:
return 0
case protocol.PacketTypeSOCKSCloseRead:
return 1
case protocol.PacketTypeSOCKSCloseWrite:
return 2
case protocol.PacketTypeSOCKSRST:
return 3
default:
return 4
}
}
func (s *SOCKSState) enqueueOutboundData(clientSessionKey string, payload []byte, final bool) bool { func (s *SOCKSState) enqueueOutboundData(clientSessionKey string, payload []byte, final bool) bool {
packet := protocol.NewPacket(clientSessionKey, protocol.PacketTypeSOCKSData) packet := protocol.NewPacket(clientSessionKey, protocol.PacketTypeSOCKSData)
packet.SOCKSID = s.ID packet.SOCKSID = s.ID
+5 -5
View File
@@ -69,7 +69,7 @@ func TestDrainSessionOutboundLockedRespectsGlobalLimits(t *testing.T) {
func TestSOCKSStateInboundReorderQueuesUntilGapFilled(t *testing.T) { func TestSOCKSStateInboundReorderQueuesUntilGapFilled(t *testing.T) {
socksState := &SOCKSState{ socksState := &SOCKSState{
ConnectAcked: true, ConnectAcked: true,
PendingInbound: make(map[uint64][]PendingInboundPacket), PendingInbound: make(map[uint64][]protocol.PendingPacket),
MaxQueueBytes: 1024, MaxQueueBytes: 1024,
} }
@@ -97,9 +97,9 @@ func TestSOCKSStateInboundReorderQueuesUntilGapFilled(t *testing.T) {
func TestSOCKSStateInboundGapTimeout(t *testing.T) { func TestSOCKSStateInboundGapTimeout(t *testing.T) {
socksState := &SOCKSState{ socksState := &SOCKSState{
PendingInbound: make(map[uint64][]PendingInboundPacket), PendingInbound: make(map[uint64][]protocol.PendingPacket),
} }
socksState.PendingInbound[3] = []PendingInboundPacket{{ socksState.PendingInbound[3] = []protocol.PendingPacket{{
Packet: testDataPacket("client-session", 1, 3, "late"), Packet: testDataPacket("client-session", 1, 3, "late"),
QueuedAt: time.Now().Add(-2 * time.Second), QueuedAt: time.Now().Add(-2 * time.Second),
}} }}
@@ -114,7 +114,7 @@ func TestSOCKSStateInboundGapTimeout(t *testing.T) {
func TestSOCKSStateInboundDataWaitsForConnect(t *testing.T) { func TestSOCKSStateInboundDataWaitsForConnect(t *testing.T) {
socksState := &SOCKSState{ socksState := &SOCKSState{
PendingInbound: make(map[uint64][]PendingInboundPacket), PendingInbound: make(map[uint64][]protocol.PendingPacket),
} }
packet1 := testDataPacket("client-session", 1, 1, "one") packet1 := testDataPacket("client-session", 1, 1, "one")
@@ -139,7 +139,7 @@ func TestSOCKSStateInboundDataWaitsForConnect(t *testing.T) {
func TestSOCKSStateInboundReorderAllowsMultiplePacketTypesPerSequence(t *testing.T) { func TestSOCKSStateInboundReorderAllowsMultiplePacketTypesPerSequence(t *testing.T) {
socksState := &SOCKSState{ socksState := &SOCKSState{
ConnectAcked: true, ConnectAcked: true,
PendingInbound: make(map[uint64][]PendingInboundPacket), PendingInbound: make(map[uint64][]protocol.PendingPacket),
} }
closeWrite := protocol.NewPacket("client-session", protocol.PacketTypeSOCKSCloseWrite) closeWrite := protocol.NewPacket("client-session", protocol.PacketTypeSOCKSCloseWrite)