feat(tunnel): pipelined polls with adaptive depth, wseq ordering, STUN blocking (#1115)

feat(tunnel): pipelined full-tunnel polls, ordered writes, and STUN blocking

Merged trusted PR #1115 by @yyoyoian-pixel after local verification and a small maintainer fix on the PR branch.

---
Answered via LLM, Supervised @therealaleph
This commit is contained in:
yyoyoian-pixel
2026-05-16 16:46:08 +02:00
committed by GitHub
parent d822d67a26
commit 919b13b166
14 changed files with 1422 additions and 209 deletions
+1
View File
@@ -17,6 +17,7 @@
prompt.
-->
<uses-permission android:name="android.permission.QUERY_ALL_PACKAGES" />
<uses-permission android:name="android.permission.SYSTEM_ALERT_WINDOW" />
<!--
App-launcher visibility filter. Complements QUERY_ALL_PACKAGES:
@@ -108,6 +108,8 @@ data class MhrvConfig(
val coalesceMaxMs: Int = 1000,
/** Block QUIC (UDP/443). QUIC over TCP tunnel causes meltdown. */
val blockQuic: Boolean = true,
/** Block STUN/TURN ports (3478/5349/19302). Forces WebRTC TCP fallback. */
val blockStun: Boolean = true,
val upstreamSocks5: String = "",
/**
@@ -231,6 +233,7 @@ data class MhrvConfig(
if (coalesceStepMs != 10) put("coalesce_step_ms", coalesceStepMs)
if (coalesceMaxMs != 1000) put("coalesce_max_ms", coalesceMaxMs)
put("block_quic", blockQuic)
put("block_stun", blockStun)
if (upstreamSocks5.isNotBlank()) {
put("upstream_socks5", upstreamSocks5.trim())
}
@@ -344,6 +347,7 @@ object ConfigStore {
if (cfg.coalesceStepMs != defaults.coalesceStepMs) obj.put("coalesce_step_ms", cfg.coalesceStepMs)
if (cfg.coalesceMaxMs != defaults.coalesceMaxMs) obj.put("coalesce_max_ms", cfg.coalesceMaxMs)
if (cfg.blockQuic != defaults.blockQuic) obj.put("block_quic", cfg.blockQuic)
if (cfg.blockStun != defaults.blockStun) obj.put("block_stun", cfg.blockStun)
if (cfg.upstreamSocks5.isNotBlank()) obj.put("upstream_socks5", cfg.upstreamSocks5)
if (cfg.passthroughHosts.isNotEmpty()) obj.put("passthrough_hosts", JSONArray().apply { cfg.passthroughHosts.forEach { put(it) } })
if (cfg.tunnelDoh != defaults.tunnelDoh) obj.put("tunnel_doh", cfg.tunnelDoh)
@@ -449,6 +453,7 @@ object ConfigStore {
coalesceStepMs = obj.optInt("coalesce_step_ms", 10),
coalesceMaxMs = obj.optInt("coalesce_max_ms", 1000),
blockQuic = obj.optBoolean("block_quic", true),
blockStun = obj.optBoolean("block_stun", true),
upstreamSocks5 = obj.optString("upstream_socks5", ""),
passthroughHosts = obj.optJSONArray("passthrough_hosts")?.let { arr ->
buildList { for (i in 0 until arr.length()) add(arr.optString(i)) }
@@ -35,6 +35,7 @@ class MhrvVpnService : VpnService() {
private var proxyHandle: Long = 0L
private var tun2proxyThread: Thread? = null
private val tun2proxyRunning = AtomicBoolean(false)
private var debugOverlay: PipelineDebugOverlay? = null
// Idempotency guard. teardown() is reachable from three paths:
// 1. ACTION_STOP onStartCommand branch (background thread)
@@ -149,6 +150,7 @@ class MhrvVpnService : VpnService() {
Log.i(TAG, "PROXY_ONLY mode: listeners up, skipping VpnService/TUN")
VpnState.setProxyHandle(proxyHandle)
VpnState.setRunning(true)
showDebugOverlay()
return
}
@@ -314,6 +316,16 @@ class MhrvVpnService : VpnService() {
// a failed-to-establish run.
VpnState.setProxyHandle(proxyHandle)
VpnState.setRunning(true)
showDebugOverlay()
}
private fun showDebugOverlay() {
if (debugOverlay != null) return
if (!android.provider.Settings.canDrawOverlays(this)) {
Log.w(TAG, "overlay permission not granted — skipping debug overlay")
return
}
debugOverlay = PipelineDebugOverlay(this).also { it.show() }
}
/**
@@ -434,6 +446,10 @@ class MhrvVpnService : VpnService() {
Log.w(TAG, "tun2proxy thread still alive after join timeout — proceeding anyway")
}
// Hide debug overlay before flipping UI state.
debugOverlay?.hide()
debugOverlay = null
// Flip UI state last — the button reverts to Connect only after
// the native-side cleanup actually happened, not optimistically.
VpnState.setProxyHandle(0L)
@@ -110,6 +110,13 @@ object Native {
*/
external fun statsJson(handle: Long): String
/**
* Pipeline debug overlay snapshot. Returns a JSON blob with elevated
* session count, batch semaphore usage, and recent ramp/drop events.
* Temporary — for debugging pipeline behavior on-device.
*/
external fun pipelineDebugJson(): String
/**
* Start tun2proxy via its CLI args C API (`tun2proxy_run_with_cli_args`).
* Resolved at runtime via dlsym from libtun2proxy.so — no fork needed.
@@ -0,0 +1,174 @@
package com.therealaleph.mhrv
import android.content.Context
import android.graphics.Color
import android.graphics.PixelFormat
import android.os.Handler
import android.os.Looper
import android.util.TypedValue
import android.view.Gravity
import android.view.MotionEvent
import android.view.View
import android.view.WindowManager
import android.widget.LinearLayout
import android.widget.TextView
import org.json.JSONObject
/**
* Transparent system overlay showing pipeline debug stats.
* Draggable, semi-transparent, shown on top of all apps.
* Temporary — remove when pipelining is validated.
*/
class PipelineDebugOverlay(private val context: Context) {
private val wm = context.getSystemService(Context.WINDOW_SERVICE) as WindowManager
private val handler = Handler(Looper.getMainLooper())
private var root: View? = null
private lateinit var tvElevated: TextView
private lateinit var tvBatches: TextView
private lateinit var tvEvents: TextView
private val pollInterval = 500L
fun show() {
if (root != null) return
val dp = { px: Int ->
TypedValue.applyDimension(TypedValue.COMPLEX_UNIT_DIP, px.toFloat(), context.resources.displayMetrics).toInt()
}
val layout = LinearLayout(context).apply {
orientation = LinearLayout.VERTICAL
setBackgroundColor(Color.argb(160, 0, 0, 0))
setPadding(dp(8), dp(6), dp(8), dp(6))
}
val titleTv = TextView(context).apply {
text = "Pipeline Debug"
setTextColor(Color.argb(220, 100, 255, 100))
textSize = 11f
}
layout.addView(titleTv)
tvElevated = TextView(context).apply {
setTextColor(Color.WHITE)
textSize = 10f
}
layout.addView(tvElevated)
tvBatches = TextView(context).apply {
setTextColor(Color.WHITE)
textSize = 10f
}
layout.addView(tvBatches)
tvEvents = TextView(context).apply {
setTextColor(Color.argb(200, 200, 200, 200))
textSize = 9f
maxLines = 8
}
layout.addView(tvEvents)
val params = WindowManager.LayoutParams(
WindowManager.LayoutParams.WRAP_CONTENT,
WindowManager.LayoutParams.WRAP_CONTENT,
WindowManager.LayoutParams.TYPE_APPLICATION_OVERLAY,
WindowManager.LayoutParams.FLAG_NOT_FOCUSABLE or
WindowManager.LayoutParams.FLAG_NOT_TOUCH_MODAL,
PixelFormat.TRANSLUCENT,
).apply {
gravity = Gravity.TOP or Gravity.START
x = dp(8)
y = dp(80)
}
// Draggable
var startX = 0
var startY = 0
var startTouchX = 0f
var startTouchY = 0f
layout.setOnTouchListener { _, event ->
when (event.action) {
MotionEvent.ACTION_DOWN -> {
startX = params.x
startY = params.y
startTouchX = event.rawX
startTouchY = event.rawY
true
}
MotionEvent.ACTION_MOVE -> {
params.x = startX + (event.rawX - startTouchX).toInt()
params.y = startY + (event.rawY - startTouchY).toInt()
wm.updateViewLayout(layout, params)
true
}
else -> false
}
}
root = layout
wm.addView(layout, params)
schedulePoll()
}
fun hide() {
handler.removeCallbacksAndMessages(null)
root?.let {
try { wm.removeView(it) } catch (_: Throwable) {}
}
root = null
}
private fun schedulePoll() {
handler.postDelayed(::poll, pollInterval)
}
private fun poll() {
if (root == null) return
Thread {
try {
val json = Native.pipelineDebugJson()
handler.post { applyJson(json) }
} catch (_: Throwable) {}
schedulePoll()
}.start()
}
private fun applyJson(json: String) {
if (root == null) return
try {
if (json.isNotBlank()) {
val obj = JSONObject(json)
val elevated = obj.optInt("elevated", 0)
val maxElev = obj.optInt("max_elevated", 0)
val batches = obj.optInt("active_batches", 0)
val maxBatch = obj.optInt("max_batch_slots", 0)
val sessions = obj.optInt("active_sessions", 0)
tvElevated.text = "Sessions: $sessions Elevated: $elevated / $maxElev"
tvBatches.text = "Batches: $batches / $maxBatch"
val sessArr = obj.optJSONArray("sessions")
val sessLines = if (sessArr != null && sessArr.length() > 0) {
(0 until sessArr.length()).joinToString("\n") { i ->
val s = sessArr.getJSONObject(i)
val sid = s.optString("sid", "?")
val d = s.optInt("depth", 0)
val inf = s.optInt("inflight", 0)
val e = if (s.optBoolean("elevated", false)) " E" else ""
"$sid d=$d f=$inf$e"
}
} else ""
val arr = obj.optJSONArray("events")
val evtLines = if (arr != null && arr.length() > 0) {
val start = maxOf(0, arr.length() - 5)
(start until arr.length()).joinToString("\n") { arr.getString(it) }
} else ""
tvEvents.text = listOf(sessLines, evtLines).filter { it.isNotEmpty() }.joinToString("\n---\n")
}
} catch (_: Throwable) {}
}
}
@@ -491,6 +491,7 @@ fun HomeScreen(
// client-side estimate only sees what this device relayed,
// not what other devices on the same deployment consumed.
UsageTodayCard()
PipelineDebugCard()
CollapsibleSection(title = stringResource(R.string.sec_live_logs), initiallyExpanded = false) {
LiveLogPane()
@@ -1287,6 +1288,28 @@ private fun AdvancedSettings(
)
}
// Block STUN/TURN toggle
Row(
verticalAlignment = Alignment.CenterVertically,
modifier = Modifier.fillMaxWidth(),
) {
Column(modifier = Modifier.weight(1f)) {
Text(
"Block STUN/TURN",
style = MaterialTheme.typography.bodyMedium,
)
Text(
"Reject STUN/TURN ports (3478/5349/19302). Forces WebRTC apps (Meet, WhatsApp) to TCP fallback — instant connect.",
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant,
)
}
Switch(
checked = cfg.blockStun,
onCheckedChange = { onChange(cfg.copy(blockStun = it)) },
)
}
// Block DoH toggle
Row(
verticalAlignment = Alignment.CenterVertically,
@@ -1645,6 +1668,104 @@ private fun UsageRow(label: String, value: String) {
}
}
@Composable
private fun PipelineDebugCard() {
val isRunning by VpnState.isRunning.collectAsState()
if (!isRunning) return
var json by remember { mutableStateOf("") }
LaunchedEffect(isRunning) {
if (!isRunning) return@LaunchedEffect
while (true) {
val result = withContext(Dispatchers.IO) {
runCatching { Native.pipelineDebugJson() }
}
json = result.getOrDefault("")
if (result.isFailure) {
android.util.Log.e("PipeDbg", "pipelineDebugJson failed", result.exceptionOrNull())
}
delay(500)
}
}
val obj = remember(json) {
if (json.isBlank()) null
else runCatching { JSONObject(json) }.getOrNull()
}
if (obj == null) return
val elevated = obj.optInt("elevated", 0)
val maxElevated = obj.optInt("max_elevated", 0)
val batches = obj.optInt("active_batches", 0)
val maxBatches = obj.optInt("max_batch_slots", 0)
val events = remember(json) {
val arr = obj.optJSONArray("events") ?: return@remember emptyList<String>()
(0 until arr.length()).map { arr.getString(it) }
}
Spacer(Modifier.height(8.dp))
ElevatedCard(modifier = Modifier.fillMaxWidth()) {
Column(
modifier = Modifier.padding(12.dp),
verticalArrangement = Arrangement.spacedBy(4.dp),
) {
Text(
"Pipeline Debug",
style = MaterialTheme.typography.titleSmall,
)
Row(
modifier = Modifier.fillMaxWidth(),
horizontalArrangement = Arrangement.SpaceBetween,
) {
Text("Elevated", style = MaterialTheme.typography.bodySmall)
Text(
"$elevated / $maxElevated",
style = MaterialTheme.typography.bodySmall,
fontFamily = FontFamily.Monospace,
)
}
Row(
modifier = Modifier.fillMaxWidth(),
horizontalArrangement = Arrangement.SpaceBetween,
) {
Text("Batches in-flight", style = MaterialTheme.typography.bodySmall)
Text(
"$batches / $maxBatches",
style = MaterialTheme.typography.bodySmall,
fontFamily = FontFamily.Monospace,
)
}
if (events.isNotEmpty()) {
Spacer(Modifier.height(4.dp))
Text("Events", style = MaterialTheme.typography.labelSmall)
Box(
modifier = Modifier
.fillMaxWidth()
.heightIn(max = 150.dp)
.clip(RoundedCornerShape(4.dp))
.background(MaterialTheme.colorScheme.surfaceVariant)
.padding(6.dp)
) {
val listState = rememberLazyListState()
LaunchedEffect(events.size) {
if (events.isNotEmpty()) listState.animateScrollToItem(events.size - 1)
}
LazyColumn(state = listState) {
items(events) { ev ->
Text(
ev,
style = MaterialTheme.typography.bodySmall,
fontFamily = FontFamily.Monospace,
fontSize = 10.sp,
)
}
}
}
}
}
}
}
private fun fmtBytes(b: Long): String {
val k = 1024L
val m = k * k
+140
View File
@@ -0,0 +1,140 @@
#!/usr/bin/env bash
#
# bench-pipeline.sh — compare throughput: serial (depth=1) vs pipelined (depth=10)
#
# Builds mhrv-rs twice (patching the INFLIGHT_ACTIVE constant), runs each
# as a local SOCKS5 proxy, downloads through the full tunnel, reports.
#
# Usage:
# ./scripts/bench-pipeline.sh [CONFIG_FILE]
#
# Default: config.json
set -euo pipefail
CONFIG="${1:-config.json}"
RUNS=3
SOCKS_PORT=18088
HTTP_PORT=18087
TEST_URL="https://speed.cloudflare.com/__down?bytes=5000000"
SRC="src/tunnel_client.rs"
TMPDIR_BENCH=$(mktemp -d)
cleanup() {
rm -rf "$TMPDIR_BENCH"
kill $PROXY_PID 2>/dev/null || true
# Restore original constant
sed -i '' "s/^const INFLIGHT_ACTIVE: usize = [0-9]*/const INFLIGHT_ACTIVE: usize = 10/" "$SRC" 2>/dev/null || true
}
trap cleanup EXIT
if [ ! -f "$CONFIG" ]; then
echo "ERROR: Config not found: $CONFIG"
exit 1
fi
echo "╔══════════════════════════════════════════════╗"
echo "║ Pipeline Throughput Benchmark ║"
echo "╠══════════════════════════════════════════════╣"
echo "║ Config: $CONFIG"
echo "║ Test URL: $TEST_URL"
echo "║ Runs: $RUNS per mode"
echo "╚══════════════════════════════════════════════╝"
echo ""
# Write a temp config with our ports
TEMP_CONFIG="$TMPDIR_BENCH/config.json"
python3 -c "
import json
with open('$CONFIG') as f:
c = json.load(f)
c['listen_port'] = $HTTP_PORT
c['socks5_port'] = $SOCKS_PORT
c['log_level'] = 'warn'
with open('$TEMP_CONFIG', 'w') as f:
json.dump(c, f)
"
run_test() {
local label="$1"
local binary="$2"
echo "━━━ $label ━━━"
$binary -c "$TEMP_CONFIG" &
PROXY_PID=$!
sleep 3
if ! kill -0 $PROXY_PID 2>/dev/null; then
echo " ERROR: Proxy failed to start"
return
fi
# Wait for proxy
for attempt in $(seq 1 15); do
if curl -s --socks5-hostname localhost:$SOCKS_PORT --connect-timeout 5 -o /dev/null https://www.google.com 2>/dev/null; then
break
fi
sleep 1
done
local total_bytes=0
local total_time=0
for i in $(seq 1 $RUNS); do
local result
result=$(curl -s --socks5-hostname localhost:$SOCKS_PORT \
-o /dev/null \
-w '%{size_download} %{time_total} %{speed_download}' \
--connect-timeout 30 \
--max-time 90 \
"$TEST_URL" 2>/dev/null || echo "0 999 0")
local bytes time_s speed
bytes=$(echo "$result" | awk '{print $1}')
time_s=$(echo "$result" | awk '{print $2}')
speed=$(echo "$result" | awk '{printf "%.0f", $3/1024}')
total_bytes=$((total_bytes + ${bytes%.*}))
total_time=$(echo "$total_time + $time_s" | bc)
printf " Run %d: %.1fs %s KB/s\n" "$i" "$time_s" "$speed"
done
local avg_speed avg_time
avg_speed=$(echo "scale=1; $total_bytes / $total_time / 1024" | bc 2>/dev/null || echo "0")
avg_time=$(echo "scale=1; $total_time / $RUNS" | bc 2>/dev/null || echo "0")
printf " ➜ Average: %s KB/s (%.1fs per download)\n\n" "$avg_speed" "$avg_time"
kill $PROXY_PID 2>/dev/null || true
wait $PROXY_PID 2>/dev/null || true
sleep 1
echo "$label|$avg_speed|$avg_time" >> "$TMPDIR_BENCH/results.txt"
}
# Build serial (depth=1)
echo "Building serial mode (INFLIGHT_ACTIVE=1)..."
sed -i '' "s/^const INFLIGHT_ACTIVE: usize = [0-9]*/const INFLIGHT_ACTIVE: usize = 1/" "$SRC"
cargo build --release 2>&1 | tail -1
cp target/release/mhrv-rs "$TMPDIR_BENCH/mhrv-serial"
# Build pipelined (depth=10)
echo "Building pipelined mode (INFLIGHT_ACTIVE=10)..."
sed -i '' "s/^const INFLIGHT_ACTIVE: usize = [0-9]*/const INFLIGHT_ACTIVE: usize = 10/" "$SRC"
cargo build --release 2>&1 | tail -1
cp target/release/mhrv-rs "$TMPDIR_BENCH/mhrv-pipelined"
echo ""
# Run tests
run_test "Serial (depth=1)" "$TMPDIR_BENCH/mhrv-serial"
run_test "Pipelined (depth=10)" "$TMPDIR_BENCH/mhrv-pipelined"
# Summary
echo "╔══════════════════════════════════════════════╗"
echo "║ RESULTS ║"
echo "╠══════════════════════════════════════════════╣"
while IFS='|' read -r label speed time; do
printf "║ %-25s %6s KB/s %5ss\n" "$label" "$speed" "$time"
done < "$TMPDIR_BENCH/results.txt"
echo "╚══════════════════════════════════════════════╝"
+15 -1
View File
@@ -199,7 +199,7 @@ pub extern "system" fn Java_com_therealaleph_mhrv_Native_startProxy(
// Try to build the runtime first — if allocation fails we want to
// know before spinning up anything stateful.
let rt = match tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.worker_threads(4)
.enable_all()
.thread_name("mhrv-worker")
.build()
@@ -483,6 +483,20 @@ pub extern "system" fn Java_com_therealaleph_mhrv_Native_statsJson<'a>(
env.new_string(out).map(|s| s.into_raw()).unwrap_or(std::ptr::null_mut())
}
/// `Native.pipelineDebugJson()` -> String. Snapshot of pipeline debug state:
/// elevated session count, batch semaphore usage, recent ramp/drop events.
/// Temporary — for the debug overlay.
#[no_mangle]
pub extern "system" fn Java_com_therealaleph_mhrv_Native_pipelineDebugJson<'a>(
env: JNIEnv<'a>,
_class: JClass,
) -> jstring {
let out = safe(String::new(), AssertUnwindSafe(|| {
crate::tunnel_client::pipeline_debug::to_json()
}));
env.new_string(out).map(|s| s.into_raw()).unwrap_or(std::ptr::null_mut())
}
// ---------------------------------------------------------------------------
// tun2proxy CLI API wrapper (dlsym — no fork or patch needed)
// ---------------------------------------------------------------------------
+20
View File
@@ -256,6 +256,10 @@ struct FormState {
/// drop the user's setting. Not currently exposed as a UI control;
/// users edit `block_quic` directly in `config.json` (Issue #213).
block_quic: bool,
/// Round-tripped from config.json and exposed beside QUIC blocking.
/// Default true to push WebRTC apps toward TCP TURN instead of slow
/// UDP ICE retries.
block_stun: bool,
/// Round-tripped from config.json. Not exposed as a UI control —
/// users edit `disable_padding` directly when needed (Issue #391).
/// Default false (padding active).
@@ -387,6 +391,7 @@ fn load_form() -> (FormState, Option<String>) {
youtube_via_relay: c.youtube_via_relay,
passthrough_hosts: c.passthrough_hosts.clone(),
block_quic: c.block_quic,
block_stun: c.block_stun,
disable_padding: c.disable_padding,
force_http1: c.force_http1,
tunnel_doh: c.tunnel_doh,
@@ -426,6 +431,7 @@ fn load_form() -> (FormState, Option<String>) {
youtube_via_relay: false,
passthrough_hosts: Vec::new(),
block_quic: true,
block_stun: true,
disable_padding: false,
force_http1: false,
tunnel_doh: true,
@@ -587,6 +593,7 @@ impl FormState {
// control yet). Round-trip through the file so save
// doesn't drop a user-set true.
block_quic: self.block_quic,
block_stun: self.block_stun,
// Issue #391: disable_padding is config-only for now.
// Round-trip preserves the user's choice.
disable_padding: self.disable_padding,
@@ -688,6 +695,9 @@ struct ConfigWire<'a> {
/// emit only when the user has explicitly disabled the block.
#[serde(skip_serializing_if = "is_true")]
block_doh: bool,
/// Default true. Emit only when the user disables STUN/TURN blocking.
#[serde(skip_serializing_if = "is_true")]
block_stun: bool,
#[serde(skip_serializing_if = "Vec::is_empty")]
fronting_groups: &'a Vec<FrontingGroup>,
/// Auto-blacklist tuning + batch timeout (#391, #444, #430). Skip
@@ -781,6 +791,7 @@ impl<'a> From<&'a Config> for ConfigWire<'a> {
tunnel_doh: c.tunnel_doh,
bypass_doh_hosts: &c.bypass_doh_hosts,
block_doh: c.block_doh,
block_stun: c.block_stun,
fronting_groups: &c.fronting_groups,
auto_blacklist_strikes: c.auto_blacklist_strikes,
auto_blacklist_window_secs: c.auto_blacklist_window_secs,
@@ -1274,6 +1285,15 @@ impl eframe::App for App {
Issue #213, #793.",
);
});
ui.horizontal(|ui| {
ui.add_space(120.0 + 8.0);
ui.checkbox(&mut self.form.block_stun, "Block STUN/TURN UDP")
.on_hover_text(
"Drop WebRTC STUN/TURN UDP ports 3478, 5349, and 19302 so apps \
such as Meet, Discord, and WhatsApp move to TCP TURN instead of \
waiting on UDP ICE retries.",
);
});
});
});
+8
View File
@@ -202,6 +202,13 @@ pub struct Config {
/// flag lets users who care about consistency over peak speed
/// opt out of QUIC at the source rather than discovering its
/// failure modes later. Issue #213.
/// Block STUN/TURN UDP ports (3478, 5349, 19302) at the SOCKS5 listener.
/// Forces WebRTC apps (Google Meet, Discord, WhatsApp) to fall back to
/// TCP TURN on port 443, skipping the 10-30s UDP ICE timeout. Default
/// true — TCP fallback works for all tested apps and connects instantly.
#[serde(default = "default_block_stun")]
pub block_stun: bool,
#[serde(default = "default_block_quic")]
pub block_quic: bool,
/// When true, suppress the random `_pad` field that v1.8.0+ adds
@@ -497,6 +504,7 @@ fn default_tunnel_doh() -> bool { true }
/// Default for `block_quic`: `true`. QUIC over the TCP-based tunnel
/// causes TCP-over-TCP meltdown (<1 Mbps). Browsers fall back to
/// HTTPS/TCP within seconds of the silent UDP drop. Issue #793.
fn default_block_stun() -> bool { true }
fn default_block_quic() -> bool { true }
/// Default for `block_doh`: `true` (browser DoH is rejected so the
+6
View File
@@ -514,6 +514,8 @@ pub struct TunnelResponse {
/// `e` only when this is `None` and compatibility is needed.
#[serde(default)]
pub code: Option<String>,
#[serde(default)]
pub seq: Option<u64>,
}
/// A single op in a batch tunnel request.
@@ -528,6 +530,10 @@ pub struct BatchOp {
pub port: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub d: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub seq: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub wseq: Option<u64>,
}
/// Batch tunnel response from Apps Script / tunnel node.
+13
View File
@@ -241,6 +241,7 @@ pub struct RewriteCtx {
/// callers fall back to TCP/HTTPS. See config.rs `block_quic` for
/// the trade-off. Issue #213.
pub block_quic: bool,
pub block_stun: bool,
/// If true, route DoH CONNECTs around the Apps Script tunnel via
/// plain TCP. Default false via `Config::tunnel_doh = true` (flipped
/// in v1.9.0, issue #468). See `DEFAULT_DOH_HOSTS` and
@@ -507,6 +508,7 @@ impl ProxyServer {
youtube_via_relay: config.youtube_via_relay,
passthrough_hosts: config.passthrough_hosts.clone(),
block_quic: config.block_quic,
block_stun: config.block_stun,
bypass_doh: !config.tunnel_doh,
block_doh: config.block_doh,
bypass_doh_hosts: config.bypass_doh_hosts.clone(),
@@ -940,6 +942,17 @@ async fn handle_socks5_client(
}
}
// Reject STUN/TURN UDP ports immediately so WebRTC (Meet,
// Telegram calls) skips UDP ICE candidates and falls back to
// TCP TURN on :443 without waiting for a timeout.
if rewrite_ctx.block_stun && matches!(port, 3478 | 5349 | 19302) {
tracing::info!("SOCKS5 CONNECT -> {}:{} (STUN/TURN blocked, forcing TCP fallback)", host, port);
sock.write_all(&[0x05, 0x05, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
.await?;
sock.flush().await?;
return Ok(());
}
tracing::info!("SOCKS5 CONNECT -> {}:{}", host, port);
// Success reply with zeroed BND.
+782 -176
View File
File diff suppressed because it is too large Load Diff
+114 -32
View File
@@ -71,7 +71,7 @@ const STRAGGLER_SETTLE_MAX: Duration = Duration::from_millis(1000);
/// `BATCH_TIMEOUT` (30 s) and Apps Script's UrlFetch ceiling (~60 s).
/// Tested on censored networks in Iran where users reported smoother
/// Telegram video playback and fewer session resets at this value.
const LONGPOLL_DEADLINE: Duration = Duration::from_secs(15);
const LONGPOLL_DEADLINE: Duration = Duration::from_secs(4);
/// Bound on each UDP session's inbound queue. Beyond this we drop oldest
/// to keep recent voice/media packets moving — a stale RTP frame is
@@ -153,6 +153,11 @@ struct SessionInner {
/// to wake the drain phase as soon as any session has something to
/// ship, replacing the old fixed-sleep heuristic.
notify: Notify,
/// Sequence-ordered write buffer: pipelined data ops may arrive
/// out of order (different batches completing at different times).
/// We buffer out-of-order writes and flush in seq order.
next_write_seq: Mutex<Option<u64>>,
pending_writes: Mutex<std::collections::BTreeMap<u64, Vec<u8>>>,
}
struct ManagedSession {
@@ -212,6 +217,8 @@ async fn create_session(host: &str, port: u16) -> std::io::Result<ManagedSession
eof: AtomicBool::new(false),
last_active: Mutex::new(Instant::now()),
notify: Notify::new(),
next_write_seq: Mutex::new(None),
pending_writes: Mutex::new(std::collections::BTreeMap::new()),
});
let inner_ref = inner.clone();
@@ -231,6 +238,8 @@ fn create_udpgw_session() -> ManagedSession {
eof: AtomicBool::new(false),
last_active: Mutex::new(Instant::now()),
notify: Notify::new(),
next_write_seq: Mutex::new(None),
pending_writes: Mutex::new(std::collections::BTreeMap::new()),
});
let inner_ref = inner.clone();
@@ -241,7 +250,7 @@ fn create_udpgw_session() -> ManagedSession {
}
async fn reader_task(mut reader: impl AsyncRead + Unpin, session: Arc<SessionInner>) {
let mut buf = vec![0u8; 65536];
let mut buf = vec![0u8; 2 * 1024 * 1024];
loop {
match reader.read(&mut buf).await {
Ok(0) => {
@@ -643,17 +652,19 @@ struct TunnelResponse {
#[serde(skip_serializing_if = "Option::is_none")] eof: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")] e: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] code: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] seq: Option<u64>,
}
impl TunnelResponse {
fn error(msg: impl Into<String>) -> Self {
Self { sid: None, d: None, pkts: None, eof: None, e: Some(msg.into()), code: None }
Self { sid: None, d: None, pkts: None, eof: None, e: Some(msg.into()), code: None, seq: None }
}
fn unsupported_op(op: &str) -> Self {
Self {
sid: None, d: None, pkts: None, eof: None,
e: Some(format!("unknown op: {}", op)),
code: Some(CODE_UNSUPPORTED_OP.into()),
seq: None,
}
}
}
@@ -675,6 +686,8 @@ struct BatchOp {
#[serde(default)] host: Option<String>,
#[serde(default)] port: Option<u16>,
#[serde(default)] d: Option<String>, // base64 data
#[serde(default)] seq: Option<u64>,
#[serde(default)] wseq: Option<u64>,
}
#[derive(Serialize)]
@@ -797,8 +810,8 @@ async fn handle_batch(
// map lock isn't held across the per-session read_buf / packets
// mutex acquisition — without this, every other batch (and every
// connect/close op) head-of-line-blocks behind the drain.
let mut tcp_drains: Vec<(usize, String, Arc<SessionInner>)> = Vec::new();
let mut udp_drains: Vec<(usize, String, Arc<UdpSessionInner>)> = Vec::new();
let mut tcp_drains: Vec<(usize, String, Arc<SessionInner>, Option<u64>)> = Vec::new();
let mut udp_drains: Vec<(usize, String, Arc<UdpSessionInner>, Option<u64>)> = Vec::new();
// True iff the batch contained any op that performed a real action
// upstream — a new connection or a non-empty data write. A batch of
// only empty "data" / "udp_data" polls (and possibly closes) leaves
@@ -893,15 +906,60 @@ async fn handle_batch(
};
if !bytes.is_empty() {
had_writes_or_connects = true;
let mut w = inner.writer.lock().await;
let _ = w.write_all(&bytes).await;
let _ = w.flush().await;
tracing::info!(
"session {} upload {}B wseq={:?}",
&sid[..sid.len().min(8)], bytes.len(), op.wseq,
);
match op.wseq {
None => {
// Old client (no wseq): write immediately.
let mut w = inner.writer.lock().await;
let _ = w.write_all(&bytes).await;
let _ = w.flush().await;
}
Some(wseq) => {
let mut nws = inner.next_write_seq.lock().await;
let expected = nws.get_or_insert(wseq);
if wseq < *expected {
// Stale / duplicate — skip.
tracing::debug!(
"session {} wseq {} < expected {} — skipping",
&sid[..sid.len().min(8)], wseq, *expected,
);
} else if wseq == *expected {
// In order — write immediately.
let mut w = inner.writer.lock().await;
let _ = w.write_all(&bytes).await;
*expected += 1;
// Flush any buffered writes that
// are now in sequence.
let mut pw = inner.pending_writes.lock().await;
while let Some(entry) = pw.first_entry() {
if *entry.key() != *expected { break; }
let (_, buffered) = entry.remove_entry();
let _ = w.write_all(&buffered).await;
*expected += 1;
}
let _ = w.flush().await;
} else {
// Out of order — buffer for later.
tracing::debug!(
"session {} wseq {} > expected {} — buffering",
&sid[..sid.len().min(8)], wseq, *expected,
);
let mut pw = inner.pending_writes.lock().await;
pw.insert(wseq, bytes);
}
}
}
}
}
}
tcp_drains.push((i, sid, inner));
tcp_drains.push((i, sid, inner, op.seq));
} else {
results.push((i, eof_response(sid)));
results.push((i, eof_response(sid, op.seq)));
}
}
"udp_data" => {
@@ -942,9 +1000,9 @@ async fn handle_batch(
if had_uplink {
*inner.last_active.lock().await = Instant::now();
}
udp_drains.push((i, sid, inner));
udp_drains.push((i, sid, inner, op.seq));
} else {
results.push((i, eof_response(sid)));
results.push((i, eof_response(sid, op.seq)));
}
}
"close" => {
@@ -964,11 +1022,11 @@ async fn handle_batch(
match join {
Ok((i, NewConn::Connect(r))) => results.push((i, r)),
Ok((i, NewConn::ConnectData(Ok((sid, inner))))) => {
tcp_drains.push((i, sid, inner));
tcp_drains.push((i, sid, inner, None));
}
Ok((i, NewConn::ConnectData(Err(r)))) => results.push((i, r)),
Ok((i, NewConn::UdpOpen(Ok((sid, inner))))) => {
udp_drains.push((i, sid, inner));
udp_drains.push((i, sid, inner, None));
}
Ok((i, NewConn::UdpOpen(Err(r)))) => results.push((i, r)),
Err(e) => {
@@ -999,9 +1057,9 @@ async fn handle_batch(
// don't need to re-acquire the sessions map lock here. Cloning
// the Arc is just a refcount bump.
let tcp_inners: Vec<Arc<SessionInner>> =
tcp_drains.iter().map(|(_, _, inner)| inner.clone()).collect();
tcp_drains.iter().map(|(_, _, inner, _)| inner.clone()).collect();
let udp_inners: Vec<Arc<UdpSessionInner>> =
udp_drains.iter().map(|(_, _, inner)| inner.clone()).collect();
udp_drains.iter().map(|(_, _, inner, _)| inner.clone()).collect();
// Wake on whichever side has work first. The previous
// `tokio::join!` was conjunctive — a TCP burst still paid the
@@ -1086,17 +1144,33 @@ async fn handle_batch(
// Apps Script's 50 MiB response ceiling. This cap stops one session
// short of the cliff; deferred sessions drain on the next poll.
let mut remaining_budget: usize = BATCH_RESPONSE_BUDGET;
for (i, sid, inner) in &tcp_drains {
let (data, eof) = drain_now(inner, remaining_budget).await;
let drained = data.len();
if eof {
for (i, sid, inner, seq) in &tcp_drains {
// Drain in a loop: keep reading until the buffer is empty
// so we catch data that arrives during the drain itself.
let mut all_data = Vec::new();
let mut final_eof = false;
let drain_deadline = Instant::now() + Duration::from_secs(1);
loop {
let (data, eof) = drain_now(inner, remaining_budget.saturating_sub(all_data.len())).await;
if eof { final_eof = true; }
if data.is_empty() { break; }
let hit_session_cap = data.len() >= TCP_DRAIN_MAX_BYTES;
all_data.extend_from_slice(&data);
if final_eof || hit_session_cap || all_data.len() >= remaining_budget { break; }
if Instant::now() >= drain_deadline { break; }
// Brief yield to let reader_task finish its current read
tokio::task::yield_now().await;
}
let drained = all_data.len();
if drained > 0 {
tracing::info!("session {} drained {}KB", &sid[..sid.len().min(8)], drained / 1024);
}
if final_eof {
tcp_eof_sids.push(sid.clone());
}
results.push((*i, tcp_drain_response(sid.clone(), data, eof)));
results.push((*i, tcp_drain_response(sid.clone(), all_data, final_eof, *seq)));
remaining_budget = remaining_budget.saturating_sub(drained);
if remaining_budget == 0 {
// Budget exhausted; remaining sessions in `tcp_drains` keep
// their buffered data and pick up next batch.
break;
}
}
@@ -1119,12 +1193,12 @@ async fn handle_batch(
// trap that motivated the TCP-side fix reappears, and tracking
// eof from the drain return rather than the atomic catches it.
let mut udp_eof_sids: Vec<String> = Vec::new();
for (i, sid, inner) in &udp_drains {
for (i, sid, inner, seq) in &udp_drains {
let (packets, eof) = drain_udp_now(inner).await;
if eof {
udp_eof_sids.push(sid.clone());
}
results.push((*i, udp_drain_response(sid.clone(), packets, eof)));
results.push((*i, udp_drain_response(sid.clone(), packets, eof, *seq)));
}
if !udp_eof_sids.is_empty() {
let mut sessions = state.udp_sessions.lock().await;
@@ -1147,7 +1221,7 @@ async fn handle_batch(
(StatusCode::OK, [(header::CONTENT_TYPE, "application/json")], json)
}
fn tcp_drain_response(sid: String, data: Vec<u8>, eof: bool) -> TunnelResponse {
fn tcp_drain_response(sid: String, data: Vec<u8>, eof: bool, seq: Option<u64>) -> TunnelResponse {
TunnelResponse {
sid: Some(sid),
d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
@@ -1155,10 +1229,11 @@ fn tcp_drain_response(sid: String, data: Vec<u8>, eof: bool) -> TunnelResponse {
eof: Some(eof),
e: None,
code: None,
seq,
}
}
fn udp_drain_response(sid: String, packets: Vec<Vec<u8>>, eof: bool) -> TunnelResponse {
fn udp_drain_response(sid: String, packets: Vec<Vec<u8>>, eof: bool, seq: Option<u64>) -> TunnelResponse {
let pkts = if packets.is_empty() {
None
} else {
@@ -1171,10 +1246,11 @@ fn udp_drain_response(sid: String, packets: Vec<Vec<u8>>, eof: bool) -> TunnelRe
eof: Some(eof),
e: None,
code: None,
seq,
}
}
fn eof_response(sid: String) -> TunnelResponse {
fn eof_response(sid: String, seq: Option<u64>) -> TunnelResponse {
TunnelResponse {
sid: Some(sid),
d: None,
@@ -1182,6 +1258,7 @@ fn eof_response(sid: String) -> TunnelResponse {
eof: Some(true),
e: None,
code: None,
seq,
}
}
@@ -1228,7 +1305,7 @@ async fn handle_connect(state: &AppState, host: Option<String>, port: Option<u16
let sid = uuid::Uuid::new_v4().to_string();
tracing::info!("session {} -> {}:{}", sid, host, port);
state.sessions.lock().await.insert(sid.clone(), session);
TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(false), e: None, code: None }
TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(false), e: None, code: None, seq: None }
}
/// Open a session and write the client's first bytes in one round trip.
@@ -1350,6 +1427,7 @@ async fn handle_connect_data_single(
eof: Some(eof),
e: None,
code: None,
seq: None,
}
}
@@ -1398,7 +1476,7 @@ async fn handle_data_single(state: &AppState, sid: Option<String>, data: Option<
sid: Some(sid),
d: if data.is_empty() { None } else { Some(B64.encode(&data)) },
pkts: None,
eof: Some(eof), e: None, code: None,
eof: Some(eof), e: None, code: None, seq: None,
}
}
@@ -1415,7 +1493,7 @@ async fn handle_close(state: &AppState, sid: Option<String>) -> TunnelResponse {
s.reader_handle.abort();
tracing::info!("udp session {} closed by client", sid);
}
TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(true), e: None, code: None }
TunnelResponse { sid: Some(sid), d: None, pkts: None, eof: Some(true), e: None, code: None, seq: None }
}
// ---------------------------------------------------------------------------
@@ -1736,6 +1814,8 @@ mod tests {
eof: AtomicBool::new(false),
last_active: Mutex::new(Instant::now()),
notify: Notify::new(),
next_write_seq: Mutex::new(None),
pending_writes: Mutex::new(std::collections::BTreeMap::new()),
})
}
@@ -1987,6 +2067,8 @@ mod tests {
eof: AtomicBool::new(false),
last_active: Mutex::new(Instant::now()),
notify: Notify::new(),
next_write_seq: Mutex::new(None),
pending_writes: Mutex::new(std::collections::BTreeMap::new()),
});
let _reader_handle = tokio::spawn(reader_task(reader, inner.clone()));
@@ -2343,7 +2425,7 @@ mod tests {
);
// The `udp_drain_response` helper threads eof into `eof: Some(true)`.
let resp = udp_drain_response("zombie".into(), pkts, eof);
let resp = udp_drain_response("zombie".into(), pkts, eof, None);
assert_eq!(resp.eof, Some(true));
assert!(resp.pkts.is_none());
}