feat(tunnel): event-driven drain with adaptive long-poll

This commit is contained in:
dazzling-no-more
2026-04-25 12:14:33 +04:00
parent fdc0405465
commit 1d45dba2c2
2 changed files with 718 additions and 62 deletions
+91 -6
View File
@@ -59,6 +59,15 @@ const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);
/// op (version mismatch). Must match `tunnel-node/src/main.rs`.
const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP";
/// Empty poll round-trip latency below which we conclude the tunnel-node
/// is *not* long-polling (legacy fixed-sleep drain instead). On a
/// long-poll-capable server an empty poll with no upstream push either
/// returns near `LONGPOLL_DEADLINE` (~5 s) or comes back early *with*
/// pushed bytes — neither matches a fast empty reply. Threshold sits
/// well above the legacy `~350 ms` drain and well below the long-poll
/// floor, so network jitter on either side won't false-trigger.
const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(1500);
/// Ports where the *server* speaks first (SMTP banner, SSH identification,
/// POP3/IMAP greeting, FTP banner). On these, waiting for client bytes
/// gains nothing and just adds handshake latency — skip the pre-read.
@@ -102,6 +111,16 @@ pub struct TunnelMux {
/// `connect_data` as unsupported. Subsequent sessions skip the
/// optimistic path entirely and go straight to plain connect + data.
connect_data_unsupported: Arc<AtomicBool>,
/// Set to `true` after we observe an empty poll round-trip that
/// returned in less than `LEGACY_DETECT_THRESHOLD` with no data.
/// On a long-poll-capable tunnel-node, an empty poll either returns
/// quickly *with data* (push arrived) or holds open until the
/// server's `LONGPOLL_DEADLINE`. A fast empty reply means the server
/// is doing the legacy fixed-sleep drain — in that mode, hammering
/// idle sessions at the new 500 ms cadence wastes Apps Script quota
/// for no benefit, so the loop reverts to the pre-long-poll
/// "skip empty polls when idle" behavior.
server_no_longpoll: Arc<AtomicBool>,
/// Pre-read observability. Lets an operator see whether the 50 ms
/// wait-for-first-bytes is pulling its weight:
/// * `preread_win` — client sent bytes in time, bundled with connect
@@ -134,6 +153,7 @@ impl TunnelMux {
Arc::new(Self {
tx,
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
server_no_longpoll: Arc::new(AtomicBool::new(false)),
preread_win: AtomicU64::new(0),
preread_loss: AtomicU64::new(0),
preread_skip_port: AtomicU64::new(0),
@@ -159,6 +179,19 @@ impl TunnelMux {
}
}
fn server_no_longpoll(&self) -> bool {
self.server_no_longpoll.load(Ordering::Relaxed)
}
fn mark_server_no_longpoll(&self) {
if !self.server_no_longpoll.swap(true, Ordering::Relaxed) {
tracing::warn!(
"tunnel-node returned an empty poll faster than {:?}; assuming legacy (no long-poll) drain — falling back to skip-empty-when-idle to avoid quota waste",
LEGACY_DETECT_THRESHOLD,
);
}
}
fn record_preread_win(&self, port: u16, elapsed: Duration) {
self.preread_win.fetch_add(1, Ordering::Relaxed);
self.preread_win_total_us
@@ -649,14 +682,27 @@ async fn tunnel_loop(
let mut consecutive_empty = 0u32;
loop {
// Cadence depends on whether the tunnel-node is doing long-poll
// drains. With long-poll, the server holds empty polls open up
// to its `LONGPOLL_DEADLINE` (~5 s currently), so the client
// can keep this read timeout short — the wait is on the wire,
// not here. Against a *legacy* tunnel-node (no long-poll, fast
// empty replies), the same short cadence + always-poll behavior
// would generate continuous round-trips on idle sessions and
// burn Apps Script quota. The `server_no_longpoll` flag detects
// the legacy case from reply latency below and reverts to the
// pre-long-poll cadence: long sleep on local read, skip empty
// polls when sustained-idle.
let legacy_mode = mux.server_no_longpoll();
let client_data = if let Some(data) = pending_client_data.take() {
Some(data)
} else {
let read_timeout = match consecutive_empty {
0 => Duration::from_millis(20),
1 => Duration::from_millis(80),
2 => Duration::from_millis(200),
_ => Duration::from_secs(30),
let read_timeout = match (legacy_mode, consecutive_empty) {
(_, 0) => Duration::from_millis(20),
(_, 1) => Duration::from_millis(80),
(_, 2) => Duration::from_millis(200),
(false, _) => Duration::from_millis(500),
(true, _) => Duration::from_secs(30),
};
match tokio::time::timeout(read_timeout, reader.read(&mut buf)).await {
@@ -670,13 +716,21 @@ async fn tunnel_loop(
}
};
if client_data.is_none() && consecutive_empty > 3 {
// Legacy-server skip: against a non-long-polling tunnel-node,
// an empty poll is wasted work — fast-empty reply, no push
// delivery benefit. Preserve the pre-long-poll behavior of
// going quiet after a few empties. Long-poll-capable servers
// skip this branch and always send the empty op so the server
// can hold it open.
if legacy_mode && client_data.is_none() && consecutive_empty > 3 {
continue;
}
let data = client_data.unwrap_or_default();
let was_empty_poll = data.is_empty();
let (reply_tx, reply_rx) = oneshot::channel();
let send_at = Instant::now();
mux.send(MuxMsg::Data {
sid: sid.to_string(),
data,
@@ -701,6 +755,21 @@ async fn tunnel_loop(
}
};
// Legacy-server detection: an empty-in/empty-out round trip
// that finishes well under `LEGACY_DETECT_THRESHOLD` is
// structurally impossible on a long-poll-capable tunnel-node
// (the server holds the response either until data arrives or
// until its long-poll deadline). One observation flips the
// sticky flag for the rest of this process. Skip the check
// once already in legacy mode — the comparison is cheap, but
// calling `mark_server_no_longpoll` repeatedly muddies logs.
if !legacy_mode && was_empty_poll {
let reply_was_empty = resp.d.as_deref().map(str::is_empty).unwrap_or(true);
if reply_was_empty && send_at.elapsed() < LEGACY_DETECT_THRESHOLD {
mux.mark_server_no_longpoll();
}
}
if let Some(ref e) = resp.e {
tracing::debug!("tunnel error: {}", e);
break;
@@ -844,6 +913,7 @@ mod tests {
let mux = Arc::new(TunnelMux {
tx,
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
server_no_longpoll: Arc::new(AtomicBool::new(false)),
preread_win: AtomicU64::new(0),
preread_loss: AtomicU64::new(0),
preread_skip_port: AtomicU64::new(0),
@@ -932,6 +1002,21 @@ mod tests {
assert!(mux.connect_data_unsupported());
}
/// `server_no_longpoll` must be sticky too: once we see a legacy
/// fast-empty reply, every subsequent session uses the legacy idle
/// cadence (long read timeout + skip-empty) for the rest of the
/// process. Flipping it back per-session would either thrash the
/// cadence or double the detection cost.
#[test]
fn no_longpoll_cache_is_sticky() {
let (mux, _rx) = mux_for_test();
assert!(!mux.server_no_longpoll());
mux.mark_server_no_longpoll();
assert!(mux.server_no_longpoll());
mux.mark_server_no_longpoll(); // idempotent
assert!(mux.server_no_longpoll());
}
#[test]
fn preread_counters_track_each_outcome() {
let (mux, _rx) = mux_for_test();
+627 -56
View File
@@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex, Notify};
use tokio::task::JoinSet;
/// Structured error code returned when the tunnel-node receives an op it
@@ -33,6 +33,49 @@ use tokio::task::JoinSet;
/// detect a version mismatch and gracefully fall back.
const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP";
/// Drain-phase deadline when the batch contained writes or new
/// connections. We expect upstream servers to respond fast (TLS
/// ServerHello, HTTP response) so this is a ceiling for slow targets;
/// `wait_for_any_drainable` returns much sooner — usually within
/// milliseconds — once any session in the batch fires its notify.
const ACTIVE_DRAIN_DEADLINE: Duration = Duration::from_millis(350);
/// After the first session in an active batch wakes the wait, we sleep
/// briefly so neighboring sessions whose responses land just after the
/// first one don't get reported empty and pay an extra round-trip. Only
/// applies to active batches — for long-poll batches the wake event IS
/// the data we want, so we deliver it immediately.
///
/// 30 ms is much shorter than the legacy two-pass retry (150 + 200 ms)
/// but covers the typical case of co-located upstreams whose RTTs
/// cluster within a few tens of ms of each other.
const STRAGGLER_SETTLE: Duration = Duration::from_millis(30);
/// Drain-phase deadline when the batch is a pure poll (no writes, no new
/// connections — clients just asking "any push data?"). Holding the
/// response open delivers server-initiated bytes (push notifications,
/// chat messages, server-sent events) within roughly one RTT instead of
/// waiting for the client's next tick.
///
/// **This is a knob, not a constant of nature.** It trades push latency
/// against the worst-case "client wants to send while mid-poll" delay:
/// the tunnel-client's `tunnel_loop` is strictly serial (one in-flight
/// op per session), so any local bytes that arrive while the poll is
/// being held are stuck in the kernel until the poll returns.
///
/// * Lower (e.g. 2 s) — interactive shells / typing-burst flows feel
/// snappier, but push-only sessions pay more empty round-trips.
/// * Higher (e.g. 20 s) — push delivery is near-RTT and round-trip
/// count is minimal, but a thinking pause between keystrokes can
/// tax the next keystroke by up to the chosen value.
///
/// 5 s is a middle ground: a typing user pausing mid-thought pays at
/// most a 5 s nudge before their next keystroke flows, while idle
/// sessions still get the bulk of the long-poll benefit. Must also
/// stay safely below the client's `BATCH_TIMEOUT` (30 s) and Apps
/// Script's UrlFetch ceiling (~60 s).
const LONGPOLL_DEADLINE: Duration = Duration::from_secs(5);
// ---------------------------------------------------------------------------
// Session
// ---------------------------------------------------------------------------
@@ -42,6 +85,11 @@ struct SessionInner {
read_buf: Mutex<Vec<u8>>,
eof: AtomicBool,
last_active: Mutex<Instant>,
/// Fired by `reader_task` whenever new bytes land in `read_buf` or the
/// upstream socket closes. `wait_for_any_drainable` listens on this
/// to wake the drain phase as soon as any session has something to
/// ship, replacing the old fixed-sleep heuristic.
notify: Notify,
}
struct ManagedSession {
@@ -62,6 +110,7 @@ async fn create_session(host: &str, port: u16) -> std::io::Result<ManagedSession
read_buf: Mutex::new(Vec::with_capacity(32768)),
eof: AtomicBool::new(false),
last_active: Mutex::new(Instant::now()),
notify: Notify::new(),
});
let inner_ref = inner.clone();
@@ -74,9 +123,29 @@ async fn reader_task(mut reader: OwnedReadHalf, session: Arc<SessionInner>) {
let mut buf = vec![0u8; 65536];
loop {
match reader.read(&mut buf).await {
Ok(0) => { session.eof.store(true, Ordering::Release); break; }
Ok(n) => { session.read_buf.lock().await.extend_from_slice(&buf[..n]); }
Err(_) => { session.eof.store(true, Ordering::Release); break; }
Ok(0) => {
session.eof.store(true, Ordering::Release);
session.notify.notify_one();
break;
}
Ok(n) => {
// Extend the buffer before notifying. The MutexGuard is
// dropped at the end of the statement, *before* the
// notify_one call below, so any waiter that wakes on the
// notify and then locks read_buf can immediately observe
// the new bytes — no torn read where the wake fires but
// the buffer still looks empty. Notify::notify_one also
// stores a permit if no waiter is currently registered,
// so we never lose an edge across the spawn race in
// wait_for_any_drainable.
session.read_buf.lock().await.extend_from_slice(&buf[..n]);
session.notify.notify_one();
}
Err(_) => {
session.eof.store(true, Ordering::Release);
session.notify.notify_one();
break;
}
}
}
}
@@ -90,6 +159,99 @@ async fn drain_now(session: &SessionInner) -> (Vec<u8>, bool) {
(data, eof)
}
/// Block until *any* of `inners` has buffered data, hits EOF, or the
/// deadline elapses — whichever comes first. Returns immediately if any
/// session is already drainable when called.
///
/// This replaces the legacy `sleep(150ms)` + `sleep(200ms)` retry pattern
/// in batch drain. With `reader_task` firing `notify_one` on each
/// appended chunk, a typical TLS ServerHello (~30-50 ms) wakes the wait
/// in milliseconds instead of paying the 150 ms ceiling. For pure-poll
/// batches the same primitive holds the response open until upstream
/// pushes data or `LONGPOLL_DEADLINE` elapses, turning idle sessions
/// into a true long-poll.
///
/// Race-safety:
/// * `Notify::notify_one` stores a one-shot permit if no waiter is
/// registered, so a notify that fires between the buffer check and
/// the watcher's `.notified().await` is consumed on the next poll
/// rather than lost.
/// * Watchers self-filter against observable session state. A prior
/// batch that returned via the spawn-race shortcut may leave a
/// stale permit on the `Notify`; this batch's watcher will consume
/// it but, finding the buffer empty and EOF unset, loop back to
/// wait for a real notify. Without this filter, an idle long-poll
/// batch could return in <1 ms on a stale permit and degrade push
/// delivery to the client's idle re-poll cadence.
async fn wait_for_any_drainable(inners: &[Arc<SessionInner>], deadline: Duration) {
if inners.is_empty() {
return;
}
// One watcher per session. Each loops until it observes real state
// (eof set or buffer non-empty) before signaling — see the
// race-safety note on `wait_for_any_drainable` for why. We abort the
// watchers on return; the only state they hold is a notify
// subscription, so abort is clean.
let (tx, mut rx) = mpsc::channel::<()>(1);
let mut watchers = Vec::with_capacity(inners.len());
for inner in inners {
let inner = inner.clone();
let tx = tx.clone();
watchers.push(tokio::spawn(async move {
loop {
inner.notify.notified().await;
if inner.eof.load(Ordering::Acquire) {
break;
}
if !inner.read_buf.lock().await.is_empty() {
break;
}
// Stale permit (notify fired but state didn't change in
// an observable way — e.g., bytes were already drained
// by a prior batch). Loop back and wait for a real
// notify, don't wake the caller.
}
let _ = tx.try_send(());
}));
}
drop(tx);
// Spawn-race shortcut: if state was already drainable when we got
// here (bytes arrived between phase 1 and this point), return
// without entering the select. The watcher self-filtering above
// means the unconsumed permit we leave behind here is harmless to
// future batches.
let already_ready = is_any_drainable(inners).await;
if !already_ready {
tokio::select! {
_ = rx.recv() => {}
_ = tokio::time::sleep(deadline) => {}
}
}
for w in &watchers {
w.abort();
}
}
/// True iff any session is currently drainable: its read buffer has
/// bytes, or it's been marked EOF. Pulled out of `wait_for_any_drainable`
/// so the same predicate can drive both the spawn-race shortcut and the
/// post-wake straggler poll.
async fn is_any_drainable(inners: &[Arc<SessionInner>]) -> bool {
for inner in inners {
if inner.eof.load(Ordering::Acquire) {
return true;
}
if !inner.read_buf.lock().await.is_empty() {
return true;
}
}
false
}
/// Wait for response data with drain window. Used by single-op mode.
async fn wait_and_drain(session: &SessionInner, max_wait: Duration) -> (Vec<u8>, bool) {
let deadline = Instant::now() + max_wait;
@@ -251,23 +413,28 @@ async fn handle_batch(
return (StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], resp);
}
// Process all ops. For "data" ops, first write all outbound data,
// then do a short sleep to let servers respond, then drain all.
// This batches the network round trips on the server side too.
// Phase 1: process connects and writes.
// Process all ops in two phases.
//
// `connect` and `connect_data` ops each establish a brand-new upstream
// TCP connection which can take up to 10 s (create_session timeout).
// Running them inline head-of-line-blocks every other op in the batch,
// so we dispatch both into a JoinSet and await them concurrently below.
// Phase 1: dispatch new connections concurrently and write outbound
// bytes for "data" ops. We track whether any op did real work
// (`had_writes_or_connects`) — this drives the deadline picked in
// phase 2.
//
// `connect_data` is expected to dominate in practice (new client) but
// we still hit `connect` from older clients or from server-speaks-first
// ports that skip the pre-read — if a slow `connect` landed in the same
// batch as data-bearing ops it could stall everyone.
// `connect` and `connect_data` each establish a brand-new upstream TCP
// connection (up to 10 s timeout in `create_session`). Running them
// inline would head-of-line-block every other op in the batch, so we
// dispatch both into a JoinSet and await them concurrently below.
//
// `connect_data` dominates in practice (new clients), but `connect`
// still fires from server-speaks-first ports and from the preread
// timeout fallback path.
let mut results: Vec<(usize, TunnelResponse)> = Vec::with_capacity(req.ops.len());
let mut data_ops: Vec<(usize, String)> = Vec::new(); // (index, sid) for data ops needing drain
// True iff the batch contained any op that performed a real action
// upstream — a new connection or a non-empty data write. A batch of
// only empty "data" polls (and possibly closes) leaves this false and
// qualifies for long-poll behavior in phase 2.
let mut had_writes_or_connects = false;
enum NewConn {
Connect(TunnelResponse),
@@ -278,6 +445,7 @@ async fn handle_batch(
for (i, op) in req.ops.iter().enumerate() {
match op.op.as_str() {
"connect" => {
had_writes_or_connects = true;
let state = state.clone();
let host = op.host.clone();
let port = op.port;
@@ -286,15 +454,16 @@ async fn handle_batch(
});
}
"connect_data" => {
had_writes_or_connects = true;
let state = state.clone();
let host = op.host.clone();
let port = op.port;
let d = op.d.clone();
new_conn_jobs.spawn(async move {
// Drop the returned Arc<SessionInner>: phase 2 below
// holds the sessions-map lock once for the whole batch
// and re-looks up each sid, which is cheap. The Arc
// return is a convenience for the single-op path only.
// re-looks up each sid under one sessions-map lock,
// which is cheap. The Arc return is a convenience for
// the single-op path only.
let r = handle_connect_data_phase1(&state, host, port, d)
.await
.map(|(sid, _inner)| sid);
@@ -313,6 +482,7 @@ async fn handle_batch(
*session.inner.last_active.lock().await = Instant::now();
if let Some(ref data_b64) = op.d {
if !data_b64.is_empty() {
had_writes_or_connects = true;
if let Ok(bytes) = B64.decode(data_b64) {
if !bytes.is_empty() {
let mut w = session.inner.writer.lock().await;
@@ -355,54 +525,68 @@ async fn handle_batch(
}
}
// Phase 2: short wait for servers to respond, then drain all data sessions
// Phase 2: signal-driven wait for any session to have data (or hit
// EOF), then drain everyone in a single pass. The deadline is
// adaptive:
// * `ACTIVE_DRAIN_DEADLINE` (~350 ms) when the batch had real work
// — typical responses arrive in ms and `wait_for_any_drainable`
// returns on the first notify. After the first wake we settle
// for `STRAGGLER_SETTLE` so neighboring sessions whose replies
// land just behind the first one don't get reported empty.
// * `LONGPOLL_DEADLINE` when the batch is a pure poll — no writes,
// no new connections. The response is held open until upstream
// pushes data, turning idle sessions into a true long-poll
// without paying per-poll latency. No straggler settle here:
// the wake event IS the data the client wants, so deliver it
// immediately.
if !data_ops.is_empty() {
// Give servers a moment to respond to the data we just wrote
tokio::time::sleep(Duration::from_millis(150)).await;
let deadline = if had_writes_or_connects {
ACTIVE_DRAIN_DEADLINE
} else {
LONGPOLL_DEADLINE
};
// First drain pass
// Snapshot SessionInner Arcs under a single lock so we don't
// hold the sessions-map lock across the await.
let inners: Vec<Arc<SessionInner>> = {
let sessions = state.sessions.lock().await;
data_ops
.iter()
.filter_map(|(_, sid)| sessions.get(sid).map(|s| s.inner.clone()))
.collect()
};
let wait_start = Instant::now();
wait_for_any_drainable(&inners, deadline).await;
// Straggler settle: only for active batches, only if we woke
// early (didn't hit the deadline). Capped by the remaining
// deadline budget — `saturating_sub` so a future refactor that
// ever lets `elapsed > deadline` slip through can't underflow.
if had_writes_or_connects {
let remaining = deadline.saturating_sub(wait_start.elapsed());
if !remaining.is_zero() {
tokio::time::sleep(STRAGGLER_SETTLE.min(remaining)).await;
}
}
// Single drain pass for all sessions in this batch.
{
let sessions = state.sessions.lock().await;
let mut need_retry = Vec::new();
for (i, sid) in &data_ops {
if let Some(session) = sessions.get(sid) {
let (data, eof) = drain_now(&session.inner).await;
if data.is_empty() && !eof {
need_retry.push((*i, sid.clone()));
} else {
results.push((*i, TunnelResponse {
sid: Some(sid.clone()),
d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
eof: Some(eof), e: None, code: None,
}));
}
results.push((*i, TunnelResponse {
sid: Some(sid.clone()),
d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
eof: Some(eof), e: None, code: None,
}));
} else {
results.push((*i, TunnelResponse {
sid: Some(sid.clone()), d: None, eof: Some(true), e: None, code: None,
}));
}
}
drop(sessions);
// Retry sessions that had no data yet
if !need_retry.is_empty() {
tokio::time::sleep(Duration::from_millis(200)).await;
let sessions = state.sessions.lock().await;
for (i, sid) in &need_retry {
if let Some(s) = sessions.get(sid) {
let (data, eof) = drain_now(&s.inner).await;
results.push((*i, TunnelResponse {
sid: Some(sid.clone()),
d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
eof: Some(eof), e: None, code: None,
}));
} else {
results.push((*i, TunnelResponse {
sid: Some(sid.clone()), d: None, eof: Some(true), e: None, code: None,
}));
}
}
}
}
// Clean up eof sessions
@@ -799,5 +983,392 @@ mod tests {
// Session should NOT be in the map since phase1 rejected it.
assert!(state.sessions.lock().await.is_empty());
}
}
// ---------------------------------------------------------------------
// wait_for_any_drainable + notify wiring
//
// These guard the new event-driven drain. Regressions here mean the
// batch handler either falls back to fixed sleeps (latency win lost)
// or wedges on a missed signal (correctness lost) — both silent
// without explicit tests.
// ---------------------------------------------------------------------
/// Build a SessionInner with no reader_task, suitable for tests that
/// drive the read_buf / eof / notify state by hand. The writer half
/// is wired to a live loopback peer so the Mutex<OwnedWriteHalf> has
/// a real value, but tests never touch it.
async fn fake_inner() -> Arc<SessionInner> {
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
let addr = listener.local_addr().unwrap();
let accept = tokio::spawn(async move { listener.accept().await.unwrap().0 });
let client = TcpStream::connect(addr).await.unwrap();
let _server_side = accept.await.unwrap();
let (_reader, writer) = client.into_split();
Arc::new(SessionInner {
writer: Mutex::new(writer),
read_buf: Mutex::new(Vec::new()),
eof: AtomicBool::new(false),
last_active: Mutex::new(Instant::now()),
notify: Notify::new(),
})
}
#[tokio::test]
async fn wait_for_any_drainable_returns_immediately_when_buffer_has_data() {
let inner = fake_inner().await;
inner.read_buf.lock().await.extend_from_slice(b"already here");
let t0 = Instant::now();
wait_for_any_drainable(&[inner], Duration::from_secs(5)).await;
assert!(
t0.elapsed() < Duration::from_millis(100),
"should short-circuit on pre-buffered data, took {:?}",
t0.elapsed()
);
}
#[tokio::test]
async fn wait_for_any_drainable_returns_immediately_when_eof_set() {
let inner = fake_inner().await;
inner.eof.store(true, Ordering::Release);
let t0 = Instant::now();
wait_for_any_drainable(&[inner], Duration::from_secs(5)).await;
assert!(
t0.elapsed() < Duration::from_millis(100),
"should short-circuit on pre-set eof, took {:?}",
t0.elapsed()
);
}
#[tokio::test]
async fn wait_for_any_drainable_returns_immediately_for_empty_list() {
let t0 = Instant::now();
wait_for_any_drainable(&[], Duration::from_secs(5)).await;
assert!(
t0.elapsed() < Duration::from_millis(50),
"empty input should be a no-op, took {:?}",
t0.elapsed()
);
}
#[tokio::test]
async fn wait_for_any_drainable_wakes_on_notify() {
let inner = fake_inner().await;
let signal = inner.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(80)).await;
signal.read_buf.lock().await.extend_from_slice(b"pushed");
signal.notify.notify_one();
});
let t0 = Instant::now();
wait_for_any_drainable(&[inner], Duration::from_secs(5)).await;
let elapsed = t0.elapsed();
// We only assert the upper bound — wake latency under load can be
// tens of ms but should never approach the 5 s deadline.
assert!(
elapsed < Duration::from_millis(800),
"did not wake on notify within reasonable time: {:?}",
elapsed
);
}
/// Any-of-N: when one session in a multi-session batch fires its
/// notify, the wait returns. Regression here would mean idle
/// neighbors block the drain for a session that has data ready.
#[tokio::test]
async fn wait_for_any_drainable_wakes_on_any_session_notify() {
let a = fake_inner().await;
let b = fake_inner().await;
let c = fake_inner().await;
let signal = b.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(80)).await;
signal.read_buf.lock().await.push(b'x');
signal.notify.notify_one();
});
let t0 = Instant::now();
wait_for_any_drainable(&[a, b, c], Duration::from_secs(5)).await;
assert!(
t0.elapsed() < Duration::from_millis(800),
"any-of-N wake too slow: {:?}",
t0.elapsed()
);
}
/// Stale-permit guard: if a previous batch consumed the buffer and
/// returned via the spawn-race shortcut without consuming the notify
/// permit, the next batch's watcher consumes that stale permit but
/// MUST NOT wake the caller — the buffer is empty. This regressed
/// silently in the first version; the self-filtering watcher closes
/// it. Without this test, an empty long-poll batch could return in
/// <1 ms and degrade push delivery to the client's idle re-poll
/// cadence (~500 ms).
#[tokio::test]
async fn wait_for_any_drainable_ignores_stale_permit() {
let inner = fake_inner().await;
// Plant a permit (no waiter yet, so it's stored as a one-shot).
inner.notify.notify_one();
// Buffer is empty and EOF is unset, so the only thing that
// could wake the wait is the permit. With self-filtering the
// watcher consumes it, sees no observable state, loops back —
// the wait should run for the full deadline and then return.
let deadline = Duration::from_millis(200);
let t0 = Instant::now();
wait_for_any_drainable(&[inner], deadline).await;
let elapsed = t0.elapsed();
assert!(
elapsed >= deadline,
"stale permit incorrectly woke the wait: {:?} < {:?}",
elapsed,
deadline
);
}
#[tokio::test]
async fn wait_for_any_drainable_hits_deadline_when_no_events() {
let inner = fake_inner().await;
let deadline = Duration::from_millis(150);
let t0 = Instant::now();
wait_for_any_drainable(&[inner], deadline).await;
let elapsed = t0.elapsed();
assert!(
elapsed >= deadline,
"returned before deadline: {:?} < {:?}",
elapsed,
deadline
);
assert!(
elapsed < deadline + Duration::from_millis(300),
"overshot deadline by too much: {:?}",
elapsed
);
}
/// Real reader_task → notify path. If reader_task ever stops calling
/// notify_one after an extend, the long-poll silently degrades to
/// "wait the full deadline every time" — this catches that.
#[tokio::test]
async fn reader_task_notifies_on_incoming_bytes() {
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
let (mut sock, _) = listener.accept().await.unwrap();
tokio::time::sleep(Duration::from_millis(80)).await;
sock.write_all(b"hello").await.unwrap();
sock.flush().await.unwrap();
// Hold the connection so reader_task doesn't immediately EOF
// and confuse the assertion.
tokio::time::sleep(Duration::from_secs(2)).await;
});
let stream = TcpStream::connect(addr).await.unwrap();
let (reader, writer) = stream.into_split();
let inner = Arc::new(SessionInner {
writer: Mutex::new(writer),
read_buf: Mutex::new(Vec::new()),
eof: AtomicBool::new(false),
last_active: Mutex::new(Instant::now()),
notify: Notify::new(),
});
let _reader_handle = tokio::spawn(reader_task(reader, inner.clone()));
let t0 = Instant::now();
wait_for_any_drainable(&[inner.clone()], Duration::from_secs(2)).await;
let elapsed = t0.elapsed();
assert!(
elapsed < Duration::from_millis(800),
"wait did not wake on reader_task notify: {:?}",
elapsed
);
assert_eq!(&inner.read_buf.lock().await[..], b"hello");
// The spawned server's only job is to deliver one chunk and hold
// the connection open long enough for the assertion. abort() is
// intentional cleanup, not a failure path.
server.abort();
}
// ---------------------------------------------------------------------
// handle_batch deadline selection (end-to-end through the actual
// batch handler — not just wait_for_any_drainable in isolation)
//
// These tests guard the adaptive deadline logic: an empty-poll batch
// must engage LONGPOLL_DEADLINE, an active batch must cap at
// ACTIVE_DRAIN_DEADLINE + STRAGGLER_SETTLE, and `Some("")` must NOT
// count as a write. Each was a separate review concern and would
// regress silently without explicit coverage.
// ---------------------------------------------------------------------
/// TCP server that pushes `data` exactly `delay` after accept,
/// without reading from the client first. Simulates server-initiated
/// push (notifications, SSE) on a real socket.
async fn start_push_server(delay: Duration, data: Vec<u8>) -> u16 {
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
if let Ok((mut sock, _)) = listener.accept().await {
tokio::time::sleep(delay).await;
let _ = sock.write_all(&data).await;
let _ = sock.flush().await;
// Hold the socket open well beyond any test's deadline
// so reader_task doesn't EOF mid-assertion.
tokio::time::sleep(Duration::from_secs(10)).await;
}
});
port
}
/// TCP server that accepts and does NOTHING — never writes, never
/// closes. Used to test deadline behavior when there's no upstream
/// response.
async fn start_silent_server() -> u16 {
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
if let Ok((sock, _)) = listener.accept().await {
// Hold the socket alive past any reasonable test deadline.
tokio::time::sleep(Duration::from_secs(60)).await;
drop(sock);
}
});
port
}
/// Drive `handle_batch` end-to-end and parse its JSON response into a
/// `serde_json::Value` for assertion (TunnelResponse/BatchResponse
/// don't derive Deserialize, and we don't want to add it just for
/// tests).
async fn invoke_handle_batch(state: &AppState, body: Vec<u8>) -> serde_json::Value {
let resp = handle_batch(State(state.clone()), Bytes::from(body))
.await
.into_response();
let body_bytes = axum::body::to_bytes(resp.into_body(), usize::MAX).await.unwrap();
serde_json::from_slice(&body_bytes).unwrap()
}
/// Pure-poll batch (one `data` op with no `d`) holds open and wakes
/// when upstream pushes data. Push arrives at ~150 ms — well past
/// any active-batch ceiling. If long-poll didn't engage we'd return
/// at ACTIVE_DRAIN_DEADLINE (350 ms) with no data.
#[tokio::test]
async fn batch_pure_poll_wakes_on_push() {
let push_port = start_push_server(
Duration::from_millis(150),
b"PUSHED".to_vec(),
).await;
let state = fresh_state();
let connect_resp = handle_connect(&state, Some("127.0.0.1".into()), Some(push_port)).await;
let sid = connect_resp.sid.expect("connect should succeed");
let body = serde_json::to_vec(&serde_json::json!({
"k": "test-key",
"ops": [{"op": "data", "sid": sid}],
})).unwrap();
let t0 = Instant::now();
let resp = invoke_handle_batch(&state, body).await;
let elapsed = t0.elapsed();
assert!(
elapsed >= Duration::from_millis(120),
"returned before push could realistically arrive: {:?}",
elapsed
);
assert!(
elapsed < Duration::from_millis(700),
"long-poll did not return promptly on push: {:?}",
elapsed
);
let r = resp["r"].as_array().expect("response must be an array");
let d_b64 = r[0]["d"].as_str().expect("response should carry pushed bytes");
let data = B64.decode(d_b64).unwrap();
assert_eq!(&data[..], b"PUSHED");
}
/// Active batch (write op) bounds the wait at roughly
/// ACTIVE_DRAIN_DEADLINE + a little overhead, even when upstream
/// doesn't respond. Upper bound proves long-poll did NOT engage.
#[tokio::test]
async fn batch_active_caps_at_active_deadline() {
let silent_port = start_silent_server().await;
let state = fresh_state();
let connect_resp = handle_connect(&state, Some("127.0.0.1".into()), Some(silent_port)).await;
let sid = connect_resp.sid.expect("connect should succeed");
let body = serde_json::to_vec(&serde_json::json!({
"k": "test-key",
"ops": [{"op": "data", "sid": sid, "d": B64.encode(b"PING")}],
})).unwrap();
let t0 = Instant::now();
let _resp = invoke_handle_batch(&state, body).await;
let elapsed = t0.elapsed();
// No upstream response → wait full ACTIVE_DRAIN_DEADLINE (~350ms),
// no straggler settle (we never woke). Upper bound is tight
// enough that a regression bumping the active deadline above
// ~600ms would fail this test instead of slipping through.
assert!(
elapsed >= Duration::from_millis(300),
"active batch returned before active deadline: {:?}",
elapsed
);
assert!(
elapsed < Duration::from_millis(600),
"active batch held longer than ACTIVE_DRAIN_DEADLINE + margin: {:?}",
elapsed
);
}
/// `Some("")` must NOT flip `had_writes_or_connects`. If it did, the
/// batch would return at the active deadline (350 ms) without the
/// pushed bytes — push arrives at 600 ms here, deliberately past
/// the active ceiling, so the only way the test gets data is if
/// long-poll actually engaged.
#[tokio::test]
async fn batch_empty_string_payload_engages_long_poll() {
let push_port = start_push_server(
Duration::from_millis(600),
b"DELAYED".to_vec(),
).await;
let state = fresh_state();
let connect_resp = handle_connect(&state, Some("127.0.0.1".into()), Some(push_port)).await;
let sid = connect_resp.sid.expect("connect should succeed");
let body = serde_json::to_vec(&serde_json::json!({
"k": "test-key",
"ops": [{"op": "data", "sid": sid, "d": ""}],
})).unwrap();
let t0 = Instant::now();
let resp = invoke_handle_batch(&state, body).await;
let elapsed = t0.elapsed();
assert!(
elapsed >= Duration::from_millis(550),
"returned before push arrived (deadline likely set to active, not long-poll): {:?}",
elapsed
);
assert!(
elapsed < Duration::from_millis(1100),
"long-poll didn't wake promptly on push: {:?}",
elapsed
);
let r = resp["r"].as_array().unwrap();
let d_b64 = r[0]["d"].as_str()
.expect("Some(\"\") payload should have engaged long-poll and delivered DELAYED");
let data = B64.decode(d_b64).unwrap();
assert_eq!(&data[..], b"DELAYED");
}
}