Refactor session-level ping into explicit busy/aggressive/backoff state machine

This commit is contained in:
Amin.MasterkinG
2026-04-21 11:20:31 +03:30
parent 26e51fc031
commit a144703f5d
2 changed files with 71 additions and 4 deletions
+25 -4
View File
@@ -20,6 +20,12 @@ import (
"masterhttprelayvpn/internal/logger"
)
const (
pingStateBusy int32 = iota
pingStateAggressiveIdle
pingStateBackoffIdle
)
type Client struct {
cfg config.Config
log *logger.Logger
@@ -38,6 +44,7 @@ type Client struct {
activeBatches atomic.Int64
pingInFlight atomic.Int64
idlePongStreak atomic.Int64
pingState atomic.Int32
batchCursor atomic.Uint64
}
@@ -138,7 +145,8 @@ func (c *Client) signalSendWork() {
func (c *Client) noteMeaningfulActivity(now time.Time) {
c.lastMeaningfulActivityUnixMS.Store(now.UnixMilli())
c.idlePongStreak.Store(0)
c.nextPingDueUnixMS.Store(now.Add(time.Duration(c.cfg.IdlePollIntervalMS) * time.Millisecond).UnixMilli())
c.setPingState(pingStateBusy)
c.scheduleAggressivePing(now)
}
func (c *Client) tryBeginPing(now time.Time) bool {
@@ -160,22 +168,27 @@ func (c *Client) completePingWithPong() {
warmThreshold := time.Duration(c.cfg.PingWarmThresholdMS) * time.Millisecond
if idleFor < warmThreshold {
c.idlePongStreak.Store(0)
c.nextPingDueUnixMS.Store(now.Add(time.Duration(c.cfg.IdlePollIntervalMS) * time.Millisecond).UnixMilli())
c.setPingState(pingStateAggressiveIdle)
c.scheduleAggressivePing(now)
return
}
streak := c.idlePongStreak.Add(1)
c.setPingState(pingStateBackoffIdle)
c.nextPingDueUnixMS.Store(now.Add(c.idleIntervalForStreak(streak)).UnixMilli())
return
}
c.idlePongStreak.Store(0)
c.nextPingDueUnixMS.Store(now.Add(time.Duration(c.cfg.IdlePollIntervalMS) * time.Millisecond).UnixMilli())
c.setPingState(pingStateBusy)
c.scheduleAggressivePing(now)
}
func (c *Client) failPing() {
c.pingInFlight.Store(0)
c.nextPingDueUnixMS.Store(time.Now().Add(time.Duration(c.cfg.IdlePollIntervalMS) * time.Millisecond).UnixMilli())
c.idlePongStreak.Store(0)
c.setPingState(pingStateAggressiveIdle)
c.scheduleAggressivePing(time.Now())
}
func (c *Client) idleIntervalForStreak(streak int64) time.Duration {
@@ -186,6 +199,14 @@ func (c *Client) idleIntervalForStreak(streak int64) time.Duration {
return time.Duration(interval) * time.Millisecond
}
func (c *Client) scheduleAggressivePing(now time.Time) {
c.nextPingDueUnixMS.Store(now.Add(time.Duration(c.cfg.IdlePollIntervalMS) * time.Millisecond).UnixMilli())
}
func (c *Client) setPingState(state int32) {
c.pingState.Store(state)
}
func generateClientSessionKey() string {
now := time.Now().UTC().Format("20060102T150405.000000000Z")
random := make([]byte, 16)
+46
View File
@@ -391,6 +391,21 @@ func TestIdleIntervalForStreakBacksOffWithIdlePongs(t *testing.T) {
}
}
func TestNoteMeaningfulActivitySetsBusyState(t *testing.T) {
cfg := testClientConfig()
client := New(cfg, nil)
now := time.Now()
client.noteMeaningfulActivity(now)
if got := client.pingState.Load(); got != pingStateBusy {
t.Fatalf("expected busy ping state, got %d", got)
}
if client.nextPingDueUnixMS.Load() <= now.UnixMilli() {
t.Fatal("expected next ping due to be scheduled after meaningful activity")
}
}
func TestCompletePingWithPongIncrementsStreakOnlyWithoutRealTraffic(t *testing.T) {
cfg := testClientConfig()
client := New(cfg, nil)
@@ -404,6 +419,9 @@ func TestCompletePingWithPongIncrementsStreakOnlyWithoutRealTraffic(t *testing.T
if got := client.idlePongStreak.Load(); got != 1 {
t.Fatalf("expected pong streak to increment to 1, got %d", got)
}
if got := client.pingState.Load(); got != pingStateBackoffIdle {
t.Fatalf("expected backoff idle ping state, got %d", got)
}
nextDue := client.nextPingDueUnixMS.Load()
if nextDue <= now.UnixMilli() {
t.Fatal("expected next ping due to be scheduled in the future after idle pong")
@@ -418,6 +436,9 @@ func TestCompletePingWithPongIncrementsStreakOnlyWithoutRealTraffic(t *testing.T
if got := client.idlePongStreak.Load(); got != 0 {
t.Fatalf("expected pong streak reset after real traffic, got %d", got)
}
if got := client.pingState.Load(); got != pingStateBusy {
t.Fatalf("expected busy ping state after meaningful traffic, got %d", got)
}
if client.nextPingDueUnixMS.Load() <= now.UnixMilli() {
t.Fatal("expected next ping due to be rescheduled after meaningful traffic")
}
@@ -438,6 +459,9 @@ func TestCompletePingWithPongStaysAggressiveBeforeWarmThreshold(t *testing.T) {
if got := client.idlePongStreak.Load(); got != 0 {
t.Fatalf("expected pong streak to stay at 0 before warm threshold, got %d", got)
}
if got := client.pingState.Load(); got != pingStateAggressiveIdle {
t.Fatalf("expected aggressive idle ping state before warm threshold, got %d", got)
}
nextDue := client.nextPingDueUnixMS.Load()
expectedMin := now.Add(900 * time.Millisecond).UnixMilli()
@@ -447,6 +471,28 @@ func TestCompletePingWithPongStaysAggressiveBeforeWarmThreshold(t *testing.T) {
}
}
func TestFailPingReturnsToAggressiveIdle(t *testing.T) {
cfg := testClientConfig()
client := New(cfg, nil)
now := time.Now()
client.noteMeaningfulActivity(now.Add(-10 * time.Second))
client.idlePongStreak.Store(3)
client.setPingState(pingStateBackoffIdle)
client.failPing()
if got := client.idlePongStreak.Load(); got != 0 {
t.Fatalf("expected pong streak reset after ping failure, got %d", got)
}
if got := client.pingState.Load(); got != pingStateAggressiveIdle {
t.Fatalf("expected aggressive idle ping state after failure, got %d", got)
}
if client.nextPingDueUnixMS.Load() <= time.Now().UnixMilli() {
t.Fatal("expected next ping due to be rescheduled after ping failure")
}
}
func TestInboundReorderAllowsCloseReadAndCloseWriteOnSameSequence(t *testing.T) {
cfg := testClientConfig()
client := New(cfg, nil)