mirror of
https://github.com/denuitt1/mhr-cfw.git
synced 2026-05-17 21:24:36 +03:00
add src/google_ip_scanner.py
This commit is contained in:
@@ -0,0 +1,194 @@
|
||||
"""
|
||||
Google IP Scanner — finds the fastest reachable Google frontend IP.
|
||||
|
||||
Scans a list of candidate Google IPs via HTTPS (with SNI fronting), measures
|
||||
latency, and reports results in a formatted table. Useful for finding the best
|
||||
IP to configure in config.json when your current IP is blocked.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import ssl
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from constants import CANDIDATE_IPS, GOOGLE_SCANNER_TIMEOUT, GOOGLE_SCANNER_CONCURRENCY
|
||||
|
||||
log = logging.getLogger("Scanner")
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProbeResult:
|
||||
"""Result of a single IP probe."""
|
||||
ip: str
|
||||
latency_ms: Optional[int] = None
|
||||
error: Optional[str] = None
|
||||
|
||||
@property
|
||||
def ok(self) -> bool:
|
||||
return self.latency_ms is not None
|
||||
|
||||
|
||||
async def _probe_ip(
|
||||
ip: str,
|
||||
sni: str,
|
||||
semaphore: asyncio.Semaphore,
|
||||
timeout: float,
|
||||
) -> ProbeResult:
|
||||
"""
|
||||
Probe a single IP via HTTPS with SNI fronting.
|
||||
|
||||
Args:
|
||||
ip: The IP to probe (xxx.xxx.xxx.xxx).
|
||||
sni: The SNI hostname to use in TLS handshake.
|
||||
semaphore: Rate limiter to control concurrency.
|
||||
timeout: Timeout in seconds for the entire probe.
|
||||
|
||||
Returns:
|
||||
ProbeResult with latency_ms (if successful) or error message.
|
||||
"""
|
||||
async with semaphore:
|
||||
start_time = time.time()
|
||||
try:
|
||||
# Create SSL context that skips certificate verification
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
# Connect to IP:443 with SNI set to the fronting domain
|
||||
reader, writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(
|
||||
ip,
|
||||
443,
|
||||
ssl=ctx,
|
||||
server_hostname=sni,
|
||||
),
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
# Send minimal HTTP HEAD request
|
||||
request = f"HEAD / HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n"
|
||||
writer.write(request.encode())
|
||||
await writer.drain()
|
||||
|
||||
# Read response header (first 256 bytes is plenty for HTTP status)
|
||||
response = await asyncio.wait_for(reader.read(256), timeout=timeout)
|
||||
|
||||
writer.close()
|
||||
try:
|
||||
await writer.wait_closed()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Check if we got an HTTP response
|
||||
if not response:
|
||||
return ProbeResult(ip=ip, error="empty response")
|
||||
|
||||
response_str = response.decode("utf-8", errors="ignore")
|
||||
if not response_str.startswith("HTTP/"):
|
||||
return ProbeResult(ip=ip, error=f"invalid response: {response_str[:30]!r}")
|
||||
|
||||
# Success — return latency in milliseconds
|
||||
elapsed_ms = int((time.time() - start_time) * 1000)
|
||||
return ProbeResult(ip=ip, latency_ms=elapsed_ms)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return ProbeResult(ip=ip, error="timeout")
|
||||
except ConnectionRefusedError:
|
||||
return ProbeResult(ip=ip, error="connection refused")
|
||||
except ConnectionResetError:
|
||||
return ProbeResult(ip=ip, error="connection reset")
|
||||
except OSError as e:
|
||||
return ProbeResult(ip=ip, error=f"network error: {e.strerror or str(e)}")
|
||||
except Exception as e:
|
||||
return ProbeResult(ip=ip, error=f"probe failed: {type(e).__name__}")
|
||||
|
||||
|
||||
async def run(front_domain: str) -> bool:
|
||||
"""
|
||||
Scan all candidate Google IPs and display results.
|
||||
|
||||
Args:
|
||||
front_domain: The SNI hostname to use (e.g. "www.google.com").
|
||||
|
||||
Returns:
|
||||
True if at least one IP is reachable, False otherwise.
|
||||
"""
|
||||
timeout = GOOGLE_SCANNER_TIMEOUT
|
||||
concurrency = GOOGLE_SCANNER_CONCURRENCY
|
||||
|
||||
print()
|
||||
print(f"Scanning {len(CANDIDATE_IPS)} Google frontend IPs")
|
||||
print(f" SNI: {front_domain}")
|
||||
print(f" Timeout: {timeout}s per IP")
|
||||
print(f" Concurrency: {concurrency} parallel probes")
|
||||
print()
|
||||
|
||||
# Create semaphore to limit concurrency
|
||||
semaphore = asyncio.Semaphore(concurrency)
|
||||
|
||||
# Launch all probes concurrently
|
||||
tasks = [
|
||||
_probe_ip(ip, front_domain, semaphore, timeout)
|
||||
for ip in CANDIDATE_IPS
|
||||
]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
# Sort by latency (successful first, then by speed)
|
||||
results.sort(key=lambda r: (not r.ok, r.latency_ms or float("inf")))
|
||||
|
||||
# Display results table
|
||||
print(f"{'IP':<20} {'LATENCY':<12} {'STATUS':<25}")
|
||||
print(f"{'-' * 20} {'-' * 12} {'-' * 25}")
|
||||
|
||||
ok_count = 0
|
||||
for result in results:
|
||||
if result.ok:
|
||||
print(f"{result.ip:<20} {result.latency_ms:>8}ms OK")
|
||||
ok_count += 1
|
||||
else:
|
||||
status = result.error or "unknown error"
|
||||
print(f"{result.ip:<20} {'—':<12} {status:<25}")
|
||||
|
||||
print()
|
||||
print(f"Result: {ok_count} / {len(results)} reachable")
|
||||
|
||||
if ok_count == 0:
|
||||
print("No Google IPs reachable from this network.")
|
||||
print()
|
||||
return False
|
||||
|
||||
# Show top 3 fastest
|
||||
fastest = [r for r in results if r.ok][:3]
|
||||
print()
|
||||
print("Top 3 fastest IPs:")
|
||||
for i, result in enumerate(fastest, 1):
|
||||
print(f" {i}. {result.ip} ({result.latency_ms}ms)")
|
||||
|
||||
print()
|
||||
print(f"Recommended: Set \"google_ip\": \"{fastest[0].ip}\" in config.json")
|
||||
print()
|
||||
return True
|
||||
|
||||
|
||||
def scan_sync(front_domain: str) -> bool:
|
||||
"""
|
||||
Wrapper to run async scanner from sync context (e.g. main.py).
|
||||
|
||||
Args:
|
||||
front_domain: The SNI hostname to use.
|
||||
|
||||
Returns:
|
||||
True if at least one IP is reachable, False otherwise.
|
||||
"""
|
||||
try:
|
||||
return asyncio.run(run(front_domain))
|
||||
except KeyboardInterrupt:
|
||||
print("\nScan interrupted by user.")
|
||||
return False
|
||||
except Exception as e:
|
||||
log.error(f"Scan failed: {e}")
|
||||
return False
|
||||
Reference in New Issue
Block a user