Session based and stream data.

This commit is contained in:
Amin.MasterkinG
2026-04-20 17:29:16 +03:30
parent 95f9fb7470
commit 4ddbc15737
3 changed files with 91 additions and 66 deletions
+24 -7
View File
@@ -8,6 +8,8 @@ package client
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"net"
"sync"
@@ -18,20 +20,24 @@ import (
)
type Client struct {
cfg config.Config
log *logger.Logger
sessions *SessionStore
cfg config.Config
log *logger.Logger
clientSessionKey string
streams *StreamStore
connMu sync.Mutex
conns map[net.Conn]struct{}
}
func New(cfg config.Config, lg *logger.Logger) *Client {
clientSessionKey := generateClientSessionKey()
return &Client{
cfg: cfg,
log: lg,
sessions: NewSessionStore(),
conns: make(map[net.Conn]struct{}),
cfg: cfg,
log: lg,
clientSessionKey: clientSessionKey,
streams: NewStreamStore(),
conns: make(map[net.Conn]struct{}),
}
}
@@ -44,6 +50,7 @@ func (c *Client) Run(ctx context.Context) error {
defer ln.Close()
c.log.Infof("<green>SOCKS5 listener started on <cyan>%s</cyan></green>", addr)
c.log.Infof("<green>client session key: <cyan>%s</cyan></green>", c.clientSessionKey)
go func() {
<-ctx.Done()
@@ -103,3 +110,13 @@ func (c *Client) closeAllConns() {
_ = conn.Close()
}
}
func generateClientSessionKey() string {
now := time.Now().UTC().Format("20060102T150405.000000000Z")
random := make([]byte, 16)
if _, err := rand.Read(random); err != nil {
return fmt.Sprintf("%s_fallback", now)
}
return fmt.Sprintf("%s_%s", now, hex.EncodeToString(random))
}
+31 -29
View File
@@ -13,58 +13,60 @@ import (
"time"
)
type Session struct {
ID uint64
CreatedAt time.Time
LastActivityAt time.Time
ClientAddr string
TargetHost string
TargetPort uint16
AddressType byte
InitialPayload []byte
BytesCaptured int
AuthMethod byte
UsernameUsed string
HandshakeDone bool
ConnectAccepted bool
type Stream struct {
ID uint64
ClientSessionKey string
CreatedAt time.Time
LastActivityAt time.Time
ClientAddress string
TargetHost string
TargetPort uint16
TargetAddressType byte
InitialPayload []byte
BufferedBytes int
SOCKSAuthMethod byte
SOCKSUsername string
HandshakeDone bool
ConnectAccepted bool
}
func (s *Session) InitialPayloadHex() string {
func (s *Stream) InitialPayloadHex() string {
if len(s.InitialPayload) == 0 {
return ""
}
return hex.EncodeToString(s.InitialPayload)
}
type SessionStore struct {
type StreamStore struct {
nextID atomic.Uint64
mu sync.RWMutex
items map[uint64]*Session
items map[uint64]*Stream
}
func NewSessionStore() *SessionStore {
return &SessionStore{
items: make(map[uint64]*Session),
func NewStreamStore() *StreamStore {
return &StreamStore{
items: make(map[uint64]*Stream),
}
}
func (s *SessionStore) New(clientAddr string) *Session {
func (s *StreamStore) New(clientSessionKey string, clientAddress string) *Stream {
id := s.nextID.Add(1)
now := time.Now()
session := &Session{
ID: id,
CreatedAt: now,
LastActivityAt: now,
ClientAddr: clientAddr,
stream := &Stream{
ID: id,
ClientSessionKey: clientSessionKey,
CreatedAt: now,
LastActivityAt: now,
ClientAddress: clientAddress,
}
s.mu.Lock()
s.items[id] = session
s.items[id] = stream
s.mu.Unlock()
return session
return stream
}
func (s *SessionStore) Delete(id uint64) {
func (s *StreamStore) Delete(id uint64) {
s.mu.Lock()
delete(s.items, id)
s.mu.Unlock()
+36 -30
View File
@@ -46,18 +46,21 @@ func (c *Client) handleConn(ctx context.Context, conn net.Conn) {
defer c.unregisterConn(conn)
defer conn.Close()
session := c.sessions.New(conn.RemoteAddr().String())
defer c.sessions.Delete(session.ID)
stream := c.streams.New(c.clientSessionKey, conn.RemoteAddr().String())
defer c.streams.Delete(stream.ID)
c.log.Infof("<green>accepted client <cyan>%s</cyan> session=<cyan>%d</cyan></green>", conn.RemoteAddr(), session.ID)
c.log.Infof(
"<green>accepted client <cyan>%s</cyan> stream=<cyan>%d</cyan> client_session_key=<cyan>%s</cyan></green>",
conn.RemoteAddr(), stream.ID, stream.ClientSessionKey,
)
if err := c.handleSOCKS5(ctx, conn, session); err != nil {
c.log.Errorf("<red>session=<cyan>%d</cyan> closed: <cyan>%v</cyan></red>", session.ID, err)
if err := c.handleSOCKS5(ctx, conn, stream); err != nil {
c.log.Errorf("<red>stream=<cyan>%d</cyan> closed: <cyan>%v</cyan></red>", stream.ID, err)
return
}
}
func (c *Client) handleSOCKS5(ctx context.Context, conn net.Conn, session *Session) error {
func (c *Client) handleSOCKS5(ctx context.Context, conn net.Conn, stream *Stream) error {
version := make([]byte, 1)
if _, err := io.ReadFull(conn, version); err != nil {
return err
@@ -66,13 +69,13 @@ func (c *Client) handleSOCKS5(ctx context.Context, conn net.Conn, session *Sessi
return fmt.Errorf("<red>unsupported SOCKS version: <cyan>%d</cyan></red>", version[0])
}
method, err := c.negotiateAuth(conn, session)
method, err := c.negotiateAuth(conn, stream)
if err != nil {
return err
}
if method == socksMethodUserPass {
if err := c.handleUserPassAuth(conn, session); err != nil {
if err := c.handleUserPassAuth(conn, stream); err != nil {
return err
}
}
@@ -82,26 +85,26 @@ func (c *Client) handleSOCKS5(ctx context.Context, conn net.Conn, session *Sessi
return err
}
session.TargetHost = targetHost
session.TargetPort = targetPort
session.AddressType = atyp
session.ConnectAccepted = true
session.HandshakeDone = true
session.LastActivityAt = time.Now()
stream.TargetHost = targetHost
stream.TargetPort = targetPort
stream.TargetAddressType = atyp
stream.ConnectAccepted = true
stream.HandshakeDone = true
stream.LastActivityAt = time.Now()
if err := writeSocksReply(conn, socksReplySuccess); err != nil {
return err
}
c.log.Infof(
"<green>session=<cyan>%d</cyan> CONNECT target=<cyan>%s:%d</cyan> auth_method=<cyan>%d</cyan></green>",
session.ID, session.TargetHost, session.TargetPort, session.AuthMethod,
"<green>stream=<cyan>%d</cyan> CONNECT target=<cyan>%s:%d</cyan> auth_method=<cyan>%d</cyan> client_session_key=<cyan>%s</cyan></green>",
stream.ID, stream.TargetHost, stream.TargetPort, stream.SOCKSAuthMethod, stream.ClientSessionKey,
)
return c.captureInitialPayload(ctx, conn, session)
return c.captureInitialPayload(ctx, conn, stream)
}
func (c *Client) negotiateAuth(conn net.Conn, session *Session) (byte, error) {
func (c *Client) negotiateAuth(conn net.Conn, stream *Stream) (byte, error) {
countBuf := make([]byte, 1)
if _, err := io.ReadFull(conn, countBuf); err != nil {
return 0, err
@@ -131,11 +134,11 @@ func (c *Client) negotiateAuth(conn net.Conn, session *Session) (byte, error) {
return 0, errors.New("no acceptable auth method")
}
session.AuthMethod = selected
stream.SOCKSAuthMethod = selected
return selected, nil
}
func (c *Client) handleUserPassAuth(conn net.Conn, session *Session) error {
func (c *Client) handleUserPassAuth(conn net.Conn, stream *Stream) error {
header := make([]byte, 2)
if _, err := io.ReadFull(conn, header); err != nil {
return err
@@ -160,7 +163,7 @@ func (c *Client) handleUserPassAuth(conn net.Conn, session *Session) error {
}
ok := string(username) == c.cfg.SOCKSUsername && string(password) == c.cfg.SOCKSPassword
session.UsernameUsed = string(username)
stream.SOCKSUsername = string(username)
if ok {
_, err := conn.Write([]byte{socksUserPassVersion, socksAuthSuccess})
return err
@@ -244,7 +247,7 @@ func writeSocksReply(conn net.Conn, reply byte) error {
return err
}
func (c *Client) captureInitialPayload(ctx context.Context, conn net.Conn, session *Session) error {
func (c *Client) captureInitialPayload(ctx context.Context, conn net.Conn, stream *Stream) error {
peekTimeout := 2 * time.Second
idleTimeout := 30 * time.Second
buf := make([]byte, 32*1024)
@@ -255,12 +258,12 @@ func (c *Client) captureInitialPayload(ctx context.Context, conn net.Conn, sessi
n, err := conn.Read(buf)
if err == nil && n > 0 {
session.InitialPayload = append([]byte(nil), buf[:n]...)
session.BytesCaptured += n
session.LastActivityAt = time.Now()
stream.InitialPayload = append([]byte(nil), buf[:n]...)
stream.BufferedBytes += n
stream.LastActivityAt = time.Now()
c.log.Debugf(
"<green>session=<cyan>%d</cyan> captured initial payload bytes=<cyan>%d</cyan> target=<cyan>%s</cyan></green>",
session.ID, n, net.JoinHostPort(session.TargetHost, strconv.Itoa(int(session.TargetPort))),
"<green>stream=<cyan>%d</cyan> captured initial payload bytes=<cyan>%d</cyan> target=<cyan>%s</cyan> client_session_key=<cyan>%s</cyan></green>",
stream.ID, n, net.JoinHostPort(stream.TargetHost, strconv.Itoa(int(stream.TargetPort))), stream.ClientSessionKey,
)
} else if ne, ok := err.(net.Error); !ok || !ne.Timeout() {
if errors.Is(err, io.EOF) {
@@ -284,9 +287,12 @@ func (c *Client) captureInitialPayload(ctx context.Context, conn net.Conn, sessi
n, err := conn.Read(buf)
if n > 0 {
session.BytesCaptured += n
session.LastActivityAt = time.Now()
c.log.Debugf("<green>session=<cyan>%d</cyan> buffered payload chunk=<cyan>%d</cyan> total=<cyan>%d</cyan></green>", session.ID, n, session.BytesCaptured)
stream.BufferedBytes += n
stream.LastActivityAt = time.Now()
c.log.Debugf(
"<green>stream=<cyan>%d</cyan> buffered payload chunk=<cyan>%d</cyan> total=<cyan>%d</cyan> client_session_key=<cyan>%s</cyan></green>",
stream.ID, n, stream.BufferedBytes, stream.ClientSessionKey,
)
}
if err != nil {