Files
MasterHttpRelayVPN/scripts/benchmark_transport.py
2026-05-02 12:03:59 +03:30

695 lines
27 KiB
Python

"""
Transport protocol & connection benchmark suite.
Tests run against Google's edge IP with SNI fronting. Four suites:
1. Protocol sequential — H1.1 / H2 / H3, one request at a time (apples-to-apples latency)
2. TLS session resumption — cold connect vs warm reconnect using cached session ticket
3. Concurrency — H2 multiplex (N streams on 1 conn) vs H1.1 parallel (N separate conns)
4. IP scan — probe all candidate Google IPs to find the fastest one on this network
Usage:
python scripts/benchmark_transport.py # reads config.json
python scripts/benchmark_transport.py --ip 216.239.38.120 --sni www.google.com
python scripts/benchmark_transport.py --suite protocol # only run suite 1
python scripts/benchmark_transport.py --suite resumption
python scripts/benchmark_transport.py --suite concurrency
python scripts/benchmark_transport.py --suite ipscan
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import socket
import ssl
import statistics
import sys
import time
from pathlib import Path
# ── Optional imports ──────────────────────────────────────────────────────
try:
import h2.connection
import h2.config
import h2.events
import h2.settings
H2_AVAILABLE = True
except ImportError:
H2_AVAILABLE = False
try:
import certifi
_CAFILE = certifi.where()
except ImportError:
_CAFILE = None
try:
import aioquic.asyncio as quic_asyncio
import aioquic.h3.connection as h3c
import aioquic.h3.events as h3e
import aioquic.quic.configuration as quic_cfg
import aioquic.quic.events as quic_events
H3_AVAILABLE = True
except ImportError:
H3_AVAILABLE = False
# ── TLS context helpers ───────────────────────────────────────────────────
def _make_tls_ctx(alpn: list[str]) -> ssl.SSLContext:
ctx = ssl.create_default_context()
if _CAFILE:
try:
ctx.load_verify_locations(cafile=_CAFILE)
except Exception:
pass
ctx.set_alpn_protocols(alpn)
return ctx
# ── HTTP/1.1 probe ────────────────────────────────────────────────────────
async def _probe_h1(host_ip: str, sni: str, path: str, timeout: float) -> float:
"""Return elapsed seconds for one H1.1 GET. Raises on error."""
ctx = _make_tls_ctx(["http/1.1"])
raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
raw.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
raw.setblocking(False)
t0 = time.perf_counter()
loop = asyncio.get_running_loop()
await asyncio.wait_for(loop.sock_connect(raw, (host_ip, 443)), timeout=timeout)
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ssl=ctx, server_hostname=sni, sock=raw),
timeout=timeout,
)
req = (
f"GET {path} HTTP/1.1\r\n"
f"Host: {sni}\r\n"
"Accept: */*\r\n"
"Connection: close\r\n"
"\r\n"
).encode()
writer.write(req)
await asyncio.wait_for(writer.drain(), timeout=timeout)
resp = b""
while True:
chunk = await asyncio.wait_for(reader.read(4096), timeout=timeout)
if not chunk:
break
resp += chunk
if b"\r\n\r\n" in resp:
break
writer.close()
elapsed = time.perf_counter() - t0
if not resp.startswith(b"HTTP/"):
raise RuntimeError(f"Unexpected response: {resp[:60]!r}")
return elapsed
# ── HTTP/2 probe ──────────────────────────────────────────────────────────
async def _probe_h2_fresh(host_ip: str, sni: str, path: str, timeout: float) -> float:
"""One H2 GET on a NEW connection each time (apples-to-apples vs H1)."""
if not H2_AVAILABLE:
raise RuntimeError("h2 not installed")
ctx = _make_tls_ctx(["h2", "http/1.1"])
raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
raw.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
raw.setblocking(False)
t0 = time.perf_counter()
loop = asyncio.get_running_loop()
await asyncio.wait_for(loop.sock_connect(raw, (host_ip, 443)), timeout=timeout)
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ssl=ctx, server_hostname=sni, sock=raw),
timeout=timeout,
)
ssl_obj = writer.get_extra_info("ssl_object")
negotiated = ssl_obj.selected_alpn_protocol() if ssl_obj else None
if negotiated != "h2":
writer.close()
raise RuntimeError(f"H2 ALPN failed (got {negotiated!r})")
cfg = h2.config.H2Configuration(client_side=True, header_encoding="utf-8")
conn = h2.connection.H2Connection(cfg)
conn.initiate_connection()
writer.write(conn.data_to_send(65535))
await writer.drain()
stream_id = conn.get_next_available_stream_id()
conn.send_headers(stream_id, [
(":method", "GET"),
(":path", path),
(":scheme", "https"),
(":authority", sni),
("accept", "*/*"),
], end_stream=True)
writer.write(conn.data_to_send(65535))
await asyncio.wait_for(writer.drain(), timeout=timeout)
headers_done = False
while not headers_done:
raw_data = await asyncio.wait_for(reader.read(65535), timeout=timeout)
if not raw_data:
break
events = conn.receive_data(raw_data)
writer.write(conn.data_to_send(65535))
await writer.drain()
for ev in events:
if isinstance(ev, (h2.events.ResponseReceived, h2.events.StreamEnded,
h2.events.DataReceived)):
if isinstance(ev, h2.events.ResponseReceived) and ev.stream_id == stream_id:
headers_done = True
writer.close()
return time.perf_counter() - t0
# ── HTTP/3 (QUIC) probe ───────────────────────────────────────────────────
class _H3ProbeProtocol(quic_asyncio.QuicConnectionProtocol):
"""Minimal aioquic protocol that sends one H3 GET and captures the result."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._h3: h3c.H3Connection | None = None
self._done: asyncio.Future[float] = asyncio.get_event_loop().create_future()
self._t0: float = time.perf_counter()
self._stream_id: int | None = None
def quic_event_received(self, event):
if isinstance(event, quic_events.HandshakeCompleted):
self._h3 = h3c.H3Connection(self._quic, enable_webtransport=False)
if self._h3 is None:
return
for h3ev in self._h3.handle_event(event):
if isinstance(h3ev, h3e.HeadersReceived):
if not self._done.done():
self._done.set_result(time.perf_counter() - self._t0)
elif isinstance(h3ev, h3e.DataReceived):
pass # don't need body
def send_request(self, sni: str, path: str):
self._stream_id = self._quic.get_next_available_stream_id()
self._h3.send_headers(
stream_id=self._stream_id,
headers=[
(b":method", b"GET"),
(b":path", path.encode()),
(b":scheme", b"https"),
(b":authority", sni.encode()),
(b"accept", b"*/*"),
],
end_stream=True,
)
self.transmit()
async def _h3_inner(host_ip: str, sni: str, path: str, timeout: float) -> float:
cfg = quic_cfg.QuicConfiguration(
is_client=True,
server_name=sni,
alpn_protocols=h3c.H3_ALPN,
verify_mode=ssl.CERT_REQUIRED,
)
if _CAFILE:
try:
cfg.load_verify_locations(_CAFILE)
except Exception:
pass
t0 = time.perf_counter()
async with quic_asyncio.connect(
host_ip,
443,
configuration=cfg,
create_protocol=_H3ProbeProtocol,
) as proto:
proto._t0 = t0
proto.send_request(sni, path)
return await proto._done
async def _probe_h3(host_ip: str, sni: str, path: str, timeout: float) -> float:
if not H3_AVAILABLE:
raise RuntimeError("aioquic not installed")
# QUIC uses UDP. Wrap the ENTIRE connect+request in wait_for so a
# network that silently drops UDP packets doesn't stall indefinitely.
h3_timeout = min(timeout, 5.0)
try:
return await asyncio.wait_for(_h3_inner(host_ip, sni, path, h3_timeout), timeout=h3_timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"QUIC/UDP timed out after {h3_timeout:.1f}s — UDP likely blocked or no H3 support")
except Exception as exc:
raise RuntimeError(f"{type(exc).__name__}: {exc or 'no detail'}")
# ── Runner ────────────────────────────────────────────────────────────────
async def _run_protocol(
name: str,
probe,
host_ip: str,
sni: str,
path: str,
n: int,
timeout: float,
) -> dict:
times: list[float] = []
errors = 0
for i in range(n):
try:
t = await probe(host_ip, sni, path, timeout)
times.append(t)
except Exception as exc:
errors += 1
desc = str(exc) or type(exc).__name__
print(f" [{name}] request {i+1}/{n} FAILED: {desc}")
# If the first 3 all failed, give up early to avoid wasting time.
if errors >= 3 and not times:
print(f" [{name}] 3 consecutive failures with no success — aborting protocol test")
break
await asyncio.sleep(0.05) # tiny gap between probes
return {"name": name, "times": times, "errors": errors, "n": n}
def _print_result(r: dict):
name = r["name"]
times = r["times"]
errors = r["errors"]
n = r["n"]
ok = len(times)
if not times:
print(f" {name:10s} NO SUCCESSFUL REQUESTS (errors={errors}/{n})")
return
mn = min(times) * 1000
mx = max(times) * 1000
avg = statistics.mean(times) * 1000
med = statistics.median(times) * 1000
p95 = sorted(times)[int(len(times) * 0.95)] * 1000
print(
f" {name:10s} "
f"ok={ok}/{n} "
f"min={mn:6.1f}ms "
f"avg={avg:6.1f}ms "
f"med={med:6.1f}ms "
f"p95={p95:6.1f}ms "
f"max={mx:6.1f}ms "
f"errors={errors}"
)
async def main(host_ip: str, sni: str, path: str, n: int, timeout: float,
suite: str = "all"):
print(f"\nBenchmark target → {host_ip}:443 SNI={sni} path={path}")
print("=" * 80)
run_all = suite == "all"
# ── Suite 1: Protocol sequential ──────────────────────────────────────
if run_all or suite == "protocol":
print("\n── Suite 1: Protocol sequential latency ──────────────────────────────")
print(f" {n} sequential requests per protocol\n")
protocols: list[tuple[str, object]] = [("HTTP/1.1", _probe_h1)]
if H2_AVAILABLE:
protocols.append(("HTTP/2", _probe_h2_fresh))
else:
print(" [HTTP/2] skipped — pip install h2")
if H3_AVAILABLE:
protocols.append(("HTTP/3", _probe_h3))
else:
print(" [HTTP/3] skipped — pip install aioquic")
results = []
for name, probe in protocols:
print(f" Running {name}...")
r = await _run_protocol(name, probe, host_ip, sni, path, n, timeout)
results.append(r)
print()
for r in results:
_print_result(r)
valid = [r for r in results if r["times"]]
if len(valid) > 1:
best = min(valid, key=lambda r: statistics.median(r["times"]))
print(f"\n Best median: {best['name']}")
h1r = next((r for r in valid if r["name"] == "HTTP/1.1"), None)
h2r = next((r for r in valid if r["name"] == "HTTP/2"), None)
h3r = next((r for r in valid if r["name"] == "HTTP/3"), None)
if h2r and h1r:
g = (statistics.median(h1r["times"]) - statistics.median(h2r["times"])) \
/ statistics.median(h1r["times"]) * 100
print(f" H2 vs H1.1: {g:+.1f}%")
if h3r and h2r:
g = (statistics.median(h2r["times"]) - statistics.median(h3r["times"])) \
/ statistics.median(h2r["times"]) * 100
print(f" H3 vs H2: {g:+.1f}%")
# ── Suite 2: TLS session resumption ───────────────────────────────────
if run_all or suite == "resumption":
print("\n── Suite 2: TLS session resumption ───────────────────────────────────")
print(" Measures cost of cold TLS handshake vs warm reconnect with session ticket\n")
await _suite_resumption(host_ip, sni, path, timeout, rounds=8)
# ── Suite 3: Concurrency ──────────────────────────────────────────────
if run_all or suite == "concurrency":
print("\n── Suite 3: Concurrency — H2 multiplex vs H1.1 parallel ─────────────")
print(f" {n} concurrent requests fired simultaneously\n")
await _suite_concurrency(host_ip, sni, path, timeout, n=n)
# ── Suite 4: IP scan ──────────────────────────────────────────────────
if run_all or suite == "ipscan":
print("\n── Suite 4: Google edge IP latency scan ──────────────────────────────")
print(" H1.1 probe to all candidate IPs — find the fastest one on this network\n")
await _suite_ipscan(sni, path, timeout)
print("\n" + "=" * 80)
print("Done.")
# ── Suite 2: TLS session resumption ──────────────────────────────────────
async def _tls_connect_time(host_ip: str, sni: str, timeout: float,
ctx: ssl.SSLContext | None = None) -> tuple[float, ssl.SSLContext]:
"""Connect with TLS and return (elapsed, ctx). ctx is reused for warm tests."""
if ctx is None:
ctx = _make_tls_ctx(["h2", "http/1.1"])
raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
raw.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
raw.setblocking(False)
loop = asyncio.get_running_loop()
t0 = time.perf_counter()
await asyncio.wait_for(loop.sock_connect(raw, (host_ip, 443)), timeout=timeout)
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ssl=ctx, server_hostname=sni, sock=raw),
timeout=timeout,
)
elapsed = time.perf_counter() - t0
# Send minimal request so the server doesn't RST the idle connection
writer.write(f"GET /generate_204 HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n".encode())
await asyncio.wait_for(writer.drain(), timeout=timeout)
try:
await asyncio.wait_for(reader.read(256), timeout=timeout)
except Exception:
pass
writer.close()
return elapsed, ctx
async def _suite_resumption(host_ip: str, sni: str, path: str,
timeout: float, rounds: int):
cold_times: list[float] = []
warm_times: list[float] = []
# cold: fresh SSLContext each time — no session ticket reuse
print(" Cold connects (new TLS context each time)...")
for _ in range(rounds):
try:
t, _ = await _tls_connect_time(host_ip, sni, timeout, ctx=None)
cold_times.append(t * 1000)
except Exception as exc:
print(f" FAILED: {exc}")
await asyncio.sleep(0.1)
# warm: reuse same SSLContext — OpenSSL caches and reuses TLS 1.3 session ticket
print(" Warm reconnects (same TLS context, session ticket reuse)...")
warm_ctx = _make_tls_ctx(["h2", "http/1.1"])
for _ in range(rounds):
try:
t, warm_ctx = await _tls_connect_time(host_ip, sni, timeout, ctx=warm_ctx)
warm_times.append(t * 1000)
except Exception as exc:
print(f" FAILED: {exc}")
await asyncio.sleep(0.1)
def _fmt(times: list[float]) -> str:
if not times:
return "no data"
return (f"min={min(times):.1f}ms avg={statistics.mean(times):.1f}ms "
f"med={statistics.median(times):.1f}ms max={max(times):.1f}ms")
print(f"\n Cold ({len(cold_times)}/{rounds} ok): {_fmt(cold_times)}")
print(f" Warm ({len(warm_times)}/{rounds} ok): {_fmt(warm_times)}")
if cold_times and warm_times:
saving = statistics.median(cold_times) - statistics.median(warm_times)
pct = saving / statistics.median(cold_times) * 100
if saving > 5:
print(f"\n Session ticket saves ~{saving:.1f}ms ({pct:.1f}%) per reconnect")
print(" → The H2 transport already reuses one long-lived connection, so this")
print(" saving only applies when the connection drops and must reconnect.")
else:
print(f"\n Resumption saving: {saving:.1f}ms ({pct:.1f}%) — negligible on this network")
print(" → Google may be issuing short-lived tickets, or RTT already dominates.")
# ── Suite 3: Concurrency ──────────────────────────────────────────────────
async def _h2_concurrent(host_ip: str, sni: str, path: str,
timeout: float, n: int) -> tuple[float, int]:
"""
Fire N H2 streams concurrently on ONE persistent connection.
Returns (wall_time_for_all, successful_count).
"""
if not H2_AVAILABLE:
raise RuntimeError("h2 not installed")
ctx = _make_tls_ctx(["h2", "http/1.1"])
raw = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
raw.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
raw.setblocking(False)
loop = asyncio.get_running_loop()
await asyncio.wait_for(loop.sock_connect(raw, (host_ip, 443)), timeout=timeout)
reader, writer = await asyncio.wait_for(
asyncio.open_connection(ssl=ctx, server_hostname=sni, sock=raw),
timeout=timeout,
)
ssl_obj = writer.get_extra_info("ssl_object")
if not ssl_obj or ssl_obj.selected_alpn_protocol() != "h2":
writer.close()
raise RuntimeError("H2 ALPN not negotiated")
cfg = h2.config.H2Configuration(client_side=True, header_encoding="utf-8")
conn = h2.connection.H2Connection(cfg)
conn.initiate_connection()
conn.increment_flow_control_window(2 ** 24 - 65535)
conn.update_settings({
h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 8 * 1024 * 1024,
h2.settings.SettingCodes.ENABLE_PUSH: 0,
})
writer.write(conn.data_to_send(65535))
await writer.drain()
# Track per-stream completion
stream_done: dict[int, asyncio.Event] = {}
stream_ids = []
for _ in range(n):
sid = conn.get_next_available_stream_id()
conn.send_headers(sid, [
(":method", "GET"), (":path", path),
(":scheme", "https"), (":authority", sni), ("accept", "*/*"),
], end_stream=True)
stream_ids.append(sid)
stream_done[sid] = asyncio.Event()
writer.write(conn.data_to_send(65535))
await writer.drain()
t0 = time.perf_counter()
done_count = 0
deadline = t0 + timeout
while done_count < n and time.perf_counter() < deadline:
try:
raw_data = await asyncio.wait_for(
reader.read(65535),
timeout=max(0.1, deadline - time.perf_counter()),
)
except asyncio.TimeoutError:
break
if not raw_data:
break
events = conn.receive_data(raw_data)
writer.write(conn.data_to_send(65535))
await writer.drain()
for ev in events:
if isinstance(ev, (h2.events.ResponseReceived, h2.events.StreamEnded)):
sid = ev.stream_id
if sid in stream_done and not stream_done[sid].is_set():
if isinstance(ev, h2.events.ResponseReceived):
stream_done[sid].set()
done_count += 1
elif isinstance(ev, h2.events.DataReceived):
conn.acknowledge_received_data(ev.flow_controlled_length, ev.stream_id)
writer.write(conn.data_to_send(65535))
await writer.drain()
wall = time.perf_counter() - t0
writer.close()
return wall, done_count
async def _h1_parallel(host_ip: str, sni: str, path: str,
timeout: float, n: int) -> tuple[float, int]:
"""Fire N H1.1 requests in parallel, each on its own TCP+TLS connection."""
t0 = time.perf_counter()
tasks = [asyncio.create_task(_probe_h1(host_ip, sni, path, timeout)) for _ in range(n)]
results = await asyncio.gather(*tasks, return_exceptions=True)
wall = time.perf_counter() - t0
ok = sum(1 for r in results if isinstance(r, float))
return wall, ok
async def _suite_concurrency(host_ip: str, sni: str, path: str,
timeout: float, n: int):
concur_levels = sorted({4, 8, min(16, n), min(n, 20)})
print(f" {'Level':>5} {'H2 mux wall':>14} {'H1.1 parallel wall':>18} {'speedup':>8}")
print(f" {'-----':>5} {'----------':>14} {'----------------':>18} {'-------':>8}")
for level in concur_levels:
h2_wall = h2_ok = h1_wall = h1_ok = None
h2_err = h1_err = None
if H2_AVAILABLE:
try:
h2_wall, h2_ok = await _h2_concurrent(host_ip, sni, path, timeout, level)
except Exception as exc:
h2_err = str(exc) or type(exc).__name__
try:
h1_wall, h1_ok = await _h1_parallel(host_ip, sni, path, timeout, level)
except Exception as exc:
h1_err = str(exc) or type(exc).__name__
h2_str = f"{h2_wall*1000:6.0f}ms ({h2_ok}/{level})" if h2_wall is not None else f"FAIL: {h2_err}"
h1_str = f"{h1_wall*1000:6.0f}ms ({h1_ok}/{level})" if h1_wall is not None else f"FAIL: {h1_err}"
if h2_wall and h1_wall and h1_wall > 0:
speedup = f"{h1_wall / h2_wall:+.2f}x"
else:
speedup = "n/a"
print(f" {level:>5} {h2_str:>14} {h1_str:>18} {speedup:>8}")
await asyncio.sleep(0.2)
print()
print(" Interpretation:")
print(" - H2 mux fires all streams on ONE TLS connection — lower overhead at scale")
print(" - H1.1 parallel opens N separate connections — higher per-connection TLS cost")
print(" - Speedup > 1.0x means H2 mux completed all requests in less wall time")
# ── Suite 4: IP scan ──────────────────────────────────────────────────────
_CANDIDATE_IPS = (
"216.239.32.120", "216.239.34.120", "216.239.36.120", "216.239.38.120",
"142.250.80.142", "142.250.80.138", "142.250.179.110", "142.250.185.110",
"142.250.184.206", "142.250.190.238", "142.250.191.78",
"172.217.1.206", "172.217.14.206", "172.217.16.142", "172.217.22.174",
"172.217.164.110","172.217.168.206","172.217.169.206",
"34.107.221.82",
"142.251.32.110", "142.251.33.110", "142.251.46.206", "142.251.46.238",
"142.250.80.170", "142.250.72.206", "142.250.64.206", "142.250.72.110",
)
async def _probe_ip(ip: str, sni: str, path: str, timeout: float) -> tuple[str, float | None, str]:
"""Return (ip, median_ms_or_None, note)."""
times = []
for _ in range(3):
try:
t = await _probe_h1(ip, sni, path, timeout)
times.append(t * 1000)
except Exception:
pass
await asyncio.sleep(0.03)
if not times:
return ip, None, "unreachable"
med = statistics.median(times)
return ip, med, ""
async def _suite_ipscan(sni: str, path: str, timeout: float):
ip_timeout = min(timeout, 5.0)
print(f" Probing {len(_CANDIDATE_IPS)} candidate IPs (3 requests each, {ip_timeout:.0f}s cap)...\n")
# Run all probes concurrently — they're independent H1.1 connects
tasks = [asyncio.create_task(_probe_ip(ip, sni, path, ip_timeout))
for ip in _CANDIDATE_IPS]
raw_results = await asyncio.gather(*tasks)
reachable = [(ip, med, note) for ip, med, note in raw_results if med is not None]
dead = [(ip, med, note) for ip, med, note in raw_results if med is None]
reachable.sort(key=lambda x: x[1])
print(f" {'IP':>18} {'median':>9} note")
print(f" {'--':>18} {'------':>9} ----")
for i, (ip, med, _) in enumerate(reachable):
tag = " ← fastest" if i == 0 else (" ← 2nd" if i == 1 else "")
print(f" {ip:>18} {med:7.1f}ms{tag}")
if dead:
print(f"\n Unreachable ({len(dead)}): {', '.join(ip for ip, *_ in dead)}")
if reachable:
best_ip, best_med, _ = reachable[0]
print(f"\n Fastest IP: {best_ip} (median {best_med:.1f}ms)")
print(f' Set in config.json: "google_ip": "{best_ip}"')
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Transport benchmark suite")
parser.add_argument("--ip", help="Google edge IP (default: from config.json)")
parser.add_argument("--sni", default="www.google.com", help="SNI hostname")
parser.add_argument("--path", default="/generate_204", help="Request path")
parser.add_argument("--n", type=int, default=15, help="Requests per protocol")
parser.add_argument("--timeout", type=float, default=10.0, help="Per-request timeout (s)")
parser.add_argument(
"--suite",
choices=["all", "protocol", "resumption", "concurrency", "ipscan"],
default="all",
help="Which benchmark suite to run (default: all)",
)
args = parser.parse_args()
host_ip = args.ip
if not host_ip:
cfg_path = Path(__file__).parent.parent / "config.json"
if cfg_path.exists():
with open(cfg_path) as f:
data = json.load(f)
host_ip = data.get("google_ip", "216.239.38.120")
print(f"Using google_ip from config.json: {host_ip}")
else:
host_ip = "216.239.38.120"
print(f"config.json not found, using default: {host_ip}")
asyncio.run(main(
host_ip=host_ip,
sni=args.sni,
path=args.path,
n=args.n,
timeout=args.timeout,
suite=args.suite,
))