feat: add execution monitoring for Apps Script usage in DomainFronter

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
Abolfazl
2026-05-02 15:57:32 +03:30
parent 39b3e4efa2
commit 7e052ad1b2
2 changed files with 72 additions and 10 deletions
+24 -6
View File
@@ -61,6 +61,9 @@ LEVEL_LABEL = {
"CRITICAL": "CRIT ", "CRITICAL": "CRIT ",
} }
# Special spotlight line for execution usage updates.
EXEC_USAGE_PREFIX = "Apps Script executions used so far:"
# Stable per-component color (keeps log scanning easy). # Stable per-component color (keeps log scanning easy).
COMPONENT_COLORS = { COMPONENT_COLORS = {
"Main": FG_CYAN, "Main": FG_CYAN,
@@ -148,14 +151,29 @@ class PrettyFormatter(logging.Formatter):
except Exception: except Exception:
message = record.msg message = record.msg
time_part = self._fmt_time(record) highlight_exec_usage = (
level_part = self._fmt_level(record.levelname) record.name == "Fronter"
comp_part = self._fmt_component(record.name) and isinstance(message, str)
and message.startswith(EXEC_USAGE_PREFIX)
)
if self.use_color: if highlight_exec_usage:
time_part = f"{DIM}{FG_GRAY}{time_part}{RESET}" # Force a single vivid color for the entire line so this metric pops.
plain_time = self._fmt_time(record)
plain_level = f"{LEVEL_GLYPH.get(record.levelname, '·')} {LEVEL_LABEL.get(record.levelname, record.levelname[:5].ljust(5))}"
plain_comp = f"[{record.name[: self.COMPONENT_WIDTH].ljust(self.COMPONENT_WIDTH)}]"
line = f"{plain_time} {plain_level} {plain_comp} {message}"
if self.use_color:
line = f"{BOLD}{FG_CYAN}{line}{RESET}"
else:
time_part = self._fmt_time(record)
level_part = self._fmt_level(record.levelname)
comp_part = self._fmt_component(record.name)
line = f"{time_part} {level_part} {comp_part} {message}" if self.use_color:
time_part = f"{DIM}{FG_GRAY}{time_part}{RESET}"
line = f"{time_part} {level_part} {comp_part} {message}"
# Exception tracebacks: render dimmed below the main line. # Exception tracebacks: render dimmed below the main line.
if record.exc_info: if record.exc_info:
+48 -4
View File
@@ -106,6 +106,11 @@ class DomainFronter:
self.script_id = self._script_ids[0] # backward compat / logging self.script_id = self._script_ids[0] # backward compat / logging
self._dev_available = False # True if /dev endpoint works (no redirect, ~400ms faster) self._dev_available = False # True if /dev endpoint works (no redirect, ~400ms faster)
# Simple execution monitor: log total consumed Apps Script executions.
self._execution_report_interval = 5.0
self._exec_total = 0
self._execution_task: asyncio.Task | None = None
# Fan-out parallel relay: fire N Apps Script instances concurrently, # Fan-out parallel relay: fire N Apps Script instances concurrently,
# keep the first successful response, cancel the rest. Script IDs # keep the first successful response, cancel the rest. Script IDs
# that fail or time out get blacklisted for SCRIPT_BLACKLIST_TTL so # that fail or time out get blacklisted for SCRIPT_BLACKLIST_TTL so
@@ -192,6 +197,10 @@ class DomainFronter:
if self._parallel_relay > 1: if self._parallel_relay > 1:
log.info("Fan-out relay: %d parallel Apps Script instances per request", log.info("Fan-out relay: %d parallel Apps Script instances per request",
self._parallel_relay) self._parallel_relay)
log.info(
"Execution monitor enabled: reporting total every %.0fs",
self._execution_report_interval,
)
# Exit node — optional second-hop relay with a non-Google exit IP. # Exit node — optional second-hop relay with a non-Google exit IP.
# Useful for sites that block GCP/Apps Script IPs (e.g. ChatGPT). # Useful for sites that block GCP/Apps Script IPs (e.g. ChatGPT).
@@ -248,6 +257,24 @@ class DomainFronter:
value = default value = default
return max(minimum, value) return max(minimum, value)
def _record_execution(self, sid: str, count: int = 1) -> None:
"""Record consumed Apps Script executions."""
if not sid or count <= 0:
return
self._exec_total += count
async def _execution_logger(self):
"""Log execution usage every N seconds."""
interval = self._execution_report_interval
while True:
try:
await asyncio.sleep(interval)
log.info("Apps Script executions used so far: %d", self._exec_total)
except asyncio.CancelledError:
break
except Exception as exc:
log.debug("Execution logger error: %s", exc)
def _ssl_ctx(self) -> ssl.SSLContext: def _ssl_ctx(self) -> ssl.SSLContext:
ctx = ssl.create_default_context() ctx = ssl.create_default_context()
if certifi is not None: if certifi is not None:
@@ -841,6 +868,8 @@ class DomainFronter:
# Periodic per-host stats logger (opt-in via log level) # Periodic per-host stats logger (opt-in via log level)
if self._stats_task is None: if self._stats_task is None:
self._stats_task = self._spawn(self._stats_logger()) self._stats_task = self._spawn(self._stats_logger())
if self._execution_task is None:
self._execution_task = self._spawn(self._execution_logger())
# Start H2 connection (runs alongside H1 pool) # Start H2 connection (runs alongside H1 pool)
if self._h2: if self._h2:
self._spawn(self._h2_connect_and_warm()) self._spawn(self._h2_connect_and_warm())
@@ -868,6 +897,7 @@ class DomainFronter:
self._warm_task = None self._warm_task = None
self._maintenance_task = None self._maintenance_task = None
self._stats_task = None self._stats_task = None
self._execution_task = None
self._keepalive_task = None self._keepalive_task = None
await self._flush_pool() await self._flush_pool()
@@ -914,6 +944,7 @@ class DomainFronter:
try: try:
dev_path = f"/macros/s/{sid}/dev" dev_path = f"/macros/s/{sid}/dev"
t0 = time.perf_counter() t0 = time.perf_counter()
self._record_execution(sid)
status, _, body = await asyncio.wait_for( status, _, body = await asyncio.wait_for(
self._h2.request( self._h2.request(
method="POST", path=dev_path, host=self.http_host, method="POST", path=dev_path, host=self.http_host,
@@ -934,6 +965,7 @@ class DomainFronter:
try: try:
exec_path = f"/macros/s/{sid}/exec" exec_path = f"/macros/s/{sid}/exec"
t0 = time.perf_counter() t0 = time.perf_counter()
self._record_execution(sid)
await asyncio.wait_for( await asyncio.wait_for(
self._h2.request( self._h2.request(
method="POST", path=exec_path, host=self.http_host, method="POST", path=exec_path, host=self.http_host,
@@ -965,8 +997,10 @@ class DomainFronter:
# Apps Script keepalive — warm the container # Apps Script keepalive — warm the container
payload = {"m": "GET", "u": "http://example.com/", "k": self.auth_key} payload = {"m": "GET", "u": "http://example.com/", "k": self.auth_key}
path = self._exec_path("example.com") sid = self._script_id_for_key(self._host_key("example.com"))
path = self._exec_path_for_sid(sid)
t0 = time.perf_counter() t0 = time.perf_counter()
self._record_execution(sid)
await asyncio.wait_for( await asyncio.wait_for(
self._h2.request( self._h2.request(
method="POST", path=path, host=self.http_host, method="POST", path=path, host=self.http_host,
@@ -2012,7 +2046,9 @@ class DomainFronter:
full_payload["k"] = self.auth_key full_payload["k"] = self.auth_key
json_body = json.dumps(full_payload).encode() json_body = json.dumps(full_payload).encode()
path = self._exec_path(payload.get("u")) sid = self._script_id_for_key(self._host_key(payload.get("u")))
path = self._exec_path_for_sid(sid)
self._record_execution(sid)
status, headers, body = await self._h2.request( status, headers, body = await self._h2.request(
method="POST", path=path, host=self.http_host, method="POST", path=path, host=self.http_host,
@@ -2034,6 +2070,7 @@ class DomainFronter:
json_body = json.dumps(full_payload).encode() json_body = json.dumps(full_payload).encode()
path = self._exec_path_for_sid(sid) path = self._exec_path_for_sid(sid)
self._record_execution(sid)
status, headers, body = await self._h2.request( status, headers, body = await self._h2.request(
method="POST", path=path, host=self.http_host, method="POST", path=path, host=self.http_host,
@@ -2050,7 +2087,8 @@ class DomainFronter:
full_payload["k"] = self.auth_key full_payload["k"] = self.auth_key
json_body = json.dumps(full_payload).encode() json_body = json.dumps(full_payload).encode()
path = self._exec_path(payload.get("u")) sid = self._script_id_for_key(self._host_key(payload.get("u")))
path = self._exec_path_for_sid(sid)
reader, writer, created = await self._acquire() reader, writer, created = await self._acquire()
try: try:
@@ -2065,6 +2103,7 @@ class DomainFronter:
) )
writer.write(request.encode() + json_body) writer.write(request.encode() + json_body)
await writer.drain() await writer.drain()
self._record_execution(sid)
status, resp_headers, resp_body = await read_http_response(reader, max_bytes=self._max_response_body_bytes) status, resp_headers, resp_body = await read_http_response(reader, max_bytes=self._max_response_body_bytes)
@@ -2114,11 +2153,15 @@ class DomainFronter:
"q": payloads, "q": payloads,
} }
json_body = json.dumps(batch_payload).encode() json_body = json.dumps(batch_payload).encode()
path = self._exec_path(payloads[0].get("u") if payloads else None) sid = self._script_id_for_key(
self._host_key(payloads[0].get("u") if payloads else None)
)
path = self._exec_path_for_sid(sid)
# Try HTTP/2 first # Try HTTP/2 first
if self._h2_available(): if self._h2_available():
try: try:
self._record_execution(sid)
status, headers, body = await asyncio.wait_for( status, headers, body = await asyncio.wait_for(
self._h2.request( self._h2.request(
method="POST", path=path, host=self.http_host, method="POST", path=path, host=self.http_host,
@@ -2148,6 +2191,7 @@ class DomainFronter:
) )
writer.write(request.encode() + json_body) writer.write(request.encode() + json_body)
await writer.drain() await writer.drain()
self._record_execution(sid)
status, resp_headers, resp_body = await read_http_response(reader, max_bytes=self._max_response_body_bytes) status, resp_headers, resp_body = await read_http_response(reader, max_bytes=self._max_response_body_bytes)