perf: negative-cache unreachable destinations and grow startup pre-warm (#280)

This commit is contained in:
dazzling-no-more
2026-04-27 00:03:46 +04:00
committed by GitHub
parent 08efbc571d
commit fa4e0fcea9
2 changed files with 287 additions and 2 deletions
+38 -1
View File
@@ -308,9 +308,17 @@ impl ProxyServer {
// doesn't pay a fresh TLS handshake to Google edge. Best-effort; // doesn't pay a fresh TLS handshake to Google edge. Best-effort;
// failures are logged and ignored. Skipped in `google_only` — there // failures are logged and ignored. Skipped in `google_only` — there
// is no fronter to warm. // is no fronter to warm.
//
// Sized to roughly match a browser's parallel-connection burst at
// startup. The previous fixed `3` was fine for a single deployment
// but left requests 4-10 of the opening burst paying a cold TLS
// handshake each (~300ms). Scaling with deployment count gives
// multi-account configs a proportionally warmer pool, capped so
// single-deployment users don't hammer Google edge unnecessarily.
if let Some(warm_fronter) = self.fronter.clone() { if let Some(warm_fronter) = self.fronter.clone() {
let n = warm_fronter.num_scripts().clamp(6, 16);
tokio::spawn(async move { tokio::spawn(async move {
warm_fronter.warm(3).await; warm_fronter.warm(n).await;
}); });
} }
@@ -503,6 +511,20 @@ async fn handle_http_client(
if method.eq_ignore_ascii_case("CONNECT") { if method.eq_ignore_ascii_case("CONNECT") {
let (host, port) = parse_host_port(&target); let (host, port) = parse_host_port(&target);
// Mirror the SOCKS5 short-circuit: if the tunnel-node just failed
// this (host, port) with unreachable, return 502 immediately rather
// than acknowledging the CONNECT and blowing tunnel quota on a
// guaranteed retry. See `TunnelMux::is_unreachable` for context.
if let Some(ref mux) = tunnel_mux {
if mux.is_unreachable(&host, port) {
tracing::info!("CONNECT {}:{} (negative-cached, refusing)", host, port);
let _ = sock
.write_all(b"HTTP/1.1 502 Bad Gateway\r\nContent-Length: 0\r\nConnection: close\r\n\r\n")
.await;
let _ = sock.flush().await;
return Ok(());
}
}
sock.write_all(b"HTTP/1.1 200 Connection Established\r\n\r\n") sock.write_all(b"HTTP/1.1 200 Connection Established\r\n\r\n")
.await?; .await?;
sock.flush().await?; sock.flush().await?;
@@ -600,6 +622,21 @@ async fn handle_socks5_client(
return handle_socks5_udp_associate(sock, rewrite_ctx, tunnel_mux).await; return handle_socks5_udp_associate(sock, rewrite_ctx, tunnel_mux).await;
} }
// Negative-cache short-circuit: if the tunnel-node just failed to reach
// this exact (host, port) with `Network is unreachable` / `No route to
// host`, reply 0x04 (Host unreachable) immediately. Saves a 1.52s tunnel
// round-trip on guaranteed-failing targets — the IPv6 probe retry loop
// is the main offender on devices without IPv6.
if let Some(ref mux) = tunnel_mux {
if mux.is_unreachable(&host, port) {
tracing::info!("SOCKS5 CONNECT -> {}:{} (negative-cached, refusing)", host, port);
sock.write_all(&[0x05, 0x04, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
.await?;
sock.flush().await?;
return Ok(());
}
}
tracing::info!("SOCKS5 CONNECT -> {}:{}", host, port); tracing::info!("SOCKS5 CONNECT -> {}:{}", host, port);
// Success reply with zeroed BND. // Success reply with zeroed BND.
+249 -1
View File
@@ -14,7 +14,7 @@ use std::collections::HashMap;
// reason; reuse it here. `AtomicBool` works fine in std on every target. // reason; reuse it here. `AtomicBool` works fine in std on every target.
use portable_atomic::AtomicU64; use portable_atomic::AtomicU64;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use base64::engine::general_purpose::STANDARD as B64; use base64::engine::general_purpose::STANDARD as B64;
@@ -78,6 +78,19 @@ const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP";
/// floor, so network jitter on either side won't false-trigger. /// floor, so network jitter on either side won't false-trigger.
const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(1500); const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(1500);
/// How long to remember a `Network is unreachable` / `No route to host`
/// failure for a given `(host, port)`. While cached, the proxy short-circuits
/// repeat CONNECTs with an immediate "host unreachable" reply instead of
/// burning a 1.52s tunnel batch round-trip on a target that just failed.
/// Real motivator: IPv6-only probe hostnames (e.g. `ds6.probe.*`) on devices
/// without IPv6 — the OS retries the probe every ~1.5s for 10s+, generating
/// 510 wasted tunnel sessions per probe.
const UNREACHABLE_CACHE_TTL: Duration = Duration::from_secs(30);
/// Hard cap on negative-cache size. Browsing pulls in dozens of distinct
/// hosts; we don't want a runaway map. Pruned opportunistically on insert.
const UNREACHABLE_CACHE_MAX: usize = 256;
/// Ports where the *server* speaks first (SMTP banner, SSH identification, /// Ports where the *server* speaks first (SMTP banner, SSH identification,
/// POP3/IMAP greeting, FTP banner). On these, waiting for client bytes /// POP3/IMAP greeting, FTP banner). On these, waiting for client bytes
/// gains nothing and just adds handshake latency — skip the pre-read. /// gains nothing and just adds handshake latency — skip the pre-read.
@@ -87,6 +100,32 @@ fn is_server_speaks_first(port: u16) -> bool {
matches!(port, 21 | 22 | 25 | 80 | 110 | 143 | 587) matches!(port, 21 | 22 | 25 | 80 | 110 | 143 | 587)
} }
/// Recognize the tunnel-node's connect-error strings that mean
/// "this destination is fundamentally unreachable from the tunnel-node's
/// network right now" — distinct from refused/reset/timeout, which can be
/// transient. These come through as the inner `e` of a `TunnelResponse`
/// after the tunnel-node's std::io::Error is stringified, so we match on
/// substrings rather than `ErrorKind`. Linux: errno 101 (ENETUNREACH),
/// errno 113 (EHOSTUNREACH). Format varies a bit across libc/Tokio
/// versions, so cover both the human text and the os-error tag.
fn is_unreachable_error_str(s: &str) -> bool {
let lc = s.to_ascii_lowercase();
lc.contains("network is unreachable")
|| lc.contains("no route to host")
|| lc.contains("os error 101")
|| lc.contains("os error 113")
}
/// Canonicalize a host string for use as a negative-cache key. DNS names
/// are case-insensitive and may carry a trailing root-label dot, so
/// `Example.COM:443`, `example.com:443`, and `example.com.:443` are all the
/// same destination. IPv4 / IPv6 literals are unaffected — IPv4 has no
/// letters, and `Ipv6Addr::to_string()` already emits lowercase.
fn normalize_cache_host(host: &str) -> String {
let trimmed = host.strip_suffix('.').unwrap_or(host);
trimmed.to_ascii_lowercase()
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Multiplexer // Multiplexer
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -159,6 +198,11 @@ pub struct TunnelMux {
/// Separate monotonic counter used only to trigger the summary log /// Separate monotonic counter used only to trigger the summary log
/// (avoids a race where two threads both see `total % 100 == 0`). /// (avoids a race where two threads both see `total % 100 == 0`).
preread_total_events: AtomicU64, preread_total_events: AtomicU64,
/// Short-lived negative cache for targets the tunnel-node reported as
/// unreachable (`Network is unreachable` / `No route to host`). Keyed by
/// `(host, port)`, value is the expiry instant. Plain Mutex<HashMap> is
/// fine: it's touched once per CONNECT (cheap) and once per failure.
unreachable_cache: Mutex<HashMap<(String, u16), Instant>>,
} }
impl TunnelMux { impl TunnelMux {
@@ -181,6 +225,7 @@ impl TunnelMux {
preread_skip_unsupported: AtomicU64::new(0), preread_skip_unsupported: AtomicU64::new(0),
preread_win_total_us: AtomicU64::new(0), preread_win_total_us: AtomicU64::new(0),
preread_total_events: AtomicU64::new(0), preread_total_events: AtomicU64::new(0),
unreachable_cache: Mutex::new(HashMap::new()),
}) })
} }
@@ -254,6 +299,71 @@ impl TunnelMux {
} }
} }
/// Returns true if `(host, port)` has a non-expired unreachable entry.
/// The proxy front-end uses this to skip the tunnel and reply
/// "host unreachable" immediately on follow-up CONNECTs.
pub fn is_unreachable(&self, host: &str, port: u16) -> bool {
let now = Instant::now();
let mut cache = match self.unreachable_cache.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
let key = (normalize_cache_host(host), port);
match cache.get(&key) {
Some(expiry) if *expiry > now => true,
Some(_) => {
cache.remove(&key);
false
}
None => false,
}
}
/// If `err` looks like a network-unreachable / no-route-to-host error
/// from the tunnel-node, remember the target for `UNREACHABLE_CACHE_TTL`.
/// No-op for any other error (timeouts, refused, EOF, etc.) — those can
/// be transient and we don't want to lock out a host on a flaky moment.
fn record_unreachable_if_match(&self, host: &str, port: u16, err: &str) {
if !is_unreachable_error_str(err) {
return;
}
let mut cache = match self.unreachable_cache.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
// Cap enforcement is two-stage: first drop anything already expired,
// then if we're STILL at/above the cap (i.e. an unbounded burst of
// unique unreachable hosts within the TTL), evict the entry that
// would expire soonest. This bounds the map size at all times — a
// pure `retain` on expiry alone would let the map grow unbounded
// until the first entry's TTL elapses.
if cache.len() >= UNREACHABLE_CACHE_MAX {
let now = Instant::now();
cache.retain(|_, expiry| *expiry > now);
while cache.len() >= UNREACHABLE_CACHE_MAX {
let victim = cache
.iter()
.min_by_key(|(_, expiry)| **expiry)
.map(|(k, _)| k.clone());
match victim {
Some(k) => {
cache.remove(&k);
}
None => break,
}
}
}
let key = (normalize_cache_host(host), port);
cache.insert(key, Instant::now() + UNREACHABLE_CACHE_TTL);
tracing::debug!(
"negative-cached {}:{} for {:?} ({})",
host,
port,
UNREACHABLE_CACHE_TTL,
err
);
}
fn record_preread_win(&self, port: u16, elapsed: Duration) { fn record_preread_win(&self, port: u16, elapsed: Duration) {
self.preread_win.fetch_add(1, Ordering::Relaxed); self.preread_win.fetch_add(1, Ordering::Relaxed);
self.preread_win_total_us self.preread_win_total_us
@@ -723,6 +833,11 @@ async fn connect_plain(host: &str, port: u16, mux: &Arc<TunnelMux>) -> std::io::
Ok(Ok(resp)) => { Ok(Ok(resp)) => {
if let Some(ref e) = resp.e { if let Some(ref e) = resp.e {
tracing::error!("tunnel connect error for {}:{}: {}", host, port, e); tracing::error!("tunnel connect error for {}:{}: {}", host, port, e);
// Only cache here: `resp.e` is the tunnel-node's own connect()
// result against the target. The outer `Ok(Err(_))` arm below
// is a transport-level failure (relay → Apps Script → tunnel-
// node never reached) and tells us nothing about the target.
mux.record_unreachable_if_match(host, port, e);
return Err(std::io::Error::new( return Err(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused, std::io::ErrorKind::ConnectionRefused,
e.clone(), e.clone(),
@@ -769,6 +884,9 @@ async fn connect_with_initial_data(
return Ok(ConnectDataOutcome::Unsupported); return Ok(ConnectDataOutcome::Unsupported);
} }
tracing::error!("tunnel connect_data error for {}:{}: {}", host, port, e); tracing::error!("tunnel connect_data error for {}:{}: {}", host, port, e);
// Outer transport failure (relay/Apps Script never reached the
// tunnel-node). Don't poison the destination cache from here —
// see `connect_plain` for the same reasoning.
return Err(std::io::Error::new( return Err(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused, std::io::ErrorKind::ConnectionRefused,
e, e,
@@ -794,6 +912,8 @@ async fn connect_with_initial_data(
if let Some(ref e) = resp.e { if let Some(ref e) = resp.e {
tracing::error!("tunnel connect_data error for {}:{}: {}", host, port, e); tracing::error!("tunnel connect_data error for {}:{}: {}", host, port, e);
// `resp.e` is the tunnel-node's own connect result — cache it.
mux.record_unreachable_if_match(host, port, e);
return Err(std::io::Error::new( return Err(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused, std::io::ErrorKind::ConnectionRefused,
e.clone(), e.clone(),
@@ -1095,6 +1215,133 @@ mod tests {
))); )));
} }
#[test]
fn unreachable_error_str_matches_expected_variants() {
assert!(is_unreachable_error_str(
"connect failed: Network is unreachable (os error 101)"
));
assert!(is_unreachable_error_str("No route to host"));
assert!(is_unreachable_error_str("os error 113"));
// Case-insensitive.
assert!(is_unreachable_error_str(
"CONNECT FAILED: NETWORK IS UNREACHABLE"
));
}
#[test]
fn unreachable_error_str_rejects_unrelated() {
assert!(!is_unreachable_error_str("connection refused"));
assert!(!is_unreachable_error_str("connect timed out"));
assert!(!is_unreachable_error_str("connection reset by peer"));
assert!(!is_unreachable_error_str(""));
}
#[test]
fn negative_cache_records_and_short_circuits() {
let (mux, _rx) = mux_for_test();
// Initially nothing is cached.
assert!(!mux.is_unreachable("ds6.probe.example", 443));
// Record a matching error.
mux.record_unreachable_if_match(
"ds6.probe.example",
443,
"connect failed: Network is unreachable (os error 101)",
);
assert!(mux.is_unreachable("ds6.probe.example", 443));
// A different port for the same host is its own entry.
assert!(!mux.is_unreachable("ds6.probe.example", 80));
}
#[test]
fn negative_cache_ignores_non_unreachable_errors() {
let (mux, _rx) = mux_for_test();
mux.record_unreachable_if_match(
"example.com",
443,
"connect failed: connection refused",
);
assert!(!mux.is_unreachable("example.com", 443));
}
#[test]
fn negative_cache_normalizes_host_keys() {
let (mux, _rx) = mux_for_test();
// Cache under one casing/format...
mux.record_unreachable_if_match(
"Example.COM.",
443,
"Network is unreachable (os error 101)",
);
// ...and look up under several equivalent forms.
assert!(mux.is_unreachable("example.com", 443));
assert!(mux.is_unreachable("EXAMPLE.com", 443));
assert!(mux.is_unreachable("example.com.", 443));
// Different host should still miss.
assert!(!mux.is_unreachable("other.com", 443));
}
/// Outer `Ok(Err(_))` from the mux channel means "the relay never
/// reached the tunnel-node" (HTTP/TLS to Apps Script failed, batch
/// timed out, etc.) — the destination wasn't even attempted. Even if
/// that error string contains "Network is unreachable" (e.g. the
/// client device's WAN was momentarily down), it must NOT poison the
/// destination cache, or every host the user touched during a
/// connectivity blip stays refused for 30s.
#[tokio::test]
async fn negative_cache_skips_outer_relay_errors() {
let (mux, mut rx) = mux_for_test();
let mux_for_task = mux.clone();
let task = tokio::spawn(async move {
connect_plain("real.target.example", 443, &mux_for_task).await
});
// Receive the Connect msg and reply with an outer Err whose string
// would otherwise match `is_unreachable_error_str`.
let msg = rx.recv().await.expect("connect msg");
let reply = match msg {
MuxMsg::Connect { reply, .. } => reply,
other => panic!("expected Connect, got {:?}", std::mem::discriminant(&other)),
};
let _ = reply.send(Err(
"relay failed: Network is unreachable (os error 101)".into(),
));
let res = task.await.expect("task");
assert!(res.is_err(), "connect_plain should surface the error");
assert!(
!mux.is_unreachable("real.target.example", 443),
"outer relay error must not negative-cache the destination"
);
}
#[test]
fn negative_cache_enforces_hard_cap_under_unique_burst() {
let (mux, _rx) = mux_for_test();
// Insert enough unique still-live entries to exceed the cap. The
// map size must never exceed UNREACHABLE_CACHE_MAX, even though
// every entry is fresh and `retain(expired)` prunes nothing.
let burst = UNREACHABLE_CACHE_MAX + 50;
for i in 0..burst {
let host = format!("h{}.example", i);
mux.record_unreachable_if_match(
&host,
443,
"connect failed: Network is unreachable (os error 101)",
);
}
let len = mux
.unreachable_cache
.lock()
.map(|g| g.len())
.unwrap_or(0);
assert!(
len <= UNREACHABLE_CACHE_MAX,
"cache size {} exceeded cap {}",
len,
UNREACHABLE_CACHE_MAX
);
}
#[test] #[test]
fn server_speaks_first_covers_common_protocols() { fn server_speaks_first_covers_common_protocols() {
for p in [21u16, 22, 25, 80, 110, 143, 587] { for p in [21u16, 22, 25, 80, 110, 143, 587] {
@@ -1128,6 +1375,7 @@ mod tests {
preread_skip_unsupported: AtomicU64::new(0), preread_skip_unsupported: AtomicU64::new(0),
preread_win_total_us: AtomicU64::new(0), preread_win_total_us: AtomicU64::new(0),
preread_total_events: AtomicU64::new(0), preread_total_events: AtomicU64::new(0),
unreachable_cache: Mutex::new(HashMap::new()),
}); });
(mux, rx) (mux, rx)
} }