mirror of
https://github.com/therealaleph/MasterHttpRelayVPN-RUST.git
synced 2026-05-17 21:24:48 +03:00
Merge pull request #173 from dazzling-no-more/feature/event-driven-drain
feat(tunnel): event-driven drain with adaptive long-poll
This commit is contained in:
+91
-6
@@ -59,6 +59,15 @@ const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);
|
||||
/// op (version mismatch). Must match `tunnel-node/src/main.rs`.
|
||||
const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP";
|
||||
|
||||
/// Empty poll round-trip latency below which we conclude the tunnel-node
|
||||
/// is *not* long-polling (legacy fixed-sleep drain instead). On a
|
||||
/// long-poll-capable server an empty poll with no upstream push either
|
||||
/// returns near `LONGPOLL_DEADLINE` (~5 s) or comes back early *with*
|
||||
/// pushed bytes — neither matches a fast empty reply. Threshold sits
|
||||
/// well above the legacy `~350 ms` drain and well below the long-poll
|
||||
/// floor, so network jitter on either side won't false-trigger.
|
||||
const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(1500);
|
||||
|
||||
/// Ports where the *server* speaks first (SMTP banner, SSH identification,
|
||||
/// POP3/IMAP greeting, FTP banner). On these, waiting for client bytes
|
||||
/// gains nothing and just adds handshake latency — skip the pre-read.
|
||||
@@ -102,6 +111,16 @@ pub struct TunnelMux {
|
||||
/// `connect_data` as unsupported. Subsequent sessions skip the
|
||||
/// optimistic path entirely and go straight to plain connect + data.
|
||||
connect_data_unsupported: Arc<AtomicBool>,
|
||||
/// Set to `true` after we observe an empty poll round-trip that
|
||||
/// returned in less than `LEGACY_DETECT_THRESHOLD` with no data.
|
||||
/// On a long-poll-capable tunnel-node, an empty poll either returns
|
||||
/// quickly *with data* (push arrived) or holds open until the
|
||||
/// server's `LONGPOLL_DEADLINE`. A fast empty reply means the server
|
||||
/// is doing the legacy fixed-sleep drain — in that mode, hammering
|
||||
/// idle sessions at the new 500 ms cadence wastes Apps Script quota
|
||||
/// for no benefit, so the loop reverts to the pre-long-poll
|
||||
/// "skip empty polls when idle" behavior.
|
||||
server_no_longpoll: Arc<AtomicBool>,
|
||||
/// Pre-read observability. Lets an operator see whether the 50 ms
|
||||
/// wait-for-first-bytes is pulling its weight:
|
||||
/// * `preread_win` — client sent bytes in time, bundled with connect
|
||||
@@ -134,6 +153,7 @@ impl TunnelMux {
|
||||
Arc::new(Self {
|
||||
tx,
|
||||
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
|
||||
server_no_longpoll: Arc::new(AtomicBool::new(false)),
|
||||
preread_win: AtomicU64::new(0),
|
||||
preread_loss: AtomicU64::new(0),
|
||||
preread_skip_port: AtomicU64::new(0),
|
||||
@@ -159,6 +179,19 @@ impl TunnelMux {
|
||||
}
|
||||
}
|
||||
|
||||
fn server_no_longpoll(&self) -> bool {
|
||||
self.server_no_longpoll.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn mark_server_no_longpoll(&self) {
|
||||
if !self.server_no_longpoll.swap(true, Ordering::Relaxed) {
|
||||
tracing::warn!(
|
||||
"tunnel-node returned an empty poll faster than {:?}; assuming legacy (no long-poll) drain — falling back to skip-empty-when-idle to avoid quota waste",
|
||||
LEGACY_DETECT_THRESHOLD,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn record_preread_win(&self, port: u16, elapsed: Duration) {
|
||||
self.preread_win.fetch_add(1, Ordering::Relaxed);
|
||||
self.preread_win_total_us
|
||||
@@ -649,14 +682,27 @@ async fn tunnel_loop(
|
||||
let mut consecutive_empty = 0u32;
|
||||
|
||||
loop {
|
||||
// Cadence depends on whether the tunnel-node is doing long-poll
|
||||
// drains. With long-poll, the server holds empty polls open up
|
||||
// to its `LONGPOLL_DEADLINE` (~5 s currently), so the client
|
||||
// can keep this read timeout short — the wait is on the wire,
|
||||
// not here. Against a *legacy* tunnel-node (no long-poll, fast
|
||||
// empty replies), the same short cadence + always-poll behavior
|
||||
// would generate continuous round-trips on idle sessions and
|
||||
// burn Apps Script quota. The `server_no_longpoll` flag detects
|
||||
// the legacy case from reply latency below and reverts to the
|
||||
// pre-long-poll cadence: long sleep on local read, skip empty
|
||||
// polls when sustained-idle.
|
||||
let legacy_mode = mux.server_no_longpoll();
|
||||
let client_data = if let Some(data) = pending_client_data.take() {
|
||||
Some(data)
|
||||
} else {
|
||||
let read_timeout = match consecutive_empty {
|
||||
0 => Duration::from_millis(20),
|
||||
1 => Duration::from_millis(80),
|
||||
2 => Duration::from_millis(200),
|
||||
_ => Duration::from_secs(30),
|
||||
let read_timeout = match (legacy_mode, consecutive_empty) {
|
||||
(_, 0) => Duration::from_millis(20),
|
||||
(_, 1) => Duration::from_millis(80),
|
||||
(_, 2) => Duration::from_millis(200),
|
||||
(false, _) => Duration::from_millis(500),
|
||||
(true, _) => Duration::from_secs(30),
|
||||
};
|
||||
|
||||
match tokio::time::timeout(read_timeout, reader.read(&mut buf)).await {
|
||||
@@ -670,13 +716,21 @@ async fn tunnel_loop(
|
||||
}
|
||||
};
|
||||
|
||||
if client_data.is_none() && consecutive_empty > 3 {
|
||||
// Legacy-server skip: against a non-long-polling tunnel-node,
|
||||
// an empty poll is wasted work — fast-empty reply, no push
|
||||
// delivery benefit. Preserve the pre-long-poll behavior of
|
||||
// going quiet after a few empties. Long-poll-capable servers
|
||||
// skip this branch and always send the empty op so the server
|
||||
// can hold it open.
|
||||
if legacy_mode && client_data.is_none() && consecutive_empty > 3 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let data = client_data.unwrap_or_default();
|
||||
let was_empty_poll = data.is_empty();
|
||||
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
let send_at = Instant::now();
|
||||
mux.send(MuxMsg::Data {
|
||||
sid: sid.to_string(),
|
||||
data,
|
||||
@@ -701,6 +755,21 @@ async fn tunnel_loop(
|
||||
}
|
||||
};
|
||||
|
||||
// Legacy-server detection: an empty-in/empty-out round trip
|
||||
// that finishes well under `LEGACY_DETECT_THRESHOLD` is
|
||||
// structurally impossible on a long-poll-capable tunnel-node
|
||||
// (the server holds the response either until data arrives or
|
||||
// until its long-poll deadline). One observation flips the
|
||||
// sticky flag for the rest of this process. Skip the check
|
||||
// once already in legacy mode — the comparison is cheap, but
|
||||
// calling `mark_server_no_longpoll` repeatedly muddies logs.
|
||||
if !legacy_mode && was_empty_poll {
|
||||
let reply_was_empty = resp.d.as_deref().map(str::is_empty).unwrap_or(true);
|
||||
if reply_was_empty && send_at.elapsed() < LEGACY_DETECT_THRESHOLD {
|
||||
mux.mark_server_no_longpoll();
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref e) = resp.e {
|
||||
tracing::debug!("tunnel error: {}", e);
|
||||
break;
|
||||
@@ -844,6 +913,7 @@ mod tests {
|
||||
let mux = Arc::new(TunnelMux {
|
||||
tx,
|
||||
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
|
||||
server_no_longpoll: Arc::new(AtomicBool::new(false)),
|
||||
preread_win: AtomicU64::new(0),
|
||||
preread_loss: AtomicU64::new(0),
|
||||
preread_skip_port: AtomicU64::new(0),
|
||||
@@ -932,6 +1002,21 @@ mod tests {
|
||||
assert!(mux.connect_data_unsupported());
|
||||
}
|
||||
|
||||
/// `server_no_longpoll` must be sticky too: once we see a legacy
|
||||
/// fast-empty reply, every subsequent session uses the legacy idle
|
||||
/// cadence (long read timeout + skip-empty) for the rest of the
|
||||
/// process. Flipping it back per-session would either thrash the
|
||||
/// cadence or double the detection cost.
|
||||
#[test]
|
||||
fn no_longpoll_cache_is_sticky() {
|
||||
let (mux, _rx) = mux_for_test();
|
||||
assert!(!mux.server_no_longpoll());
|
||||
mux.mark_server_no_longpoll();
|
||||
assert!(mux.server_no_longpoll());
|
||||
mux.mark_server_no_longpoll(); // idempotent
|
||||
assert!(mux.server_no_longpoll());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preread_counters_track_each_outcome() {
|
||||
let (mux, _rx) = mux_for_test();
|
||||
|
||||
+627
-56
@@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::{mpsc, Mutex, Notify};
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
/// Structured error code returned when the tunnel-node receives an op it
|
||||
@@ -33,6 +33,49 @@ use tokio::task::JoinSet;
|
||||
/// detect a version mismatch and gracefully fall back.
|
||||
const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP";
|
||||
|
||||
/// Drain-phase deadline when the batch contained writes or new
|
||||
/// connections. We expect upstream servers to respond fast (TLS
|
||||
/// ServerHello, HTTP response) so this is a ceiling for slow targets;
|
||||
/// `wait_for_any_drainable` returns much sooner — usually within
|
||||
/// milliseconds — once any session in the batch fires its notify.
|
||||
const ACTIVE_DRAIN_DEADLINE: Duration = Duration::from_millis(350);
|
||||
|
||||
/// After the first session in an active batch wakes the wait, we sleep
|
||||
/// briefly so neighboring sessions whose responses land just after the
|
||||
/// first one don't get reported empty and pay an extra round-trip. Only
|
||||
/// applies to active batches — for long-poll batches the wake event IS
|
||||
/// the data we want, so we deliver it immediately.
|
||||
///
|
||||
/// 30 ms is much shorter than the legacy two-pass retry (150 + 200 ms)
|
||||
/// but covers the typical case of co-located upstreams whose RTTs
|
||||
/// cluster within a few tens of ms of each other.
|
||||
const STRAGGLER_SETTLE: Duration = Duration::from_millis(30);
|
||||
|
||||
/// Drain-phase deadline when the batch is a pure poll (no writes, no new
|
||||
/// connections — clients just asking "any push data?"). Holding the
|
||||
/// response open delivers server-initiated bytes (push notifications,
|
||||
/// chat messages, server-sent events) within roughly one RTT instead of
|
||||
/// waiting for the client's next tick.
|
||||
///
|
||||
/// **This is a knob, not a constant of nature.** It trades push latency
|
||||
/// against the worst-case "client wants to send while mid-poll" delay:
|
||||
/// the tunnel-client's `tunnel_loop` is strictly serial (one in-flight
|
||||
/// op per session), so any local bytes that arrive while the poll is
|
||||
/// being held are stuck in the kernel until the poll returns.
|
||||
///
|
||||
/// * Lower (e.g. 2 s) — interactive shells / typing-burst flows feel
|
||||
/// snappier, but push-only sessions pay more empty round-trips.
|
||||
/// * Higher (e.g. 20 s) — push delivery is near-RTT and round-trip
|
||||
/// count is minimal, but a thinking pause between keystrokes can
|
||||
/// tax the next keystroke by up to the chosen value.
|
||||
///
|
||||
/// 5 s is a middle ground: a typing user pausing mid-thought pays at
|
||||
/// most a 5 s nudge before their next keystroke flows, while idle
|
||||
/// sessions still get the bulk of the long-poll benefit. Must also
|
||||
/// stay safely below the client's `BATCH_TIMEOUT` (30 s) and Apps
|
||||
/// Script's UrlFetch ceiling (~60 s).
|
||||
const LONGPOLL_DEADLINE: Duration = Duration::from_secs(5);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Session
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -42,6 +85,11 @@ struct SessionInner {
|
||||
read_buf: Mutex<Vec<u8>>,
|
||||
eof: AtomicBool,
|
||||
last_active: Mutex<Instant>,
|
||||
/// Fired by `reader_task` whenever new bytes land in `read_buf` or the
|
||||
/// upstream socket closes. `wait_for_any_drainable` listens on this
|
||||
/// to wake the drain phase as soon as any session has something to
|
||||
/// ship, replacing the old fixed-sleep heuristic.
|
||||
notify: Notify,
|
||||
}
|
||||
|
||||
struct ManagedSession {
|
||||
@@ -62,6 +110,7 @@ async fn create_session(host: &str, port: u16) -> std::io::Result<ManagedSession
|
||||
read_buf: Mutex::new(Vec::with_capacity(32768)),
|
||||
eof: AtomicBool::new(false),
|
||||
last_active: Mutex::new(Instant::now()),
|
||||
notify: Notify::new(),
|
||||
});
|
||||
|
||||
let inner_ref = inner.clone();
|
||||
@@ -74,9 +123,29 @@ async fn reader_task(mut reader: OwnedReadHalf, session: Arc<SessionInner>) {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
match reader.read(&mut buf).await {
|
||||
Ok(0) => { session.eof.store(true, Ordering::Release); break; }
|
||||
Ok(n) => { session.read_buf.lock().await.extend_from_slice(&buf[..n]); }
|
||||
Err(_) => { session.eof.store(true, Ordering::Release); break; }
|
||||
Ok(0) => {
|
||||
session.eof.store(true, Ordering::Release);
|
||||
session.notify.notify_one();
|
||||
break;
|
||||
}
|
||||
Ok(n) => {
|
||||
// Extend the buffer before notifying. The MutexGuard is
|
||||
// dropped at the end of the statement, *before* the
|
||||
// notify_one call below, so any waiter that wakes on the
|
||||
// notify and then locks read_buf can immediately observe
|
||||
// the new bytes — no torn read where the wake fires but
|
||||
// the buffer still looks empty. Notify::notify_one also
|
||||
// stores a permit if no waiter is currently registered,
|
||||
// so we never lose an edge across the spawn race in
|
||||
// wait_for_any_drainable.
|
||||
session.read_buf.lock().await.extend_from_slice(&buf[..n]);
|
||||
session.notify.notify_one();
|
||||
}
|
||||
Err(_) => {
|
||||
session.eof.store(true, Ordering::Release);
|
||||
session.notify.notify_one();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -90,6 +159,99 @@ async fn drain_now(session: &SessionInner) -> (Vec<u8>, bool) {
|
||||
(data, eof)
|
||||
}
|
||||
|
||||
/// Block until *any* of `inners` has buffered data, hits EOF, or the
|
||||
/// deadline elapses — whichever comes first. Returns immediately if any
|
||||
/// session is already drainable when called.
|
||||
///
|
||||
/// This replaces the legacy `sleep(150ms)` + `sleep(200ms)` retry pattern
|
||||
/// in batch drain. With `reader_task` firing `notify_one` on each
|
||||
/// appended chunk, a typical TLS ServerHello (~30-50 ms) wakes the wait
|
||||
/// in milliseconds instead of paying the 150 ms ceiling. For pure-poll
|
||||
/// batches the same primitive holds the response open until upstream
|
||||
/// pushes data or `LONGPOLL_DEADLINE` elapses, turning idle sessions
|
||||
/// into a true long-poll.
|
||||
///
|
||||
/// Race-safety:
|
||||
/// * `Notify::notify_one` stores a one-shot permit if no waiter is
|
||||
/// registered, so a notify that fires between the buffer check and
|
||||
/// the watcher's `.notified().await` is consumed on the next poll
|
||||
/// rather than lost.
|
||||
/// * Watchers self-filter against observable session state. A prior
|
||||
/// batch that returned via the spawn-race shortcut may leave a
|
||||
/// stale permit on the `Notify`; this batch's watcher will consume
|
||||
/// it but, finding the buffer empty and EOF unset, loop back to
|
||||
/// wait for a real notify. Without this filter, an idle long-poll
|
||||
/// batch could return in <1 ms on a stale permit and degrade push
|
||||
/// delivery to the client's idle re-poll cadence.
|
||||
async fn wait_for_any_drainable(inners: &[Arc<SessionInner>], deadline: Duration) {
|
||||
if inners.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// One watcher per session. Each loops until it observes real state
|
||||
// (eof set or buffer non-empty) before signaling — see the
|
||||
// race-safety note on `wait_for_any_drainable` for why. We abort the
|
||||
// watchers on return; the only state they hold is a notify
|
||||
// subscription, so abort is clean.
|
||||
let (tx, mut rx) = mpsc::channel::<()>(1);
|
||||
let mut watchers = Vec::with_capacity(inners.len());
|
||||
for inner in inners {
|
||||
let inner = inner.clone();
|
||||
let tx = tx.clone();
|
||||
watchers.push(tokio::spawn(async move {
|
||||
loop {
|
||||
inner.notify.notified().await;
|
||||
if inner.eof.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
if !inner.read_buf.lock().await.is_empty() {
|
||||
break;
|
||||
}
|
||||
// Stale permit (notify fired but state didn't change in
|
||||
// an observable way — e.g., bytes were already drained
|
||||
// by a prior batch). Loop back and wait for a real
|
||||
// notify, don't wake the caller.
|
||||
}
|
||||
let _ = tx.try_send(());
|
||||
}));
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
// Spawn-race shortcut: if state was already drainable when we got
|
||||
// here (bytes arrived between phase 1 and this point), return
|
||||
// without entering the select. The watcher self-filtering above
|
||||
// means the unconsumed permit we leave behind here is harmless to
|
||||
// future batches.
|
||||
let already_ready = is_any_drainable(inners).await;
|
||||
|
||||
if !already_ready {
|
||||
tokio::select! {
|
||||
_ = rx.recv() => {}
|
||||
_ = tokio::time::sleep(deadline) => {}
|
||||
}
|
||||
}
|
||||
|
||||
for w in &watchers {
|
||||
w.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/// True iff any session is currently drainable: its read buffer has
|
||||
/// bytes, or it's been marked EOF. Pulled out of `wait_for_any_drainable`
|
||||
/// so the same predicate can drive both the spawn-race shortcut and the
|
||||
/// post-wake straggler poll.
|
||||
async fn is_any_drainable(inners: &[Arc<SessionInner>]) -> bool {
|
||||
for inner in inners {
|
||||
if inner.eof.load(Ordering::Acquire) {
|
||||
return true;
|
||||
}
|
||||
if !inner.read_buf.lock().await.is_empty() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Wait for response data with drain window. Used by single-op mode.
|
||||
async fn wait_and_drain(session: &SessionInner, max_wait: Duration) -> (Vec<u8>, bool) {
|
||||
let deadline = Instant::now() + max_wait;
|
||||
@@ -251,23 +413,28 @@ async fn handle_batch(
|
||||
return (StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], resp);
|
||||
}
|
||||
|
||||
// Process all ops. For "data" ops, first write all outbound data,
|
||||
// then do a short sleep to let servers respond, then drain all.
|
||||
// This batches the network round trips on the server side too.
|
||||
|
||||
// Phase 1: process connects and writes.
|
||||
// Process all ops in two phases.
|
||||
//
|
||||
// `connect` and `connect_data` ops each establish a brand-new upstream
|
||||
// TCP connection which can take up to 10 s (create_session timeout).
|
||||
// Running them inline head-of-line-blocks every other op in the batch,
|
||||
// so we dispatch both into a JoinSet and await them concurrently below.
|
||||
// Phase 1: dispatch new connections concurrently and write outbound
|
||||
// bytes for "data" ops. We track whether any op did real work
|
||||
// (`had_writes_or_connects`) — this drives the deadline picked in
|
||||
// phase 2.
|
||||
//
|
||||
// `connect_data` is expected to dominate in practice (new client) but
|
||||
// we still hit `connect` from older clients or from server-speaks-first
|
||||
// ports that skip the pre-read — if a slow `connect` landed in the same
|
||||
// batch as data-bearing ops it could stall everyone.
|
||||
// `connect` and `connect_data` each establish a brand-new upstream TCP
|
||||
// connection (up to 10 s timeout in `create_session`). Running them
|
||||
// inline would head-of-line-block every other op in the batch, so we
|
||||
// dispatch both into a JoinSet and await them concurrently below.
|
||||
//
|
||||
// `connect_data` dominates in practice (new clients), but `connect`
|
||||
// still fires from server-speaks-first ports and from the preread
|
||||
// timeout fallback path.
|
||||
let mut results: Vec<(usize, TunnelResponse)> = Vec::with_capacity(req.ops.len());
|
||||
let mut data_ops: Vec<(usize, String)> = Vec::new(); // (index, sid) for data ops needing drain
|
||||
// True iff the batch contained any op that performed a real action
|
||||
// upstream — a new connection or a non-empty data write. A batch of
|
||||
// only empty "data" polls (and possibly closes) leaves this false and
|
||||
// qualifies for long-poll behavior in phase 2.
|
||||
let mut had_writes_or_connects = false;
|
||||
|
||||
enum NewConn {
|
||||
Connect(TunnelResponse),
|
||||
@@ -278,6 +445,7 @@ async fn handle_batch(
|
||||
for (i, op) in req.ops.iter().enumerate() {
|
||||
match op.op.as_str() {
|
||||
"connect" => {
|
||||
had_writes_or_connects = true;
|
||||
let state = state.clone();
|
||||
let host = op.host.clone();
|
||||
let port = op.port;
|
||||
@@ -286,15 +454,16 @@ async fn handle_batch(
|
||||
});
|
||||
}
|
||||
"connect_data" => {
|
||||
had_writes_or_connects = true;
|
||||
let state = state.clone();
|
||||
let host = op.host.clone();
|
||||
let port = op.port;
|
||||
let d = op.d.clone();
|
||||
new_conn_jobs.spawn(async move {
|
||||
// Drop the returned Arc<SessionInner>: phase 2 below
|
||||
// holds the sessions-map lock once for the whole batch
|
||||
// and re-looks up each sid, which is cheap. The Arc
|
||||
// return is a convenience for the single-op path only.
|
||||
// re-looks up each sid under one sessions-map lock,
|
||||
// which is cheap. The Arc return is a convenience for
|
||||
// the single-op path only.
|
||||
let r = handle_connect_data_phase1(&state, host, port, d)
|
||||
.await
|
||||
.map(|(sid, _inner)| sid);
|
||||
@@ -313,6 +482,7 @@ async fn handle_batch(
|
||||
*session.inner.last_active.lock().await = Instant::now();
|
||||
if let Some(ref data_b64) = op.d {
|
||||
if !data_b64.is_empty() {
|
||||
had_writes_or_connects = true;
|
||||
if let Ok(bytes) = B64.decode(data_b64) {
|
||||
if !bytes.is_empty() {
|
||||
let mut w = session.inner.writer.lock().await;
|
||||
@@ -355,54 +525,68 @@ async fn handle_batch(
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 2: short wait for servers to respond, then drain all data sessions
|
||||
// Phase 2: signal-driven wait for any session to have data (or hit
|
||||
// EOF), then drain everyone in a single pass. The deadline is
|
||||
// adaptive:
|
||||
// * `ACTIVE_DRAIN_DEADLINE` (~350 ms) when the batch had real work
|
||||
// — typical responses arrive in ms and `wait_for_any_drainable`
|
||||
// returns on the first notify. After the first wake we settle
|
||||
// for `STRAGGLER_SETTLE` so neighboring sessions whose replies
|
||||
// land just behind the first one don't get reported empty.
|
||||
// * `LONGPOLL_DEADLINE` when the batch is a pure poll — no writes,
|
||||
// no new connections. The response is held open until upstream
|
||||
// pushes data, turning idle sessions into a true long-poll
|
||||
// without paying per-poll latency. No straggler settle here:
|
||||
// the wake event IS the data the client wants, so deliver it
|
||||
// immediately.
|
||||
if !data_ops.is_empty() {
|
||||
// Give servers a moment to respond to the data we just wrote
|
||||
tokio::time::sleep(Duration::from_millis(150)).await;
|
||||
let deadline = if had_writes_or_connects {
|
||||
ACTIVE_DRAIN_DEADLINE
|
||||
} else {
|
||||
LONGPOLL_DEADLINE
|
||||
};
|
||||
|
||||
// First drain pass
|
||||
// Snapshot SessionInner Arcs under a single lock so we don't
|
||||
// hold the sessions-map lock across the await.
|
||||
let inners: Vec<Arc<SessionInner>> = {
|
||||
let sessions = state.sessions.lock().await;
|
||||
data_ops
|
||||
.iter()
|
||||
.filter_map(|(_, sid)| sessions.get(sid).map(|s| s.inner.clone()))
|
||||
.collect()
|
||||
};
|
||||
|
||||
let wait_start = Instant::now();
|
||||
wait_for_any_drainable(&inners, deadline).await;
|
||||
|
||||
// Straggler settle: only for active batches, only if we woke
|
||||
// early (didn't hit the deadline). Capped by the remaining
|
||||
// deadline budget — `saturating_sub` so a future refactor that
|
||||
// ever lets `elapsed > deadline` slip through can't underflow.
|
||||
if had_writes_or_connects {
|
||||
let remaining = deadline.saturating_sub(wait_start.elapsed());
|
||||
if !remaining.is_zero() {
|
||||
tokio::time::sleep(STRAGGLER_SETTLE.min(remaining)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Single drain pass for all sessions in this batch.
|
||||
{
|
||||
let sessions = state.sessions.lock().await;
|
||||
let mut need_retry = Vec::new();
|
||||
for (i, sid) in &data_ops {
|
||||
if let Some(session) = sessions.get(sid) {
|
||||
let (data, eof) = drain_now(&session.inner).await;
|
||||
if data.is_empty() && !eof {
|
||||
need_retry.push((*i, sid.clone()));
|
||||
} else {
|
||||
results.push((*i, TunnelResponse {
|
||||
sid: Some(sid.clone()),
|
||||
d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
|
||||
eof: Some(eof), e: None, code: None,
|
||||
}));
|
||||
}
|
||||
results.push((*i, TunnelResponse {
|
||||
sid: Some(sid.clone()),
|
||||
d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
|
||||
eof: Some(eof), e: None, code: None,
|
||||
}));
|
||||
} else {
|
||||
results.push((*i, TunnelResponse {
|
||||
sid: Some(sid.clone()), d: None, eof: Some(true), e: None, code: None,
|
||||
}));
|
||||
}
|
||||
}
|
||||
drop(sessions);
|
||||
|
||||
// Retry sessions that had no data yet
|
||||
if !need_retry.is_empty() {
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let sessions = state.sessions.lock().await;
|
||||
for (i, sid) in &need_retry {
|
||||
if let Some(s) = sessions.get(sid) {
|
||||
let (data, eof) = drain_now(&s.inner).await;
|
||||
results.push((*i, TunnelResponse {
|
||||
sid: Some(sid.clone()),
|
||||
d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
|
||||
eof: Some(eof), e: None, code: None,
|
||||
}));
|
||||
} else {
|
||||
results.push((*i, TunnelResponse {
|
||||
sid: Some(sid.clone()), d: None, eof: Some(true), e: None, code: None,
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up eof sessions
|
||||
@@ -799,5 +983,392 @@ mod tests {
|
||||
// Session should NOT be in the map since phase1 rejected it.
|
||||
assert!(state.sessions.lock().await.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// wait_for_any_drainable + notify wiring
|
||||
//
|
||||
// These guard the new event-driven drain. Regressions here mean the
|
||||
// batch handler either falls back to fixed sleeps (latency win lost)
|
||||
// or wedges on a missed signal (correctness lost) — both silent
|
||||
// without explicit tests.
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
/// Build a SessionInner with no reader_task, suitable for tests that
|
||||
/// drive the read_buf / eof / notify state by hand. The writer half
|
||||
/// is wired to a live loopback peer so the Mutex<OwnedWriteHalf> has
|
||||
/// a real value, but tests never touch it.
|
||||
async fn fake_inner() -> Arc<SessionInner> {
|
||||
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let accept = tokio::spawn(async move { listener.accept().await.unwrap().0 });
|
||||
let client = TcpStream::connect(addr).await.unwrap();
|
||||
let _server_side = accept.await.unwrap();
|
||||
let (_reader, writer) = client.into_split();
|
||||
|
||||
Arc::new(SessionInner {
|
||||
writer: Mutex::new(writer),
|
||||
read_buf: Mutex::new(Vec::new()),
|
||||
eof: AtomicBool::new(false),
|
||||
last_active: Mutex::new(Instant::now()),
|
||||
notify: Notify::new(),
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_for_any_drainable_returns_immediately_when_buffer_has_data() {
|
||||
let inner = fake_inner().await;
|
||||
inner.read_buf.lock().await.extend_from_slice(b"already here");
|
||||
|
||||
let t0 = Instant::now();
|
||||
wait_for_any_drainable(&[inner], Duration::from_secs(5)).await;
|
||||
assert!(
|
||||
t0.elapsed() < Duration::from_millis(100),
|
||||
"should short-circuit on pre-buffered data, took {:?}",
|
||||
t0.elapsed()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_for_any_drainable_returns_immediately_when_eof_set() {
|
||||
let inner = fake_inner().await;
|
||||
inner.eof.store(true, Ordering::Release);
|
||||
|
||||
let t0 = Instant::now();
|
||||
wait_for_any_drainable(&[inner], Duration::from_secs(5)).await;
|
||||
assert!(
|
||||
t0.elapsed() < Duration::from_millis(100),
|
||||
"should short-circuit on pre-set eof, took {:?}",
|
||||
t0.elapsed()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_for_any_drainable_returns_immediately_for_empty_list() {
|
||||
let t0 = Instant::now();
|
||||
wait_for_any_drainable(&[], Duration::from_secs(5)).await;
|
||||
assert!(
|
||||
t0.elapsed() < Duration::from_millis(50),
|
||||
"empty input should be a no-op, took {:?}",
|
||||
t0.elapsed()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_for_any_drainable_wakes_on_notify() {
|
||||
let inner = fake_inner().await;
|
||||
let signal = inner.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_millis(80)).await;
|
||||
signal.read_buf.lock().await.extend_from_slice(b"pushed");
|
||||
signal.notify.notify_one();
|
||||
});
|
||||
|
||||
let t0 = Instant::now();
|
||||
wait_for_any_drainable(&[inner], Duration::from_secs(5)).await;
|
||||
let elapsed = t0.elapsed();
|
||||
// We only assert the upper bound — wake latency under load can be
|
||||
// tens of ms but should never approach the 5 s deadline.
|
||||
assert!(
|
||||
elapsed < Duration::from_millis(800),
|
||||
"did not wake on notify within reasonable time: {:?}",
|
||||
elapsed
|
||||
);
|
||||
}
|
||||
|
||||
/// Any-of-N: when one session in a multi-session batch fires its
|
||||
/// notify, the wait returns. Regression here would mean idle
|
||||
/// neighbors block the drain for a session that has data ready.
|
||||
#[tokio::test]
|
||||
async fn wait_for_any_drainable_wakes_on_any_session_notify() {
|
||||
let a = fake_inner().await;
|
||||
let b = fake_inner().await;
|
||||
let c = fake_inner().await;
|
||||
let signal = b.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_millis(80)).await;
|
||||
signal.read_buf.lock().await.push(b'x');
|
||||
signal.notify.notify_one();
|
||||
});
|
||||
|
||||
let t0 = Instant::now();
|
||||
wait_for_any_drainable(&[a, b, c], Duration::from_secs(5)).await;
|
||||
assert!(
|
||||
t0.elapsed() < Duration::from_millis(800),
|
||||
"any-of-N wake too slow: {:?}",
|
||||
t0.elapsed()
|
||||
);
|
||||
}
|
||||
|
||||
/// Stale-permit guard: if a previous batch consumed the buffer and
|
||||
/// returned via the spawn-race shortcut without consuming the notify
|
||||
/// permit, the next batch's watcher consumes that stale permit but
|
||||
/// MUST NOT wake the caller — the buffer is empty. This regressed
|
||||
/// silently in the first version; the self-filtering watcher closes
|
||||
/// it. Without this test, an empty long-poll batch could return in
|
||||
/// <1 ms and degrade push delivery to the client's idle re-poll
|
||||
/// cadence (~500 ms).
|
||||
#[tokio::test]
|
||||
async fn wait_for_any_drainable_ignores_stale_permit() {
|
||||
let inner = fake_inner().await;
|
||||
|
||||
// Plant a permit (no waiter yet, so it's stored as a one-shot).
|
||||
inner.notify.notify_one();
|
||||
|
||||
// Buffer is empty and EOF is unset, so the only thing that
|
||||
// could wake the wait is the permit. With self-filtering the
|
||||
// watcher consumes it, sees no observable state, loops back —
|
||||
// the wait should run for the full deadline and then return.
|
||||
let deadline = Duration::from_millis(200);
|
||||
let t0 = Instant::now();
|
||||
wait_for_any_drainable(&[inner], deadline).await;
|
||||
let elapsed = t0.elapsed();
|
||||
assert!(
|
||||
elapsed >= deadline,
|
||||
"stale permit incorrectly woke the wait: {:?} < {:?}",
|
||||
elapsed,
|
||||
deadline
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_for_any_drainable_hits_deadline_when_no_events() {
|
||||
let inner = fake_inner().await;
|
||||
let deadline = Duration::from_millis(150);
|
||||
|
||||
let t0 = Instant::now();
|
||||
wait_for_any_drainable(&[inner], deadline).await;
|
||||
let elapsed = t0.elapsed();
|
||||
assert!(
|
||||
elapsed >= deadline,
|
||||
"returned before deadline: {:?} < {:?}",
|
||||
elapsed,
|
||||
deadline
|
||||
);
|
||||
assert!(
|
||||
elapsed < deadline + Duration::from_millis(300),
|
||||
"overshot deadline by too much: {:?}",
|
||||
elapsed
|
||||
);
|
||||
}
|
||||
|
||||
/// Real reader_task → notify path. If reader_task ever stops calling
|
||||
/// notify_one after an extend, the long-poll silently degrades to
|
||||
/// "wait the full deadline every time" — this catches that.
|
||||
#[tokio::test]
|
||||
async fn reader_task_notifies_on_incoming_bytes() {
|
||||
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let server = tokio::spawn(async move {
|
||||
let (mut sock, _) = listener.accept().await.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(80)).await;
|
||||
sock.write_all(b"hello").await.unwrap();
|
||||
sock.flush().await.unwrap();
|
||||
// Hold the connection so reader_task doesn't immediately EOF
|
||||
// and confuse the assertion.
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
});
|
||||
|
||||
let stream = TcpStream::connect(addr).await.unwrap();
|
||||
let (reader, writer) = stream.into_split();
|
||||
let inner = Arc::new(SessionInner {
|
||||
writer: Mutex::new(writer),
|
||||
read_buf: Mutex::new(Vec::new()),
|
||||
eof: AtomicBool::new(false),
|
||||
last_active: Mutex::new(Instant::now()),
|
||||
notify: Notify::new(),
|
||||
});
|
||||
let _reader_handle = tokio::spawn(reader_task(reader, inner.clone()));
|
||||
|
||||
let t0 = Instant::now();
|
||||
wait_for_any_drainable(&[inner.clone()], Duration::from_secs(2)).await;
|
||||
let elapsed = t0.elapsed();
|
||||
assert!(
|
||||
elapsed < Duration::from_millis(800),
|
||||
"wait did not wake on reader_task notify: {:?}",
|
||||
elapsed
|
||||
);
|
||||
assert_eq!(&inner.read_buf.lock().await[..], b"hello");
|
||||
|
||||
// The spawned server's only job is to deliver one chunk and hold
|
||||
// the connection open long enough for the assertion. abort() is
|
||||
// intentional cleanup, not a failure path.
|
||||
server.abort();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// handle_batch deadline selection (end-to-end through the actual
|
||||
// batch handler — not just wait_for_any_drainable in isolation)
|
||||
//
|
||||
// These tests guard the adaptive deadline logic: an empty-poll batch
|
||||
// must engage LONGPOLL_DEADLINE, an active batch must cap at
|
||||
// ACTIVE_DRAIN_DEADLINE + STRAGGLER_SETTLE, and `Some("")` must NOT
|
||||
// count as a write. Each was a separate review concern and would
|
||||
// regress silently without explicit coverage.
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
/// TCP server that pushes `data` exactly `delay` after accept,
|
||||
/// without reading from the client first. Simulates server-initiated
|
||||
/// push (notifications, SSE) on a real socket.
|
||||
async fn start_push_server(delay: Duration, data: Vec<u8>) -> u16 {
|
||||
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
|
||||
let port = listener.local_addr().unwrap().port();
|
||||
tokio::spawn(async move {
|
||||
if let Ok((mut sock, _)) = listener.accept().await {
|
||||
tokio::time::sleep(delay).await;
|
||||
let _ = sock.write_all(&data).await;
|
||||
let _ = sock.flush().await;
|
||||
// Hold the socket open well beyond any test's deadline
|
||||
// so reader_task doesn't EOF mid-assertion.
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
});
|
||||
port
|
||||
}
|
||||
|
||||
/// TCP server that accepts and does NOTHING — never writes, never
|
||||
/// closes. Used to test deadline behavior when there's no upstream
|
||||
/// response.
|
||||
async fn start_silent_server() -> u16 {
|
||||
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
|
||||
let port = listener.local_addr().unwrap().port();
|
||||
tokio::spawn(async move {
|
||||
if let Ok((sock, _)) = listener.accept().await {
|
||||
// Hold the socket alive past any reasonable test deadline.
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
drop(sock);
|
||||
}
|
||||
});
|
||||
port
|
||||
}
|
||||
|
||||
/// Drive `handle_batch` end-to-end and parse its JSON response into a
|
||||
/// `serde_json::Value` for assertion (TunnelResponse/BatchResponse
|
||||
/// don't derive Deserialize, and we don't want to add it just for
|
||||
/// tests).
|
||||
async fn invoke_handle_batch(state: &AppState, body: Vec<u8>) -> serde_json::Value {
|
||||
let resp = handle_batch(State(state.clone()), Bytes::from(body))
|
||||
.await
|
||||
.into_response();
|
||||
let body_bytes = axum::body::to_bytes(resp.into_body(), usize::MAX).await.unwrap();
|
||||
serde_json::from_slice(&body_bytes).unwrap()
|
||||
}
|
||||
|
||||
/// Pure-poll batch (one `data` op with no `d`) holds open and wakes
|
||||
/// when upstream pushes data. Push arrives at ~150 ms — well past
|
||||
/// any active-batch ceiling. If long-poll didn't engage we'd return
|
||||
/// at ACTIVE_DRAIN_DEADLINE (350 ms) with no data.
|
||||
#[tokio::test]
|
||||
async fn batch_pure_poll_wakes_on_push() {
|
||||
let push_port = start_push_server(
|
||||
Duration::from_millis(150),
|
||||
b"PUSHED".to_vec(),
|
||||
).await;
|
||||
let state = fresh_state();
|
||||
let connect_resp = handle_connect(&state, Some("127.0.0.1".into()), Some(push_port)).await;
|
||||
let sid = connect_resp.sid.expect("connect should succeed");
|
||||
|
||||
let body = serde_json::to_vec(&serde_json::json!({
|
||||
"k": "test-key",
|
||||
"ops": [{"op": "data", "sid": sid}],
|
||||
})).unwrap();
|
||||
|
||||
let t0 = Instant::now();
|
||||
let resp = invoke_handle_batch(&state, body).await;
|
||||
let elapsed = t0.elapsed();
|
||||
|
||||
assert!(
|
||||
elapsed >= Duration::from_millis(120),
|
||||
"returned before push could realistically arrive: {:?}",
|
||||
elapsed
|
||||
);
|
||||
assert!(
|
||||
elapsed < Duration::from_millis(700),
|
||||
"long-poll did not return promptly on push: {:?}",
|
||||
elapsed
|
||||
);
|
||||
|
||||
let r = resp["r"].as_array().expect("response must be an array");
|
||||
let d_b64 = r[0]["d"].as_str().expect("response should carry pushed bytes");
|
||||
let data = B64.decode(d_b64).unwrap();
|
||||
assert_eq!(&data[..], b"PUSHED");
|
||||
}
|
||||
|
||||
/// Active batch (write op) bounds the wait at roughly
|
||||
/// ACTIVE_DRAIN_DEADLINE + a little overhead, even when upstream
|
||||
/// doesn't respond. Upper bound proves long-poll did NOT engage.
|
||||
#[tokio::test]
|
||||
async fn batch_active_caps_at_active_deadline() {
|
||||
let silent_port = start_silent_server().await;
|
||||
let state = fresh_state();
|
||||
let connect_resp = handle_connect(&state, Some("127.0.0.1".into()), Some(silent_port)).await;
|
||||
let sid = connect_resp.sid.expect("connect should succeed");
|
||||
|
||||
let body = serde_json::to_vec(&serde_json::json!({
|
||||
"k": "test-key",
|
||||
"ops": [{"op": "data", "sid": sid, "d": B64.encode(b"PING")}],
|
||||
})).unwrap();
|
||||
|
||||
let t0 = Instant::now();
|
||||
let _resp = invoke_handle_batch(&state, body).await;
|
||||
let elapsed = t0.elapsed();
|
||||
|
||||
// No upstream response → wait full ACTIVE_DRAIN_DEADLINE (~350ms),
|
||||
// no straggler settle (we never woke). Upper bound is tight
|
||||
// enough that a regression bumping the active deadline above
|
||||
// ~600ms would fail this test instead of slipping through.
|
||||
assert!(
|
||||
elapsed >= Duration::from_millis(300),
|
||||
"active batch returned before active deadline: {:?}",
|
||||
elapsed
|
||||
);
|
||||
assert!(
|
||||
elapsed < Duration::from_millis(600),
|
||||
"active batch held longer than ACTIVE_DRAIN_DEADLINE + margin: {:?}",
|
||||
elapsed
|
||||
);
|
||||
}
|
||||
|
||||
/// `Some("")` must NOT flip `had_writes_or_connects`. If it did, the
|
||||
/// batch would return at the active deadline (350 ms) without the
|
||||
/// pushed bytes — push arrives at 600 ms here, deliberately past
|
||||
/// the active ceiling, so the only way the test gets data is if
|
||||
/// long-poll actually engaged.
|
||||
#[tokio::test]
|
||||
async fn batch_empty_string_payload_engages_long_poll() {
|
||||
let push_port = start_push_server(
|
||||
Duration::from_millis(600),
|
||||
b"DELAYED".to_vec(),
|
||||
).await;
|
||||
let state = fresh_state();
|
||||
let connect_resp = handle_connect(&state, Some("127.0.0.1".into()), Some(push_port)).await;
|
||||
let sid = connect_resp.sid.expect("connect should succeed");
|
||||
|
||||
let body = serde_json::to_vec(&serde_json::json!({
|
||||
"k": "test-key",
|
||||
"ops": [{"op": "data", "sid": sid, "d": ""}],
|
||||
})).unwrap();
|
||||
|
||||
let t0 = Instant::now();
|
||||
let resp = invoke_handle_batch(&state, body).await;
|
||||
let elapsed = t0.elapsed();
|
||||
|
||||
assert!(
|
||||
elapsed >= Duration::from_millis(550),
|
||||
"returned before push arrived (deadline likely set to active, not long-poll): {:?}",
|
||||
elapsed
|
||||
);
|
||||
assert!(
|
||||
elapsed < Duration::from_millis(1100),
|
||||
"long-poll didn't wake promptly on push: {:?}",
|
||||
elapsed
|
||||
);
|
||||
|
||||
let r = resp["r"].as_array().unwrap();
|
||||
let d_b64 = r[0]["d"].as_str()
|
||||
.expect("Some(\"\") payload should have engaged long-poll and delivered DELAYED");
|
||||
let data = B64.decode(d_b64).unwrap();
|
||||
assert_eq!(&data[..], b"DELAYED");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user