feat(udp): SOCKS5 UDP ASSOCIATE relay through full tunnel

Adds end-to-end UDP support: SOCKS5 client UDP ASSOCIATE → tunnel-mux
udp_open/udp_data ops → tunnel-node UDP sessions → real UDP to upstream.
QUIC/HTTP3, DNS, and STUN now traverse full mode without falling back to
TCP or leaking outside the tunnel.

Apps Script proxies the new ops opaquely through the existing batch
endpoint; CodeFull.gs only gets a doc-comment update.

Highlights:
- proxy_server.rs: SOCKS5 UDP ASSOCIATE handler with per-session task,
  bounded uplink mpsc channel, adaptive empty-poll backoff (500 ms → 30 s),
  source-IP validation against the control TCP peer, port-locking on
  first valid datagram, and self-removal from the dispatch map on eof.
- tunnel_client.rs: UdpOpen / UdpData / close_session mux variants
  alongside the existing TCP plumbing; pkts decoder helper.
- tunnel-node: UdpSessionInner with bounded VecDeque queue, drop-oldest
  on overflow with queue_drops counter and warn-then-throttled logs,
  last_active refreshed only on real activity (uplink send or upstream
  recv — empty polls do not refresh), independent TCP/UDP drain in
  handle_batch Phase 2, separate active-drain (150 ms) and retry
  (250 ms) windows for UDP, idle long-poll (5 s).
- Tests: SOCKS5 UDP packet parser (IPv4/IPv6/DOMAIN round-trips,
  truncation rejects, fragmented rejects), UDP queue overflow drop +
  counter, regression test that batch with both UDP and TCP-data ops
  still runs the TCP retry pass.

