Finalize transport stabilization, cleanup, and debug tracing

This commit is contained in:
Amin.MasterkinG
2026-04-20 20:21:48 +03:30
parent 025923fe89
commit c4776c88e1
4 changed files with 63 additions and 3 deletions
+1
View File
@@ -62,6 +62,7 @@ func (c *Client) Run(ctx context.Context) error {
c.log.Infof("<yellow>shutdown requested, closing listener and active sessions</yellow>")
_ = ln.Close()
c.closeAllConns()
c.socksConnections.CloseAll()
}()
var wg sync.WaitGroup
+40
View File
@@ -226,6 +226,10 @@ func (w *sendWorker) postBatch(ctx context.Context, c *Client, batch protocol.Ba
}
if resp.StatusCode == http.StatusNoContent {
c.log.Debugf(
"<gray>worker=<cyan>%d</cyan> batch=<cyan>%s</cyan> got no-content response</gray>",
w.id, batch.BatchID,
)
return nil
}
@@ -241,6 +245,10 @@ func (w *sendWorker) postBatch(ctx context.Context, c *Client, batch protocol.Ba
if err != nil {
return err
}
c.log.Debugf(
"<gray>worker=<cyan>%d</cyan> received response batch=<cyan>%s</cyan> packets=<cyan>%d</cyan> bytes=<cyan>%d</cyan></gray>",
w.id, responseBatch.BatchID, len(responseBatch.Packets), len(respBody),
)
if err := c.applyResponseBatch(responseBatch); err != nil {
return err
}
@@ -249,6 +257,10 @@ func (w *sendWorker) postBatch(ctx context.Context, c *Client, batch protocol.Ba
func (c *Client) applyResponseBatch(batch protocol.Batch) error {
for _, packet := range batch.Packets {
c.log.Debugf(
"<gray>apply response packet=<cyan>%s</cyan> socks_id=<cyan>%d</cyan> seq=<cyan>%d</cyan> payload_bytes=<cyan>%d</cyan> final=<cyan>%t</cyan></gray>",
packet.Type, packet.SOCKSID, packet.Sequence, len(packet.Payload), packet.Final,
)
if err := c.applyResponsePacket(packet); err != nil {
return err
}
@@ -272,6 +284,10 @@ func (c *Client) applyResponsePacket(packet protocol.Packet) error {
_ = socksConn.AckPacket(packet)
socksConn.ConnectAccepted = true
socksConn.LastActivityAt = time.Now()
c.log.Debugf(
"<gray>connect ack applied socks_id=<cyan>%d</cyan></gray>",
socksConn.ID,
)
socksConn.CompleteConnect(nil)
return nil
@@ -291,6 +307,10 @@ func (c *Client) applyResponsePacket(packet protocol.Packet) error {
}
_ = socksConn.AckPacket(packet)
socksConn.ConnectFailure = message
c.log.Warnf(
"<yellow>connect failure applied socks_id=<cyan>%d</cyan> reason=<cyan>%s</cyan></yellow>",
socksConn.ID, message,
)
socksConn.CompleteConnect(fmt.Errorf("%s", message))
_ = socksConn.CloseLocal()
return nil
@@ -298,15 +318,27 @@ func (c *Client) applyResponsePacket(packet protocol.Packet) error {
case protocol.PacketTypeSOCKSDataAck:
_ = socksConn.AckPacket(packet)
socksConn.LastActivityAt = time.Now()
c.log.Debugf(
"<gray>data ack applied socks_id=<cyan>%d</cyan> seq=<cyan>%d</cyan></gray>",
socksConn.ID, packet.Sequence,
)
return nil
case protocol.PacketTypeSOCKSData:
socksConn.LastActivityAt = time.Now()
c.log.Debugf(
"<gray>writing to local socket socks_id=<cyan>%d</cyan> bytes=<cyan>%d</cyan></gray>",
socksConn.ID, len(packet.Payload),
)
return socksConn.WriteToLocal(packet.Payload)
case protocol.PacketTypeSOCKSCloseRead:
_ = socksConn.AckPacket(packet)
socksConn.LastActivityAt = time.Now()
c.log.Debugf(
"<gray>close_read applied socks_id=<cyan>%d</cyan></gray>",
socksConn.ID,
)
if err := socksConn.CloseLocalWrite(); err != nil {
return err
}
@@ -318,6 +350,10 @@ func (c *Client) applyResponsePacket(packet protocol.Packet) error {
case protocol.PacketTypeSOCKSCloseWrite:
_ = socksConn.AckPacket(packet)
socksConn.LastActivityAt = time.Now()
c.log.Debugf(
"<gray>close_write applied socks_id=<cyan>%d</cyan></gray>",
socksConn.ID,
)
if socksConn.BothLocalSidesClosed() {
return socksConn.CloseLocal()
}
@@ -326,6 +362,10 @@ func (c *Client) applyResponsePacket(packet protocol.Packet) error {
case protocol.PacketTypeSOCKSRST:
_ = socksConn.AckPacket(packet)
socksConn.LastActivityAt = time.Now()
c.log.Warnf(
"<yellow>rst applied socks_id=<cyan>%d</cyan></yellow>",
socksConn.ID,
)
return socksConn.CloseLocal()
default:
+21 -2
View File
@@ -91,12 +91,17 @@ func (s *SOCKSConnectionStore) New(clientSessionKey string, clientAddress string
return socksConn
}
func (s *SOCKSConnection) WaitForConnect(timeout time.Duration) error {
func (s *SOCKSConnection) WaitForConnect(ctx context.Context, timeout time.Duration) error {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case err := <-s.connectResultC:
return err
case <-time.After(timeout):
case <-timer.C:
return ErrSOCKSConnectTimeout
case <-ctx.Done():
return ctx.Err()
}
}
@@ -192,6 +197,20 @@ func (s *SOCKSConnectionStore) Delete(id uint64) {
s.mu.Unlock()
}
func (s *SOCKSConnectionStore) CloseAll() {
s.mu.Lock()
items := make([]*SOCKSConnection, 0, len(s.items))
for _, item := range s.items {
items = append(items, item)
}
s.items = make(map[uint64]*SOCKSConnection)
s.mu.Unlock()
for _, item := range items {
_ = item.CloseLocal()
}
}
func (s *SOCKSConnectionStore) Snapshot() []*SOCKSConnection {
s.mu.RLock()
defer s.mu.RUnlock()
+1 -1
View File
@@ -102,7 +102,7 @@ func (c *Client) handleSOCKS5(ctx context.Context, conn net.Conn, socksConn *SOC
return err
}
if err := socksConn.WaitForConnect(30 * time.Second); err != nil {
if err := socksConn.WaitForConnect(ctx, 30*time.Second); err != nil {
_ = writeSocksReply(conn, socksReplyGeneralFailure)
return err
}