mirror of
https://github.com/masterking32/MasterHttpRelayVPN.git
synced 2026-05-18 23:54:37 +03:00
Add relay header profiles and randomized request timing/batching
This commit is contained in:
@@ -58,14 +58,14 @@ func (w *sendWorker) run(ctx context.Context, c *Client) {
|
||||
c.reclaimExpiredInFlight()
|
||||
batch, selected := c.buildNextBatch()
|
||||
if len(batch.Packets) == 0 {
|
||||
c.waitForSendWork(ctx, pollInterval)
|
||||
c.waitForSendWork(ctx, c.jitterDuration(pollInterval))
|
||||
continue
|
||||
}
|
||||
|
||||
if err := batch.Validate(); err != nil {
|
||||
c.log.Errorf("<red>worker=<cyan>%d</cyan> invalid batch: <cyan>%v</cyan></red>", w.id, err)
|
||||
c.requeueSelected(selected)
|
||||
c.waitForSendWork(ctx, pollInterval)
|
||||
c.waitForSendWork(ctx, c.jitterDuration(pollInterval))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -75,14 +75,14 @@ func (w *sendWorker) run(ctx context.Context, c *Client) {
|
||||
if err != nil {
|
||||
c.log.Errorf("<red>worker=<cyan>%d</cyan> encrypt batch failed: <cyan>%v</cyan></red>", w.id, err)
|
||||
c.requeueSelected(selected)
|
||||
c.waitForSendWork(ctx, pollInterval)
|
||||
c.waitForSendWork(ctx, c.jitterDuration(pollInterval))
|
||||
continue
|
||||
}
|
||||
|
||||
if err := w.postBatch(ctx, c, batch, body); err != nil {
|
||||
c.log.Warnf("<yellow>worker=<cyan>%d</cyan> send failed for batch=<cyan>%s</cyan>: <cyan>%v</cyan></yellow>", w.id, batch.BatchID, err)
|
||||
c.requeueSelected(selected)
|
||||
c.waitForSendWork(ctx, pollInterval)
|
||||
c.waitForSendWork(ctx, c.jitterDuration(pollInterval))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -114,16 +114,17 @@ func (c *Client) buildNextBatch() (protocol.Batch, []dequeuedPacket) {
|
||||
if len(connections) > 1 {
|
||||
start = int(c.batchCursor.Add(1)-1) % len(connections)
|
||||
}
|
||||
maxPackets, maxBatchBytes := c.effectiveBatchLimits()
|
||||
|
||||
selected := make([]dequeuedPacket, 0, c.cfg.MaxPacketsPerBatch)
|
||||
packets := make([]protocol.Packet, 0, c.cfg.MaxPacketsPerBatch)
|
||||
selected := make([]dequeuedPacket, 0, maxPackets)
|
||||
packets := make([]protocol.Packet, 0, maxPackets)
|
||||
totalBytes := 0
|
||||
|
||||
for len(selected) < c.cfg.MaxPacketsPerBatch {
|
||||
for len(selected) < maxPackets {
|
||||
progress := false
|
||||
|
||||
for offset := range connections {
|
||||
if len(selected) >= c.cfg.MaxPacketsPerBatch {
|
||||
if len(selected) >= maxPackets {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -135,7 +136,7 @@ func (c *Client) buildNextBatch() (protocol.Batch, []dequeuedPacket) {
|
||||
}
|
||||
|
||||
packetBytes := len(item.Packet.Payload)
|
||||
if len(selected) > 0 && totalBytes+packetBytes > c.cfg.MaxBatchBytes {
|
||||
if len(selected) > 0 && totalBytes+packetBytes > maxBatchBytes {
|
||||
socksConn.RequeueFront([]*SOCKSOutboundQueueItem{item})
|
||||
continue
|
||||
}
|
||||
@@ -173,7 +174,7 @@ func (c *Client) buildPollBatch(connections []*SOCKSConnection) (protocol.Batch,
|
||||
now := time.Now()
|
||||
nowUnixMS := now.UnixMilli()
|
||||
lastUnixMS := c.lastPollUnixMS.Load()
|
||||
minInterval := time.Duration(c.cfg.IdlePollIntervalMS) * time.Millisecond
|
||||
minInterval := c.jitterDuration(time.Duration(c.cfg.IdlePollIntervalMS) * time.Millisecond)
|
||||
if lastUnixMS > 0 && nowUnixMS-lastUnixMS < minInterval.Milliseconds() {
|
||||
return protocol.Batch{}, false
|
||||
}
|
||||
@@ -188,6 +189,39 @@ func (c *Client) buildPollBatch(connections []*SOCKSConnection) (protocol.Batch,
|
||||
return batch, true
|
||||
}
|
||||
|
||||
func (c *Client) effectiveBatchLimits() (int, int) {
|
||||
maxPackets := c.cfg.MaxPacketsPerBatch
|
||||
maxBatchBytes := c.cfg.MaxBatchBytes
|
||||
if !c.cfg.HTTPBatchRandomize {
|
||||
return maxPackets, maxBatchBytes
|
||||
}
|
||||
|
||||
if c.cfg.HTTPBatchPacketsJitter > 0 && maxPackets > 1 {
|
||||
delta := randomIndex(c.cfg.HTTPBatchPacketsJitter + 1)
|
||||
if adjusted := maxPackets - delta; adjusted >= 1 {
|
||||
maxPackets = adjusted
|
||||
}
|
||||
}
|
||||
|
||||
if c.cfg.HTTPBatchBytesJitter > 0 && maxBatchBytes > c.cfg.MaxChunkSize {
|
||||
delta := randomIndex(c.cfg.HTTPBatchBytesJitter + 1)
|
||||
if adjusted := maxBatchBytes - delta; adjusted >= c.cfg.MaxChunkSize {
|
||||
maxBatchBytes = adjusted
|
||||
}
|
||||
}
|
||||
|
||||
return maxPackets, maxBatchBytes
|
||||
}
|
||||
|
||||
func (c *Client) jitterDuration(base time.Duration) time.Duration {
|
||||
if base <= 0 || c.cfg.HTTPTimingJitterMS <= 0 {
|
||||
return base
|
||||
}
|
||||
|
||||
jitter := time.Duration(randomIndex(c.cfg.HTTPTimingJitterMS+1)) * time.Millisecond
|
||||
return base + jitter
|
||||
}
|
||||
|
||||
func (c *Client) requeueSelected(selected []dequeuedPacket) {
|
||||
grouped := make(map[*SOCKSConnection][]string)
|
||||
for _, entry := range selected {
|
||||
@@ -343,15 +377,18 @@ func (c *Client) applyResponsePacket(packet protocol.Packet) error {
|
||||
protocol.PacketTypeSOCKSAuthFailed,
|
||||
protocol.PacketTypeSOCKSUpstreamUnavailable:
|
||||
message := packet.Type.String()
|
||||
|
||||
if len(packet.Payload) > 0 {
|
||||
message = string(packet.Payload)
|
||||
}
|
||||
|
||||
_ = socksConn.AckPacket(packet)
|
||||
socksConn.ConnectFailure = message
|
||||
c.log.Warnf(
|
||||
"<yellow>connect failure applied socks_id=<cyan>%d</cyan> reason=<cyan>%s</cyan></yellow>",
|
||||
socksConn.ID, message,
|
||||
)
|
||||
|
||||
socksConn.CompleteConnect(fmt.Errorf("%s", message))
|
||||
_ = socksConn.CloseLocal()
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user