Merge pull request #9 from EmranHejazi/improvement/rate-limit-quota-markers

Add quota/rate‑limit detection, and enhance body‑hint extraction
This commit is contained in:
Abolfazl Ghaemi
2026-04-22 23:50:07 +03:30
committed by GitHub
+50 -11
View File
@@ -264,29 +264,68 @@ class ProxyServer:
def _log_response_summary(self, url: str, response: bytes):
status, headers, body = self.fronter._split_raw_response(response)
host = (urlparse(url).hostname or "").lower()
if status >= 300 or self._should_trace_host(host):
location = headers.get("location", "")
server = headers.get("server", "")
cf_ray = headers.get("cf-ray", "")
content_type = headers.get("content-type", "")
location = headers.get("location", "") or "-"
server = headers.get("server", "") or "-"
cf_ray = headers.get("cf-ray", "") or "-"
content_type = headers.get("content-type", "") or "-"
body_len = len(body)
body_hint = "-"
if "text/html" in content_type.lower() and body:
sample = body[:800].decode(errors="replace").lower()
rate_limited = False
# Handle text-like responses (HTML, plain text, JSON…)
if ("text" in content_type.lower() or "json" in content_type.lower()) and body:
sample = body[:1200].decode(errors="replace").lower()
# --- Structured HTML title extraction ---
if "<title>" in sample and "</title>" in sample:
title = sample.split("<title>", 1)[1].split("</title>", 1)[0]
body_hint = title[:120]
body_hint = title.strip()[:120] or "-"
# --- Known content patterns ---
elif "captcha" in sample:
body_hint = "captcha"
elif "turnstile" in sample:
body_hint = "turnstile"
elif "loading" in sample:
body_hint = "loading"
log.info(
"RESP ← %s status=%s type=%s len=%s server=%s location=%s cf-ray=%s hint=%s",
host or url[:60], status, content_type or "-", body_len,
server or "-", location or "-", cf_ray or "-", body_hint,
# --- Rate-limit / quota markers ---
rate_limit_markers = (
"too many",
"rate limit",
"quota",
"quota exceeded",
"request limit",
"دفعات زیاد",
"بیش از حد",
"سرویس در طول یک روز",
)
if any(m in sample for m in rate_limit_markers):
rate_limited = True
body_hint = "quota_exceeded"
log_msg = (
"RESP ← %s status=%s type=%s len=%s server=%s location=%s cf-ray=%s hint=%s"
)
log_args = (
host or url[:60],
status,
content_type,
body_len,
server,
location,
cf_ray,
body_hint,
)
if rate_limited:
log.warning("RATE LIMIT detected! " + log_msg, *log_args)
else:
log.info(log_msg, *log_args)
async def start(self):
http_srv = await asyncio.start_server(self._on_client, self.host, self.port)