mirror of
https://github.com/therealaleph/MasterHttpRelayVPN-RUST.git
synced 2026-05-18 23:54:48 +03:00
fix(tunnel): per-deployment legacy fallback with auto-recovery (#290)
This commit is contained in:
@@ -99,6 +99,10 @@ tun2proxy = { version = "0.7", default-features = false, features = ["udpgw"] }
|
||||
[dev-dependencies]
|
||||
# Used in mitm tests to sanity-check the cert extensions we emit.
|
||||
x509-parser = "0.16"
|
||||
# `test-util` enables `tokio::test(start_paused = true)` so timing-
|
||||
# sensitive tests in `tunnel_client` (the empty-poll cadence) can
|
||||
# auto-advance virtual time instead of burning real wall-clock seconds.
|
||||
tokio = { version = "1", features = ["test-util"] }
|
||||
|
||||
[profile.release]
|
||||
panic = "abort"
|
||||
|
||||
+397
-74
@@ -78,6 +78,15 @@ const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP";
|
||||
/// floor, so network jitter on either side won't false-trigger.
|
||||
const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(1500);
|
||||
|
||||
/// How long a deployment stays in "legacy / no long-poll" mode after the
|
||||
/// last detection. Must be much longer than `LEGACY_DETECT_THRESHOLD` so a
|
||||
/// freshly-marked deployment doesn't immediately self-recover, but short
|
||||
/// enough that a redeployed / recovered tunnel-node gets re-probed without
|
||||
/// requiring a process restart. 60 s lets one stuck deployment widen its
|
||||
/// own poll cadence without poisoning the others, and self-resets so an
|
||||
/// upgraded tunnel-node returns to the long-poll fast path on its own.
|
||||
const LEGACY_RECOVER_AFTER: Duration = Duration::from_secs(60);
|
||||
|
||||
/// How long to remember a `Network is unreachable` / `No route to host`
|
||||
/// failure for a given `(host, port)`. While cached, the proxy short-circuits
|
||||
/// repeat CONNECTs with an immediate "host unreachable" reply instead of
|
||||
@@ -130,6 +139,14 @@ fn normalize_cache_host(host: &str) -> String {
|
||||
// Multiplexer
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Reply payload for ops that go through `fire_batch`. The `String` is the
|
||||
/// `script_id` of the deployment that processed the batch — needed by
|
||||
/// `tunnel_loop`'s legacy-detection and per-deployment skip-when-idle
|
||||
/// decisions, which can't reach `fire_batch`'s local `script_id` any
|
||||
/// other way. Plain `Connect` doesn't go through `fire_batch` and keeps
|
||||
/// the simpler reply type.
|
||||
type BatchedReply = oneshot::Sender<Result<(TunnelResponse, String), String>>;
|
||||
|
||||
enum MuxMsg {
|
||||
Connect {
|
||||
host: String,
|
||||
@@ -142,23 +159,23 @@ enum MuxMsg {
|
||||
// Arc so the caller can hand the buffer to the mux AND keep a ref
|
||||
// for the fallback path without an extra 64 KB copy per session.
|
||||
data: Arc<Vec<u8>>,
|
||||
reply: oneshot::Sender<Result<TunnelResponse, String>>,
|
||||
reply: BatchedReply,
|
||||
},
|
||||
Data {
|
||||
sid: String,
|
||||
data: Vec<u8>,
|
||||
reply: oneshot::Sender<Result<TunnelResponse, String>>,
|
||||
reply: BatchedReply,
|
||||
},
|
||||
UdpOpen {
|
||||
host: String,
|
||||
port: u16,
|
||||
data: Vec<u8>,
|
||||
reply: oneshot::Sender<Result<TunnelResponse, String>>,
|
||||
reply: BatchedReply,
|
||||
},
|
||||
UdpData {
|
||||
sid: String,
|
||||
data: Vec<u8>,
|
||||
reply: oneshot::Sender<Result<TunnelResponse, String>>,
|
||||
reply: BatchedReply,
|
||||
},
|
||||
Close {
|
||||
sid: String,
|
||||
@@ -171,16 +188,48 @@ pub struct TunnelMux {
|
||||
/// `connect_data` as unsupported. Subsequent sessions skip the
|
||||
/// optimistic path entirely and go straight to plain connect + data.
|
||||
connect_data_unsupported: Arc<AtomicBool>,
|
||||
/// Set to `true` after we observe an empty poll round-trip that
|
||||
/// returned in less than `LEGACY_DETECT_THRESHOLD` with no data.
|
||||
/// On a long-poll-capable tunnel-node, an empty poll either returns
|
||||
/// quickly *with data* (push arrived) or holds open until the
|
||||
/// server's `LONGPOLL_DEADLINE`. A fast empty reply means the server
|
||||
/// is doing the legacy fixed-sleep drain — in that mode, hammering
|
||||
/// idle sessions at the new 500 ms cadence wastes Apps Script quota
|
||||
/// for no benefit, so the loop reverts to the pre-long-poll
|
||||
/// "skip empty polls when idle" behavior.
|
||||
server_no_longpoll: Arc<AtomicBool>,
|
||||
/// Per-deployment legacy state: `script_id` → time it was last
|
||||
/// observed serving an empty poll faster than `LEGACY_DETECT_THRESHOLD`.
|
||||
/// Absence means "long-poll capable, or untested." Entries expire after
|
||||
/// `LEGACY_RECOVER_AFTER` so a redeployed / recovered tunnel-node
|
||||
/// rejoins the long-poll fast path without requiring a process restart.
|
||||
///
|
||||
/// Note: the per-deployment marks here do *not* drive a per-deployment
|
||||
/// poll cadence — the `tunnel_loop` cadence (read-timeout backoff and
|
||||
/// skip-empty-when-idle) is gated on the aggregate `all_legacy`,
|
||||
/// because the next op's deployment is chosen later by
|
||||
/// `next_script_id()` round-robin and the loop can't pre-select. What
|
||||
/// the per-deployment design *does* fix vs the old single AtomicBool:
|
||||
/// * one slow / legacy deployment can no longer flip the aggregate
|
||||
/// true on its own — every deployment has to be marked first;
|
||||
/// * deployments recover individually on the TTL, so an upgraded
|
||||
/// tunnel-node lifts the aggregate without needing the others to
|
||||
/// also recover or the process to restart;
|
||||
/// * the warn log fires once per (deployment, recovery cycle), so
|
||||
/// re-detection after recovery is a real signal in the logs.
|
||||
/// The cost: legacy deployments still receive fast empty polls in
|
||||
/// mixed mode (round-robin doesn't know to avoid them). Worth it to
|
||||
/// keep pushed bytes flowing through the long-poll-capable peers.
|
||||
legacy_deployments: Mutex<HashMap<String, Instant>>,
|
||||
/// Lock-free hot-path snapshot of "every known deployment is currently
|
||||
/// in legacy mode." Recomputed under `legacy_deployments`'s mutex on
|
||||
/// every mark/expire and read with a relaxed load from `tunnel_loop`.
|
||||
/// True only when this process has fast-empty observations for *all*
|
||||
/// `num_scripts` deployments simultaneously — that's when the per-
|
||||
/// session 30 s read-timeout backoff (the only setting where there is
|
||||
/// no per-deployment alternative) is still appropriate. Invariant: the
|
||||
/// atomic is always written *after* the map insert, under the same
|
||||
/// lock, so any reader that sees `true` was preceded by a complete
|
||||
/// map update.
|
||||
all_legacy: Arc<AtomicBool>,
|
||||
/// Count of *unique* configured deployment IDs at start time.
|
||||
/// Snapshotted from `fronter.script_id_list()` deduped, since the
|
||||
/// aggregate gate compares this against `legacy_deployments.len()`
|
||||
/// (a HashMap, so unique-keyed) — using the raw configured count
|
||||
/// would make the gate unreachable whenever a user lists the same
|
||||
/// script_id twice. Blacklisted-but-configured deployments still
|
||||
/// count here; see `all_servers_legacy` for why.
|
||||
num_scripts: usize,
|
||||
/// Pre-read observability. Lets an operator see whether the 50 ms
|
||||
/// wait-for-first-bytes is pulling its weight:
|
||||
/// * `preread_win` — client sent bytes in time, bundled with connect
|
||||
@@ -207,10 +256,28 @@ pub struct TunnelMux {
|
||||
|
||||
impl TunnelMux {
|
||||
pub fn start(fronter: Arc<DomainFronter>) -> Arc<Self> {
|
||||
let n = fronter.num_scripts();
|
||||
// Dedupe before snapshotting: the aggregate `all_legacy` gate
|
||||
// compares `legacy_deployments.len()` (a HashMap, so unique
|
||||
// keys) against this count, so using the raw `num_scripts()`
|
||||
// would make the gate unreachable whenever a user lists the
|
||||
// same script_id twice in config.
|
||||
let unique: std::collections::HashSet<&str> = fronter
|
||||
.script_id_list()
|
||||
.iter()
|
||||
.map(String::as_str)
|
||||
.collect();
|
||||
let unique_n = unique.len();
|
||||
let raw_n = fronter.num_scripts();
|
||||
if unique_n != raw_n {
|
||||
tracing::warn!(
|
||||
"tunnel mux: {} deployments configured but only {} unique script_id(s) — duplicate entries ignored for legacy detection",
|
||||
raw_n,
|
||||
unique_n,
|
||||
);
|
||||
}
|
||||
tracing::info!(
|
||||
"tunnel mux: {} deployment(s), {} concurrent per deployment",
|
||||
n,
|
||||
unique_n,
|
||||
CONCURRENCY_PER_DEPLOYMENT
|
||||
);
|
||||
let (tx, rx) = mpsc::channel(512);
|
||||
@@ -218,7 +285,9 @@ impl TunnelMux {
|
||||
Arc::new(Self {
|
||||
tx,
|
||||
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
|
||||
server_no_longpoll: Arc::new(AtomicBool::new(false)),
|
||||
legacy_deployments: Mutex::new(HashMap::new()),
|
||||
all_legacy: Arc::new(AtomicBool::new(false)),
|
||||
num_scripts: unique_n,
|
||||
preread_win: AtomicU64::new(0),
|
||||
preread_loss: AtomicU64::new(0),
|
||||
preread_skip_port: AtomicU64::new(0),
|
||||
@@ -248,7 +317,8 @@ impl TunnelMux {
|
||||
})
|
||||
.await;
|
||||
match reply_rx.await {
|
||||
Ok(r) => r,
|
||||
Ok(Ok((resp, _script_id))) => Ok(resp),
|
||||
Ok(Err(e)) => Err(e),
|
||||
Err(_) => Err("mux channel closed".into()),
|
||||
}
|
||||
}
|
||||
@@ -262,7 +332,8 @@ impl TunnelMux {
|
||||
})
|
||||
.await;
|
||||
match reply_rx.await {
|
||||
Ok(r) => r,
|
||||
Ok(Ok((resp, _script_id))) => Ok(resp),
|
||||
Ok(Err(e)) => Err(e),
|
||||
Err(_) => Err("mux channel closed".into()),
|
||||
}
|
||||
}
|
||||
@@ -286,15 +357,78 @@ impl TunnelMux {
|
||||
}
|
||||
}
|
||||
|
||||
fn server_no_longpoll(&self) -> bool {
|
||||
self.server_no_longpoll.load(Ordering::Relaxed)
|
||||
/// True only when *every* known deployment is currently in legacy
|
||||
/// mode. Both per-session decisions in `tunnel_loop` (the 30 s
|
||||
/// read-timeout backoff and the skip-empty-when-idle short-circuit)
|
||||
/// gate on this aggregate — they can't pick a per-deployment answer
|
||||
/// ahead of time because the next op's deployment is chosen by
|
||||
/// `next_script_id()` only when the batch fires. With one
|
||||
/// long-poll-capable peer still around, the loop must keep emitting
|
||||
/// empty polls so round-robin lands some on that peer (where the
|
||||
/// server can hold them open and deliver pushed bytes).
|
||||
///
|
||||
/// Known limitation: the comparison is against *all configured*
|
||||
/// deployments (`num_scripts`), not currently-selectable ones. A
|
||||
/// fleet where most deployments are blacklisted in `DomainFronter`
|
||||
/// (10 min cooldown) and the only selectable deployment(s) are
|
||||
/// legacy will keep the fast cadence for up to that cooldown, even
|
||||
/// though every reachable peer is legacy. Accepted because
|
||||
/// integrating the blacklist would require a hot-path query on the
|
||||
/// fronter's mutex once per `tunnel_loop` iteration; a heavily-
|
||||
/// blacklisted fleet has bigger problems than quota optimization,
|
||||
/// and the worst-case quota cost is bounded by the cooldown.
|
||||
///
|
||||
/// Hot path: lock-free relaxed load. If the cached value is `true`,
|
||||
/// double-check under the mutex with a sweep for expired entries —
|
||||
/// otherwise stale legacy marks would keep us in the slow path forever
|
||||
/// after every deployment recovers (the `mark_server_no_longpoll` sweep
|
||||
/// only fires on the next mark, which may never come).
|
||||
fn all_servers_legacy(&self) -> bool {
|
||||
if !self.all_legacy.load(Ordering::Relaxed) {
|
||||
return false;
|
||||
}
|
||||
let now = Instant::now();
|
||||
let mut deps = match self.legacy_deployments.lock() {
|
||||
Ok(g) => g,
|
||||
Err(p) => p.into_inner(),
|
||||
};
|
||||
deps.retain(|_, marked_at| now.duration_since(*marked_at) < LEGACY_RECOVER_AFTER);
|
||||
let still_all = deps.len() == self.num_scripts;
|
||||
if !still_all {
|
||||
self.all_legacy.store(false, Ordering::Relaxed);
|
||||
}
|
||||
still_all
|
||||
}
|
||||
|
||||
fn mark_server_no_longpoll(&self) {
|
||||
if !self.server_no_longpoll.swap(true, Ordering::Relaxed) {
|
||||
fn mark_server_no_longpoll(&self, script_id: &str) {
|
||||
let now = Instant::now();
|
||||
let mut deps = match self.legacy_deployments.lock() {
|
||||
Ok(g) => g,
|
||||
Err(p) => p.into_inner(),
|
||||
};
|
||||
// Inline expiry sweep: if any entry has aged past
|
||||
// LEGACY_RECOVER_AFTER, drop it before recomputing `all_legacy`.
|
||||
// Without this, an entry that should have recovered would still
|
||||
// count toward the aggregate.
|
||||
deps.retain(|_, marked_at| now.duration_since(*marked_at) < LEGACY_RECOVER_AFTER);
|
||||
let was_present = deps.contains_key(script_id);
|
||||
deps.insert(script_id.to_string(), now);
|
||||
let all = deps.len() == self.num_scripts;
|
||||
// Atomic written under the lock and *after* the map insert. Any
|
||||
// reader that observes `all_legacy = true` has seen a complete
|
||||
// map state where every deployment is marked.
|
||||
self.all_legacy.store(all, Ordering::Relaxed);
|
||||
drop(deps);
|
||||
// Only log on first-mark-for-this-cycle: after `LEGACY_RECOVER_AFTER`
|
||||
// expiry + re-detection we re-log, which is intentional — that's
|
||||
// a real signal that the deployment regressed back to legacy mode.
|
||||
if !was_present {
|
||||
let short = &script_id[..script_id.len().min(8)];
|
||||
tracing::warn!(
|
||||
"tunnel-node returned an empty poll faster than {:?}; assuming legacy (no long-poll) drain — falling back to skip-empty-when-idle to avoid quota waste",
|
||||
"tunnel-node deployment {}... returned an empty poll faster than {:?}; assuming legacy (no long-poll) drain — this deployment will skip empty polls when idle for the next {:?}",
|
||||
short,
|
||||
LEGACY_DETECT_THRESHOLD,
|
||||
LEGACY_RECOVER_AFTER,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -466,8 +600,7 @@ async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
|
||||
|
||||
// Split: plain connects go parallel, data-bearing ops get batched.
|
||||
let mut data_ops: Vec<BatchOp> = Vec::new();
|
||||
let mut data_replies: Vec<(usize, oneshot::Sender<Result<TunnelResponse, String>>)> =
|
||||
Vec::new();
|
||||
let mut data_replies: Vec<(usize, BatchedReply)> = Vec::new();
|
||||
let mut close_sids: Vec<String> = Vec::new();
|
||||
let mut batch_payload_bytes: usize = 0;
|
||||
|
||||
@@ -663,7 +796,7 @@ async fn fire_batch(
|
||||
sems: &Arc<HashMap<String, Arc<Semaphore>>>,
|
||||
fronter: &Arc<DomainFronter>,
|
||||
data_ops: Vec<BatchOp>,
|
||||
data_replies: Vec<(usize, oneshot::Sender<Result<TunnelResponse, String>>)>,
|
||||
data_replies: Vec<(usize, BatchedReply)>,
|
||||
) {
|
||||
let script_id = fronter.next_script_id();
|
||||
let sem = sems
|
||||
@@ -696,7 +829,7 @@ async fn fire_batch(
|
||||
Ok(Ok(batch_resp)) => {
|
||||
for (idx, reply) in data_replies {
|
||||
if let Some(resp) = batch_resp.r.get(idx) {
|
||||
let _ = reply.send(Ok(resp.clone()));
|
||||
let _ = reply.send(Ok((resp.clone(), script_id.clone())));
|
||||
} else {
|
||||
let _ = reply.send(Err("missing response in batch".into()));
|
||||
}
|
||||
@@ -877,7 +1010,7 @@ async fn connect_with_initial_data(
|
||||
.await;
|
||||
|
||||
let resp = match reply_rx.await {
|
||||
Ok(Ok(resp)) => resp,
|
||||
Ok(Ok((resp, _script_id))) => resp,
|
||||
Ok(Err(e)) => {
|
||||
if is_connect_data_unsupported_error_str(&e) {
|
||||
tracing::debug!("connect_data unsupported for {}:{}: {}", host, port, e);
|
||||
@@ -980,18 +1113,30 @@ async fn tunnel_loop(
|
||||
// drains. With long-poll, the server holds empty polls open up
|
||||
// to its `LONGPOLL_DEADLINE` (~5 s currently), so the client
|
||||
// can keep this read timeout short — the wait is on the wire,
|
||||
// not here. Against a *legacy* tunnel-node (no long-poll, fast
|
||||
// not here. Against *legacy* tunnel-nodes (no long-poll, fast
|
||||
// empty replies), the same short cadence + always-poll behavior
|
||||
// would generate continuous round-trips on idle sessions and
|
||||
// burn Apps Script quota. The `server_no_longpoll` flag detects
|
||||
// the legacy case from reply latency below and reverts to the
|
||||
// pre-long-poll cadence: long sleep on local read, skip empty
|
||||
// polls when sustained-idle.
|
||||
let legacy_mode = mux.server_no_longpoll();
|
||||
// burn Apps Script quota.
|
||||
//
|
||||
// Both the read timeout and the skip-empty-when-idle decision
|
||||
// are gated on `all_legacy` — i.e. *every known deployment is
|
||||
// currently legacy*. Per-deployment "skip when this script is
|
||||
// legacy" sounds appealing but is unsafe: the next op's
|
||||
// deployment is chosen by `next_script_id()` only when the
|
||||
// batch fires, so the loop can't predict where the empty poll
|
||||
// will land. Suppressing polls based on the *previous* reply's
|
||||
// script would stall remote→client data on mixed setups —
|
||||
// round-robin would never reach the long-poll-capable peer for
|
||||
// this session if every iteration short-circuits before
|
||||
// sending. Cost of the conservative gate: legacy peers see
|
||||
// some wasted empty polls when at least one peer is healthy,
|
||||
// bounded by round-robin fan-out. Worth it to keep pushed
|
||||
// bytes flowing.
|
||||
let all_legacy = mux.all_servers_legacy();
|
||||
let client_data = if let Some(data) = pending_client_data.take() {
|
||||
Some(data)
|
||||
} else {
|
||||
let read_timeout = match (legacy_mode, consecutive_empty) {
|
||||
let read_timeout = match (all_legacy, consecutive_empty) {
|
||||
(_, 0) => Duration::from_millis(20),
|
||||
(_, 1) => Duration::from_millis(80),
|
||||
(_, 2) => Duration::from_millis(200),
|
||||
@@ -1010,13 +1155,13 @@ async fn tunnel_loop(
|
||||
}
|
||||
};
|
||||
|
||||
// Legacy-server skip: against a non-long-polling tunnel-node,
|
||||
// an empty poll is wasted work — fast-empty reply, no push
|
||||
// delivery benefit. Preserve the pre-long-poll behavior of
|
||||
// going quiet after a few empties. Long-poll-capable servers
|
||||
// skip this branch and always send the empty op so the server
|
||||
// can hold it open.
|
||||
if legacy_mode && client_data.is_none() && consecutive_empty > 3 {
|
||||
// Skip empty polls only when *every* deployment is legacy. With
|
||||
// even one long-poll-capable peer, round-robin will land some
|
||||
// empty polls there where the server holds them open and can
|
||||
// deliver pushed bytes — that's the whole point of long-poll,
|
||||
// so we must keep emitting. See the `all_legacy` comment above
|
||||
// for why a per-deployment gate here would stall mixed setups.
|
||||
if all_legacy && client_data.is_none() && consecutive_empty > 3 {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1035,8 +1180,8 @@ async fn tunnel_loop(
|
||||
// Bounded-wait on reply: if the batch this op landed in is slow
|
||||
// (dead target on the tunnel-node side), don't block this session
|
||||
// forever — timeout and let it retry on the next tick.
|
||||
let resp = match tokio::time::timeout(REPLY_TIMEOUT, reply_rx).await {
|
||||
Ok(Ok(Ok(r))) => r,
|
||||
let (resp, script_id) = match tokio::time::timeout(REPLY_TIMEOUT, reply_rx).await {
|
||||
Ok(Ok(Ok((r, sid_used)))) => (r, sid_used),
|
||||
Ok(Ok(Err(e))) => {
|
||||
tracing::debug!("tunnel data error: {}", e);
|
||||
break;
|
||||
@@ -1049,18 +1194,18 @@ async fn tunnel_loop(
|
||||
}
|
||||
};
|
||||
|
||||
// Legacy-server detection: an empty-in/empty-out round trip
|
||||
// that finishes well under `LEGACY_DETECT_THRESHOLD` is
|
||||
// Per-deployment legacy detection: an empty-in/empty-out round
|
||||
// trip that finishes well under `LEGACY_DETECT_THRESHOLD` is
|
||||
// structurally impossible on a long-poll-capable tunnel-node
|
||||
// (the server holds the response either until data arrives or
|
||||
// until its long-poll deadline). One observation flips the
|
||||
// sticky flag for the rest of this process. Skip the check
|
||||
// once already in legacy mode — the comparison is cheap, but
|
||||
// calling `mark_server_no_longpoll` repeatedly muddies logs.
|
||||
if !legacy_mode && was_empty_poll {
|
||||
// until its long-poll deadline). One observation marks *this
|
||||
// specific* deployment as legacy for `LEGACY_RECOVER_AFTER`;
|
||||
// peers stay on the fast path. The aggregate `all_legacy` gate
|
||||
// only flips once *every* deployment has been so marked.
|
||||
if was_empty_poll {
|
||||
let reply_was_empty = resp.d.as_deref().map(str::is_empty).unwrap_or(true);
|
||||
if reply_was_empty && send_at.elapsed() < LEGACY_DETECT_THRESHOLD {
|
||||
mux.mark_server_no_longpoll();
|
||||
mux.mark_server_no_longpoll(&script_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1364,11 +1509,21 @@ mod tests {
|
||||
/// than wired to a real DomainFronter. Lets tests assert what messages
|
||||
/// the client would emit without needing network or apps_script.
|
||||
fn mux_for_test() -> (Arc<TunnelMux>, mpsc::Receiver<MuxMsg>) {
|
||||
mux_for_test_with(2)
|
||||
}
|
||||
|
||||
/// Build a TunnelMux for tests with a specific deployment count. The
|
||||
/// per-deployment legacy state's aggregate gate (`all_servers_legacy`)
|
||||
/// requires `legacy_deployments.len() == num_scripts`, so tests that
|
||||
/// exercise that gate need to control how many "deployments" exist.
|
||||
fn mux_for_test_with(num_scripts: usize) -> (Arc<TunnelMux>, mpsc::Receiver<MuxMsg>) {
|
||||
let (tx, rx) = mpsc::channel(16);
|
||||
let mux = Arc::new(TunnelMux {
|
||||
tx,
|
||||
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
|
||||
server_no_longpoll: Arc::new(AtomicBool::new(false)),
|
||||
legacy_deployments: Mutex::new(HashMap::new()),
|
||||
all_legacy: Arc::new(AtomicBool::new(false)),
|
||||
num_scripts,
|
||||
preread_win: AtomicU64::new(0),
|
||||
preread_loss: AtomicU64::new(0),
|
||||
preread_skip_port: AtomicU64::new(0),
|
||||
@@ -1418,14 +1573,17 @@ mod tests {
|
||||
assert_eq!(sid, "sid-under-test");
|
||||
assert_eq!(&data[..], b"CLIENTHELLO");
|
||||
// Reply with eof so tunnel_loop unwinds cleanly.
|
||||
let _ = reply.send(Ok(TunnelResponse {
|
||||
sid: Some("sid-under-test".into()),
|
||||
d: None,
|
||||
pkts: None,
|
||||
eof: Some(true),
|
||||
e: None,
|
||||
code: None,
|
||||
}));
|
||||
let _ = reply.send(Ok((
|
||||
TunnelResponse {
|
||||
sid: Some("sid-under-test".into()),
|
||||
d: None,
|
||||
pkts: None,
|
||||
eof: Some(true),
|
||||
e: None,
|
||||
code: None,
|
||||
},
|
||||
"test-script".to_string(),
|
||||
)));
|
||||
}
|
||||
other => panic!(
|
||||
"first mux message was not Data (expected replay); got {:?}",
|
||||
@@ -1445,6 +1603,81 @@ mod tests {
|
||||
.expect("tunnel_loop did not exit after eof");
|
||||
}
|
||||
|
||||
/// Regression for the mixed-mode stall: A is legacy, B is long-poll
|
||||
/// capable, the session's last reply came from A. A naive per-
|
||||
/// deployment skip (gated on the *previous* reply's `script_id`)
|
||||
/// would short-circuit every empty poll on this session — so B
|
||||
/// never gets a chance to long-poll for us, and remote→client data
|
||||
/// stalls until either the local client sends bytes or A's TTL
|
||||
/// expires. The fix gates skip-when-idle on the aggregate
|
||||
/// `all_servers_legacy()` instead, so the loop keeps emitting empty
|
||||
/// polls whenever at least one peer can still hold the request open.
|
||||
/// Replies are paced via `start_paused` time auto-advance — without
|
||||
/// it the test would take ~2 s of real wall-clock time per session.
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn tunnel_loop_keeps_polling_when_only_some_deployments_legacy() {
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let accept = tokio::spawn(async move { listener.accept().await.unwrap().0 });
|
||||
let _client = TcpStream::connect(addr).await.unwrap();
|
||||
let mut server_side = accept.await.unwrap();
|
||||
|
||||
// 2 deployments, only A marked legacy → all_servers_legacy = false.
|
||||
let (mux, mut rx) = mux_for_test_with(2);
|
||||
mux.mark_server_no_longpoll("script-A");
|
||||
assert!(!mux.all_servers_legacy());
|
||||
|
||||
let loop_handle = tokio::spawn({
|
||||
let mux = mux.clone();
|
||||
async move { tunnel_loop(&mut server_side, "sid-mixed", &mux, None).await }
|
||||
});
|
||||
|
||||
// Reply to 6 empty polls, all from A. With the regression
|
||||
// (per-deployment skip on `last_script_id == A`), the loop would
|
||||
// stop emitting at iteration 4 — `consecutive_empty > 3` plus
|
||||
// `last_was_legacy` would short-circuit the send. With the fix,
|
||||
// the aggregate gate stays false and the loop keeps polling.
|
||||
// The 60 s timeout below is paused-time, so it only "elapses"
|
||||
// if rx.recv() truly never resolves (i.e. the loop has stalled).
|
||||
for i in 0..6u32 {
|
||||
let msg = tokio::time::timeout(Duration::from_secs(60), rx.recv())
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!(
|
||||
"loop stopped emitting at iteration {} — regression: per-deployment skip-when-idle stalled session even though long-poll-capable peer was available",
|
||||
i
|
||||
))
|
||||
.expect("mux channel closed unexpectedly");
|
||||
match msg {
|
||||
MuxMsg::Data { sid, data, reply } => {
|
||||
assert_eq!(sid, "sid-mixed");
|
||||
assert!(data.is_empty(), "expected empty poll, got {} bytes", data.len());
|
||||
let last = i == 5;
|
||||
let _ = reply.send(Ok((
|
||||
TunnelResponse {
|
||||
sid: Some("sid-mixed".into()),
|
||||
d: None,
|
||||
pkts: None,
|
||||
eof: if last { Some(true) } else { None },
|
||||
e: None,
|
||||
code: None,
|
||||
},
|
||||
"script-A".to_string(),
|
||||
)));
|
||||
}
|
||||
_ => panic!(
|
||||
"iteration {}: expected Data poll, got a different MuxMsg variant",
|
||||
i
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
let _ = tokio::time::timeout(Duration::from_secs(2), loop_handle)
|
||||
.await
|
||||
.expect("tunnel_loop did not exit after eof");
|
||||
}
|
||||
|
||||
/// Once `mark_connect_data_unsupported` is called, future sessions
|
||||
/// must see the flag — no per-session repeat of the detect-and-fallback
|
||||
/// cost. If this regresses, every new flow pays an extra round trip
|
||||
@@ -1459,19 +1692,109 @@ mod tests {
|
||||
assert!(mux.connect_data_unsupported());
|
||||
}
|
||||
|
||||
/// `server_no_longpoll` must be sticky too: once we see a legacy
|
||||
/// fast-empty reply, every subsequent session uses the legacy idle
|
||||
/// cadence (long read timeout + skip-empty) for the rest of the
|
||||
/// process. Flipping it back per-session would either thrash the
|
||||
/// cadence or double the detection cost.
|
||||
/// Marking deployment A as legacy must NOT make B look legacy. This
|
||||
/// is the central guarantee of the per-deployment design: with the
|
||||
/// old global AtomicBool, one slow / legacy deployment dragged every
|
||||
/// session onto the 30 s legacy cadence even when the other 7 were
|
||||
/// long-polling fine.
|
||||
#[test]
|
||||
fn no_longpoll_cache_is_sticky() {
|
||||
let (mux, _rx) = mux_for_test();
|
||||
assert!(!mux.server_no_longpoll());
|
||||
mux.mark_server_no_longpoll();
|
||||
assert!(mux.server_no_longpoll());
|
||||
mux.mark_server_no_longpoll(); // idempotent
|
||||
assert!(mux.server_no_longpoll());
|
||||
fn legacy_state_is_per_deployment() {
|
||||
let (mux, _rx) = mux_for_test_with(2);
|
||||
mux.mark_server_no_longpoll("script-A");
|
||||
|
||||
let deps = mux.legacy_deployments.lock().unwrap();
|
||||
assert!(deps.contains_key("script-A"));
|
||||
assert!(
|
||||
!deps.contains_key("script-B"),
|
||||
"marking A must not insert an entry for B"
|
||||
);
|
||||
}
|
||||
|
||||
/// `all_servers_legacy` (the per-session 30 s read-timeout gate) flips
|
||||
/// to true *only* when every known deployment has been marked. With
|
||||
/// 2 deployments, marking one keeps the gate false; marking both
|
||||
/// flips it true.
|
||||
#[test]
|
||||
fn all_servers_legacy_requires_every_deployment() {
|
||||
let (mux, _rx) = mux_for_test_with(2);
|
||||
assert!(!mux.all_servers_legacy());
|
||||
|
||||
mux.mark_server_no_longpoll("script-A");
|
||||
assert!(
|
||||
!mux.all_servers_legacy(),
|
||||
"1 of 2 marked: aggregate must stay false"
|
||||
);
|
||||
|
||||
mux.mark_server_no_longpoll("script-B");
|
||||
assert!(
|
||||
mux.all_servers_legacy(),
|
||||
"all deployments marked: aggregate flips true"
|
||||
);
|
||||
|
||||
// Idempotent re-mark of an already-legacy deployment doesn't
|
||||
// disturb the aggregate.
|
||||
mux.mark_server_no_longpoll("script-A");
|
||||
assert!(mux.all_servers_legacy());
|
||||
}
|
||||
|
||||
/// After `LEGACY_RECOVER_AFTER`, an entry is treated as expired and
|
||||
/// the deployment rejoins the long-poll fast path. The next mark
|
||||
/// (against any deployment) sweeps stale entries before recomputing
|
||||
/// the aggregate gate, so a recovered peer doesn't keep counting
|
||||
/// toward `all_legacy`. Backdating the mark time avoids a real 60 s
|
||||
/// sleep in the test — same effect as the wall-clock moving forward.
|
||||
#[test]
|
||||
fn legacy_state_recovers_after_ttl() {
|
||||
let (mux, _rx) = mux_for_test_with(2);
|
||||
mux.mark_server_no_longpoll("script-A");
|
||||
|
||||
// Backdate A past LEGACY_RECOVER_AFTER, then mark B. B's mark
|
||||
// must trigger a sweep that drops the stale A entry.
|
||||
{
|
||||
let mut deps = mux.legacy_deployments.lock().unwrap();
|
||||
let stale = Instant::now()
|
||||
.checked_sub(LEGACY_RECOVER_AFTER + Duration::from_secs(1))
|
||||
.expect("test environment should have a non-trivial monotonic clock");
|
||||
deps.insert("script-A".to_string(), stale);
|
||||
}
|
||||
mux.mark_server_no_longpoll("script-B");
|
||||
|
||||
let deps = mux.legacy_deployments.lock().unwrap();
|
||||
assert!(
|
||||
!deps.contains_key("script-A"),
|
||||
"expired entry must be swept on the next mark — otherwise stale legacy state never clears"
|
||||
);
|
||||
assert!(deps.contains_key("script-B"));
|
||||
}
|
||||
|
||||
/// If every deployment is legacy and then time passes past
|
||||
/// `LEGACY_RECOVER_AFTER` *without any new mark*, the aggregate gate
|
||||
/// must self-correct on the next `all_servers_legacy()` call.
|
||||
/// Without the in-place sweep on read, stale legacy marks would keep
|
||||
/// the 30 s read-timeout active forever after every deployment
|
||||
/// recovers.
|
||||
#[test]
|
||||
fn all_servers_legacy_self_corrects_when_entries_expire() {
|
||||
let (mux, _rx) = mux_for_test_with(2);
|
||||
mux.mark_server_no_longpoll("script-A");
|
||||
mux.mark_server_no_longpoll("script-B");
|
||||
assert!(mux.all_servers_legacy());
|
||||
|
||||
// Backdate every entry past TTL.
|
||||
{
|
||||
let mut deps = mux.legacy_deployments.lock().unwrap();
|
||||
let stale = Instant::now()
|
||||
.checked_sub(LEGACY_RECOVER_AFTER + Duration::from_secs(1))
|
||||
.expect("monotonic clock should be far enough along");
|
||||
for (_, t) in deps.iter_mut() {
|
||||
*t = stale;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
!mux.all_servers_legacy(),
|
||||
"aggregate must self-correct when all entries expire — otherwise the 30 s read timeout sticks forever"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user