Merge pull request #8 from EmranHejazi/features/google-ip-scanner

Google IP Scanner
This commit is contained in:
Abolfazl Ghaemi
2026-04-23 22:13:27 +03:30
committed by GitHub
5 changed files with 330 additions and 0 deletions
+44
View File
@@ -335,10 +335,53 @@ python3 main.py --log-level DEBUG # Show detailed logs
python3 main.py -c /path/to/config.json # Use a different config file
python3 main.py --install-cert # Install MITM CA certificate and exit
python3 main.py --no-cert-check # Skip automatic CA install check on startup
python3 main.py --scan # Scan Google IPs and find the fastest one
```
> **Auto-install:** On startup (MITM mode), the proxy automatically checks if the CA certificate is trusted and attempts to install it. Use `--no-cert-check` to skip this. If auto-install fails (e.g. needs elevation), run `python main.py --install-cert` manually or follow Step 6 above.
### Scanning for the Fastest Google IP
If your current `google_ip` in `config.json` is blocked or slow, you can scan to find a faster one:
```bash
python3 main.py --scan
```
This will:
1. Probe 27 candidate Google IPs in parallel
2. Measure latency from your network
3. Display results in a table
4. Recommend the fastest IP
5. Exit with exit code 0 if at least one IP is reachable, 1 otherwise
**Example output:**
```
Scanning 27 Google frontend IPs
SNI: www.google.com
Timeout: 4s per IP
Concurrency: 8 parallel probes
IP LATENCY STATUS
-------------------- ------------ -------------------------
216.239.32.120 42ms OK
216.239.34.120 45ms OK
216.239.36.120 52ms OK
142.250.80.142 timeout timeout
...
Result: 15 / 27 reachable
Top 3 fastest IPs:
1. 216.239.32.120 (42ms)
2. 216.239.34.120 (45ms)
3. 216.239.36.120 (52ms)
Recommended: Set "google_ip": "216.239.32.120" in config.json
```
After scanning, update your `config.json` with the recommended IP and restart the proxy.
---
## Architecture
@@ -375,6 +418,7 @@ MasterHttpRelayVPN/
├── mitm.py # On-the-fly TLS interception
├── cert_installer.py # Cross-platform CA installer (Windows/macOS/Linux + Firefox)
├── codec.py # Content-Encoding decoder (gzip/deflate/br/zstd)
├── google_ip_scanner.py # Scanner to find the fastest reachable Google IP
├── constants.py # Tunable defaults and shared data
└── logging_utils.py # Colored, aligned log formatter
```
+44
View File
@@ -282,10 +282,53 @@ python3 main.py --log-level DEBUG
python3 main.py -c /path/to/config.json
python3 main.py --install-cert # نصب گواهی CA و خروج
python3 main.py --no-cert-check # رد شدن از بررسی خودکار گواهی
python3 main.py --scan # اسکن IP های Google و یافتن سریع‌ترین
```
> **نصب خودکار:** هنگام اجرا در حالت `apps_script`، برنامه به‌طور خودکار بررسی می‌کند که آیا گواهی CA قابل اعتماد است یا نه و در صورت نیاز آن را نصب می‌کند. اگر نصب خودکار ناموفق بود (مثلاً نیاز به دسترسی مدیر دارد)، می‌توانید دستور `python main.py --install-cert` را اجرا کنید یا مراحل مرحله ۶ را دنبال کنید.
### اسکن کردن برای یافتن سریع‌ترین IP گوگل
اگر `google_ip` فعلی در `config.json` بلاک شده یا آهسته است، می‌توانید اسکن کنید تا سریع‌ترین آن را پیدا کنید:
```bash
python3 main.py --scan
```
این دستور:
1. ۲۷ IP برای fronting Google را به‌صورت موازی بررسی می‌کند
2. تأخیر (latency) از شبکه شما را اندازه می‌گیرد
3. نتایج را در جدول نمایش می‌دهد
4. سریع‌ترین IP را پیشنهاد می‌دهد
5. اگر حداقل یک IP در دسترس باشد کد خروج ۰، ورنه ۱ را برمی‌گرداند
**نمونه خروجی:**
```
Scanning 27 Google frontend IPs
SNI: www.google.com
Timeout: 4s per IP
Concurrency: 8 parallel probes
IP LATENCY STATUS
-------------------- ------------ -------------------------
216.239.32.120 42ms OK
216.239.34.120 45ms OK
216.239.36.120 52ms OK
142.250.80.142 timeout timeout
...
Result: 15 / 27 reachable
Top 3 fastest IPs:
1. 216.239.32.120 (42ms)
2. 216.239.34.120 (45ms)
3. 216.239.36.120 (52ms)
Recommended: Set "google_ip": "216.239.32.120" in config.json
```
پس از اسکن، مقدار `google_ip` در `config.json` را با IP پیشنهادی به‌روزرسانی کنید و پراکسی را دوباره راه‌اندازی کنید.
---
## معماری
@@ -318,6 +361,7 @@ MasterHttpRelayVPN/
├── mitm.py # ساخت و مدیریت گواهی‌ها
├── cert_installer.py # نصب خودکار CA در ویندوز/مک/لینوکس + فایرفاکس
├── codec.py # رمزگشای Content-Encoding (gzip/deflate/br/zstd)
├── google_ip_scanner.py # اسکنر IP های Google برای یافتن سریع‌ترین
├── constants.py # مقادیر پیش‌فرض قابل تنظیم
└── logging_utils.py # فرمت‌دهنده‌ی لاگ رنگی و منظم
```
+15
View File
@@ -23,6 +23,7 @@ if _SRC_DIR not in sys.path:
from cert_installer import install_ca, is_ca_trusted
from constants import __version__
from lan_utils import log_lan_access
from google_ip_scanner import scan_sync
from logging_utils import configure as configure_logging, print_banner
from mitm import CA_CERT_FILE
from proxy_server import ProxyServer
@@ -92,6 +93,11 @@ def parse_args():
action="store_true",
help="Skip the certificate installation check on startup.",
)
parser.add_argument(
"--scan",
action="store_true",
help="Scan Google IPs to find the fastest reachable one and exit.",
)
return parser.parse_args()
@@ -192,6 +198,15 @@ def main():
ok = install_ca(CA_CERT_FILE)
sys.exit(0 if ok else 1)
# ── Google IP Scanner ──────────────────────────────────────────────────
if args.scan:
setup_logging("INFO")
front_domain = config.get("front_domain", "www.google.com")
_log = logging.getLogger("Main")
_log.info(f"Scanning Google IPs (fronting domain: {front_domain})")
ok = scan_sync(front_domain)
sys.exit(0 if ok else 1)
setup_logging(config.get("log_level", "INFO"))
log = logging.getLogger("Main")
+33
View File
@@ -23,6 +23,39 @@ RELAY_TIMEOUT = 25
TLS_CONNECT_TIMEOUT = 15
TCP_CONNECT_TIMEOUT = 10
# ── Google IP Scanner settings ──────────────────────────────────────────────
GOOGLE_SCANNER_TIMEOUT = 4 # Timeout per IP probe (seconds)
GOOGLE_SCANNER_CONCURRENCY = 8 # Parallel probes
# Candidate Google frontend IPs for scanning (multiple ASNs and regions)
CANDIDATE_IPS: tuple[str, ...] = (
"216.239.32.120",
"216.239.34.120",
"216.239.36.120",
"216.239.38.120",
"142.250.80.142",
"142.250.80.138",
"142.250.179.110",
"142.250.185.110",
"142.250.184.206",
"142.250.190.238",
"142.250.191.78",
"172.217.1.206",
"172.217.14.206",
"172.217.16.142",
"172.217.22.174",
"172.217.164.110",
"172.217.168.206",
"172.217.169.206",
"34.107.221.82",
"142.251.32.110",
"142.251.33.110",
"142.251.46.206",
"142.251.46.238",
"142.250.80.170",
"142.250.72.206",
"142.250.64.206",
"142.250.72.110",
)
# ── Response cache ────────────────────────────────────────────────────────
CACHE_MAX_MB = 50
+194
View File
@@ -0,0 +1,194 @@
"""
Google IP Scanner — finds the fastest reachable Google frontend IP.
Scans a list of candidate Google IPs via HTTPS (with SNI fronting), measures
latency, and reports results in a formatted table. Useful for finding the best
IP to configure in config.json when your current IP is blocked.
"""
from __future__ import annotations
import asyncio
import logging
import ssl
import time
from dataclasses import dataclass
from typing import Optional
from constants import CANDIDATE_IPS, GOOGLE_SCANNER_TIMEOUT, GOOGLE_SCANNER_CONCURRENCY
log = logging.getLogger("Scanner")
@dataclass
class ProbeResult:
"""Result of a single IP probe."""
ip: str
latency_ms: Optional[int] = None
error: Optional[str] = None
@property
def ok(self) -> bool:
return self.latency_ms is not None
async def _probe_ip(
ip: str,
sni: str,
semaphore: asyncio.Semaphore,
timeout: float,
) -> ProbeResult:
"""
Probe a single IP via HTTPS with SNI fronting.
Args:
ip: The IP to probe (xxx.xxx.xxx.xxx).
sni: The SNI hostname to use in TLS handshake.
semaphore: Rate limiter to control concurrency.
timeout: Timeout in seconds for the entire probe.
Returns:
ProbeResult with latency_ms (if successful) or error message.
"""
async with semaphore:
start_time = time.time()
try:
# Create SSL context that skips certificate verification
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# Connect to IP:443 with SNI set to the fronting domain
reader, writer = await asyncio.wait_for(
asyncio.open_connection(
ip,
443,
ssl=ctx,
server_hostname=sni,
),
timeout=timeout,
)
# Send minimal HTTP HEAD request
request = f"HEAD / HTTP/1.1\r\nHost: {sni}\r\nConnection: close\r\n\r\n"
writer.write(request.encode())
await writer.drain()
# Read response header (first 256 bytes is plenty for HTTP status)
response = await asyncio.wait_for(reader.read(256), timeout=timeout)
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
# Check if we got an HTTP response
if not response:
return ProbeResult(ip=ip, error="empty response")
response_str = response.decode("utf-8", errors="ignore")
if not response_str.startswith("HTTP/"):
return ProbeResult(ip=ip, error=f"invalid response: {response_str[:30]!r}")
# Success — return latency in milliseconds
elapsed_ms = int((time.time() - start_time) * 1000)
return ProbeResult(ip=ip, latency_ms=elapsed_ms)
except asyncio.TimeoutError:
return ProbeResult(ip=ip, error="timeout")
except ConnectionRefusedError:
return ProbeResult(ip=ip, error="connection refused")
except ConnectionResetError:
return ProbeResult(ip=ip, error="connection reset")
except OSError as e:
return ProbeResult(ip=ip, error=f"network error: {e.strerror or str(e)}")
except Exception as e:
return ProbeResult(ip=ip, error=f"probe failed: {type(e).__name__}")
async def run(front_domain: str) -> bool:
"""
Scan all candidate Google IPs and display results.
Args:
front_domain: The SNI hostname to use (e.g. "www.google.com").
Returns:
True if at least one IP is reachable, False otherwise.
"""
timeout = GOOGLE_SCANNER_TIMEOUT
concurrency = GOOGLE_SCANNER_CONCURRENCY
print()
print(f"Scanning {len(CANDIDATE_IPS)} Google frontend IPs")
print(f" SNI: {front_domain}")
print(f" Timeout: {timeout}s per IP")
print(f" Concurrency: {concurrency} parallel probes")
print()
# Create semaphore to limit concurrency
semaphore = asyncio.Semaphore(concurrency)
# Launch all probes concurrently
tasks = [
_probe_ip(ip, front_domain, semaphore, timeout)
for ip in CANDIDATE_IPS
]
results = await asyncio.gather(*tasks)
# Sort by latency (successful first, then by speed)
results.sort(key=lambda r: (not r.ok, r.latency_ms or float("inf")))
# Display results table
print(f"{'IP':<20} {'LATENCY':<12} {'STATUS':<25}")
print(f"{'-' * 20} {'-' * 12} {'-' * 25}")
ok_count = 0
for result in results:
if result.ok:
print(f"{result.ip:<20} {result.latency_ms:>8}ms OK")
ok_count += 1
else:
status = result.error or "unknown error"
print(f"{result.ip:<20} {'':<12} {status:<25}")
print()
print(f"Result: {ok_count} / {len(results)} reachable")
if ok_count == 0:
print("No Google IPs reachable from this network.")
print()
return False
# Show top 3 fastest
fastest = [r for r in results if r.ok][:3]
print()
print("Top 3 fastest IPs:")
for i, result in enumerate(fastest, 1):
print(f" {i}. {result.ip} ({result.latency_ms}ms)")
print()
print(f"Recommended: Set \"google_ip\": \"{fastest[0].ip}\" in config.json")
print()
return True
def scan_sync(front_domain: str) -> bool:
"""
Wrapper to run async scanner from sync context (e.g. main.py).
Args:
front_domain: The SNI hostname to use.
Returns:
True if at least one IP is reachable, False otherwise.
"""
try:
return asyncio.run(run(front_domain))
except KeyboardInterrupt:
print("\nScan interrupted by user.")
return False
except Exception as e:
log.error(f"Scan failed: {e}")
return False