Docs: README + android.{md,fa.md} updated to reflect UDP availability
in full mode; tunnel-node/README documents the new ops.
This commit is contained in:
dazzling-no-more
2026-04-25 13:38:01 +04:00
parent fb552c227d
commit 40c2b6c509
9 changed files with 1314 additions and 151 deletions
+1 -1
View File
@@ -267,7 +267,7 @@ HTTP/HTTPS continues to route through the Apps Script relay (no change), and the
## Full tunnel mode ## Full tunnel mode
Full tunnel mode (`"mode": "full"`) routes **all** traffic end-to-end through Apps Script and a remote [tunnel-node](tunnel-node/) — no MITM certificate needed. The trade-off is higher latency per request (every byte goes Apps Script → tunnel-node → destination), but it works for every protocol and every app without CA installation. Full tunnel mode (`"mode": "full"`) routes **all** traffic end-to-end through Apps Script and a remote [tunnel-node](tunnel-node/) — no MITM certificate needed. TCP is carried as persistent tunnel sessions, and UDP from Android/TUN clients is carried via SOCKS5 `UDP ASSOCIATE` to the tunnel-node, which then emits real UDP from the server side. The trade-off is higher latency per request (every byte/datagram goes Apps Script → tunnel-node → destination), but it works for protocols and apps that cannot use the MITM relay path.
### How deployment IDs affect performance ### How deployment IDs affect performance
+2
View File
@@ -6,6 +6,8 @@
* 2. Batch relay: POST { k, q: [{m,u,h,b,ct,r}, ...] } → { q: [{s,h,b}, ...] } * 2. Batch relay: POST { k, q: [{m,u,h,b,ct,r}, ...] } → { q: [{s,h,b}, ...] }
* 3. Tunnel: POST { k, t, h, p, sid, d } → { sid, d, eof } * 3. Tunnel: POST { k, t, h, p, sid, d } → { sid, d, eof }
* 4. Tunnel batch: POST { k, t:"batch", ops:[...] } → { r: [...] } * 4. Tunnel batch: POST { k, t:"batch", ops:[...] } → { r: [...] }
* Batch ops include TCP (`connect`, `data`) and UDP (`udp_open`,
* `udp_data`) tunnel-node operations.
* *
* CHANGE THESE TO YOUR OWN VALUES! * CHANGE THESE TO YOUR OWN VALUES!
*/ */
+2 -2
View File
@@ -234,9 +234,9 @@ VpnService TUN ──► tun2proxy (داخل فرایند)
**سایت‌هایی که فقط بارگذاری اول را `gate` می‌کنند** (اکثر مشتریان `Bot Fight Mode` کلادفلر) بعد از یک حل بی‌مشکل کار می‌کنند. سایت‌هایی که هر درخواست `challenge` می‌زنند (صرافی‌های رمزارز، بزرگسال، بعضی فوروم‌ها) ذاتاً با این معماری نمی‌شوند — برایشان از تونل دیگری استفاده کنید. **سایت‌هایی که فقط بارگذاری اول را `gate` می‌کنند** (اکثر مشتریان `Bot Fight Mode` کلادفلر) بعد از یک حل بی‌مشکل کار می‌کنند. سایت‌هایی که هر درخواست `challenge` می‌زنند (صرافی‌های رمزارز، بزرگسال، بعضی فوروم‌ها) ذاتاً با این معماری نمی‌شوند — برایشان از تونل دیگری استفاده کنید.
### UDP / QUIC (HTTP/3) رد نمی‌شود ### UDP / QUIC (HTTP/3)
`SOCKS5 listener` فقط `CONNECT` را می‌فهمد، نه `UDP ASSOCIATE`. `Chrome` اول `HTTP/3` را امتحان می‌کند و به `HTTP/2 over TCP` برمی‌گردد — که از پروکسی رد می‌شود. اثر: اولین اتصال کمی کندتر، بقیه چیزها عادی. در حالت `full`SOCKS5 listener` دستور `UDP ASSOCIATE` را هم می‌فهمد و دیتاگرام‌های UDP را داخل مسیر Apps Script تا `tunnel-node` می‌برد؛ بعد `tunnel-node` از سمت سرور UDP واقعی به مقصد می‌فرستد. ISP شما همچنان فقط HTTPS به Google می‌بیند. در حالت `apps_script`، UDP هنوز مسیر قدیمی را دارد: `Chrome` اول `HTTP/3` را امتحان می‌کند و بعد به `HTTP/2 over TCP` برمی‌گردد.
### نشت IPv6 ### نشت IPv6
+2 -2
View File
@@ -221,9 +221,9 @@ Cloudflare's `cf_clearance` cookie is bound to the `(IP, UA, JA3)` tuple the cha
**Sites that only gate the first page load** (most of CF's Bot Fight Mode customers) work fine after one solve. Sites that challenge every request (crypto exchanges, adult, some forums) fundamentally can't hold a session through this architecture — use a different tunnel for those. **Sites that only gate the first page load** (most of CF's Bot Fight Mode customers) work fine after one solve. Sites that challenge every request (crypto exchanges, adult, some forums) fundamentally can't hold a session through this architecture — use a different tunnel for those.
### UDP / QUIC (HTTP/3) doesn't go through ### UDP / QUIC (HTTP/3)
The SOCKS5 listener only handles `CONNECT`, not `UDP ASSOCIATE`. Chrome tries HTTP/3 first and falls back to HTTP/2 over TCP, which works fine. Effect: slightly slower first connect, everything else normal. In `full` mode, the SOCKS5 listener handles `UDP ASSOCIATE` and tunnels UDP datagrams through Apps Script to `tunnel-node`, which then sends real UDP to the destination. Your ISP still only sees HTTPS to Google. In `apps_script` mode, UDP still falls back the old way: Chrome tries HTTP/3 first and then uses HTTP/2 over TCP.
### IPv6 leaks ### IPv6 leaks
+3
View File
@@ -181,6 +181,9 @@ pub struct TunnelResponse {
pub sid: Option<String>, pub sid: Option<String>,
#[serde(default)] #[serde(default)]
pub d: Option<String>, pub d: Option<String>,
/// UDP datagrams returned by tunnel-node, base64-encoded individually.
#[serde(default)]
pub pkts: Option<Vec<String>>,
#[serde(default)] #[serde(default)]
pub eof: Option<bool>, pub eof: Option<bool>,
#[serde(default)] #[serde(default)]
+514 -41
View File
@@ -1,20 +1,23 @@
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::Mutex; use tokio::sync::{mpsc, Mutex};
use tokio_rustls::rustls::client::danger::{ use tokio_rustls::rustls::client::danger::{
HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier, HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier,
}; };
use tokio_rustls::rustls::pki_types::{CertificateDer, ServerName, UnixTime}; use tokio_rustls::rustls::pki_types::{CertificateDer, ServerName, UnixTime};
use tokio_rustls::rustls::{ClientConfig, DigitallySignedStruct, SignatureScheme};
use tokio_rustls::rustls::server::Acceptor; use tokio_rustls::rustls::server::Acceptor;
use tokio_rustls::rustls::{ClientConfig, DigitallySignedStruct, SignatureScheme};
use tokio_rustls::{LazyConfigAcceptor, TlsAcceptor, TlsConnector}; use tokio_rustls::{LazyConfigAcceptor, TlsAcceptor, TlsConnector};
use crate::config::{Config, Mode}; use crate::config::{Config, Mode};
use crate::domain_fronter::DomainFronter; use crate::domain_fronter::DomainFronter;
use crate::mitm::MitmCertManager; use crate::mitm::MitmCertManager;
use crate::tunnel_client::TunnelMux; use crate::tunnel_client::{decode_udp_packets, TunnelMux};
// Domains that are served from Google's core frontend IP pool and therefore // Domains that are served from Google's core frontend IP pool and therefore
// respond correctly when we connect to `google_ip` with SNI=`front_domain` // respond correctly when we connect to `google_ip` with SNI=`front_domain`
@@ -182,9 +185,8 @@ impl ProxyServer {
// `script_id`, which is exactly the state a bootstrapping user is in. // `script_id`, which is exactly the state a bootstrapping user is in.
let fronter = match mode { let fronter = match mode {
Mode::AppsScript | Mode::Full => { Mode::AppsScript | Mode::Full => {
let f = DomainFronter::new(config).map_err(|e| { let f = DomainFronter::new(config)
std::io::Error::new(std::io::ErrorKind::Other, format!("{e}")) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e}")))?;
})?;
Some(Arc::new(f)) Some(Arc::new(f))
} }
Mode::GoogleOnly => None, Mode::GoogleOnly => None,
@@ -320,7 +322,8 @@ impl ProxyServer {
let rewrite_ctx = http_ctx.clone(); let rewrite_ctx = http_ctx.clone();
let mux = http_mux.clone(); let mux = http_mux.clone();
children.spawn(async move { children.spawn(async move {
if let Err(e) = handle_http_client(sock, fronter, mitm, rewrite_ctx, mux).await { if let Err(e) = handle_http_client(sock, fronter, mitm, rewrite_ctx, mux).await
{
tracing::debug!("http client {} closed: {}", peer, e); tracing::debug!("http client {} closed: {}", peer, e);
} }
}); });
@@ -356,7 +359,9 @@ impl ProxyServer {
let rewrite_ctx = socks_ctx.clone(); let rewrite_ctx = socks_ctx.clone();
let mux = socks_mux.clone(); let mux = socks_mux.clone();
children.spawn(async move { children.spawn(async move {
if let Err(e) = handle_socks5_client(sock, fronter, mitm, rewrite_ctx, mux).await { if let Err(e) =
handle_socks5_client(sock, fronter, mitm, rewrite_ctx, mux).await
{
tracing::debug!("socks client {} closed: {}", peer, e); tracing::debug!("socks client {} closed: {}", peer, e);
} }
}); });
@@ -510,8 +515,8 @@ async fn handle_socks5_client(
return Ok(()); return Ok(());
} }
let cmd = req[1]; let cmd = req[1];
if cmd != 0x01 { if cmd != 0x01 && cmd != 0x03 {
// CONNECT only. // CONNECT and UDP ASSOCIATE only.
sock.write_all(&[0x05, 0x07, 0x00, 0x01, 0, 0, 0, 0, 0, 0]) sock.write_all(&[0x05, 0x07, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
.await?; .await?;
return Ok(()); return Ok(());
@@ -546,6 +551,11 @@ async fn handle_socks5_client(
sock.read_exact(&mut port_buf).await?; sock.read_exact(&mut port_buf).await?;
let port = u16::from_be_bytes(port_buf); let port = u16::from_be_bytes(port_buf);
if cmd == 0x03 {
tracing::info!("SOCKS5 UDP ASSOCIATE requested for {}:{}", host, port);
return handle_socks5_udp_associate(sock, rewrite_ctx, tunnel_mux).await;
}
tracing::info!("SOCKS5 CONNECT -> {}:{}", host, port); tracing::info!("SOCKS5 CONNECT -> {}:{}", host, port);
// Success reply with zeroed BND. // Success reply with zeroed BND.
@@ -556,6 +566,372 @@ async fn handle_socks5_client(
dispatch_tunnel(sock, host, port, fronter, mitm, rewrite_ctx, tunnel_mux).await dispatch_tunnel(sock, host, port, fronter, mitm, rewrite_ctx, tunnel_mux).await
} }
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct SocksUdpTarget {
host: String,
port: u16,
atyp: u8,
addr: Vec<u8>,
}
/// Per-target relay session state shared between the dispatch loop and
/// the per-session task. The dispatch loop pushes uplink datagrams via
/// `uplink`; the task drains the upstream and serializes both directions
/// onto a single tunnel-mux call at a time.
struct UdpRelaySession {
uplink: mpsc::Sender<Vec<u8>>,
}
/// SOCKS5 UDP request frame: 4-byte header + atyp-specific address + 2-byte
/// port + payload. DOMAIN atyp uses a 1-byte length prefix + up to 255
/// bytes, so the largest header is `4 + 1 + 255 + 2 = 262`. Round to 300
/// for safety; payload itself can be a full 64 KB datagram.
const SOCKS5_UDP_RECV_BUF_BYTES: usize = 65535 + 300;
/// Bound on per-session uplink queue depth. UDP is lossy by design — if
/// the per-session task can't keep up, drop the newest datagram (caller
/// uses `try_send`) instead of stalling the whole UDP relay loop.
const UDP_UPLINK_QUEUE: usize = 64;
/// Initial poll spacing when a session is idle. Tunnel-node already
/// long-polls each empty `udp_data` for up to 5 s, so this is a
/// client-side floor — bursts of upstream packets reset back to this.
const UDP_INITIAL_POLL_DELAY: Duration = Duration::from_millis(500);
/// Cap on the exponential backoff for an idle session. After this many
/// seconds of zero traffic in either direction, polls happen at most
/// once per `UDP_MAX_POLL_DELAY` plus the tunnel-node long-poll window —
/// so an idle UDP destination costs roughly one batch slot every 35 s.
const UDP_MAX_POLL_DELAY: Duration = Duration::from_secs(30);
async fn handle_socks5_udp_associate(
mut control: TcpStream,
rewrite_ctx: Arc<RewriteCtx>,
tunnel_mux: Option<Arc<TunnelMux>>,
) -> std::io::Result<()> {
if rewrite_ctx.mode != Mode::Full {
tracing::debug!("UDP ASSOCIATE rejected: only full mode supports UDP tunneling");
write_socks5_reply(&mut control, 0x07, None).await?;
return Ok(());
}
let Some(mux) = tunnel_mux else {
tracing::debug!("UDP ASSOCIATE rejected: full mode has no tunnel mux");
write_socks5_reply(&mut control, 0x01, None).await?;
return Ok(());
};
// Per RFC 1928 §6 the UDP relay only accepts datagrams from the
// SOCKS5 client. We pin the source IP to the control TCP peer up
// front so a third party on the bind interface can't hijack the
// session by sending the first datagram.
let client_peer_ip = control.peer_addr()?.ip();
// The local TUN bridge talks to us over loopback. Binding the UDP relay
// there avoids exposing an unauthenticated UDP socket on LAN interfaces.
let bind_ip = match control.local_addr()?.ip() {
IpAddr::V4(ip) if ip.is_unspecified() => IpAddr::V4(Ipv4Addr::LOCALHOST),
ip => ip,
};
let udp = Arc::new(UdpSocket::bind(SocketAddr::new(bind_ip, 0)).await?);
write_socks5_reply(&mut control, 0x00, Some(udp.local_addr()?)).await?;
tracing::info!(
"SOCKS5 UDP relay bound on {} for client {}",
udp.local_addr()?,
client_peer_ip
);
let mut buf = vec![0u8; SOCKS5_UDP_RECV_BUF_BYTES];
let mut control_buf = [0u8; 1];
let mut client_addr: Option<SocketAddr> = None;
let sessions: Arc<Mutex<HashMap<SocksUdpTarget, UdpRelaySession>>> =
Arc::new(Mutex::new(HashMap::new()));
loop {
tokio::select! {
recv = udp.recv_from(&mut buf) => {
let (n, peer) = match recv {
Ok(v) => v,
Err(e) => {
tracing::debug!("udp associate recv failed: {}", e);
break;
}
};
// Source-IP check: anything not from the SOCKS5 client's
// host is dropped silently. After the first valid packet,
// also lock to its source port (RFC 1928 §6).
if peer.ip() != client_peer_ip {
continue;
}
if let Some(existing) = client_addr {
if existing != peer {
continue;
}
} else {
tracing::info!("UDP relay locked to client {}", peer);
client_addr = Some(peer);
}
let Some((target, payload)) = parse_socks5_udp_packet(&buf[..n]) else {
continue;
};
let payload = payload.to_vec();
// Fast path: existing session — push payload onto its
// bounded uplink queue, drop on overflow (UDP semantics).
{
let sess = sessions.lock().await;
if let Some(session) = sess.get(&target) {
let _ = session.uplink.try_send(payload);
continue;
}
}
// New target: open via tunnel-node and spawn the per-session
// task. The first datagram rides the udp_open op so we
// save one round trip on session establishment.
let resp = match mux.udp_open(&target.host, target.port, payload).await {
Ok(r) => r,
Err(e) => {
tracing::debug!(
"udp open {}:{} failed: {}",
target.host, target.port, e
);
continue;
}
};
if let Some(ref e) = resp.e {
tracing::debug!("udp open {}:{} failed: {}", target.host, target.port, e);
continue;
}
let Some(sid) = resp.sid.clone() else {
tracing::debug!(
"udp open {}:{} returned no sid",
target.host, target.port
);
continue;
};
send_udp_response_packets(&udp, peer, &target, &resp).await;
let (uplink_tx, uplink_rx) = mpsc::channel::<Vec<u8>>(UDP_UPLINK_QUEUE);
let task_mux = mux.clone();
let task_udp = udp.clone();
let task_target = target.clone();
let task_sessions = sessions.clone();
let task_sid = sid.clone();
tokio::spawn(async move {
udp_session_task(
task_mux,
task_udp,
task_sid,
task_target.clone(),
peer,
uplink_rx,
)
.await;
// On exit (eof / mux error / channel close) remove
// ourselves from the dispatch map so a future packet
// to the same target opens a fresh tunnel-node session.
task_sessions.lock().await.remove(&task_target);
});
sessions
.lock()
.await
.insert(target, UdpRelaySession { uplink: uplink_tx });
}
read = control.read(&mut control_buf) => {
match read {
Ok(0) | Err(_) => break,
Ok(_) => {}
}
}
}
}
// Drop every uplink Sender. Each per-session task observes its
// receiver close, breaks out of select!, and issues close_session
// on the tunnel-node before exiting.
sessions.lock().await.clear();
Ok(())
}
/// Per-target relay task. Owns one tunnel-node UDP session and shuttles
/// datagrams in both directions through a single in-flight tunnel call
/// at a time. Two cancellation points:
/// * `uplink_rx.recv()` returns `None` when the dispatch loop drops
/// the matching `Sender` (SOCKS5 client gone, or session evicted).
/// * `mux.udp_data` returns eof / error when the tunnel-node session
/// is reaped or the target is unreachable.
async fn udp_session_task(
mux: Arc<TunnelMux>,
udp: Arc<UdpSocket>,
sid: String,
target: SocksUdpTarget,
client_addr: SocketAddr,
mut uplink_rx: mpsc::Receiver<Vec<u8>>,
) {
let mut backoff = UDP_INITIAL_POLL_DELAY;
loop {
// `biased;` prefers uplink so an active client doesn't get
// shadowed by a long sleep. Both branches are cancel-safe.
let resp = tokio::select! {
biased;
uplink = uplink_rx.recv() => {
let Some(payload) = uplink else { break; };
// Active uplink — reset the empty-poll backoff so the
// next inbound poll happens promptly.
backoff = UDP_INITIAL_POLL_DELAY;
match mux.udp_data(&sid, payload).await {
Ok(r) => r,
Err(e) => {
tracing::debug!("udp data {} failed: {}", sid, e);
break;
}
}
}
_ = tokio::time::sleep(backoff) => {
match mux.udp_data(&sid, Vec::new()).await {
Ok(r) => r,
Err(e) => {
tracing::debug!("udp poll {} failed: {}", sid, e);
break;
}
}
}
};
if resp.e.is_some() || resp.eof.unwrap_or(false) {
break;
}
let got_pkts = resp.pkts.as_ref().map(|p| !p.is_empty()).unwrap_or(false);
if got_pkts {
send_udp_response_packets(&udp, client_addr, &target, &resp).await;
backoff = UDP_INITIAL_POLL_DELAY;
} else {
// Empty poll — back off so an idle destination doesn't
// monopolize batch slots.
backoff = (backoff * 2).min(UDP_MAX_POLL_DELAY);
}
}
// Be polite even if the session is already gone server-side; the
// tunnel-node tolerates close on an unknown sid.
mux.close_session(&sid).await;
}
async fn send_udp_response_packets(
udp: &UdpSocket,
client_addr: SocketAddr,
target: &SocksUdpTarget,
resp: &crate::domain_fronter::TunnelResponse,
) {
let packets = match decode_udp_packets(resp) {
Ok(packets) => packets,
Err(e) => {
tracing::debug!("{}", e);
return;
}
};
for packet in packets {
let framed = build_socks5_udp_packet(target, &packet);
let _ = udp.send_to(&framed, client_addr).await;
}
}
async fn write_socks5_reply(
sock: &mut TcpStream,
rep: u8,
addr: Option<SocketAddr>,
) -> std::io::Result<()> {
let mut out = vec![0x05, rep, 0x00];
match addr {
Some(SocketAddr::V4(v4)) => {
out.push(0x01);
out.extend_from_slice(&v4.ip().octets());
out.extend_from_slice(&v4.port().to_be_bytes());
}
Some(SocketAddr::V6(v6)) => {
out.push(0x04);
out.extend_from_slice(&v6.ip().octets());
out.extend_from_slice(&v6.port().to_be_bytes());
}
None => {
out.push(0x01);
out.extend_from_slice(&[0, 0, 0, 0]);
out.extend_from_slice(&0u16.to_be_bytes());
}
}
sock.write_all(&out).await?;
sock.flush().await
}
fn parse_socks5_udp_packet(buf: &[u8]) -> Option<(SocksUdpTarget, &[u8])> {
if buf.len() < 4 || buf[0] != 0 || buf[1] != 0 || buf[2] != 0 {
return None;
}
let atyp = buf[3];
let mut pos = 4usize;
let (host, addr) = match atyp {
0x01 => {
if buf.len() < pos + 4 + 2 {
return None;
}
let addr = buf[pos..pos + 4].to_vec();
pos += 4;
let ip = std::net::Ipv4Addr::new(addr[0], addr[1], addr[2], addr[3]);
(ip.to_string(), addr)
}
0x03 => {
if buf.len() < pos + 1 {
return None;
}
let len = buf[pos] as usize;
pos += 1;
if len == 0 || buf.len() < pos + len + 2 {
return None;
}
let addr = buf[pos..pos + len].to_vec();
pos += len;
(String::from_utf8_lossy(&addr).into_owned(), addr)
}
0x04 => {
if buf.len() < pos + 16 + 2 {
return None;
}
let addr = buf[pos..pos + 16].to_vec();
pos += 16;
let mut octets = [0u8; 16];
octets.copy_from_slice(&addr);
(std::net::Ipv6Addr::from(octets).to_string(), addr)
}
_ => return None,
};
let port = u16::from_be_bytes([buf[pos], buf[pos + 1]]);
pos += 2;
Some((
SocksUdpTarget {
host,
port,
atyp,
addr,
},
&buf[pos..],
))
}
fn build_socks5_udp_packet(target: &SocksUdpTarget, payload: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(4 + target.addr.len() + 2 + payload.len() + 1);
out.extend_from_slice(&[0, 0, 0, target.atyp]);
match target.atyp {
0x03 => {
out.push(target.addr.len() as u8);
out.extend_from_slice(&target.addr);
}
_ => out.extend_from_slice(&target.addr),
}
out.extend_from_slice(&target.port.to_be_bytes());
out.extend_from_slice(payload);
out
}
// ---------- Smart dispatch (used by both HTTP CONNECT and SOCKS5) ---------- // ---------- Smart dispatch (used by both HTTP CONNECT and SOCKS5) ----------
fn should_use_sni_rewrite( fn should_use_sni_rewrite(
@@ -613,15 +989,13 @@ async fn dispatch_tunnel(
None => { None => {
tracing::error!( tracing::error!(
"dispatch {}:{} -> full mode but no tunnel mux (should not happen)", "dispatch {}:{} -> full mode but no tunnel mux (should not happen)",
host, port host,
port
); );
return Ok(()); return Ok(());
} }
}; };
tracing::info!( tracing::info!("dispatch {}:{} -> full tunnel (via batch mux)", host, port);
"dispatch {}:{} -> full tunnel (via batch mux)",
host, port
);
crate::tunnel_client::tunnel_connection(sock, &host, port, &mux).await?; crate::tunnel_client::tunnel_connection(sock, &host, port, &mux).await?;
return Ok(()); return Ok(());
} }
@@ -634,7 +1008,11 @@ async fn dispatch_tunnel(
port, port,
rewrite_ctx.youtube_via_relay, rewrite_ctx.youtube_via_relay,
) { ) {
tracing::info!("dispatch {}:{} -> sni-rewrite tunnel (Google edge direct)", host, port); tracing::info!(
"dispatch {}:{} -> sni-rewrite tunnel (Google edge direct)",
host,
port
);
return do_sni_rewrite_tunnel_from_tcp(sock, &host, port, mitm, rewrite_ctx).await; return do_sni_rewrite_tunnel_from_tcp(sock, &host, port, mitm, rewrite_ctx).await;
} }
@@ -775,10 +1153,7 @@ async fn plain_tcp_passthrough(
port, port,
e e
); );
match tokio::time::timeout( match tokio::time::timeout(connect_timeout, TcpStream::connect((target_host, port)))
connect_timeout,
TcpStream::connect((target_host, port)),
)
.await .await
{ {
Ok(Ok(s)) => s, Ok(Ok(s)) => s,
@@ -787,12 +1162,7 @@ async fn plain_tcp_passthrough(
} }
} }
} else { } else {
match tokio::time::timeout( match tokio::time::timeout(connect_timeout, TcpStream::connect((target_host, port))).await {
connect_timeout,
TcpStream::connect((target_host, port)),
)
.await
{
Ok(Ok(s)) => { Ok(Ok(s)) => {
tracing::info!("plain-tcp passthrough -> {}:{}", host, port); tracing::info!("plain-tcp passthrough -> {}:{}", host, port);
s s
@@ -804,7 +1174,8 @@ async fn plain_tcp_passthrough(
Err(_) => { Err(_) => {
tracing::debug!( tracing::debug!(
"plain-tcp connect {}:{} timeout (likely blocked; client should rotate)", "plain-tcp connect {}:{} timeout (likely blocked; client should rotate)",
host, port host,
port
); );
return; return;
} }
@@ -1283,13 +1654,14 @@ where
// subdomain of x.com here. // subdomain of x.com here.
let host_lower = host.to_ascii_lowercase(); let host_lower = host.to_ascii_lowercase();
let is_x_com = host_lower == "x.com" || host_lower.ends_with(".x.com"); let is_x_com = host_lower == "x.com" || host_lower.ends_with(".x.com");
let path = if is_x_com let path = if is_x_com && path.starts_with("/i/api/graphql/") && path.contains("?variables=") {
&& path.starts_with("/i/api/graphql/")
&& path.contains("?variables=")
{
match path.split_once('&') { match path.split_once('&') {
Some((short, _)) => { Some((short, _)) => {
tracing::debug!("x.com graphql URL truncated: {} chars -> {}", path.len(), short.len()); tracing::debug!(
"x.com graphql URL truncated: {} chars -> {}",
path.len(),
short.len()
);
short.to_string() short.to_string()
} }
None => path, None => path,
@@ -1358,7 +1730,9 @@ where
// relay path — range semantics on mutating requests are undefined // relay path — range semantics on mutating requests are undefined
// and would break form submissions. // and would break form submissions.
let response = if method.eq_ignore_ascii_case("GET") && body.is_empty() { let response = if method.eq_ignore_ascii_case("GET") && body.is_empty() {
fronter.relay_parallel_range(&method, &url, &headers, &body).await fronter
.relay_parallel_range(&method, &url, &headers, &body)
.await
} else { } else {
fronter.relay(&method, &url, &headers, &body).await fronter.relay(&method, &url, &headers, &body).await
}; };
@@ -1609,7 +1983,9 @@ async fn do_plain_http(
// mirrors, video poster streams, etc.) need the same acceleration // mirrors, video poster streams, etc.) need the same acceleration
// or the relay stalls per-chunk. // or the relay stalls per-chunk.
let response = if method.eq_ignore_ascii_case("GET") && body.is_empty() { let response = if method.eq_ignore_ascii_case("GET") && body.is_empty() {
fronter.relay_parallel_range(&method, &url, &headers, &body).await fronter
.relay_parallel_range(&method, &url, &headers, &body)
.await
} else { } else {
fronter.relay(&method, &url, &headers, &body).await fronter.relay(&method, &url, &headers, &body).await
}; };
@@ -1630,6 +2006,73 @@ mod tests {
.collect() .collect()
} }
#[test]
fn socks5_udp_domain_packet_round_trips() {
let mut raw = vec![0, 0, 0, 0x03, 11];
raw.extend_from_slice(b"example.com");
raw.extend_from_slice(&3478u16.to_be_bytes());
raw.extend_from_slice(b"hello");
let (target, payload) = parse_socks5_udp_packet(&raw).unwrap();
assert_eq!(target.host, "example.com");
assert_eq!(target.port, 3478);
assert_eq!(payload, b"hello");
assert_eq!(build_socks5_udp_packet(&target, payload), raw);
}
#[test]
fn socks5_udp_rejects_fragmented_packets() {
let raw = [0, 0, 1, 0x01, 127, 0, 0, 1, 0x13, 0x8a, b'x'];
assert!(parse_socks5_udp_packet(&raw).is_none());
}
#[test]
fn socks5_udp_rejects_truncated_inputs() {
// Header alone is not enough.
assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x01]).is_none());
// IPv4 with truncated address bytes (need 4 octets).
assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x01, 127, 0, 0]).is_none());
// IPv4 with no port.
assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x01, 127, 0, 0, 1]).is_none());
// DOMAIN with zero-length.
assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x03, 0, 0, 80]).is_none());
// DOMAIN with length exceeding remaining buffer.
assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x03, 5, b'a', b'b']).is_none());
// Unknown atyp.
assert!(parse_socks5_udp_packet(&[0, 0, 0, 0x09, 1, 2, 3, 4]).is_none());
// IPv6 with truncated address.
let raw = [0, 0, 0, 0x04, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; // 11 bytes < 16
assert!(parse_socks5_udp_packet(&raw).is_none());
}
#[test]
fn socks5_udp_ipv4_round_trips() {
let mut raw = vec![0, 0, 0, 0x01, 1, 2, 3, 4];
raw.extend_from_slice(&53u16.to_be_bytes());
raw.extend_from_slice(b"\x00\x01");
let (target, payload) = parse_socks5_udp_packet(&raw).unwrap();
assert_eq!(target.host, "1.2.3.4");
assert_eq!(target.port, 53);
assert_eq!(payload, b"\x00\x01");
assert_eq!(build_socks5_udp_packet(&target, payload), raw);
}
#[test]
fn socks5_udp_ipv6_round_trips() {
let mut raw = vec![0, 0, 0, 0x04];
raw.extend_from_slice(&[
0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01,
]);
raw.extend_from_slice(&443u16.to_be_bytes());
raw.extend_from_slice(b"q");
let (target, payload) = parse_socks5_udp_packet(&raw).unwrap();
assert_eq!(target.host, "2001:db8::1");
assert_eq!(target.port, 443);
assert_eq!(payload, b"q");
assert_eq!(build_socks5_udp_packet(&target, payload), raw);
}
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
async fn read_body_decodes_chunked_request() { async fn read_body_decodes_chunked_request() {
let (mut client, mut server) = duplex(1024); let (mut client, mut server) = duplex(1024);
@@ -1689,8 +2132,18 @@ mod tests {
assert!(should_use_sni_rewrite(&hosts, "google.com", 443, false)); assert!(should_use_sni_rewrite(&hosts, "google.com", 443, false));
assert!(!should_use_sni_rewrite(&hosts, "google.com", 80, false)); assert!(!should_use_sni_rewrite(&hosts, "google.com", 80, false));
assert!(should_use_sni_rewrite(&hosts, "www.example.com", 443, false)); assert!(should_use_sni_rewrite(
assert!(!should_use_sni_rewrite(&hosts, "www.example.com", 80, false)); &hosts,
"www.example.com",
443,
false
));
assert!(!should_use_sni_rewrite(
&hosts,
"www.example.com",
80,
false
));
} }
#[test] #[test]
@@ -1702,17 +2155,32 @@ mod tests {
let hosts = std::collections::HashMap::new(); let hosts = std::collections::HashMap::new();
// Default behaviour: everything in the pool rewrites. // Default behaviour: everything in the pool rewrites.
assert!(should_use_sni_rewrite(&hosts, "www.youtube.com", 443, false)); assert!(should_use_sni_rewrite(
&hosts,
"www.youtube.com",
443,
false
));
assert!(should_use_sni_rewrite(&hosts, "i.ytimg.com", 443, false)); assert!(should_use_sni_rewrite(&hosts, "i.ytimg.com", 443, false));
assert!(should_use_sni_rewrite(&hosts, "youtu.be", 443, false)); assert!(should_use_sni_rewrite(&hosts, "youtu.be", 443, false));
assert!(should_use_sni_rewrite(&hosts, "www.google.com", 443, false)); assert!(should_use_sni_rewrite(&hosts, "www.google.com", 443, false));
// With the toggle on: YouTube opts out, Google stays. // With the toggle on: YouTube opts out, Google stays.
assert!(!should_use_sni_rewrite(&hosts, "www.youtube.com", 443, true)); assert!(!should_use_sni_rewrite(
&hosts,
"www.youtube.com",
443,
true
));
assert!(!should_use_sni_rewrite(&hosts, "i.ytimg.com", 443, true)); assert!(!should_use_sni_rewrite(&hosts, "i.ytimg.com", 443, true));
assert!(!should_use_sni_rewrite(&hosts, "youtu.be", 443, true)); assert!(!should_use_sni_rewrite(&hosts, "youtu.be", 443, true));
assert!(should_use_sni_rewrite(&hosts, "www.google.com", 443, true)); assert!(should_use_sni_rewrite(&hosts, "www.google.com", 443, true));
assert!(should_use_sni_rewrite(&hosts, "fonts.gstatic.com", 443, true)); assert!(should_use_sni_rewrite(
&hosts,
"fonts.gstatic.com",
443,
true
));
} }
#[test] #[test]
@@ -1723,7 +2191,12 @@ mod tests {
let mut hosts = std::collections::HashMap::new(); let mut hosts = std::collections::HashMap::new();
hosts.insert("rr4.googlevideo.com".to_string(), "1.2.3.4".to_string()); hosts.insert("rr4.googlevideo.com".to_string(), "1.2.3.4".to_string());
assert!(should_use_sni_rewrite(&hosts, "rr4.googlevideo.com", 443, true)); assert!(should_use_sni_rewrite(
&hosts,
"rr4.googlevideo.com",
443,
true
));
} }
#[test] #[test]
+206 -23
View File
@@ -100,6 +100,17 @@ enum MuxMsg {
data: Vec<u8>, data: Vec<u8>,
reply: oneshot::Sender<Result<TunnelResponse, String>>, reply: oneshot::Sender<Result<TunnelResponse, String>>,
}, },
UdpOpen {
host: String,
port: u16,
data: Vec<u8>,
reply: oneshot::Sender<Result<TunnelResponse, String>>,
},
UdpData {
sid: String,
data: Vec<u8>,
reply: oneshot::Sender<Result<TunnelResponse, String>>,
},
Close { Close {
sid: String, sid: String,
}, },
@@ -167,6 +178,47 @@ impl TunnelMux {
let _ = self.tx.send(msg).await; let _ = self.tx.send(msg).await;
} }
pub async fn udp_open(
&self,
host: &str,
port: u16,
data: Vec<u8>,
) -> Result<TunnelResponse, String> {
let (reply_tx, reply_rx) = oneshot::channel();
self.send(MuxMsg::UdpOpen {
host: host.to_string(),
port,
data,
reply: reply_tx,
})
.await;
match reply_rx.await {
Ok(r) => r,
Err(_) => Err("mux channel closed".into()),
}
}
pub async fn udp_data(&self, sid: &str, data: Vec<u8>) -> Result<TunnelResponse, String> {
let (reply_tx, reply_rx) = oneshot::channel();
self.send(MuxMsg::UdpData {
sid: sid.to_string(),
data,
reply: reply_tx,
})
.await;
match reply_rx.await {
Ok(r) => r,
Err(_) => Err("mux channel closed".into()),
}
}
pub async fn close_session(&self, sid: &str) {
self.send(MuxMsg::Close {
sid: sid.to_string(),
})
.await;
}
fn connect_data_unsupported(&self) -> bool { fn connect_data_unsupported(&self) -> bool {
self.connect_data_unsupported.load(Ordering::Relaxed) self.connect_data_unsupported.load(Ordering::Relaxed)
} }
@@ -202,7 +254,11 @@ impl TunnelMux {
fn record_preread_loss(&self, port: u16) { fn record_preread_loss(&self, port: u16) {
self.preread_loss.fetch_add(1, Ordering::Relaxed); self.preread_loss.fetch_add(1, Ordering::Relaxed);
tracing::debug!("preread loss: port={} (empty within {:?})", port, CLIENT_FIRST_DATA_WAIT); tracing::debug!(
"preread loss: port={} (empty within {:?})",
port,
CLIENT_FIRST_DATA_WAIT
);
self.maybe_log_preread_summary(); self.maybe_log_preread_summary();
} }
@@ -213,7 +269,8 @@ impl TunnelMux {
} }
fn record_preread_skip_unsupported(&self, port: u16) { fn record_preread_skip_unsupported(&self, port: u16) {
self.preread_skip_unsupported.fetch_add(1, Ordering::Relaxed); self.preread_skip_unsupported
.fetch_add(1, Ordering::Relaxed);
tracing::debug!("preread skip: port={} (connect_data unsupported)", port); tracing::debug!("preread skip: port={} (connect_data unsupported)", port);
self.maybe_log_preread_summary(); self.maybe_log_preread_summary();
} }
@@ -251,7 +308,12 @@ async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
fronter fronter
.script_id_list() .script_id_list()
.iter() .iter()
.map(|id| (id.clone(), Arc::new(Semaphore::new(CONCURRENCY_PER_DEPLOYMENT)))) .map(|id| {
(
id.clone(),
Arc::new(Semaphore::new(CONCURRENCY_PER_DEPLOYMENT)),
)
})
.collect(), .collect(),
); );
@@ -278,16 +340,25 @@ async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
MuxMsg::Connect { host, port, reply } => { MuxMsg::Connect { host, port, reply } => {
let f = fronter.clone(); let f = fronter.clone();
tokio::spawn(async move { tokio::spawn(async move {
let result = let result = f
f.tunnel_request("connect", Some(&host), Some(port), None, None) .tunnel_request("connect", Some(&host), Some(port), None, None)
.await; .await;
match result { match result {
Ok(resp) => { let _ = reply.send(Ok(resp)); } Ok(resp) => {
Err(e) => { let _ = reply.send(Err(format!("{}", e))); } let _ = reply.send(Ok(resp));
}
Err(e) => {
let _ = reply.send(Err(format!("{}", e)));
}
} }
}); });
} }
MuxMsg::ConnectData { host, port, data, reply } => { MuxMsg::ConnectData {
host,
port,
data,
reply,
} => {
let encoded = Some(B64.encode(data.as_slice())); let encoded = Some(B64.encode(data.as_slice()));
let op_bytes = encoded.as_ref().map(|s| s.len()).unwrap_or(0); let op_bytes = encoded.as_ref().map(|s| s.len()).unwrap_or(0);
@@ -351,6 +422,77 @@ async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
data_replies.push((idx, reply)); data_replies.push((idx, reply));
batch_payload_bytes += op_bytes; batch_payload_bytes += op_bytes;
} }
MuxMsg::UdpOpen {
host,
port,
data,
reply,
} => {
let encoded = if data.is_empty() {
None
} else {
Some(B64.encode(&data))
};
let op_bytes = encoded.as_ref().map(|s| s.len()).unwrap_or(0);
if !data_ops.is_empty()
&& (data_ops.len() >= MAX_BATCH_OPS
|| batch_payload_bytes + op_bytes > MAX_BATCH_PAYLOAD_BYTES)
{
fire_batch(
&sems,
&fronter,
std::mem::take(&mut data_ops),
std::mem::take(&mut data_replies),
)
.await;
batch_payload_bytes = 0;
}
let idx = data_ops.len();
data_ops.push(BatchOp {
op: "udp_open".into(),
sid: None,
host: Some(host),
port: Some(port),
d: encoded,
});
data_replies.push((idx, reply));
batch_payload_bytes += op_bytes;
}
MuxMsg::UdpData { sid, data, reply } => {
let encoded = if data.is_empty() {
None
} else {
Some(B64.encode(&data))
};
let op_bytes = encoded.as_ref().map(|s| s.len()).unwrap_or(0);
if !data_ops.is_empty()
&& (data_ops.len() >= MAX_BATCH_OPS
|| batch_payload_bytes + op_bytes > MAX_BATCH_PAYLOAD_BYTES)
{
fire_batch(
&sems,
&fronter,
std::mem::take(&mut data_ops),
std::mem::take(&mut data_replies),
)
.await;
batch_payload_bytes = 0;
}
let idx = data_ops.len();
data_ops.push(BatchOp {
op: "udp_data".into(),
sid: Some(sid),
host: None,
port: None,
d: encoded,
});
data_replies.push((idx, reply));
batch_payload_bytes += op_bytes;
}
MuxMsg::Close { sid } => { MuxMsg::Close { sid } => {
close_sids.push(sid); close_sids.push(sid);
} }
@@ -514,7 +656,10 @@ pub async fn tunnel_connection(
match write_tunnel_response(&mut sock, &resp).await? { match write_tunnel_response(&mut sock, &resp).await? {
WriteOutcome::Wrote | WriteOutcome::NoData => {} WriteOutcome::Wrote | WriteOutcome::NoData => {}
WriteOutcome::BadBase64 => { WriteOutcome::BadBase64 => {
tracing::error!("tunnel session {}: bad base64 in connect_data response", sid); tracing::error!(
"tunnel session {}: bad base64 in connect_data response",
sid
);
return Ok(()); return Ok(());
} }
} }
@@ -636,7 +781,10 @@ async fn connect_with_initial_data(
)); ));
}; };
Ok(ConnectDataOutcome::Opened { sid, response: resp }) Ok(ConnectDataOutcome::Opened {
sid,
response: resp,
})
} }
/// Decide whether a response indicates the tunnel-node (or apps_script /// Decide whether a response indicates the tunnel-node (or apps_script
@@ -834,6 +982,18 @@ where
} }
} }
pub fn decode_udp_packets(resp: &TunnelResponse) -> Result<Vec<Vec<u8>>, String> {
let Some(pkts) = resp.pkts.as_ref() else {
return Ok(Vec::new());
};
pkts.iter()
.map(|pkt| {
B64.decode(pkt)
.map_err(|e| format!("bad UDP packet base64: {}", e))
})
.collect()
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Tests // Tests
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -846,6 +1006,7 @@ mod tests {
TunnelResponse { TunnelResponse {
sid: None, sid: None,
d: None, d: None,
pkts: None,
eof: None, eof: None,
e: e.map(str::to_string), e: e.map(str::to_string),
code: code.map(str::to_string), code: code.map(str::to_string),
@@ -854,7 +1015,10 @@ mod tests {
#[test] #[test]
fn unsupported_detection_via_structured_code() { fn unsupported_detection_via_structured_code() {
assert!(is_connect_data_unsupported_response(&resp_with(Some("UNSUPPORTED_OP"), None))); assert!(is_connect_data_unsupported_response(&resp_with(
Some("UNSUPPORTED_OP"),
None
)));
assert!(is_connect_data_unsupported_response(&resp_with( assert!(is_connect_data_unsupported_response(&resp_with(
Some("UNSUPPORTED_OP"), Some("UNSUPPORTED_OP"),
Some("unknown op: connect_data"), Some("unknown op: connect_data"),
@@ -865,10 +1029,12 @@ mod tests {
fn unsupported_detection_via_legacy_tunnel_node_string() { fn unsupported_detection_via_legacy_tunnel_node_string() {
// Pre-change tunnel-node: no code field, bare "unknown op: ...". // Pre-change tunnel-node: no code field, bare "unknown op: ...".
assert!(is_connect_data_unsupported_response(&resp_with( assert!(is_connect_data_unsupported_response(&resp_with(
None, Some("unknown op: connect_data"), None,
Some("unknown op: connect_data"),
))); )));
assert!(is_connect_data_unsupported_response(&resp_with( assert!(is_connect_data_unsupported_response(&resp_with(
None, Some("Unknown Op: CONNECT_DATA"), None,
Some("Unknown Op: CONNECT_DATA"),
))); )));
} }
@@ -878,30 +1044,46 @@ mod tests {
// This is the realistic skew case — user upgrades tunnel-node + client // This is the realistic skew case — user upgrades tunnel-node + client
// binary but hasn't redeployed the Apps Script yet. // binary but hasn't redeployed the Apps Script yet.
assert!(is_connect_data_unsupported_response(&resp_with( assert!(is_connect_data_unsupported_response(&resp_with(
None, Some("unknown tunnel op: connect_data"), None,
Some("unknown tunnel op: connect_data"),
))); )));
} }
#[test] #[test]
fn unsupported_detection_rejects_unrelated_errors() { fn unsupported_detection_rejects_unrelated_errors() {
assert!(!is_connect_data_unsupported_response(&resp_with( assert!(!is_connect_data_unsupported_response(&resp_with(
None, Some("connect failed: refused"), None,
Some("connect failed: refused"),
)));
assert!(!is_connect_data_unsupported_response(&resp_with(
None,
Some("bad base64")
)));
assert!(!is_connect_data_unsupported_response(&resp_with(
None, None
))); )));
assert!(!is_connect_data_unsupported_response(&resp_with(None, Some("bad base64"))));
assert!(!is_connect_data_unsupported_response(&resp_with(None, None)));
// "connect_data" alone (without "unknown op") shouldn't trigger. // "connect_data" alone (without "unknown op") shouldn't trigger.
assert!(!is_connect_data_unsupported_response(&resp_with( assert!(!is_connect_data_unsupported_response(&resp_with(
None, Some("connect_data: bad port"), None,
Some("connect_data: bad port"),
))); )));
} }
#[test] #[test]
fn server_speaks_first_covers_common_protocols() { fn server_speaks_first_covers_common_protocols() {
for p in [21u16, 22, 25, 80, 110, 143, 587] { for p in [21u16, 22, 25, 80, 110, 143, 587] {
assert!(is_server_speaks_first(p), "port {} should be server-first", p); assert!(
is_server_speaks_first(p),
"port {} should be server-first",
p
);
} }
for p in [443u16, 8443, 853, 993, 1234] { for p in [443u16, 8443, 853, 993, 1234] {
assert!(!is_server_speaks_first(p), "port {} should NOT be server-first", p); assert!(
!is_server_speaks_first(p),
"port {} should NOT be server-first",
p
);
} }
} }
@@ -947,9 +1129,7 @@ mod tests {
let loop_handle = tokio::spawn({ let loop_handle = tokio::spawn({
let mux = mux.clone(); let mux = mux.clone();
async move { async move { tunnel_loop(&mut server_side, "sid-under-test", &mux, pending).await }
tunnel_loop(&mut server_side, "sid-under-test", &mux, pending).await
}
}); });
// The first message tunnel_loop emits must be Data carrying the // The first message tunnel_loop emits must be Data carrying the
@@ -967,6 +1147,7 @@ mod tests {
let _ = reply.send(Ok(TunnelResponse { let _ = reply.send(Ok(TunnelResponse {
sid: Some("sid-under-test".into()), sid: Some("sid-under-test".into()),
d: None, d: None,
pkts: None,
eof: Some(true), eof: Some(true),
e: None, e: None,
code: None, code: None,
@@ -978,6 +1159,8 @@ mod tests {
MuxMsg::Connect { .. } => "Connect", MuxMsg::Connect { .. } => "Connect",
MuxMsg::ConnectData { .. } => "ConnectData", MuxMsg::ConnectData { .. } => "ConnectData",
MuxMsg::Data { .. } => unreachable!(), MuxMsg::Data { .. } => unreachable!(),
MuxMsg::UdpOpen { .. } => "UdpOpen",
MuxMsg::UdpData { .. } => "UdpData",
MuxMsg::Close { .. } => "Close", MuxMsg::Close { .. } => "Close",
} }
), ),
+4 -2
View File
@@ -8,10 +8,12 @@ HTTP tunnel bridge server for MasterHttpRelayVPN "full" mode. Bridges HTTP tunne
Phone → mhrv-rs → [domain-fronted TLS] → Apps Script → [HTTP] → Tunnel Node → [real TCP] → Internet Phone → mhrv-rs → [domain-fronted TLS] → Apps Script → [HTTP] → Tunnel Node → [real TCP] → Internet
``` ```
The tunnel node manages persistent TCP sessions. Each session is a real TCP connection to a destination server. Data flows through a JSON protocol: The tunnel node manages persistent TCP and UDP sessions. TCP sessions are real TCP connections to a destination server; UDP sessions are connected UDP sockets to one destination host:port. Data flows through a JSON protocol:
- **connect** — open TCP to host:port, return session ID - **connect** — open TCP to host:port, return session ID
- **data** — write client data, return server response - **data** — write client data, return server response
- **udp_open** — open UDP to host:port, optionally send the first datagram
- **udp_data** — send one UDP datagram, or poll for returned datagrams when `d` is omitted
- **close** — tear down session - **close** — tear down session
- **batch** — process multiple ops in one HTTP request (reduces round trips) - **batch** — process multiple ops in one HTTP request (reduces round trips)
@@ -108,7 +110,7 @@ TUNNEL_AUTH_KEY=your-secret PORT=8080 ./target/release/tunnel-node
"k": "auth", "k": "auth",
"ops": [ "ops": [
{"op":"data","sid":"uuid1","d":"base64"}, {"op":"data","sid":"uuid1","d":"base64"},
{"op":"data","sid":"uuid2","d":"base64"}, {"op":"udp_data","sid":"uuid2","d":"base64"},
{"op":"close","sid":"uuid3"} {"op":"close","sid":"uuid3"}
] ]
} }
+560 -60
View File
@@ -9,8 +9,8 @@
//! TUNNEL_AUTH_KEY — shared secret (required) //! TUNNEL_AUTH_KEY — shared secret (required)
//! PORT — listen port (default 8080, Cloud Run sets this) //! PORT — listen port (default 8080, Cloud Run sets this)
use std::collections::HashMap; use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@@ -24,7 +24,7 @@ use base64::Engine;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream; use tokio::net::{lookup_host, TcpStream, UdpSocket};
use tokio::sync::{mpsc, Mutex, Notify}; use tokio::sync::{mpsc, Mutex, Notify};
use tokio::task::JoinSet; use tokio::task::JoinSet;
@@ -76,6 +76,21 @@ const STRAGGLER_SETTLE: Duration = Duration::from_millis(30);
/// Script's UrlFetch ceiling (~60 s). /// Script's UrlFetch ceiling (~60 s).
const LONGPOLL_DEADLINE: Duration = Duration::from_secs(5); const LONGPOLL_DEADLINE: Duration = Duration::from_secs(5);
/// Bound on each UDP session's inbound queue. Beyond this we drop oldest
/// to keep recent voice/media packets moving — a stale RTP frame is
/// worse than a missing one. Sized so a 256-deep queue at typical 1500B
/// payloads is ~384 KB before backpressure kicks in.
const UDP_QUEUE_LIMIT: usize = 256;
/// Receive buffer for the UDP reader task. Must be ≥ 65535 to handle
/// a maximum-size IPv4 datagram without truncation.
const UDP_RECV_BUF_BYTES: usize = 65536;
/// First queue-drop on a session always logs at warn level; subsequent
/// drops log at debug only every Nth occurrence so a single congested
/// session can't flood the operator's log.
const UDP_QUEUE_DROP_LOG_STRIDE: u64 = 100;
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Session // Session
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -97,6 +112,28 @@ struct ManagedSession {
reader_handle: tokio::task::JoinHandle<()>, reader_handle: tokio::task::JoinHandle<()>,
} }
/// UDP equivalent of `SessionInner`. Holds a *connected* `UdpSocket`
/// pinned to one `(host, port)` upstream so we don't have to re-resolve
/// or re-parse the destination on every datagram. `notify` is fired by
/// the reader task on each inbound datagram (or on socket error) so the
/// batch drain phase can wake without polling — same primitive as the
/// TCP path.
struct UdpSessionInner {
socket: Arc<UdpSocket>,
packets: Mutex<VecDeque<Vec<u8>>>,
last_active: Mutex<Instant>,
notify: Notify,
/// Total datagrams dropped because the queue hit `UDP_QUEUE_LIMIT`.
/// Surfaced via tracing so operators can correlate "choppy call"
/// reports with relay backpressure.
queue_drops: AtomicU64,
}
struct ManagedUdpSession {
inner: Arc<UdpSessionInner>,
reader_handle: tokio::task::JoinHandle<()>,
}
async fn create_session(host: &str, port: u16) -> std::io::Result<ManagedSession> { async fn create_session(host: &str, port: u16) -> std::io::Result<ManagedSession> {
let addr = format!("{}:{}", host, port); let addr = format!("{}:{}", host, port);
let stream = tokio::time::timeout(Duration::from_secs(10), TcpStream::connect(&addr)) let stream = tokio::time::timeout(Duration::from_secs(10), TcpStream::connect(&addr))
@@ -150,6 +187,78 @@ async fn reader_task(mut reader: OwnedReadHalf, session: Arc<SessionInner>) {
} }
} }
async fn create_udp_session(host: &str, port: u16) -> std::io::Result<ManagedUdpSession> {
let mut addrs = lookup_host((host, port)).await?;
let remote = addrs.next().ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::AddrNotAvailable,
"no UDP address resolved",
)
})?;
let bind_addr = if remote.is_ipv4() {
"0.0.0.0:0"
} else {
"[::]:0"
};
let socket = UdpSocket::bind(bind_addr).await?;
socket.connect(remote).await?;
let socket = Arc::new(socket);
let inner = Arc::new(UdpSessionInner {
socket: socket.clone(),
packets: Mutex::new(VecDeque::with_capacity(UDP_QUEUE_LIMIT)),
last_active: Mutex::new(Instant::now()),
notify: Notify::new(),
queue_drops: AtomicU64::new(0),
});
let inner_ref = inner.clone();
let reader_handle = tokio::spawn(udp_reader_task(socket, inner_ref));
Ok(ManagedUdpSession {
inner,
reader_handle,
})
}
/// UDP analogue of `reader_task`. Reads from the connected UDP socket
/// and queues each datagram on the session. Drops oldest on overflow,
/// updates `last_active` so server-push (download-only) UDP keeps the
/// session out of the idle reaper, and fires `notify` so the batch
/// drain phase can wake without polling.
async fn udp_reader_task(socket: Arc<UdpSocket>, session: Arc<UdpSessionInner>) {
let mut buf = vec![0u8; UDP_RECV_BUF_BYTES];
loop {
match socket.recv(&mut buf).await {
// Empty datagram is valid UDP; nothing to forward, ignore.
Ok(0) => {}
Ok(n) => {
let mut packets = session.packets.lock().await;
if packets.len() >= UDP_QUEUE_LIMIT {
packets.pop_front();
let dropped = session.queue_drops.fetch_add(1, Ordering::Relaxed) + 1;
if dropped == 1 {
tracing::warn!(
"udp queue full ({}); dropping oldest. Apps Script polling cannot keep up with upstream rate.",
UDP_QUEUE_LIMIT
);
} else if dropped % UDP_QUEUE_DROP_LOG_STRIDE == 0 {
tracing::debug!("udp queue drops: {} on session", dropped);
}
}
packets.push_back(buf[..n].to_vec());
drop(packets);
// Inbound packet counts as activity — keeps server-push
// UDP (e.g. SIP/RTP, server-sent telemetry) out of the
// idle reaper. Empty `udp_data` polls deliberately do
// NOT bump this (see batch handler).
*session.last_active.lock().await = Instant::now();
session.notify.notify_one();
}
Err(_) => break,
}
}
}
/// Drain whatever is currently buffered — no waiting. /// Drain whatever is currently buffered — no waiting.
/// Used by batch mode where we poll frequently. /// Used by batch mode where we poll frequently.
async fn drain_now(session: &SessionInner) -> (Vec<u8>, bool) { async fn drain_now(session: &SessionInner) -> (Vec<u8>, bool) {
@@ -252,6 +361,61 @@ async fn is_any_drainable(inners: &[Arc<SessionInner>]) -> bool {
false false
} }
/// Drain whatever UDP datagrams are currently queued — no waiting.
async fn drain_udp_now(session: &UdpSessionInner) -> Vec<Vec<u8>> {
let mut packets = session.packets.lock().await;
packets.drain(..).collect()
}
/// UDP analogue of `wait_for_any_drainable`. Wakes when any session has
/// at least one queued packet. Same race-safety contract: watchers
/// self-filter against observable state to ignore stale permits.
async fn wait_for_any_udp_drainable(inners: &[Arc<UdpSessionInner>], deadline: Duration) {
if inners.is_empty() {
return;
}
let (tx, mut rx) = mpsc::channel::<()>(1);
let mut watchers = Vec::with_capacity(inners.len());
for inner in inners {
let inner = inner.clone();
let tx = tx.clone();
watchers.push(tokio::spawn(async move {
loop {
inner.notify.notified().await;
if !inner.packets.lock().await.is_empty() {
break;
}
// Stale permit — packets were already drained by a
// prior batch. Loop back, don't wake the caller.
}
let _ = tx.try_send(());
}));
}
drop(tx);
let already_ready = is_any_udp_drainable(inners).await;
if !already_ready {
tokio::select! {
_ = rx.recv() => {}
_ = tokio::time::sleep(deadline) => {}
}
}
for w in &watchers {
w.abort();
}
}
async fn is_any_udp_drainable(inners: &[Arc<UdpSessionInner>]) -> bool {
for inner in inners {
if !inner.packets.lock().await.is_empty() {
return true;
}
}
false
}
/// Wait for response data with drain window. Used by single-op mode. /// Wait for response data with drain window. Used by single-op mode.
async fn wait_and_drain(session: &SessionInner, max_wait: Duration) -> (Vec<u8>, bool) { async fn wait_and_drain(session: &SessionInner, max_wait: Duration) -> (Vec<u8>, bool) {
let deadline = Instant::now() + max_wait; let deadline = Instant::now() + max_wait;
@@ -288,6 +452,7 @@ async fn wait_and_drain(session: &SessionInner, max_wait: Duration) -> (Vec<u8>,
#[derive(Clone)] #[derive(Clone)]
struct AppState { struct AppState {
sessions: Arc<Mutex<HashMap<String, ManagedSession>>>, sessions: Arc<Mutex<HashMap<String, ManagedSession>>>,
udp_sessions: Arc<Mutex<HashMap<String, ManagedUdpSession>>>,
auth_key: String, auth_key: String,
} }
@@ -309,6 +474,10 @@ struct TunnelRequest {
struct TunnelResponse { struct TunnelResponse {
#[serde(skip_serializing_if = "Option::is_none")] sid: Option<String>, #[serde(skip_serializing_if = "Option::is_none")] sid: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] d: Option<String>, #[serde(skip_serializing_if = "Option::is_none")] d: Option<String>,
/// UDP datagrams returned to the client, base64-encoded individually.
/// `None` for TCP responses; `Some(vec![])` is never serialized
/// (the field is dropped when empty by the empty-on-None check above).
#[serde(skip_serializing_if = "Option::is_none")] pkts: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")] eof: Option<bool>, #[serde(skip_serializing_if = "Option::is_none")] eof: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")] e: Option<String>, #[serde(skip_serializing_if = "Option::is_none")] e: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] code: Option<String>, #[serde(skip_serializing_if = "Option::is_none")] code: Option<String>,
@@ -316,11 +485,11 @@ struct TunnelResponse {
impl TunnelResponse { impl TunnelResponse {
fn error(msg: impl Into<String>) -> Self { fn error(msg: impl Into<String>) -> Self {
Self { sid: None, d: None, eof: None, e: Some(msg.into()), code: None } Self { sid: None, d: None, pkts: None, eof: None, e: Some(msg.into()), code: None }
} }
fn unsupported_op(op: &str) -> Self { fn unsupported_op(op: &str) -> Self {
Self { Self {
sid: None, d: None, eof: None, sid: None, d: None, pkts: None, eof: None,
e: Some(format!("unknown op: {}", op)), e: Some(format!("unknown op: {}", op)),
code: Some(CODE_UNSUPPORTED_OP.into()), code: Some(CODE_UNSUPPORTED_OP.into()),
} }
@@ -429,16 +598,18 @@ async fn handle_batch(
// still fires from server-speaks-first ports and from the preread // still fires from server-speaks-first ports and from the preread
// timeout fallback path. // timeout fallback path.
let mut results: Vec<(usize, TunnelResponse)> = Vec::with_capacity(req.ops.len()); let mut results: Vec<(usize, TunnelResponse)> = Vec::with_capacity(req.ops.len());
let mut data_ops: Vec<(usize, String)> = Vec::new(); // (index, sid) for data ops needing drain let mut tcp_drains: Vec<(usize, String)> = Vec::new();
let mut udp_drains: Vec<(usize, String)> = Vec::new();
// True iff the batch contained any op that performed a real action // True iff the batch contained any op that performed a real action
// upstream — a new connection or a non-empty data write. A batch of // upstream — a new connection or a non-empty data write. A batch of
// only empty "data" polls (and possibly closes) leaves this false and // only empty "data" / "udp_data" polls (and possibly closes) leaves
// qualifies for long-poll behavior in phase 2. // this false and qualifies for long-poll behavior in phase 2.
let mut had_writes_or_connects = false; let mut had_writes_or_connects = false;
enum NewConn { enum NewConn {
Connect(TunnelResponse), Connect(TunnelResponse),
ConnectData(Result<String, TunnelResponse>), ConnectData(Result<String, TunnelResponse>),
UdpOpen(Result<String, TunnelResponse>),
} }
let mut new_conn_jobs: JoinSet<(usize, NewConn)> = JoinSet::new(); let mut new_conn_jobs: JoinSet<(usize, NewConn)> = JoinSet::new();
@@ -470,6 +641,19 @@ async fn handle_batch(
(i, NewConn::ConnectData(r)) (i, NewConn::ConnectData(r))
}); });
} }
"udp_open" => {
had_writes_or_connects = true;
let state = state.clone();
let host = op.host.clone();
let port = op.port;
let d = op.d.clone();
new_conn_jobs.spawn(async move {
let r = handle_udp_open_phase1(&state, host, port, d)
.await
.map(|(sid, _inner)| sid);
(i, NewConn::UdpOpen(r))
});
}
"data" => { "data" => {
let sid = match &op.sid { let sid = match &op.sid {
Some(s) if !s.is_empty() => s.clone(), Some(s) if !s.is_empty() => s.clone(),
@@ -493,12 +677,53 @@ async fn handle_batch(
} }
} }
drop(sessions); drop(sessions);
data_ops.push((i, sid)); tcp_drains.push((i, sid));
} else { } else {
drop(sessions); drop(sessions);
results.push((i, TunnelResponse { results.push((i, eof_response(sid)));
sid: Some(sid), d: None, eof: Some(true), e: None, code: None, }
})); }
"udp_data" => {
let sid = match &op.sid {
Some(s) if !s.is_empty() => s.clone(),
_ => { results.push((i, TunnelResponse::error("missing sid"))); continue; }
};
let inner = {
let sessions = state.udp_sessions.lock().await;
sessions.get(&sid).map(|s| s.inner.clone())
};
if let Some(inner) = inner {
let mut had_uplink = false;
if let Some(ref data_b64) = op.d {
if !data_b64.is_empty() {
let bytes = match B64.decode(data_b64) {
Ok(b) => b,
Err(e) => {
results.push((
i,
TunnelResponse::error(format!("bad base64: {}", e)),
));
continue;
}
};
if !bytes.is_empty() {
had_writes_or_connects = true;
had_uplink = true;
let _ = inner.socket.send(&bytes).await;
}
}
}
// last_active is bumped only on real activity:
// outbound here, or inbound in udp_reader_task.
// Empty long-poll batches must not refresh it, else
// the idle reaper never fires.
if had_uplink {
*inner.last_active.lock().await = Instant::now();
}
udp_drains.push((i, sid));
} else {
results.push((i, eof_response(sid)));
} }
} }
"close" => { "close" => {
@@ -511,58 +736,64 @@ async fn handle_batch(
} }
} }
// Await all concurrent connect / connect_data jobs. For connect_data, // Await all concurrent connect / connect_data / udp_open jobs.
// successful ones join the data-drain set in phase 2; plain connects // Successful drain-bearing ones join the appropriate drain list;
// go straight to results because they have no initial data to drain. // plain connects go straight to results.
while let Some(join) = new_conn_jobs.join_next().await { while let Some(join) = new_conn_jobs.join_next().await {
match join { match join {
Ok((i, NewConn::Connect(r))) => results.push((i, r)), Ok((i, NewConn::Connect(r))) => results.push((i, r)),
Ok((i, NewConn::ConnectData(Ok(sid)))) => data_ops.push((i, sid)), Ok((i, NewConn::ConnectData(Ok(sid)))) => tcp_drains.push((i, sid)),
Ok((i, NewConn::ConnectData(Err(r)))) => results.push((i, r)), Ok((i, NewConn::ConnectData(Err(r)))) => results.push((i, r)),
Ok((i, NewConn::UdpOpen(Ok(sid)))) => udp_drains.push((i, sid)),
Ok((i, NewConn::UdpOpen(Err(r)))) => results.push((i, r)),
Err(e) => { Err(e) => {
tracing::error!("new-connection task panicked: {}", e); tracing::error!("new-connection task panicked: {}", e);
} }
} }
} }
// Phase 2: signal-driven wait for any session to have data (or hit // Phase 2: signal-driven wait for any session (TCP or UDP) to have
// EOF), then drain everyone in a single pass. The deadline is // data, then drain TCP and UDP independently in a single pass each.
// adaptive: // Deadlines:
// * `ACTIVE_DRAIN_DEADLINE` (~350 ms) when the batch had real work // * `ACTIVE_DRAIN_DEADLINE` (~350 ms) when the batch had real work.
// — typical responses arrive in ms and `wait_for_any_drainable` // Typical responses arrive in ms; the wait helpers return on
// returns on the first notify. After the first wake we settle // the first notify. For active batches we settle for
// for `STRAGGLER_SETTLE` so neighboring sessions whose replies // `STRAGGLER_SETTLE` so neighbors whose replies trail by a few
// land just behind the first one don't get reported empty. // ms aren't reported empty.
// * `LONGPOLL_DEADLINE` when the batch is a pure poll — no writes, // * `LONGPOLL_DEADLINE` for pure-poll batches — held open until
// no new connections. The response is held open until upstream // upstream pushes data. UDP idle polls benefit from this just
// pushes data, turning idle sessions into a true long-poll // as much as TCP, so the same window applies.
// without paying per-poll latency. No straggler settle here: if !tcp_drains.is_empty() || !udp_drains.is_empty() {
// the wake event IS the data the client wants, so deliver it
// immediately.
if !data_ops.is_empty() {
let deadline = if had_writes_or_connects { let deadline = if had_writes_or_connects {
ACTIVE_DRAIN_DEADLINE ACTIVE_DRAIN_DEADLINE
} else { } else {
LONGPOLL_DEADLINE LONGPOLL_DEADLINE
}; };
// Snapshot SessionInner Arcs under a single lock so we don't let tcp_inners: Vec<Arc<SessionInner>> = {
// hold the sessions-map lock across the await.
let inners: Vec<Arc<SessionInner>> = {
let sessions = state.sessions.lock().await; let sessions = state.sessions.lock().await;
data_ops tcp_drains
.iter()
.filter_map(|(_, sid)| sessions.get(sid).map(|s| s.inner.clone()))
.collect()
};
let udp_inners: Vec<Arc<UdpSessionInner>> = {
let sessions = state.udp_sessions.lock().await;
udp_drains
.iter() .iter()
.filter_map(|(_, sid)| sessions.get(sid).map(|s| s.inner.clone())) .filter_map(|(_, sid)| sessions.get(sid).map(|s| s.inner.clone()))
.collect() .collect()
}; };
let wait_start = Instant::now(); let wait_start = Instant::now();
wait_for_any_drainable(&inners, deadline).await; // Wait for either side to wake. Running both concurrently means
// a TCP-only batch isn't slowed by a stale UDP watch list, and
// vice versa.
tokio::join!(
wait_for_any_drainable(&tcp_inners, deadline),
wait_for_any_udp_drainable(&udp_inners, deadline),
);
// Straggler settle: only for active batches, only if we woke
// early (didn't hit the deadline). Capped by the remaining
// deadline budget — `saturating_sub` so a future refactor that
// ever lets `elapsed > deadline` slip through can't underflow.
if had_writes_or_connects { if had_writes_or_connects {
let remaining = deadline.saturating_sub(wait_start.elapsed()); let remaining = deadline.saturating_sub(wait_start.elapsed());
if !remaining.is_zero() { if !remaining.is_zero() {
@@ -570,28 +801,22 @@ async fn handle_batch(
} }
} }
// Single drain pass for all sessions in this batch. // ---- TCP drain ----
{ if !tcp_drains.is_empty() {
let sessions = state.sessions.lock().await; let sessions = state.sessions.lock().await;
for (i, sid) in &data_ops { for (i, sid) in &tcp_drains {
if let Some(session) = sessions.get(sid) { if let Some(session) = sessions.get(sid) {
let (data, eof) = drain_now(&session.inner).await; let (data, eof) = drain_now(&session.inner).await;
results.push((*i, TunnelResponse { results.push((*i, tcp_drain_response(sid.clone(), data, eof)));
sid: Some(sid.clone()),
d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
eof: Some(eof), e: None, code: None,
}));
} else { } else {
results.push((*i, TunnelResponse { results.push((*i, eof_response(sid.clone())));
sid: Some(sid.clone()), d: None, eof: Some(true), e: None, code: None,
}));
}
} }
} }
drop(sessions);
// Clean up eof sessions // Clean up eof TCP sessions.
let mut sessions = state.sessions.lock().await; let mut sessions = state.sessions.lock().await;
for (_, sid) in &data_ops { for (_, sid) in &tcp_drains {
if let Some(s) = sessions.get(sid) { if let Some(s) = sessions.get(sid) {
if s.inner.eof.load(Ordering::Acquire) { if s.inner.eof.load(Ordering::Acquire) {
if let Some(s) = sessions.remove(sid) { if let Some(s) = sessions.remove(sid) {
@@ -603,6 +828,20 @@ async fn handle_batch(
} }
} }
// ---- UDP drain ----
if !udp_drains.is_empty() {
let sessions = state.udp_sessions.lock().await;
for (i, sid) in &udp_drains {
if let Some(session) = sessions.get(sid) {
let packets = drain_udp_now(&session.inner).await;
results.push((*i, udp_drain_response(sid.clone(), packets)));
} else {
results.push((*i, eof_response(sid.clone())));
}
}
}
}
// Sort results by original index and build response // Sort results by original index and build response
results.sort_by_key(|(i, _)| *i); results.sort_by_key(|(i, _)| *i);
let batch_resp = BatchResponse { let batch_resp = BatchResponse {
@@ -613,6 +852,44 @@ async fn handle_batch(
(StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], json) (StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], json)
} }
fn tcp_drain_response(sid: String, data: Vec<u8>, eof: bool) -> TunnelResponse {
TunnelResponse {
sid: Some(sid),
d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
pkts: None,
eof: Some(eof),
e: None,
code: None,
}
}
fn udp_drain_response(sid: String, packets: Vec<Vec<u8>>) -> TunnelResponse {
let pkts = if packets.is_empty() {
None
} else {
Some(packets.iter().map(|p| B64.encode(p)).collect())
};
TunnelResponse {
sid: Some(sid),
d: None,
pkts,
eof: Some(false),
e: None,
code: None,
}
}
fn eof_response(sid: String) -> TunnelResponse {
TunnelResponse {
sid: Some(sid),
d: None,
pkts: None,
eof: Some(true),
e: None,
code: None,
}
}
fn decompress_gzip(data: &[u8]) -> Result<Vec<u8>, String> { fn decompress_gzip(data: &[u8]) -> Result<Vec<u8>, String> {
use std::io::Read; use std::io::Read;
let mut decoder = flate2::read::GzDecoder::new(data); let mut decoder = flate2::read::GzDecoder::new(data);
@@ -652,7 +929,7 @@ async fn handle_connect(state: &AppState, host: Option<String>, port: Option<u16
let sid = uuid::Uuid::new_v4().to_string(); let sid = uuid::Uuid::new_v4().to_string();
tracing::info!("session {} -> {}:{}", sid, host, port); tracing::info!("session {} -> {}:{}", sid, host, port);
state.sessions.lock().await.insert(sid.clone(), session); state.sessions.lock().await.insert(sid.clone(), session);
TunnelResponse { sid: Some(sid), d: None, eof: Some(false), e: None, code: None } TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(false), e: None, code: None }
} }
/// Open a session and write the client's first bytes in one round trip. /// Open a session and write the client's first bytes in one round trip.
@@ -704,6 +981,47 @@ async fn handle_connect_data_phase1(
Ok((sid, inner)) Ok((sid, inner))
} }
/// UDP analogue of `handle_connect_data_phase1`. Opens a connected UDP
/// socket to `(host, port)` and optionally sends the client's first
/// datagram in the same op so a request-response flow (e.g. DNS, STUN)
/// saves a round trip on session establishment.
async fn handle_udp_open_phase1(
state: &AppState,
host: Option<String>,
port: Option<u16>,
data: Option<String>,
) -> Result<(String, Arc<UdpSessionInner>), TunnelResponse> {
let (host, port) = validate_host_port(host, port)?;
let session = create_udp_session(&host, port)
.await
.map_err(|e| TunnelResponse::error(format!("udp connect failed: {}", e)))?;
if let Some(ref data_b64) = data {
if !data_b64.is_empty() {
let bytes = match B64.decode(data_b64) {
Ok(b) => b,
Err(e) => {
session.reader_handle.abort();
return Err(TunnelResponse::error(format!("bad base64: {}", e)));
}
};
if !bytes.is_empty() {
if let Err(e) = session.inner.socket.send(&bytes).await {
session.reader_handle.abort();
return Err(TunnelResponse::error(format!("udp write failed: {}", e)));
}
}
}
}
let inner = session.inner.clone();
let sid = uuid::Uuid::new_v4().to_string();
tracing::info!("udp session {} -> {}:{}", sid, host, port);
state.udp_sessions.lock().await.insert(sid.clone(), session);
Ok((sid, inner))
}
async fn handle_connect_data_single( async fn handle_connect_data_single(
state: &AppState, state: &AppState,
host: Option<String>, host: Option<String>,
@@ -724,6 +1042,7 @@ async fn handle_connect_data_single(
TunnelResponse { TunnelResponse {
sid: Some(sid), sid: Some(sid),
d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
pkts: None,
eof: Some(eof), eof: Some(eof),
e: None, e: None,
code: None, code: None,
@@ -767,6 +1086,7 @@ async fn handle_data_single(state: &AppState, sid: Option<String>, data: Option<
TunnelResponse { TunnelResponse {
sid: Some(sid), sid: Some(sid),
d: if data.is_empty() { None } else { Some(B64.encode(&data)) }, d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
pkts: None,
eof: Some(eof), e: None, code: None, eof: Some(eof), e: None, code: None,
} }
} }
@@ -780,20 +1100,29 @@ async fn handle_close(state: &AppState, sid: Option<String>) -> TunnelResponse {
s.reader_handle.abort(); s.reader_handle.abort();
tracing::info!("session {} closed by client", sid); tracing::info!("session {} closed by client", sid);
} }
TunnelResponse { sid: Some(sid), d: None, eof: Some(true), e: None, code: None } if let Some(s) = state.udp_sessions.lock().await.remove(&sid) {
s.reader_handle.abort();
tracing::info!("udp session {} closed by client", sid);
}
TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(true), e: None, code: None }
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Cleanup // Cleanup
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
async fn cleanup_task(sessions: Arc<Mutex<HashMap<String, ManagedSession>>>) { async fn cleanup_task(
sessions: Arc<Mutex<HashMap<String, ManagedSession>>>,
udp_sessions: Arc<Mutex<HashMap<String, ManagedUdpSession>>>,
) {
let mut interval = tokio::time::interval(Duration::from_secs(30)); let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop { loop {
interval.tick().await; interval.tick().await;
let mut map = sessions.lock().await;
let now = Instant::now(); let now = Instant::now();
{
let mut map = sessions.lock().await;
let mut stale = Vec::new(); let mut stale = Vec::new();
for (k, s) in map.iter() { for (k, s) in map.iter() {
let last = *s.inner.last_active.lock().await; let last = *s.inner.last_active.lock().await;
@@ -811,6 +1140,35 @@ async fn cleanup_task(sessions: Arc<Mutex<HashMap<String, ManagedSession>>>) {
tracing::info!("cleanup: reaped {}, {} active", stale.len(), map.len()); tracing::info!("cleanup: reaped {}, {} active", stale.len(), map.len());
} }
} }
{
// UDP sessions get a tighter idle window because UDP flows
// are typically short-lived (DNS, STUN, single-RTT QUIC) or
// make their own keepalives. 120 s avoids leaking sockets
// for one-shot lookups while keeping calls/streams alive.
let mut map = udp_sessions.lock().await;
let mut stale = Vec::new();
for (k, s) in map.iter() {
let last = *s.inner.last_active.lock().await;
if now.duration_since(last) > Duration::from_secs(120) {
stale.push(k.clone());
}
}
for k in &stale {
if let Some(s) = map.remove(k) {
s.reader_handle.abort();
tracing::info!("reaped idle udp session {}", k);
}
}
if !stale.is_empty() {
tracing::info!(
"cleanup: reaped {}, {} active udp",
stale.len(),
map.len()
);
}
}
}
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -837,9 +1195,11 @@ async fn main() {
let sessions: Arc<Mutex<HashMap<String, ManagedSession>>> = let sessions: Arc<Mutex<HashMap<String, ManagedSession>>> =
Arc::new(Mutex::new(HashMap::new())); Arc::new(Mutex::new(HashMap::new()));
tokio::spawn(cleanup_task(sessions.clone())); let udp_sessions: Arc<Mutex<HashMap<String, ManagedUdpSession>>> =
Arc::new(Mutex::new(HashMap::new()));
tokio::spawn(cleanup_task(sessions.clone(), udp_sessions.clone()));
let state = AppState { sessions, auth_key }; let state = AppState { sessions, udp_sessions, auth_key };
let app = Router::new() let app = Router::new()
.route("/tunnel", post(handle_tunnel)) .route("/tunnel", post(handle_tunnel))
@@ -872,10 +1232,25 @@ mod tests {
fn fresh_state() -> AppState { fn fresh_state() -> AppState {
AppState { AppState {
sessions: Arc::new(Mutex::new(HashMap::new())), sessions: Arc::new(Mutex::new(HashMap::new())),
udp_sessions: Arc::new(Mutex::new(HashMap::new())),
auth_key: "test-key".into(), auth_key: "test-key".into(),
} }
} }
async fn start_udp_echo_server() -> u16 {
let socket = UdpSocket::bind(("127.0.0.1", 0)).await.unwrap();
let port = socket.local_addr().unwrap().port();
tokio::spawn(async move {
let mut buf = [0u8; 2048];
if let Ok((n, peer)) = socket.recv_from(&mut buf).await {
let mut out = b"ECHO: ".to_vec();
out.extend_from_slice(&buf[..n]);
let _ = socket.send_to(&out, peer).await;
}
});
port
}
/// Spin up a one-shot TCP server that echoes everything it reads back /// Spin up a one-shot TCP server that echoes everything it reads back
/// with a `"ECHO: "` prefix, then returns the bound port. /// with a `"ECHO: "` prefix, then returns the bound port.
async fn start_echo_server() -> u16 { async fn start_echo_server() -> u16 {
@@ -1371,4 +1746,129 @@ mod tests {
let data = B64.decode(d_b64).unwrap(); let data = B64.decode(d_b64).unwrap();
assert_eq!(&data[..], b"DELAYED"); assert_eq!(&data[..], b"DELAYED");
} }
// ---------------------------------------------------------------------
// UDP path
// ---------------------------------------------------------------------
#[tokio::test]
async fn udp_open_writes_initial_datagram_and_buffers_reply() {
let port = start_udp_echo_server().await;
let state = fresh_state();
let (sid, inner) = handle_udp_open_phase1(
&state,
Some("127.0.0.1".into()),
Some(port),
Some(B64.encode(b"ping")),
)
.await
.expect("udp open should succeed");
assert!(state.udp_sessions.lock().await.contains_key(&sid));
wait_for_any_udp_drainable(std::slice::from_ref(&inner), Duration::from_secs(2)).await;
let packets = drain_udp_now(&inner).await;
assert_eq!(packets, vec![b"ECHO: ping".to_vec()]);
}
/// When the upstream sends faster than the relay drains, the queue
/// must drop oldest packets (so recent voice/video stays current)
/// AND increment the counter so operators can correlate user
/// reports of choppiness with relay backpressure.
#[tokio::test]
async fn udp_queue_overflow_drops_oldest_and_counts() {
let state = fresh_state();
let sink = UdpSocket::bind(("127.0.0.1", 0)).await.unwrap();
let sink_port = sink.local_addr().unwrap().port();
let (_sid, inner) =
handle_udp_open_phase1(&state, Some("127.0.0.1".into()), Some(sink_port), None)
.await
.expect("udp open");
// Flood the session socket from sink — its connected remote is
// exactly sink_port, so packets pass the kernel's source check.
let session_addr = inner.socket.local_addr().unwrap();
let burst = UDP_QUEUE_LIMIT + 16;
for i in 0..burst {
let payload = format!("p{}", i).into_bytes();
sink.send_to(&payload, session_addr).await.unwrap();
}
// Give the reader_task a chance to drain the OS buffer.
for _ in 0..50 {
if inner.queue_drops.load(Ordering::Relaxed) > 0 {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
let drops = inner.queue_drops.load(Ordering::Relaxed);
let queued = inner.packets.lock().await.len();
assert!(drops >= 1, "expected ≥1 drop, got {} (queued={})", drops, queued);
assert!(queued <= UDP_QUEUE_LIMIT, "queue exceeded limit: {}", queued);
}
/// Regression for the bug the review caught: a batch mixing UDP and
/// TCP-data ops must let the TCP side benefit from the same
/// event-driven drain. With the new architecture both sides share
/// one wait_start / deadline window — ensure a delayed TCP response
/// still makes it into the batch even when UDP is along for the ride.
#[tokio::test]
async fn tcp_drain_runs_when_batch_also_contains_udp() {
use axum::body::Bytes;
use axum::extract::State;
// TCP server that delays its response past the typical wake but
// well within ACTIVE_DRAIN_DEADLINE (350ms).
let tcp_listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
let tcp_port = tcp_listener.local_addr().unwrap().port();
tokio::spawn(async move {
if let Ok((mut sock, _)) = tcp_listener.accept().await {
let mut buf = [0u8; 64];
let _ = sock.read(&mut buf).await;
tokio::time::sleep(Duration::from_millis(120)).await;
let _ = sock.write_all(b"DELAYED").await;
let _ = sock.flush().await;
}
});
// Idle UDP target — never replies. Just sets up the dual-drain
// path through Phase 2.
let udp_target = UdpSocket::bind(("127.0.0.1", 0)).await.unwrap();
let udp_port = udp_target.local_addr().unwrap().port();
let state = fresh_state();
let tcp_sid = match handle_connect(&state, Some("127.0.0.1".into()), Some(tcp_port)).await {
TunnelResponse {
sid: Some(s),
e: None,
..
} => s,
other => panic!("connect failed: {:?}", other),
};
let (udp_sid, _udp_inner) =
handle_udp_open_phase1(&state, Some("127.0.0.1".into()), Some(udp_port), None)
.await
.expect("udp open");
let body = serde_json::json!({
"k": "test-key",
"ops": [
{"op": "data", "sid": tcp_sid, "d": B64.encode(b"hello")},
{"op": "udp_data", "sid": udp_sid},
]
})
.to_string();
let resp = handle_batch(State(state.clone()), Bytes::from(body))
.await
.into_response();
let (parts, body) = resp.into_parts();
assert_eq!(parts.status, axum::http::StatusCode::OK);
let body_bytes = axum::body::to_bytes(body, 64 * 1024).await.unwrap();
let parsed: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
let r = parsed["r"].as_array().unwrap();
assert_eq!(r.len(), 2);
let tcp_d = r[0]["d"].as_str().expect("tcp data missing");
let decoded = B64.decode(tcp_d).unwrap();
assert_eq!(&decoded[..], b"DELAYED");
}
} }