From 40c2b6c509a5144a8957b51d9c54b23792775d8e Mon Sep 17 00:00:00 2001 From: dazzling-no-more <278675588+dazzling-no-more@users.noreply.github.com> Date: Sat, 25 Apr 2026 13:38:01 +0400 Subject: [PATCH] feat(udp): SOCKS5 UDP ASSOCIATE relay through full tunnel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds end-to-end UDP support: SOCKS5 client UDP ASSOCIATE → tunnel-mux udp_open/udp_data ops → tunnel-node UDP sessions → real UDP to upstream. QUIC/HTTP3, DNS, and STUN now traverse full mode without falling back to TCP or leaking outside the tunnel. Apps Script proxies the new ops opaquely through the existing batch endpoint; CodeFull.gs only gets a doc-comment update. Highlights: - proxy_server.rs: SOCKS5 UDP ASSOCIATE handler with per-session task, bounded uplink mpsc channel, adaptive empty-poll backoff (500 ms → 30 s), source-IP validation against the control TCP peer, port-locking on first valid datagram, and self-removal from the dispatch map on eof. - tunnel_client.rs: UdpOpen / UdpData / close_session mux variants alongside the existing TCP plumbing; pkts decoder helper. - tunnel-node: UdpSessionInner with bounded VecDeque queue, drop-oldest on overflow with queue_drops counter and warn-then-throttled logs, last_active refreshed only on real activity (uplink send or upstream recv — empty polls do not refresh), independent TCP/UDP drain in handle_batch Phase 2, separate active-drain (150 ms) and retry (250 ms) windows for UDP, idle long-poll (5 s). - Tests: SOCKS5 UDP packet parser (IPv4/IPv6/DOMAIN round-trips, truncation rejects, fragmented rejects), UDP queue overflow drop + counter, regression test that batch with both UDP and TCP-data ops still runs the TCP retry pass. Docs: README + android.{md,fa.md} updated to reflect UDP availability in full mode; tunnel-node/README documents the new ops. --- README.md | 2 +- assets/apps_script/CodeFull.gs | 2 + docs/android.fa.md | 4 +- docs/android.md | 4 +- src/domain_fronter.rs | 3 + src/proxy_server.rs | 557 +++++++++++++++++++++++++--- src/tunnel_client.rs | 231 ++++++++++-- tunnel-node/README.md | 6 +- tunnel-node/src/main.rs | 656 +++++++++++++++++++++++++++++---- 9 files changed, 1314 insertions(+), 151 deletions(-) diff --git a/README.md b/README.md index 5872a45..938d10c 100644 --- a/README.md +++ b/README.md @@ -267,7 +267,7 @@ HTTP/HTTPS continues to route through the Apps Script relay (no change), and the ## Full tunnel mode -Full tunnel mode (`"mode": "full"`) routes **all** traffic end-to-end through Apps Script and a remote [tunnel-node](tunnel-node/) — no MITM certificate needed. The trade-off is higher latency per request (every byte goes Apps Script → tunnel-node → destination), but it works for every protocol and every app without CA installation. +Full tunnel mode (`"mode": "full"`) routes **all** traffic end-to-end through Apps Script and a remote [tunnel-node](tunnel-node/) — no MITM certificate needed. TCP is carried as persistent tunnel sessions, and UDP from Android/TUN clients is carried via SOCKS5 `UDP ASSOCIATE` to the tunnel-node, which then emits real UDP from the server side. The trade-off is higher latency per request (every byte/datagram goes Apps Script → tunnel-node → destination), but it works for protocols and apps that cannot use the MITM relay path. ### How deployment IDs affect performance diff --git a/assets/apps_script/CodeFull.gs b/assets/apps_script/CodeFull.gs index 42cecc4..77b2a5e 100644 --- a/assets/apps_script/CodeFull.gs +++ b/assets/apps_script/CodeFull.gs @@ -6,6 +6,8 @@ * 2. Batch relay: POST { k, q: [{m,u,h,b,ct,r}, ...] } → { q: [{s,h,b}, ...] } * 3. Tunnel: POST { k, t, h, p, sid, d } → { sid, d, eof } * 4. Tunnel batch: POST { k, t:"batch", ops:[...] } → { r: [...] } + * Batch ops include TCP (`connect`, `data`) and UDP (`udp_open`, + * `udp_data`) tunnel-node operations. * * CHANGE THESE TO YOUR OWN VALUES! */ diff --git a/docs/android.fa.md b/docs/android.fa.md index 46fa4bf..e5e3a6f 100644 --- a/docs/android.fa.md +++ b/docs/android.fa.md @@ -234,9 +234,9 @@ VpnService TUN ──► tun2proxy (داخل فرایند) **سایت‌هایی که فقط بارگذاری اول را `gate` می‌کنند** (اکثر مشتریان `Bot Fight Mode` کلادفلر) بعد از یک حل بی‌مشکل کار می‌کنند. سایت‌هایی که هر درخواست `challenge` می‌زنند (صرافی‌های رمزارز، بزرگسال، بعضی فوروم‌ها) ذاتاً با این معماری نمی‌شوند — برایشان از تونل دیگری استفاده کنید. -### UDP / QUIC (HTTP/3) رد نمی‌شود +### UDP / QUIC (HTTP/3) -`SOCKS5 listener` فقط `CONNECT` را می‌فهمد، نه `UDP ASSOCIATE`. `Chrome` اول `HTTP/3` را امتحان می‌کند و به `HTTP/2 over TCP` برمی‌گردد — که از پروکسی رد می‌شود. اثر: اولین اتصال کمی کندتر، بقیه چیزها عادی. +در حالت `full`، `SOCKS5 listener` دستور `UDP ASSOCIATE` را هم می‌فهمد و دیتاگرام‌های UDP را داخل مسیر Apps Script تا `tunnel-node` می‌برد؛ بعد `tunnel-node` از سمت سرور UDP واقعی به مقصد می‌فرستد. ISP شما همچنان فقط HTTPS به Google می‌بیند. در حالت `apps_script`، UDP هنوز مسیر قدیمی را دارد: `Chrome` اول `HTTP/3` را امتحان می‌کند و بعد به `HTTP/2 over TCP` برمی‌گردد. ### نشت IPv6 diff --git a/docs/android.md b/docs/android.md index 2245dcf..c632ab6 100644 --- a/docs/android.md +++ b/docs/android.md @@ -221,9 +221,9 @@ Cloudflare's `cf_clearance` cookie is bound to the `(IP, UA, JA3)` tuple the cha **Sites that only gate the first page load** (most of CF's Bot Fight Mode customers) work fine after one solve. Sites that challenge every request (crypto exchanges, adult, some forums) fundamentally can't hold a session through this architecture — use a different tunnel for those. -### UDP / QUIC (HTTP/3) doesn't go through +### UDP / QUIC (HTTP/3) -The SOCKS5 listener only handles `CONNECT`, not `UDP ASSOCIATE`. Chrome tries HTTP/3 first and falls back to HTTP/2 over TCP, which works fine. Effect: slightly slower first connect, everything else normal. +In `full` mode, the SOCKS5 listener handles `UDP ASSOCIATE` and tunnels UDP datagrams through Apps Script to `tunnel-node`, which then sends real UDP to the destination. Your ISP still only sees HTTPS to Google. In `apps_script` mode, UDP still falls back the old way: Chrome tries HTTP/3 first and then uses HTTP/2 over TCP. ### IPv6 leaks diff --git a/src/domain_fronter.rs b/src/domain_fronter.rs index 5409e7a..a18dd21 100644 --- a/src/domain_fronter.rs +++ b/src/domain_fronter.rs @@ -181,6 +181,9 @@ pub struct TunnelResponse { pub sid: Option, #[serde(default)] pub d: Option, + /// UDP datagrams returned by tunnel-node, base64-encoded individually. + #[serde(default)] + pub pkts: Option>, #[serde(default)] pub eof: Option, #[serde(default)] diff --git a/src/proxy_server.rs b/src/proxy_server.rs index 45cbbc9..171495a 100644 --- a/src/proxy_server.rs +++ b/src/proxy_server.rs @@ -1,20 +1,23 @@ +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; +use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::Mutex; +use tokio::net::{TcpListener, TcpStream, UdpSocket}; +use tokio::sync::{mpsc, Mutex}; use tokio_rustls::rustls::client::danger::{ HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier, }; use tokio_rustls::rustls::pki_types::{CertificateDer, ServerName, UnixTime}; -use tokio_rustls::rustls::{ClientConfig, DigitallySignedStruct, SignatureScheme}; use tokio_rustls::rustls::server::Acceptor; +use tokio_rustls::rustls::{ClientConfig, DigitallySignedStruct, SignatureScheme}; use tokio_rustls::{LazyConfigAcceptor, TlsAcceptor, TlsConnector}; use crate::config::{Config, Mode}; use crate::domain_fronter::DomainFronter; use crate::mitm::MitmCertManager; -use crate::tunnel_client::TunnelMux; +use crate::tunnel_client::{decode_udp_packets, TunnelMux}; // Domains that are served from Google's core frontend IP pool and therefore // respond correctly when we connect to `google_ip` with SNI=`front_domain` @@ -182,9 +185,8 @@ impl ProxyServer { // `script_id`, which is exactly the state a bootstrapping user is in. let fronter = match mode { Mode::AppsScript | Mode::Full => { - let f = DomainFronter::new(config).map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, format!("{e}")) - })?; + let f = DomainFronter::new(config) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}")))?; Some(Arc::new(f)) } Mode::GoogleOnly => None, @@ -320,7 +322,8 @@ impl ProxyServer { let rewrite_ctx = http_ctx.clone(); let mux = http_mux.clone(); children.spawn(async move { - if let Err(e) = handle_http_client(sock, fronter, mitm, rewrite_ctx, mux).await { + if let Err(e) = handle_http_client(sock, fronter, mitm, rewrite_ctx, mux).await + { tracing::debug!("http client {} closed: {}", peer, e); } }); @@ -356,7 +359,9 @@ impl ProxyServer { let rewrite_ctx = socks_ctx.clone(); let mux = socks_mux.clone(); children.spawn(async move { - if let Err(e) = handle_socks5_client(sock, fronter, mitm, rewrite_ctx, mux).await { + if let Err(e) = + handle_socks5_client(sock, fronter, mitm, rewrite_ctx, mux).await + { tracing::debug!("socks client {} closed: {}", peer, e); } }); @@ -510,8 +515,8 @@ async fn handle_socks5_client( return Ok(()); } let cmd = req[1]; - if cmd != 0x01 { - // CONNECT only. + if cmd != 0x01 && cmd != 0x03 { + // CONNECT and UDP ASSOCIATE only. sock.write_all(&[0x05, 0x07, 0x00, 0x01, 0, 0, 0, 0, 0, 0]) .await?; return Ok(()); @@ -546,6 +551,11 @@ async fn handle_socks5_client( sock.read_exact(&mut port_buf).await?; let port = u16::from_be_bytes(port_buf); + if cmd == 0x03 { + tracing::info!("SOCKS5 UDP ASSOCIATE requested for {}:{}", host, port); + return handle_socks5_udp_associate(sock, rewrite_ctx, tunnel_mux).await; + } + tracing::info!("SOCKS5 CONNECT -> {}:{}", host, port); // Success reply with zeroed BND. @@ -556,6 +566,372 @@ async fn handle_socks5_client( dispatch_tunnel(sock, host, port, fronter, mitm, rewrite_ctx, tunnel_mux).await } +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct SocksUdpTarget { + host: String, + port: u16, + atyp: u8, + addr: Vec, +} + +/// Per-target relay session state shared between the dispatch loop and +/// the per-session task. The dispatch loop pushes uplink datagrams via +/// `uplink`; the task drains the upstream and serializes both directions +/// onto a single tunnel-mux call at a time. +struct UdpRelaySession { + uplink: mpsc::Sender>, +} + +/// SOCKS5 UDP request frame: 4-byte header + atyp-specific address + 2-byte +/// port + payload. DOMAIN atyp uses a 1-byte length prefix + up to 255 +/// bytes, so the largest header is `4 + 1 + 255 + 2 = 262`. Round to 300 +/// for safety; payload itself can be a full 64 KB datagram. +const SOCKS5_UDP_RECV_BUF_BYTES: usize = 65535 + 300; + +/// Bound on per-session uplink queue depth. UDP is lossy by design — if +/// the per-session task can't keep up, drop the newest datagram (caller +/// uses `try_send`) instead of stalling the whole UDP relay loop. +const UDP_UPLINK_QUEUE: usize = 64; + +/// Initial poll spacing when a session is idle. Tunnel-node already +/// long-polls each empty `udp_data` for up to 5 s, so this is a +/// client-side floor — bursts of upstream packets reset back to this. +const UDP_INITIAL_POLL_DELAY: Duration = Duration::from_millis(500); + +/// Cap on the exponential backoff for an idle session. After this many +/// seconds of zero traffic in either direction, polls happen at most +/// once per `UDP_MAX_POLL_DELAY` plus the tunnel-node long-poll window — +/// so an idle UDP destination costs roughly one batch slot every 35 s. +const UDP_MAX_POLL_DELAY: Duration = Duration::from_secs(30); + +async fn handle_socks5_udp_associate( + mut control: TcpStream, + rewrite_ctx: Arc, + tunnel_mux: Option>, +) -> std::io::Result<()> { + if rewrite_ctx.mode != Mode::Full { + tracing::debug!("UDP ASSOCIATE rejected: only full mode supports UDP tunneling"); + write_socks5_reply(&mut control, 0x07, None).await?; + return Ok(()); + } + let Some(mux) = tunnel_mux else { + tracing::debug!("UDP ASSOCIATE rejected: full mode has no tunnel mux"); + write_socks5_reply(&mut control, 0x01, None).await?; + return Ok(()); + }; + + // Per RFC 1928 §6 the UDP relay only accepts datagrams from the + // SOCKS5 client. We pin the source IP to the control TCP peer up + // front so a third party on the bind interface can't hijack the + // session by sending the first datagram. + let client_peer_ip = control.peer_addr()?.ip(); + + // The local TUN bridge talks to us over loopback. Binding the UDP relay + // there avoids exposing an unauthenticated UDP socket on LAN interfaces. + let bind_ip = match control.local_addr()?.ip() { + IpAddr::V4(ip) if ip.is_unspecified() => IpAddr::V4(Ipv4Addr::LOCALHOST), + ip => ip, + }; + let udp = Arc::new(UdpSocket::bind(SocketAddr::new(bind_ip, 0)).await?); + write_socks5_reply(&mut control, 0x00, Some(udp.local_addr()?)).await?; + tracing::info!( + "SOCKS5 UDP relay bound on {} for client {}", + udp.local_addr()?, + client_peer_ip + ); + + let mut buf = vec![0u8; SOCKS5_UDP_RECV_BUF_BYTES]; + let mut control_buf = [0u8; 1]; + let mut client_addr: Option = None; + let sessions: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + + loop { + tokio::select! { + recv = udp.recv_from(&mut buf) => { + let (n, peer) = match recv { + Ok(v) => v, + Err(e) => { + tracing::debug!("udp associate recv failed: {}", e); + break; + } + }; + // Source-IP check: anything not from the SOCKS5 client's + // host is dropped silently. After the first valid packet, + // also lock to its source port (RFC 1928 §6). + if peer.ip() != client_peer_ip { + continue; + } + if let Some(existing) = client_addr { + if existing != peer { + continue; + } + } else { + tracing::info!("UDP relay locked to client {}", peer); + client_addr = Some(peer); + } + + let Some((target, payload)) = parse_socks5_udp_packet(&buf[..n]) else { + continue; + }; + let payload = payload.to_vec(); + + // Fast path: existing session — push payload onto its + // bounded uplink queue, drop on overflow (UDP semantics). + { + let sess = sessions.lock().await; + if let Some(session) = sess.get(&target) { + let _ = session.uplink.try_send(payload); + continue; + } + } + + // New target: open via tunnel-node and spawn the per-session + // task. The first datagram rides the udp_open op so we + // save one round trip on session establishment. + let resp = match mux.udp_open(&target.host, target.port, payload).await { + Ok(r) => r, + Err(e) => { + tracing::debug!( + "udp open {}:{} failed: {}", + target.host, target.port, e + ); + continue; + } + }; + if let Some(ref e) = resp.e { + tracing::debug!("udp open {}:{} failed: {}", target.host, target.port, e); + continue; + } + let Some(sid) = resp.sid.clone() else { + tracing::debug!( + "udp open {}:{} returned no sid", + target.host, target.port + ); + continue; + }; + send_udp_response_packets(&udp, peer, &target, &resp).await; + + let (uplink_tx, uplink_rx) = mpsc::channel::>(UDP_UPLINK_QUEUE); + let task_mux = mux.clone(); + let task_udp = udp.clone(); + let task_target = target.clone(); + let task_sessions = sessions.clone(); + let task_sid = sid.clone(); + tokio::spawn(async move { + udp_session_task( + task_mux, + task_udp, + task_sid, + task_target.clone(), + peer, + uplink_rx, + ) + .await; + // On exit (eof / mux error / channel close) remove + // ourselves from the dispatch map so a future packet + // to the same target opens a fresh tunnel-node session. + task_sessions.lock().await.remove(&task_target); + }); + + sessions + .lock() + .await + .insert(target, UdpRelaySession { uplink: uplink_tx }); + } + read = control.read(&mut control_buf) => { + match read { + Ok(0) | Err(_) => break, + Ok(_) => {} + } + } + } + } + + // Drop every uplink Sender. Each per-session task observes its + // receiver close, breaks out of select!, and issues close_session + // on the tunnel-node before exiting. + sessions.lock().await.clear(); + Ok(()) +} + +/// Per-target relay task. Owns one tunnel-node UDP session and shuttles +/// datagrams in both directions through a single in-flight tunnel call +/// at a time. Two cancellation points: +/// * `uplink_rx.recv()` returns `None` when the dispatch loop drops +/// the matching `Sender` (SOCKS5 client gone, or session evicted). +/// * `mux.udp_data` returns eof / error when the tunnel-node session +/// is reaped or the target is unreachable. +async fn udp_session_task( + mux: Arc, + udp: Arc, + sid: String, + target: SocksUdpTarget, + client_addr: SocketAddr, + mut uplink_rx: mpsc::Receiver>, +) { + let mut backoff = UDP_INITIAL_POLL_DELAY; + loop { + // `biased;` prefers uplink so an active client doesn't get + // shadowed by a long sleep. Both branches are cancel-safe. + let resp = tokio::select! { + biased; + uplink = uplink_rx.recv() => { + let Some(payload) = uplink else { break; }; + // Active uplink — reset the empty-poll backoff so the + // next inbound poll happens promptly. + backoff = UDP_INITIAL_POLL_DELAY; + match mux.udp_data(&sid, payload).await { + Ok(r) => r, + Err(e) => { + tracing::debug!("udp data {} failed: {}", sid, e); + break; + } + } + } + _ = tokio::time::sleep(backoff) => { + match mux.udp_data(&sid, Vec::new()).await { + Ok(r) => r, + Err(e) => { + tracing::debug!("udp poll {} failed: {}", sid, e); + break; + } + } + } + }; + if resp.e.is_some() || resp.eof.unwrap_or(false) { + break; + } + let got_pkts = resp.pkts.as_ref().map(|p| !p.is_empty()).unwrap_or(false); + if got_pkts { + send_udp_response_packets(&udp, client_addr, &target, &resp).await; + backoff = UDP_INITIAL_POLL_DELAY; + } else { + // Empty poll — back off so an idle destination doesn't + // monopolize batch slots. + backoff = (backoff * 2).min(UDP_MAX_POLL_DELAY); + } + } + // Be polite even if the session is already gone server-side; the + // tunnel-node tolerates close on an unknown sid. + mux.close_session(&sid).await; +} + +async fn send_udp_response_packets( + udp: &UdpSocket, + client_addr: SocketAddr, + target: &SocksUdpTarget, + resp: &crate::domain_fronter::TunnelResponse, +) { + let packets = match decode_udp_packets(resp) { + Ok(packets) => packets, + Err(e) => { + tracing::debug!("{}", e); + return; + } + }; + for packet in packets { + let framed = build_socks5_udp_packet(target, &packet); + let _ = udp.send_to(&framed, client_addr).await; + } +} + +async fn write_socks5_reply( + sock: &mut TcpStream, + rep: u8, + addr: Option, +) -> std::io::Result<()> { + let mut out = vec![0x05, rep, 0x00]; + match addr { + Some(SocketAddr::V4(v4)) => { + out.push(0x01); + out.extend_from_slice(&v4.ip().octets()); + out.extend_from_slice(&v4.port().to_be_bytes()); + } + Some(SocketAddr::V6(v6)) => { + out.push(0x04); + out.extend_from_slice(&v6.ip().octets()); + out.extend_from_slice(&v6.port().to_be_bytes()); + } + None => { + out.push(0x01); + out.extend_from_slice(&[0, 0, 0, 0]); + out.extend_from_slice(&0u16.to_be_bytes()); + } + } + sock.write_all(&out).await?; + sock.flush().await +} + +fn parse_socks5_udp_packet(buf: &[u8]) -> Option<(SocksUdpTarget, &[u8])> { + if buf.len() < 4 || buf[0] != 0 || buf[1] != 0 || buf[2] != 0 { + return None; + } + let atyp = buf[3]; + let mut pos = 4usize; + let (host, addr) = match atyp { + 0x01 => { + if buf.len() < pos + 4 + 2 { + return None; + } + let addr = buf[pos..pos + 4].to_vec(); + pos += 4; + let ip = std::net::Ipv4Addr::new(addr[0], addr[1], addr[2], addr[3]); + (ip.to_string(), addr) + } + 0x03 => { + if buf.len() < pos + 1 { + return None; + } + let len = buf[pos] as usize; + pos += 1; + if len == 0 || buf.len() < pos + len + 2 { + return None; + } + let addr = buf[pos..pos + len].to_vec(); + pos += len; + (String::from_utf8_lossy(&addr).into_owned(), addr) + } + 0x04 => { + if buf.len() < pos + 16 + 2 { + return None; + } + let addr = buf[pos..pos + 16].to_vec(); + pos += 16; + let mut octets = [0u8; 16]; + octets.copy_from_slice(&addr); + (std::net::Ipv6Addr::from(octets).to_string(), addr) + } + _ => return None, + }; + let port = u16::from_be_bytes([buf[pos], buf[pos + 1]]); + pos += 2; + Some(( + SocksUdpTarget { + host, + port, + atyp, + addr, + }, + &buf[pos..], + )) +} + +fn build_socks5_udp_packet(target: &SocksUdpTarget, payload: &[u8]) -> Vec { + let mut out = Vec::with_capacity(4 + target.addr.len() + 2 + payload.len() + 1); + out.extend_from_slice(&[0, 0, 0, target.atyp]); + match target.atyp { + 0x03 => { + out.push(target.addr.len() as u8); + out.extend_from_slice(&target.addr); + } + _ => out.extend_from_slice(&target.addr), + } + out.extend_from_slice(&target.port.to_be_bytes()); + out.extend_from_slice(payload); + out +} + // ---------- Smart dispatch (used by both HTTP CONNECT and SOCKS5) ---------- fn should_use_sni_rewrite( @@ -613,15 +989,13 @@ async fn dispatch_tunnel( None => { tracing::error!( "dispatch {}:{} -> full mode but no tunnel mux (should not happen)", - host, port + host, + port ); return Ok(()); } }; - tracing::info!( - "dispatch {}:{} -> full tunnel (via batch mux)", - host, port - ); + tracing::info!("dispatch {}:{} -> full tunnel (via batch mux)", host, port); crate::tunnel_client::tunnel_connection(sock, &host, port, &mux).await?; return Ok(()); } @@ -634,7 +1008,11 @@ async fn dispatch_tunnel( port, rewrite_ctx.youtube_via_relay, ) { - tracing::info!("dispatch {}:{} -> sni-rewrite tunnel (Google edge direct)", host, port); + tracing::info!( + "dispatch {}:{} -> sni-rewrite tunnel (Google edge direct)", + host, + port + ); return do_sni_rewrite_tunnel_from_tcp(sock, &host, port, mitm, rewrite_ctx).await; } @@ -775,11 +1153,8 @@ async fn plain_tcp_passthrough( port, e ); - match tokio::time::timeout( - connect_timeout, - TcpStream::connect((target_host, port)), - ) - .await + match tokio::time::timeout(connect_timeout, TcpStream::connect((target_host, port))) + .await { Ok(Ok(s)) => s, _ => return, @@ -787,12 +1162,7 @@ async fn plain_tcp_passthrough( } } } else { - match tokio::time::timeout( - connect_timeout, - TcpStream::connect((target_host, port)), - ) - .await - { + match tokio::time::timeout(connect_timeout, TcpStream::connect((target_host, port))).await { Ok(Ok(s)) => { tracing::info!("plain-tcp passthrough -> {}:{}", host, port); s @@ -804,7 +1174,8 @@ async fn plain_tcp_passthrough( Err(_) => { tracing::debug!( "plain-tcp connect {}:{} timeout (likely blocked; client should rotate)", - host, port + host, + port ); return; } @@ -1283,13 +1654,14 @@ where // subdomain of x.com here. let host_lower = host.to_ascii_lowercase(); let is_x_com = host_lower == "x.com" || host_lower.ends_with(".x.com"); - let path = if is_x_com - && path.starts_with("/i/api/graphql/") - && path.contains("?variables=") - { + let path = if is_x_com && path.starts_with("/i/api/graphql/") && path.contains("?variables=") { match path.split_once('&') { Some((short, _)) => { - tracing::debug!("x.com graphql URL truncated: {} chars -> {}", path.len(), short.len()); + tracing::debug!( + "x.com graphql URL truncated: {} chars -> {}", + path.len(), + short.len() + ); short.to_string() } None => path, @@ -1358,7 +1730,9 @@ where // relay path — range semantics on mutating requests are undefined // and would break form submissions. let response = if method.eq_ignore_ascii_case("GET") && body.is_empty() { - fronter.relay_parallel_range(&method, &url, &headers, &body).await + fronter + .relay_parallel_range(&method, &url, &headers, &body) + .await } else { fronter.relay(&method, &url, &headers, &body).await }; @@ -1609,7 +1983,9 @@ async fn do_plain_http( // mirrors, video poster streams, etc.) need the same acceleration // or the relay stalls per-chunk. let response = if method.eq_ignore_ascii_case("GET") && body.is_empty() { - fronter.relay_parallel_range(&method, &url, &headers, &body).await + fronter + .relay_parallel_range(&method, &url, &headers, &body) + .await } else { fronter.relay(&method, &url, &headers, &body).await }; @@ -1630,6 +2006,73 @@ mod tests { .collect() } + #[test] + fn socks5_udp_domain_packet_round_trips() { + let mut raw = vec![0, 0, 0, 0x03, 11]; + raw.extend_from_slice(b"example.com"); + raw.extend_from_slice(&3478u16.to_be_bytes()); + raw.extend_from_slice(b"hello"); + + let (target, payload) = parse_socks5_udp_packet(&raw).unwrap(); + assert_eq!(target.host, "example.com"); + assert_eq!(target.port, 3478); + assert_eq!(payload, b"hello"); + assert_eq!(build_socks5_udp_packet(&target, payload), raw); + } + + #[test] + fn socks5_udp_rejects_fragmented_packets() { + let raw = [0, 0, 1, 0x01, 127, 0, 0, 1, 0x13, 0x8a, b'x']; + assert!(parse_socks5_udp_packet(&raw).is_none()); + } + + #[test] + fn socks5_udp_rejects_truncated_inputs() { + // Header alone is not enough. + assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x01]).is_none()); + // IPv4 with truncated address bytes (need 4 octets). + assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x01, 127, 0, 0]).is_none()); + // IPv4 with no port. + assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x01, 127, 0, 0, 1]).is_none()); + // DOMAIN with zero-length. + assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x03, 0, 0, 80]).is_none()); + // DOMAIN with length exceeding remaining buffer. + assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x03, 5, b'a', b'b']).is_none()); + // Unknown atyp. + assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x09, 1, 2, 3, 4]).is_none()); + // IPv6 with truncated address. + let raw = [0, 0, 0, 0x04, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; // 11 bytes < 16 + assert!(parse_socks5_udp_packet(&raw).is_none()); + } + + #[test] + fn socks5_udp_ipv4_round_trips() { + let mut raw = vec![0, 0, 0, 0x01, 1, 2, 3, 4]; + raw.extend_from_slice(&53u16.to_be_bytes()); + raw.extend_from_slice(b"\x00\x01"); + + let (target, payload) = parse_socks5_udp_packet(&raw).unwrap(); + assert_eq!(target.host, "1.2.3.4"); + assert_eq!(target.port, 53); + assert_eq!(payload, b"\x00\x01"); + assert_eq!(build_socks5_udp_packet(&target, payload), raw); + } + + #[test] + fn socks5_udp_ipv6_round_trips() { + let mut raw = vec![0, 0, 0, 0x04]; + raw.extend_from_slice(&[ + 0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01, + ]); + raw.extend_from_slice(&443u16.to_be_bytes()); + raw.extend_from_slice(b"q"); + let (target, payload) = parse_socks5_udp_packet(&raw).unwrap(); + assert_eq!(target.host, "2001:db8::1"); + assert_eq!(target.port, 443); + assert_eq!(payload, b"q"); + assert_eq!(build_socks5_udp_packet(&target, payload), raw); + } + #[tokio::test(flavor = "current_thread")] async fn read_body_decodes_chunked_request() { let (mut client, mut server) = duplex(1024); @@ -1689,8 +2132,18 @@ mod tests { assert!(should_use_sni_rewrite(&hosts, "google.com", 443, false)); assert!(!should_use_sni_rewrite(&hosts, "google.com", 80, false)); - assert!(should_use_sni_rewrite(&hosts, "www.example.com", 443, false)); - assert!(!should_use_sni_rewrite(&hosts, "www.example.com", 80, false)); + assert!(should_use_sni_rewrite( + &hosts, + "www.example.com", + 443, + false + )); + assert!(!should_use_sni_rewrite( + &hosts, + "www.example.com", + 80, + false + )); } #[test] @@ -1702,17 +2155,32 @@ mod tests { let hosts = std::collections::HashMap::new(); // Default behaviour: everything in the pool rewrites. - assert!(should_use_sni_rewrite(&hosts, "www.youtube.com", 443, false)); + assert!(should_use_sni_rewrite( + &hosts, + "www.youtube.com", + 443, + false + )); assert!(should_use_sni_rewrite(&hosts, "i.ytimg.com", 443, false)); assert!(should_use_sni_rewrite(&hosts, "youtu.be", 443, false)); assert!(should_use_sni_rewrite(&hosts, "www.google.com", 443, false)); // With the toggle on: YouTube opts out, Google stays. - assert!(!should_use_sni_rewrite(&hosts, "www.youtube.com", 443, true)); + assert!(!should_use_sni_rewrite( + &hosts, + "www.youtube.com", + 443, + true + )); assert!(!should_use_sni_rewrite(&hosts, "i.ytimg.com", 443, true)); assert!(!should_use_sni_rewrite(&hosts, "youtu.be", 443, true)); assert!(should_use_sni_rewrite(&hosts, "www.google.com", 443, true)); - assert!(should_use_sni_rewrite(&hosts, "fonts.gstatic.com", 443, true)); + assert!(should_use_sni_rewrite( + &hosts, + "fonts.gstatic.com", + 443, + true + )); } #[test] @@ -1723,7 +2191,12 @@ mod tests { let mut hosts = std::collections::HashMap::new(); hosts.insert("rr4.googlevideo.com".to_string(), "1.2.3.4".to_string()); - assert!(should_use_sni_rewrite(&hosts, "rr4.googlevideo.com", 443, true)); + assert!(should_use_sni_rewrite( + &hosts, + "rr4.googlevideo.com", + 443, + true + )); } #[test] diff --git a/src/tunnel_client.rs b/src/tunnel_client.rs index 260596f..72444e6 100644 --- a/src/tunnel_client.rs +++ b/src/tunnel_client.rs @@ -100,6 +100,17 @@ enum MuxMsg { data: Vec, reply: oneshot::Sender>, }, + UdpOpen { + host: String, + port: u16, + data: Vec, + reply: oneshot::Sender>, + }, + UdpData { + sid: String, + data: Vec, + reply: oneshot::Sender>, + }, Close { sid: String, }, @@ -167,6 +178,47 @@ impl TunnelMux { let _ = self.tx.send(msg).await; } + pub async fn udp_open( + &self, + host: &str, + port: u16, + data: Vec, + ) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + self.send(MuxMsg::UdpOpen { + host: host.to_string(), + port, + data, + reply: reply_tx, + }) + .await; + match reply_rx.await { + Ok(r) => r, + Err(_) => Err("mux channel closed".into()), + } + } + + pub async fn udp_data(&self, sid: &str, data: Vec) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + self.send(MuxMsg::UdpData { + sid: sid.to_string(), + data, + reply: reply_tx, + }) + .await; + match reply_rx.await { + Ok(r) => r, + Err(_) => Err("mux channel closed".into()), + } + } + + pub async fn close_session(&self, sid: &str) { + self.send(MuxMsg::Close { + sid: sid.to_string(), + }) + .await; + } + fn connect_data_unsupported(&self) -> bool { self.connect_data_unsupported.load(Ordering::Relaxed) } @@ -202,7 +254,11 @@ impl TunnelMux { fn record_preread_loss(&self, port: u16) { self.preread_loss.fetch_add(1, Ordering::Relaxed); - tracing::debug!("preread loss: port={} (empty within {:?})", port, CLIENT_FIRST_DATA_WAIT); + tracing::debug!( + "preread loss: port={} (empty within {:?})", + port, + CLIENT_FIRST_DATA_WAIT + ); self.maybe_log_preread_summary(); } @@ -213,7 +269,8 @@ impl TunnelMux { } fn record_preread_skip_unsupported(&self, port: u16) { - self.preread_skip_unsupported.fetch_add(1, Ordering::Relaxed); + self.preread_skip_unsupported + .fetch_add(1, Ordering::Relaxed); tracing::debug!("preread skip: port={} (connect_data unsupported)", port); self.maybe_log_preread_summary(); } @@ -251,7 +308,12 @@ async fn mux_loop(mut rx: mpsc::Receiver, fronter: Arc) { fronter .script_id_list() .iter() - .map(|id| (id.clone(), Arc::new(Semaphore::new(CONCURRENCY_PER_DEPLOYMENT)))) + .map(|id| { + ( + id.clone(), + Arc::new(Semaphore::new(CONCURRENCY_PER_DEPLOYMENT)), + ) + }) .collect(), ); @@ -278,16 +340,25 @@ async fn mux_loop(mut rx: mpsc::Receiver, fronter: Arc) { MuxMsg::Connect { host, port, reply } => { let f = fronter.clone(); tokio::spawn(async move { - let result = - f.tunnel_request("connect", Some(&host), Some(port), None, None) - .await; + let result = f + .tunnel_request("connect", Some(&host), Some(port), None, None) + .await; match result { - Ok(resp) => { let _ = reply.send(Ok(resp)); } - Err(e) => { let _ = reply.send(Err(format!("{}", e))); } + Ok(resp) => { + let _ = reply.send(Ok(resp)); + } + Err(e) => { + let _ = reply.send(Err(format!("{}", e))); + } } }); } - MuxMsg::ConnectData { host, port, data, reply } => { + MuxMsg::ConnectData { + host, + port, + data, + reply, + } => { let encoded = Some(B64.encode(data.as_slice())); let op_bytes = encoded.as_ref().map(|s| s.len()).unwrap_or(0); @@ -351,6 +422,77 @@ async fn mux_loop(mut rx: mpsc::Receiver, fronter: Arc) { data_replies.push((idx, reply)); batch_payload_bytes += op_bytes; } + MuxMsg::UdpOpen { + host, + port, + data, + reply, + } => { + let encoded = if data.is_empty() { + None + } else { + Some(B64.encode(&data)) + }; + let op_bytes = encoded.as_ref().map(|s| s.len()).unwrap_or(0); + + if !data_ops.is_empty() + && (data_ops.len() >= MAX_BATCH_OPS + || batch_payload_bytes + op_bytes > MAX_BATCH_PAYLOAD_BYTES) + { + fire_batch( + &sems, + &fronter, + std::mem::take(&mut data_ops), + std::mem::take(&mut data_replies), + ) + .await; + batch_payload_bytes = 0; + } + + let idx = data_ops.len(); + data_ops.push(BatchOp { + op: "udp_open".into(), + sid: None, + host: Some(host), + port: Some(port), + d: encoded, + }); + data_replies.push((idx, reply)); + batch_payload_bytes += op_bytes; + } + MuxMsg::UdpData { sid, data, reply } => { + let encoded = if data.is_empty() { + None + } else { + Some(B64.encode(&data)) + }; + let op_bytes = encoded.as_ref().map(|s| s.len()).unwrap_or(0); + + if !data_ops.is_empty() + && (data_ops.len() >= MAX_BATCH_OPS + || batch_payload_bytes + op_bytes > MAX_BATCH_PAYLOAD_BYTES) + { + fire_batch( + &sems, + &fronter, + std::mem::take(&mut data_ops), + std::mem::take(&mut data_replies), + ) + .await; + batch_payload_bytes = 0; + } + + let idx = data_ops.len(); + data_ops.push(BatchOp { + op: "udp_data".into(), + sid: Some(sid), + host: None, + port: None, + d: encoded, + }); + data_replies.push((idx, reply)); + batch_payload_bytes += op_bytes; + } MuxMsg::Close { sid } => { close_sids.push(sid); } @@ -514,7 +656,10 @@ pub async fn tunnel_connection( match write_tunnel_response(&mut sock, &resp).await? { WriteOutcome::Wrote | WriteOutcome::NoData => {} WriteOutcome::BadBase64 => { - tracing::error!("tunnel session {}: bad base64 in connect_data response", sid); + tracing::error!( + "tunnel session {}: bad base64 in connect_data response", + sid + ); return Ok(()); } } @@ -636,7 +781,10 @@ async fn connect_with_initial_data( )); }; - Ok(ConnectDataOutcome::Opened { sid, response: resp }) + Ok(ConnectDataOutcome::Opened { + sid, + response: resp, + }) } /// Decide whether a response indicates the tunnel-node (or apps_script @@ -834,6 +982,18 @@ where } } +pub fn decode_udp_packets(resp: &TunnelResponse) -> Result>, String> { + let Some(pkts) = resp.pkts.as_ref() else { + return Ok(Vec::new()); + }; + pkts.iter() + .map(|pkt| { + B64.decode(pkt) + .map_err(|e| format!("bad UDP packet base64: {}", e)) + }) + .collect() +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -846,6 +1006,7 @@ mod tests { TunnelResponse { sid: None, d: None, + pkts: None, eof: None, e: e.map(str::to_string), code: code.map(str::to_string), @@ -854,7 +1015,10 @@ mod tests { #[test] fn unsupported_detection_via_structured_code() { - assert!(is_connect_data_unsupported_response(&resp_with(Some("UNSUPPORTED_OP"), None))); + assert!(is_connect_data_unsupported_response(&resp_with( + Some("UNSUPPORTED_OP"), + None + ))); assert!(is_connect_data_unsupported_response(&resp_with( Some("UNSUPPORTED_OP"), Some("unknown op: connect_data"), @@ -865,10 +1029,12 @@ mod tests { fn unsupported_detection_via_legacy_tunnel_node_string() { // Pre-change tunnel-node: no code field, bare "unknown op: ...". assert!(is_connect_data_unsupported_response(&resp_with( - None, Some("unknown op: connect_data"), + None, + Some("unknown op: connect_data"), ))); assert!(is_connect_data_unsupported_response(&resp_with( - None, Some("Unknown Op: CONNECT_DATA"), + None, + Some("Unknown Op: CONNECT_DATA"), ))); } @@ -878,30 +1044,46 @@ mod tests { // This is the realistic skew case — user upgrades tunnel-node + client // binary but hasn't redeployed the Apps Script yet. assert!(is_connect_data_unsupported_response(&resp_with( - None, Some("unknown tunnel op: connect_data"), + None, + Some("unknown tunnel op: connect_data"), ))); } #[test] fn unsupported_detection_rejects_unrelated_errors() { assert!(!is_connect_data_unsupported_response(&resp_with( - None, Some("connect failed: refused"), + None, + Some("connect failed: refused"), + ))); + assert!(!is_connect_data_unsupported_response(&resp_with( + None, + Some("bad base64") + ))); + assert!(!is_connect_data_unsupported_response(&resp_with( + None, None ))); - assert!(!is_connect_data_unsupported_response(&resp_with(None, Some("bad base64")))); - assert!(!is_connect_data_unsupported_response(&resp_with(None, None))); // "connect_data" alone (without "unknown op") shouldn't trigger. assert!(!is_connect_data_unsupported_response(&resp_with( - None, Some("connect_data: bad port"), + None, + Some("connect_data: bad port"), ))); } #[test] fn server_speaks_first_covers_common_protocols() { for p in [21u16, 22, 25, 80, 110, 143, 587] { - assert!(is_server_speaks_first(p), "port {} should be server-first", p); + assert!( + is_server_speaks_first(p), + "port {} should be server-first", + p + ); } for p in [443u16, 8443, 853, 993, 1234] { - assert!(!is_server_speaks_first(p), "port {} should NOT be server-first", p); + assert!( + !is_server_speaks_first(p), + "port {} should NOT be server-first", + p + ); } } @@ -947,9 +1129,7 @@ mod tests { let loop_handle = tokio::spawn({ let mux = mux.clone(); - async move { - tunnel_loop(&mut server_side, "sid-under-test", &mux, pending).await - } + async move { tunnel_loop(&mut server_side, "sid-under-test", &mux, pending).await } }); // The first message tunnel_loop emits must be Data carrying the @@ -967,6 +1147,7 @@ mod tests { let _ = reply.send(Ok(TunnelResponse { sid: Some("sid-under-test".into()), d: None, + pkts: None, eof: Some(true), e: None, code: None, @@ -978,6 +1159,8 @@ mod tests { MuxMsg::Connect { .. } => "Connect", MuxMsg::ConnectData { .. } => "ConnectData", MuxMsg::Data { .. } => unreachable!(), + MuxMsg::UdpOpen { .. } => "UdpOpen", + MuxMsg::UdpData { .. } => "UdpData", MuxMsg::Close { .. } => "Close", } ), diff --git a/tunnel-node/README.md b/tunnel-node/README.md index 7dd0af3..88d884b 100644 --- a/tunnel-node/README.md +++ b/tunnel-node/README.md @@ -8,10 +8,12 @@ HTTP tunnel bridge server for MasterHttpRelayVPN "full" mode. Bridges HTTP tunne Phone → mhrv-rs → [domain-fronted TLS] → Apps Script → [HTTP] → Tunnel Node → [real TCP] → Internet ``` -The tunnel node manages persistent TCP sessions. Each session is a real TCP connection to a destination server. Data flows through a JSON protocol: +The tunnel node manages persistent TCP and UDP sessions. TCP sessions are real TCP connections to a destination server; UDP sessions are connected UDP sockets to one destination host:port. Data flows through a JSON protocol: - **connect** — open TCP to host:port, return session ID - **data** — write client data, return server response +- **udp_open** — open UDP to host:port, optionally send the first datagram +- **udp_data** — send one UDP datagram, or poll for returned datagrams when `d` is omitted - **close** — tear down session - **batch** — process multiple ops in one HTTP request (reduces round trips) @@ -108,7 +110,7 @@ TUNNEL_AUTH_KEY=your-secret PORT=8080 ./target/release/tunnel-node "k": "auth", "ops": [ {"op":"data","sid":"uuid1","d":"base64"}, - {"op":"data","sid":"uuid2","d":"base64"}, + {"op":"udp_data","sid":"uuid2","d":"base64"}, {"op":"close","sid":"uuid3"} ] } diff --git a/tunnel-node/src/main.rs b/tunnel-node/src/main.rs index 49659de..c66ba54 100644 --- a/tunnel-node/src/main.rs +++ b/tunnel-node/src/main.rs @@ -9,8 +9,8 @@ //! TUNNEL_AUTH_KEY — shared secret (required) //! PORT — listen port (default 8080, Cloud Run sets this) -use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::collections::{HashMap, VecDeque}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -24,7 +24,7 @@ use base64::Engine; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; -use tokio::net::TcpStream; +use tokio::net::{lookup_host, TcpStream, UdpSocket}; use tokio::sync::{mpsc, Mutex, Notify}; use tokio::task::JoinSet; @@ -76,6 +76,21 @@ const STRAGGLER_SETTLE: Duration = Duration::from_millis(30); /// Script's UrlFetch ceiling (~60 s). const LONGPOLL_DEADLINE: Duration = Duration::from_secs(5); +/// Bound on each UDP session's inbound queue. Beyond this we drop oldest +/// to keep recent voice/media packets moving — a stale RTP frame is +/// worse than a missing one. Sized so a 256-deep queue at typical 1500B +/// payloads is ~384 KB before backpressure kicks in. +const UDP_QUEUE_LIMIT: usize = 256; + +/// Receive buffer for the UDP reader task. Must be ≥ 65535 to handle +/// a maximum-size IPv4 datagram without truncation. +const UDP_RECV_BUF_BYTES: usize = 65536; + +/// First queue-drop on a session always logs at warn level; subsequent +/// drops log at debug only every Nth occurrence so a single congested +/// session can't flood the operator's log. +const UDP_QUEUE_DROP_LOG_STRIDE: u64 = 100; + // --------------------------------------------------------------------------- // Session // --------------------------------------------------------------------------- @@ -97,6 +112,28 @@ struct ManagedSession { reader_handle: tokio::task::JoinHandle<()>, } +/// UDP equivalent of `SessionInner`. Holds a *connected* `UdpSocket` +/// pinned to one `(host, port)` upstream so we don't have to re-resolve +/// or re-parse the destination on every datagram. `notify` is fired by +/// the reader task on each inbound datagram (or on socket error) so the +/// batch drain phase can wake without polling — same primitive as the +/// TCP path. +struct UdpSessionInner { + socket: Arc, + packets: Mutex>>, + last_active: Mutex, + notify: Notify, + /// Total datagrams dropped because the queue hit `UDP_QUEUE_LIMIT`. + /// Surfaced via tracing so operators can correlate "choppy call" + /// reports with relay backpressure. + queue_drops: AtomicU64, +} + +struct ManagedUdpSession { + inner: Arc, + reader_handle: tokio::task::JoinHandle<()>, +} + async fn create_session(host: &str, port: u16) -> std::io::Result { let addr = format!("{}:{}", host, port); let stream = tokio::time::timeout(Duration::from_secs(10), TcpStream::connect(&addr)) @@ -150,6 +187,78 @@ async fn reader_task(mut reader: OwnedReadHalf, session: Arc) { } } +async fn create_udp_session(host: &str, port: u16) -> std::io::Result { + let mut addrs = lookup_host((host, port)).await?; + let remote = addrs.next().ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::AddrNotAvailable, + "no UDP address resolved", + ) + })?; + let bind_addr = if remote.is_ipv4() { + "0.0.0.0:0" + } else { + "[::]:0" + }; + let socket = UdpSocket::bind(bind_addr).await?; + socket.connect(remote).await?; + let socket = Arc::new(socket); + + let inner = Arc::new(UdpSessionInner { + socket: socket.clone(), + packets: Mutex::new(VecDeque::with_capacity(UDP_QUEUE_LIMIT)), + last_active: Mutex::new(Instant::now()), + notify: Notify::new(), + queue_drops: AtomicU64::new(0), + }); + + let inner_ref = inner.clone(); + let reader_handle = tokio::spawn(udp_reader_task(socket, inner_ref)); + Ok(ManagedUdpSession { + inner, + reader_handle, + }) +} + +/// UDP analogue of `reader_task`. Reads from the connected UDP socket +/// and queues each datagram on the session. Drops oldest on overflow, +/// updates `last_active` so server-push (download-only) UDP keeps the +/// session out of the idle reaper, and fires `notify` so the batch +/// drain phase can wake without polling. +async fn udp_reader_task(socket: Arc, session: Arc) { + let mut buf = vec![0u8; UDP_RECV_BUF_BYTES]; + loop { + match socket.recv(&mut buf).await { + // Empty datagram is valid UDP; nothing to forward, ignore. + Ok(0) => {} + Ok(n) => { + let mut packets = session.packets.lock().await; + if packets.len() >= UDP_QUEUE_LIMIT { + packets.pop_front(); + let dropped = session.queue_drops.fetch_add(1, Ordering::Relaxed) + 1; + if dropped == 1 { + tracing::warn!( + "udp queue full ({}); dropping oldest. Apps Script polling cannot keep up with upstream rate.", + UDP_QUEUE_LIMIT + ); + } else if dropped % UDP_QUEUE_DROP_LOG_STRIDE == 0 { + tracing::debug!("udp queue drops: {} on session", dropped); + } + } + packets.push_back(buf[..n].to_vec()); + drop(packets); + // Inbound packet counts as activity — keeps server-push + // UDP (e.g. SIP/RTP, server-sent telemetry) out of the + // idle reaper. Empty `udp_data` polls deliberately do + // NOT bump this (see batch handler). + *session.last_active.lock().await = Instant::now(); + session.notify.notify_one(); + } + Err(_) => break, + } + } +} + /// Drain whatever is currently buffered — no waiting. /// Used by batch mode where we poll frequently. async fn drain_now(session: &SessionInner) -> (Vec, bool) { @@ -252,6 +361,61 @@ async fn is_any_drainable(inners: &[Arc]) -> bool { false } +/// Drain whatever UDP datagrams are currently queued — no waiting. +async fn drain_udp_now(session: &UdpSessionInner) -> Vec> { + let mut packets = session.packets.lock().await; + packets.drain(..).collect() +} + +/// UDP analogue of `wait_for_any_drainable`. Wakes when any session has +/// at least one queued packet. Same race-safety contract: watchers +/// self-filter against observable state to ignore stale permits. +async fn wait_for_any_udp_drainable(inners: &[Arc], deadline: Duration) { + if inners.is_empty() { + return; + } + + let (tx, mut rx) = mpsc::channel::<()>(1); + let mut watchers = Vec::with_capacity(inners.len()); + for inner in inners { + let inner = inner.clone(); + let tx = tx.clone(); + watchers.push(tokio::spawn(async move { + loop { + inner.notify.notified().await; + if !inner.packets.lock().await.is_empty() { + break; + } + // Stale permit — packets were already drained by a + // prior batch. Loop back, don't wake the caller. + } + let _ = tx.try_send(()); + })); + } + drop(tx); + + let already_ready = is_any_udp_drainable(inners).await; + if !already_ready { + tokio::select! { + _ = rx.recv() => {} + _ = tokio::time::sleep(deadline) => {} + } + } + + for w in &watchers { + w.abort(); + } +} + +async fn is_any_udp_drainable(inners: &[Arc]) -> bool { + for inner in inners { + if !inner.packets.lock().await.is_empty() { + return true; + } + } + false +} + /// Wait for response data with drain window. Used by single-op mode. async fn wait_and_drain(session: &SessionInner, max_wait: Duration) -> (Vec, bool) { let deadline = Instant::now() + max_wait; @@ -288,6 +452,7 @@ async fn wait_and_drain(session: &SessionInner, max_wait: Duration) -> (Vec, #[derive(Clone)] struct AppState { sessions: Arc>>, + udp_sessions: Arc>>, auth_key: String, } @@ -309,6 +474,10 @@ struct TunnelRequest { struct TunnelResponse { #[serde(skip_serializing_if = "Option::is_none")] sid: Option, #[serde(skip_serializing_if = "Option::is_none")] d: Option, + /// UDP datagrams returned to the client, base64-encoded individually. + /// `None` for TCP responses; `Some(vec![])` is never serialized + /// (the field is dropped when empty by the empty-on-None check above). + #[serde(skip_serializing_if = "Option::is_none")] pkts: Option>, #[serde(skip_serializing_if = "Option::is_none")] eof: Option, #[serde(skip_serializing_if = "Option::is_none")] e: Option, #[serde(skip_serializing_if = "Option::is_none")] code: Option, @@ -316,11 +485,11 @@ struct TunnelResponse { impl TunnelResponse { fn error(msg: impl Into) -> Self { - Self { sid: None, d: None, eof: None, e: Some(msg.into()), code: None } + Self { sid: None, d: None, pkts: None, eof: None, e: Some(msg.into()), code: None } } fn unsupported_op(op: &str) -> Self { Self { - sid: None, d: None, eof: None, + sid: None, d: None, pkts: None, eof: None, e: Some(format!("unknown op: {}", op)), code: Some(CODE_UNSUPPORTED_OP.into()), } @@ -429,16 +598,18 @@ async fn handle_batch( // still fires from server-speaks-first ports and from the preread // timeout fallback path. let mut results: Vec<(usize, TunnelResponse)> = Vec::with_capacity(req.ops.len()); - let mut data_ops: Vec<(usize, String)> = Vec::new(); // (index, sid) for data ops needing drain + let mut tcp_drains: Vec<(usize, String)> = Vec::new(); + let mut udp_drains: Vec<(usize, String)> = Vec::new(); // True iff the batch contained any op that performed a real action // upstream — a new connection or a non-empty data write. A batch of - // only empty "data" polls (and possibly closes) leaves this false and - // qualifies for long-poll behavior in phase 2. + // only empty "data" / "udp_data" polls (and possibly closes) leaves + // this false and qualifies for long-poll behavior in phase 2. let mut had_writes_or_connects = false; enum NewConn { Connect(TunnelResponse), ConnectData(Result), + UdpOpen(Result), } let mut new_conn_jobs: JoinSet<(usize, NewConn)> = JoinSet::new(); @@ -470,6 +641,19 @@ async fn handle_batch( (i, NewConn::ConnectData(r)) }); } + "udp_open" => { + had_writes_or_connects = true; + let state = state.clone(); + let host = op.host.clone(); + let port = op.port; + let d = op.d.clone(); + new_conn_jobs.spawn(async move { + let r = handle_udp_open_phase1(&state, host, port, d) + .await + .map(|(sid, _inner)| sid); + (i, NewConn::UdpOpen(r)) + }); + } "data" => { let sid = match &op.sid { Some(s) if !s.is_empty() => s.clone(), @@ -493,12 +677,53 @@ async fn handle_batch( } } drop(sessions); - data_ops.push((i, sid)); + tcp_drains.push((i, sid)); } else { drop(sessions); - results.push((i, TunnelResponse { - sid: Some(sid), d: None, eof: Some(true), e: None, code: None, - })); + results.push((i, eof_response(sid))); + } + } + "udp_data" => { + let sid = match &op.sid { + Some(s) if !s.is_empty() => s.clone(), + _ => { results.push((i, TunnelResponse::error("missing sid"))); continue; } + }; + + let inner = { + let sessions = state.udp_sessions.lock().await; + sessions.get(&sid).map(|s| s.inner.clone()) + }; + if let Some(inner) = inner { + let mut had_uplink = false; + if let Some(ref data_b64) = op.d { + if !data_b64.is_empty() { + let bytes = match B64.decode(data_b64) { + Ok(b) => b, + Err(e) => { + results.push(( + i, + TunnelResponse::error(format!("bad base64: {}", e)), + )); + continue; + } + }; + if !bytes.is_empty() { + had_writes_or_connects = true; + had_uplink = true; + let _ = inner.socket.send(&bytes).await; + } + } + } + // last_active is bumped only on real activity: + // outbound here, or inbound in udp_reader_task. + // Empty long-poll batches must not refresh it, else + // the idle reaper never fires. + if had_uplink { + *inner.last_active.lock().await = Instant::now(); + } + udp_drains.push((i, sid)); + } else { + results.push((i, eof_response(sid))); } } "close" => { @@ -511,58 +736,64 @@ async fn handle_batch( } } - // Await all concurrent connect / connect_data jobs. For connect_data, - // successful ones join the data-drain set in phase 2; plain connects - // go straight to results because they have no initial data to drain. + // Await all concurrent connect / connect_data / udp_open jobs. + // Successful drain-bearing ones join the appropriate drain list; + // plain connects go straight to results. while let Some(join) = new_conn_jobs.join_next().await { match join { Ok((i, NewConn::Connect(r))) => results.push((i, r)), - Ok((i, NewConn::ConnectData(Ok(sid)))) => data_ops.push((i, sid)), + Ok((i, NewConn::ConnectData(Ok(sid)))) => tcp_drains.push((i, sid)), Ok((i, NewConn::ConnectData(Err(r)))) => results.push((i, r)), + Ok((i, NewConn::UdpOpen(Ok(sid)))) => udp_drains.push((i, sid)), + Ok((i, NewConn::UdpOpen(Err(r)))) => results.push((i, r)), Err(e) => { tracing::error!("new-connection task panicked: {}", e); } } } - // Phase 2: signal-driven wait for any session to have data (or hit - // EOF), then drain everyone in a single pass. The deadline is - // adaptive: - // * `ACTIVE_DRAIN_DEADLINE` (~350 ms) when the batch had real work - // — typical responses arrive in ms and `wait_for_any_drainable` - // returns on the first notify. After the first wake we settle - // for `STRAGGLER_SETTLE` so neighboring sessions whose replies - // land just behind the first one don't get reported empty. - // * `LONGPOLL_DEADLINE` when the batch is a pure poll — no writes, - // no new connections. The response is held open until upstream - // pushes data, turning idle sessions into a true long-poll - // without paying per-poll latency. No straggler settle here: - // the wake event IS the data the client wants, so deliver it - // immediately. - if !data_ops.is_empty() { + // Phase 2: signal-driven wait for any session (TCP or UDP) to have + // data, then drain TCP and UDP independently in a single pass each. + // Deadlines: + // * `ACTIVE_DRAIN_DEADLINE` (~350 ms) when the batch had real work. + // Typical responses arrive in ms; the wait helpers return on + // the first notify. For active batches we settle for + // `STRAGGLER_SETTLE` so neighbors whose replies trail by a few + // ms aren't reported empty. + // * `LONGPOLL_DEADLINE` for pure-poll batches — held open until + // upstream pushes data. UDP idle polls benefit from this just + // as much as TCP, so the same window applies. + if !tcp_drains.is_empty() || !udp_drains.is_empty() { let deadline = if had_writes_or_connects { ACTIVE_DRAIN_DEADLINE } else { LONGPOLL_DEADLINE }; - // Snapshot SessionInner Arcs under a single lock so we don't - // hold the sessions-map lock across the await. - let inners: Vec> = { + let tcp_inners: Vec> = { let sessions = state.sessions.lock().await; - data_ops + tcp_drains + .iter() + .filter_map(|(_, sid)| sessions.get(sid).map(|s| s.inner.clone())) + .collect() + }; + let udp_inners: Vec> = { + let sessions = state.udp_sessions.lock().await; + udp_drains .iter() .filter_map(|(_, sid)| sessions.get(sid).map(|s| s.inner.clone())) .collect() }; let wait_start = Instant::now(); - wait_for_any_drainable(&inners, deadline).await; + // Wait for either side to wake. Running both concurrently means + // a TCP-only batch isn't slowed by a stale UDP watch list, and + // vice versa. + tokio::join!( + wait_for_any_drainable(&tcp_inners, deadline), + wait_for_any_udp_drainable(&udp_inners, deadline), + ); - // Straggler settle: only for active batches, only if we woke - // early (didn't hit the deadline). Capped by the remaining - // deadline budget — `saturating_sub` so a future refactor that - // ever lets `elapsed > deadline` slip through can't underflow. if had_writes_or_connects { let remaining = deadline.saturating_sub(wait_start.elapsed()); if !remaining.is_zero() { @@ -570,34 +801,42 @@ async fn handle_batch( } } - // Single drain pass for all sessions in this batch. - { + // ---- TCP drain ---- + if !tcp_drains.is_empty() { let sessions = state.sessions.lock().await; - for (i, sid) in &data_ops { + for (i, sid) in &tcp_drains { if let Some(session) = sessions.get(sid) { let (data, eof) = drain_now(&session.inner).await; - results.push((*i, TunnelResponse { - sid: Some(sid.clone()), - d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, - eof: Some(eof), e: None, code: None, - })); + results.push((*i, tcp_drain_response(sid.clone(), data, eof))); } else { - results.push((*i, TunnelResponse { - sid: Some(sid.clone()), d: None, eof: Some(true), e: None, code: None, - })); + results.push((*i, eof_response(sid.clone()))); + } + } + drop(sessions); + + // Clean up eof TCP sessions. + let mut sessions = state.sessions.lock().await; + for (_, sid) in &tcp_drains { + if let Some(s) = sessions.get(sid) { + if s.inner.eof.load(Ordering::Acquire) { + if let Some(s) = sessions.remove(sid) { + s.reader_handle.abort(); + tracing::info!("session {} closed by remote (batch)", sid); + } + } } } } - // Clean up eof sessions - let mut sessions = state.sessions.lock().await; - for (_, sid) in &data_ops { - if let Some(s) = sessions.get(sid) { - if s.inner.eof.load(Ordering::Acquire) { - if let Some(s) = sessions.remove(sid) { - s.reader_handle.abort(); - tracing::info!("session {} closed by remote (batch)", sid); - } + // ---- UDP drain ---- + if !udp_drains.is_empty() { + let sessions = state.udp_sessions.lock().await; + for (i, sid) in &udp_drains { + if let Some(session) = sessions.get(sid) { + let packets = drain_udp_now(&session.inner).await; + results.push((*i, udp_drain_response(sid.clone(), packets))); + } else { + results.push((*i, eof_response(sid.clone()))); } } } @@ -613,6 +852,44 @@ async fn handle_batch( (StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], json) } +fn tcp_drain_response(sid: String, data: Vec, eof: bool) -> TunnelResponse { + TunnelResponse { + sid: Some(sid), + d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, + pkts: None, + eof: Some(eof), + e: None, + code: None, + } +} + +fn udp_drain_response(sid: String, packets: Vec>) -> TunnelResponse { + let pkts = if packets.is_empty() { + None + } else { + Some(packets.iter().map(|p| B64.encode(p)).collect()) + }; + TunnelResponse { + sid: Some(sid), + d: None, + pkts, + eof: Some(false), + e: None, + code: None, + } +} + +fn eof_response(sid: String) -> TunnelResponse { + TunnelResponse { + sid: Some(sid), + d: None, + pkts: None, + eof: Some(true), + e: None, + code: None, + } +} + fn decompress_gzip(data: &[u8]) -> Result, String> { use std::io::Read; let mut decoder = flate2::read::GzDecoder::new(data); @@ -652,7 +929,7 @@ async fn handle_connect(state: &AppState, host: Option, port: Option {}:{}", sid, host, port); state.sessions.lock().await.insert(sid.clone(), session); - TunnelResponse { sid: Some(sid), d: None, eof: Some(false), e: None, code: None } + TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(false), e: None, code: None } } /// Open a session and write the client's first bytes in one round trip. @@ -704,6 +981,47 @@ async fn handle_connect_data_phase1( Ok((sid, inner)) } +/// UDP analogue of `handle_connect_data_phase1`. Opens a connected UDP +/// socket to `(host, port)` and optionally sends the client's first +/// datagram in the same op so a request-response flow (e.g. DNS, STUN) +/// saves a round trip on session establishment. +async fn handle_udp_open_phase1( + state: &AppState, + host: Option, + port: Option, + data: Option, +) -> Result<(String, Arc), TunnelResponse> { + let (host, port) = validate_host_port(host, port)?; + + let session = create_udp_session(&host, port) + .await + .map_err(|e| TunnelResponse::error(format!("udp connect failed: {}", e)))?; + + if let Some(ref data_b64) = data { + if !data_b64.is_empty() { + let bytes = match B64.decode(data_b64) { + Ok(b) => b, + Err(e) => { + session.reader_handle.abort(); + return Err(TunnelResponse::error(format!("bad base64: {}", e))); + } + }; + if !bytes.is_empty() { + if let Err(e) = session.inner.socket.send(&bytes).await { + session.reader_handle.abort(); + return Err(TunnelResponse::error(format!("udp write failed: {}", e))); + } + } + } + } + + let inner = session.inner.clone(); + let sid = uuid::Uuid::new_v4().to_string(); + tracing::info!("udp session {} -> {}:{}", sid, host, port); + state.udp_sessions.lock().await.insert(sid.clone(), session); + Ok((sid, inner)) +} + async fn handle_connect_data_single( state: &AppState, host: Option, @@ -724,6 +1042,7 @@ async fn handle_connect_data_single( TunnelResponse { sid: Some(sid), d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, + pkts: None, eof: Some(eof), e: None, code: None, @@ -767,6 +1086,7 @@ async fn handle_data_single(state: &AppState, sid: Option, data: Option< TunnelResponse { sid: Some(sid), d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, + pkts: None, eof: Some(eof), e: None, code: None, } } @@ -780,35 +1100,73 @@ async fn handle_close(state: &AppState, sid: Option) -> TunnelResponse { s.reader_handle.abort(); tracing::info!("session {} closed by client", sid); } - TunnelResponse { sid: Some(sid), d: None, eof: Some(true), e: None, code: None } + if let Some(s) = state.udp_sessions.lock().await.remove(&sid) { + s.reader_handle.abort(); + tracing::info!("udp session {} closed by client", sid); + } + TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(true), e: None, code: None } } // --------------------------------------------------------------------------- // Cleanup // --------------------------------------------------------------------------- -async fn cleanup_task(sessions: Arc>>) { +async fn cleanup_task( + sessions: Arc>>, + udp_sessions: Arc>>, +) { let mut interval = tokio::time::interval(Duration::from_secs(30)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; - let mut map = sessions.lock().await; let now = Instant::now(); - let mut stale = Vec::new(); - for (k, s) in map.iter() { - let last = *s.inner.last_active.lock().await; - if now.duration_since(last) > Duration::from_secs(300) { - stale.push(k.clone()); + + { + let mut map = sessions.lock().await; + let mut stale = Vec::new(); + for (k, s) in map.iter() { + let last = *s.inner.last_active.lock().await; + if now.duration_since(last) > Duration::from_secs(300) { + stale.push(k.clone()); + } + } + for k in &stale { + if let Some(s) = map.remove(k) { + s.reader_handle.abort(); + tracing::info!("reaped idle session {}", k); + } + } + if !stale.is_empty() { + tracing::info!("cleanup: reaped {}, {} active", stale.len(), map.len()); } } - for k in &stale { - if let Some(s) = map.remove(k) { - s.reader_handle.abort(); - tracing::info!("reaped idle session {}", k); + + { + // UDP sessions get a tighter idle window because UDP flows + // are typically short-lived (DNS, STUN, single-RTT QUIC) or + // make their own keepalives. 120 s avoids leaking sockets + // for one-shot lookups while keeping calls/streams alive. + let mut map = udp_sessions.lock().await; + let mut stale = Vec::new(); + for (k, s) in map.iter() { + let last = *s.inner.last_active.lock().await; + if now.duration_since(last) > Duration::from_secs(120) { + stale.push(k.clone()); + } + } + for k in &stale { + if let Some(s) = map.remove(k) { + s.reader_handle.abort(); + tracing::info!("reaped idle udp session {}", k); + } + } + if !stale.is_empty() { + tracing::info!( + "cleanup: reaped {}, {} active udp", + stale.len(), + map.len() + ); } - } - if !stale.is_empty() { - tracing::info!("cleanup: reaped {}, {} active", stale.len(), map.len()); } } } @@ -837,9 +1195,11 @@ async fn main() { let sessions: Arc>> = Arc::new(Mutex::new(HashMap::new())); - tokio::spawn(cleanup_task(sessions.clone())); + let udp_sessions: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + tokio::spawn(cleanup_task(sessions.clone(), udp_sessions.clone())); - let state = AppState { sessions, auth_key }; + let state = AppState { sessions, udp_sessions, auth_key }; let app = Router::new() .route("/tunnel", post(handle_tunnel)) @@ -872,10 +1232,25 @@ mod tests { fn fresh_state() -> AppState { AppState { sessions: Arc::new(Mutex::new(HashMap::new())), + udp_sessions: Arc::new(Mutex::new(HashMap::new())), auth_key: "test-key".into(), } } + async fn start_udp_echo_server() -> u16 { + let socket = UdpSocket::bind(("127.0.0.1", 0)).await.unwrap(); + let port = socket.local_addr().unwrap().port(); + tokio::spawn(async move { + let mut buf = [0u8; 2048]; + if let Ok((n, peer)) = socket.recv_from(&mut buf).await { + let mut out = b"ECHO: ".to_vec(); + out.extend_from_slice(&buf[..n]); + let _ = socket.send_to(&out, peer).await; + } + }); + port + } + /// Spin up a one-shot TCP server that echoes everything it reads back /// with a `"ECHO: "` prefix, then returns the bound port. async fn start_echo_server() -> u16 { @@ -1371,4 +1746,129 @@ mod tests { let data = B64.decode(d_b64).unwrap(); assert_eq!(&data[..], b"DELAYED"); } + + // --------------------------------------------------------------------- + // UDP path + // --------------------------------------------------------------------- + + #[tokio::test] + async fn udp_open_writes_initial_datagram_and_buffers_reply() { + let port = start_udp_echo_server().await; + let state = fresh_state(); + + let (sid, inner) = handle_udp_open_phase1( + &state, + Some("127.0.0.1".into()), + Some(port), + Some(B64.encode(b"ping")), + ) + .await + .expect("udp open should succeed"); + + assert!(state.udp_sessions.lock().await.contains_key(&sid)); + wait_for_any_udp_drainable(std::slice::from_ref(&inner), Duration::from_secs(2)).await; + let packets = drain_udp_now(&inner).await; + assert_eq!(packets, vec![b"ECHO: ping".to_vec()]); + } + + /// When the upstream sends faster than the relay drains, the queue + /// must drop oldest packets (so recent voice/video stays current) + /// AND increment the counter so operators can correlate user + /// reports of choppiness with relay backpressure. + #[tokio::test] + async fn udp_queue_overflow_drops_oldest_and_counts() { + let state = fresh_state(); + let sink = UdpSocket::bind(("127.0.0.1", 0)).await.unwrap(); + let sink_port = sink.local_addr().unwrap().port(); + + let (_sid, inner) = + handle_udp_open_phase1(&state, Some("127.0.0.1".into()), Some(sink_port), None) + .await + .expect("udp open"); + + // Flood the session socket from sink — its connected remote is + // exactly sink_port, so packets pass the kernel's source check. + let session_addr = inner.socket.local_addr().unwrap(); + let burst = UDP_QUEUE_LIMIT + 16; + for i in 0..burst { + let payload = format!("p{}", i).into_bytes(); + sink.send_to(&payload, session_addr).await.unwrap(); + } + // Give the reader_task a chance to drain the OS buffer. + for _ in 0..50 { + if inner.queue_drops.load(Ordering::Relaxed) > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + let drops = inner.queue_drops.load(Ordering::Relaxed); + let queued = inner.packets.lock().await.len(); + assert!(drops >= 1, "expected ≥1 drop, got {} (queued={})", drops, queued); + assert!(queued <= UDP_QUEUE_LIMIT, "queue exceeded limit: {}", queued); + } + + /// Regression for the bug the review caught: a batch mixing UDP and + /// TCP-data ops must let the TCP side benefit from the same + /// event-driven drain. With the new architecture both sides share + /// one wait_start / deadline window — ensure a delayed TCP response + /// still makes it into the batch even when UDP is along for the ride. + #[tokio::test] + async fn tcp_drain_runs_when_batch_also_contains_udp() { + use axum::body::Bytes; + use axum::extract::State; + + // TCP server that delays its response past the typical wake but + // well within ACTIVE_DRAIN_DEADLINE (350ms). + let tcp_listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap(); + let tcp_port = tcp_listener.local_addr().unwrap().port(); + tokio::spawn(async move { + if let Ok((mut sock, _)) = tcp_listener.accept().await { + let mut buf = [0u8; 64]; + let _ = sock.read(&mut buf).await; + tokio::time::sleep(Duration::from_millis(120)).await; + let _ = sock.write_all(b"DELAYED").await; + let _ = sock.flush().await; + } + }); + + // Idle UDP target — never replies. Just sets up the dual-drain + // path through Phase 2. + let udp_target = UdpSocket::bind(("127.0.0.1", 0)).await.unwrap(); + let udp_port = udp_target.local_addr().unwrap().port(); + + let state = fresh_state(); + let tcp_sid = match handle_connect(&state, Some("127.0.0.1".into()), Some(tcp_port)).await { + TunnelResponse { + sid: Some(s), + e: None, + .. + } => s, + other => panic!("connect failed: {:?}", other), + }; + let (udp_sid, _udp_inner) = + handle_udp_open_phase1(&state, Some("127.0.0.1".into()), Some(udp_port), None) + .await + .expect("udp open"); + + let body = serde_json::json!({ + "k": "test-key", + "ops": [ + {"op": "data", "sid": tcp_sid, "d": B64.encode(b"hello")}, + {"op": "udp_data", "sid": udp_sid}, + ] + }) + .to_string(); + let resp = handle_batch(State(state.clone()), Bytes::from(body)) + .await + .into_response(); + let (parts, body) = resp.into_parts(); + assert_eq!(parts.status, axum::http::StatusCode::OK); + let body_bytes = axum::body::to_bytes(body, 64 * 1024).await.unwrap(); + let parsed: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap(); + let r = parsed["r"].as_array().unwrap(); + assert_eq!(r.len(), 2); + let tcp_d = r[0]["d"].as_str().expect("tcp data missing"); + let decoded = B64.decode(tcp_d).unwrap(); + assert_eq!(&decoded[..], b"DELAYED"); + } }