Merge branch 'denuitt1:main' into main

This commit is contained in:
Arya Ahmadi
2026-05-10 23:07:29 +03:30
committed by GitHub
3 changed files with 131 additions and 38 deletions
+5
View File
@@ -264,6 +264,11 @@ Browse `https://httpbin.org/ip` through the proxy — you should see the **VPS's
---
## Contributors
- Special thanks to [onlymaj](https://github.com/onlymaj)
---
## Sources
- This project is based on [MasterHttpRelayVPN](https://github.com/masterking32/MasterHttpRelayVPN)
+1 -1
View File
@@ -13,7 +13,7 @@
[![Watch the video](https://img.youtube.com/vi/L3lJZrAqqUQ/maxresdefault.jpg)](https://youtu.be/L3lJZrAqqUQ)
- لینک یوتیوب: https://youtu.be/L3lJZrAqqUQ
- لینک داخلی دانلود ویدیو: https://nc.thearthur.ir/s/YaCp4zAzepHJKi2
- لینک داخلی دانلود ویدیو: https://cdn.vayrex.ir/vasls/8440130/1777611424961-86c092e3-mhrv-cfw.mp4
---
+125 -37
View File
@@ -60,6 +60,10 @@ class HostStat:
errors: int = 0
class _RelayBadResponse(Exception):
"""Raised when a relay response indicates the chosen script ID is unhealthy."""
def _build_sni_pool(front_domain: str, overrides: list | None) -> list[str]:
"""Build the list of SNIs to rotate through on new outbound TLS handshakes.
@@ -401,6 +405,16 @@ class DomainFronter:
if force or until <= now:
self._sid_blacklist.pop(sid, None)
def _next_alt_sid(self, tried: set[str]) -> str | None:
"""Pick a script ID not already tried and not blacklisted, or None."""
for sid in self._script_ids:
if sid in tried:
continue
if self._is_sid_blacklisted(sid):
continue
return sid
return None
def _pick_fanout_sids(self, key: str | None) -> list[str]:
"""Pick up to `parallel_relay` distinct non-blacklisted script IDs.
@@ -842,8 +856,18 @@ class DomainFronter:
{"m": "HEAD", "u": "http://example.com/", "k": self.auth_key}
).encode()
hdrs = {"content-type": "application/json"}
sid = self._script_ids[0]
for sid in list(self._script_ids):
if self._is_sid_blacklisted(sid):
continue
if await self._prewarm_one_sid(sid, payload, hdrs):
return
self._blacklist_sid(sid, reason="prewarm")
log.debug("Pre-warm exhausted all script IDs")
async def _prewarm_one_sid(self, sid: str, payload: bytes,
hdrs: dict) -> bool:
"""Try /dev fast-path detection then /exec warmup for one sid."""
# Test /dev endpoint — returns data inline (no 302 redirect).
# If it works, saves ~400ms per request by eliminating one round trip.
try:
@@ -857,19 +881,21 @@ class DomainFronter:
timeout=15,
)
dt = (time.perf_counter() - t0) * 1000
data = json.loads(body.decode(errors="replace"))
if "s" in data:
self._dev_available = True
log.info("/dev fast path active (%.0fms, no redirect)", dt)
return
if status == 200:
data = json.loads(body.decode(errors="replace"))
if "s" in data:
self._dev_available = True
log.info("/dev fast path active (%.0fms, no redirect)", dt)
return True
except Exception as e:
log.debug("/dev test failed: %s", e)
log.debug("/dev test failed for sid %s: %s",
sid[-8:] if len(sid) > 8 else sid, e)
# Fallback: warm up with /exec
try:
exec_path = f"/macros/s/{sid}/exec"
t0 = time.perf_counter()
await asyncio.wait_for(
status, _, _ = await asyncio.wait_for(
self._h2.request(
method="POST", path=exec_path, host=self.http_host,
headers=hdrs, body=payload,
@@ -877,9 +903,16 @@ class DomainFronter:
timeout=15,
)
dt = (time.perf_counter() - t0) * 1000
if status != 200:
log.debug("Pre-warm /exec returned %d for sid %s",
status, sid[-8:] if len(sid) > 8 else sid)
return False
log.info("Apps Script pre-warmed in %.0fms", dt)
return True
except Exception as e:
log.debug("Pre-warm failed: %s", e)
log.debug("Pre-warm failed for sid %s: %s",
sid[-8:] if len(sid) > 8 else sid, e)
return False
async def _keepalive_loop(self):
"""Send periodic pings to keep Apps Script warm + H2 connection alive."""
@@ -1665,6 +1698,15 @@ class DomainFronter:
async def _relay_with_retry(self, payload: dict) -> bytes:
"""Single relay with one retry on failure. Uses H2 if available."""
attempts = self._retry_attempts_for_payload(payload)
host_key = self._host_key(payload.get("u"))
tried_sids: set[str] = set()
def pick_sid() -> str:
if not tried_sids:
return self._script_id_for_key(host_key)
alt = self._next_alt_sid(tried_sids)
return alt if alt is not None else self._script_id_for_key(host_key)
# Fan-out: race N Apps Script instances when enabled and H2 is up.
# Cuts tail latency when one container is slow/cold. Only kicks in
# if multiple script IDs are configured and the H2 transport is live.
@@ -1686,12 +1728,23 @@ class DomainFronter:
# Try HTTP/2 first — much faster (multiplexed, no pool checkout)
if self._h2_available():
for attempt in range(attempts):
sid = pick_sid()
tried_sids.add(sid)
try:
result = await asyncio.wait_for(
self._relay_single_h2(payload), timeout=self._relay_timeout
self._relay_single_h2(payload, sid=sid),
timeout=self._relay_timeout,
)
self._record_h2_success()
return result
except _RelayBadResponse as e:
self._blacklist_sid(sid, reason=str(e)[:40])
if (attempt < attempts - 1
and self._next_alt_sid(tried_sids) is not None):
log.debug("H2 sid %s bad (%s), rotating",
sid[-8:] if len(sid) > 8 else sid, e)
continue
raise
except Exception as e:
self._record_h2_failure(e)
if attempt < attempts - 1:
@@ -1716,10 +1769,21 @@ class DomainFronter:
# HTTP/1.1 fallback (pool-based)
async with self._semaphore:
for attempt in range(attempts):
sid = pick_sid()
tried_sids.add(sid)
try:
return await asyncio.wait_for(
self._relay_single(payload), timeout=self._relay_timeout
self._relay_single(payload, sid=sid),
timeout=self._relay_timeout,
)
except _RelayBadResponse as e:
self._blacklist_sid(sid, reason=str(e)[:40])
if (attempt < attempts - 1
and self._next_alt_sid(tried_sids) is not None):
log.debug("H1 sid %s bad (%s), rotating",
sid[-8:] if len(sid) > 8 else sid, e)
continue
raise
except Exception as e:
if attempt < attempts - 1:
log.debug("Relay attempt %d failed (%s: %s), retrying",
@@ -1776,33 +1840,15 @@ class DomainFronter:
if pending:
await asyncio.gather(*pending, return_exceptions=True)
async def _relay_single_h2(self, payload: dict) -> bytes:
async def _relay_single_h2(self, payload: dict,
sid: str | None = None) -> bytes:
"""Execute a relay through HTTP/2 multiplexing.
Uses the shared H2 connection — no pool checkout needed.
Many concurrent calls all share one TLS connection.
"""
full_payload = dict(payload)
full_payload["k"] = self.auth_key
json_body = json.dumps(full_payload).encode()
path = self._exec_path(payload.get("u"))
status, headers, body = await self._h2.request(
method="POST", path=path, host=self.http_host,
headers={"content-type": "application/json"},
body=json_body,
)
return self._parse_relay_response(body)
async def _relay_single_h2_with_sid(self, payload: dict,
sid: str) -> bytes:
"""Execute an H2 relay pinned to a specific Apps Script deployment.
Used by `_relay_fanout` to race multiple script IDs in parallel.
Mirrors `_relay_single_h2` but ignores the stable-hash routing.
"""
if sid is None:
sid = self._script_id_for_key(self._host_key(payload.get("u")))
full_payload = dict(payload)
full_payload["k"] = self.auth_key
json_body = json.dumps(full_payload).encode()
@@ -1815,16 +1861,32 @@ class DomainFronter:
body=json_body,
)
return self._parse_relay_response(body)
if status != 200:
raise _RelayBadResponse(
f"upstream HTTP {status} from script "
f"{sid[-8:] if len(sid) > 8 else sid}",
)
return self._parse_or_raise(body)
async def _relay_single(self, payload: dict) -> bytes:
async def _relay_single_h2_with_sid(self, payload: dict,
sid: str) -> bytes:
"""Execute an H2 relay pinned to a specific Apps Script deployment.
Used by `_relay_fanout` to race multiple script IDs in parallel.
"""
return await self._relay_single_h2(payload, sid=sid)
async def _relay_single(self, payload: dict,
sid: str | None = None) -> bytes:
"""Execute a single relay POST → redirect → parse."""
# Add auth key
if sid is None:
sid = self._script_id_for_key(self._host_key(payload.get("u")))
full_payload = dict(payload)
full_payload["k"] = self.auth_key
json_body = json.dumps(full_payload).encode()
path = self._exec_path(payload.get("u"))
path = self._exec_path_for_sid(sid)
reader, writer, created = await self._acquire()
try:
@@ -1872,7 +1934,12 @@ class DomainFronter:
status, resp_headers, resp_body = await self._read_http_response(reader)
await self._release(reader, writer, created)
return self._parse_relay_response(resp_body)
if status != 200:
raise _RelayBadResponse(
f"upstream HTTP {status} from script "
f"{sid[-8:] if len(sid) > 8 else sid}",
)
return self._parse_or_raise(resp_body)
except Exception:
try:
@@ -2136,6 +2203,27 @@ class DomainFronter:
return self._parse_relay_json(data)
def _parse_or_raise(self, body: bytes) -> bytes:
"""Like `_parse_relay_response` but raises `_RelayBadResponse` on failure."""
text = body.decode(errors="replace").strip()
if not text:
raise _RelayBadResponse("empty response")
try:
data = json.loads(text)
except json.JSONDecodeError:
m = re.search(r'\{.*\}', text, re.DOTALL)
if not m:
raise _RelayBadResponse(f"non-JSON: {text[:120]}")
try:
data = json.loads(m.group())
except json.JSONDecodeError:
raise _RelayBadResponse(f"bad JSON: {text[:120]}")
if "e" in data:
raise _RelayBadResponse(f"relay error: {data['e']}")
return self._parse_relay_json(data)
def _parse_relay_json(self, data: dict) -> bytes:
"""Convert a parsed relay JSON dict to raw HTTP response bytes."""
if "e" in data: