Update main.py

This commit is contained in:
anthroposcene
2026-04-29 02:53:11 -07:00
parent e025324877
commit 8fc40e8c33
+199 -83
View File
@@ -1,10 +1,10 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
DomainFront Tunnel — Bypass DPI censorship via Domain Fronting. DomainFront Tunnel — Bypass DPI censorship via GAS (Google Apps Script) and Cloudflare Workers.
Run a local HTTP proxy that tunnels all traffic through a CDN using Run a local HTTP proxy that tunnels all traffic through a Google Apps
domain fronting: the TLS SNI shows an allowed domain while the encrypted Script relay fronted by www.google.com (TLS SNI shows www.google.com
HTTP Host header routes to your Cloudflare Worker relay. while the encrypted Host header points at script.google.com).
""" """
import argparse import argparse
@@ -14,26 +14,36 @@ import logging
import os import os
import sys import sys
from core.cert_installer import install_ca, is_ca_trusted # Project modules live under ./src — put that folder on sys.path so the
from core.mitm import CA_CERT_FILE # historical flat imports ("from proxy_server import …") keep working.
from core.proxy_server import ProxyServer _SRC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "src")
if _SRC_DIR not in sys.path:
sys.path.insert(0, _SRC_DIR)
__version__ = "1.0.0" from cert_installer import install_ca, uninstall_ca, is_ca_trusted
from constants import __version__
from lan_utils import log_lan_access
from google_ip_scanner import scan_sync
from logging_utils import configure as configure_logging, print_banner
from mitm import CA_CERT_FILE
from proxy_server import ProxyServer
def setup_logging(level_name: str): def setup_logging(level_name: str):
level = getattr(logging, level_name.upper(), logging.INFO) configure_logging(level_name)
logging.basicConfig(
level=level,
format="%(asctime)s [%(name)-12s] %(levelname)-7s %(message)s", _PLACEHOLDER_AUTH_KEYS = {
datefmt="%H:%M:%S", "",
) "CHANGE_ME_TO_A_STRONG_SECRET",
"your-secret-password-here",
}
def parse_args(): def parse_args():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="domainfront-tunnel", prog="domainfront-tunnel",
description="Local HTTP proxy that tunnels traffic through domain fronting.", description="Local HTTP proxy that relays traffic through Google Apps Script.",
) )
parser.add_argument( parser.add_argument(
"-c", "--config", "-c", "--config",
@@ -51,6 +61,17 @@ def parse_args():
default=None, default=None,
help="Override listen host (env: DFT_HOST)", help="Override listen host (env: DFT_HOST)",
) )
parser.add_argument(
"--socks5-port",
type=int,
default=None,
help="Override SOCKS5 listen port (env: DFT_SOCKS5_PORT)",
)
parser.add_argument(
"--disable-socks5",
action="store_true",
help="Disable the built-in SOCKS5 listener.",
)
parser.add_argument( parser.add_argument(
"--log-level", "--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR"], choices=["DEBUG", "INFO", "WARNING", "ERROR"],
@@ -67,16 +88,48 @@ def parse_args():
action="store_true", action="store_true",
help="Install the MITM CA certificate as a trusted root and exit.", help="Install the MITM CA certificate as a trusted root and exit.",
) )
parser.add_argument(
"--uninstall-cert",
action="store_true",
help="Remove the MITM CA certificate from trusted roots and exit.",
)
parser.add_argument( parser.add_argument(
"--no-cert-check", "--no-cert-check",
action="store_true", action="store_true",
help="Skip the certificate installation check on startup.", help="Skip the certificate installation check on startup.",
) )
parser.add_argument(
"--scan",
action="store_true",
help="Scan Google IPs to find the fastest reachable one and exit.",
)
return parser.parse_args() return parser.parse_args()
def main(): def main():
args = parse_args() args = parse_args()
# Handle cert-only commands before loading config so they can run standalone.
if args.install_cert or args.uninstall_cert:
setup_logging("INFO")
_log = logging.getLogger("Main")
if args.install_cert:
_log.info("Installing CA certificate…")
if not os.path.exists(CA_CERT_FILE):
from mitm import MITMCertManager
MITMCertManager() # side-effect: creates ca/ca.crt + ca/ca.key
ok = install_ca(CA_CERT_FILE)
sys.exit(0 if ok else 1)
_log.info("Removing CA certificate…")
ok = uninstall_ca(CA_CERT_FILE)
if ok:
_log.info("CA certificate removed successfully.")
else:
_log.warning("CA certificate removal may have failed. Check logs above.")
sys.exit(0 if ok else 1)
config_path = args.config config_path = args.config
try: try:
@@ -84,8 +137,31 @@ def main():
config = json.load(f) config = json.load(f)
except FileNotFoundError: except FileNotFoundError:
print(f"Config not found: {config_path}") print(f"Config not found: {config_path}")
print("Copy config.example.json to config.json and fill in your values.") # Offer the interactive wizard if it's available and we're on a TTY.
sys.exit(1) wizard = os.path.join(os.path.dirname(os.path.abspath(__file__)), "setup.py")
if os.path.exists(wizard) and sys.stdin.isatty():
try:
answer = input("Run the interactive setup wizard now? [Y/n]: ").strip().lower()
except EOFError:
answer = "n"
if answer in ("", "y", "yes"):
import subprocess
rc = subprocess.call([sys.executable, wizard])
if rc != 0:
sys.exit(rc)
try:
with open(config_path) as f:
config = json.load(f)
except Exception as e:
print(f"Could not load config after setup: {e}")
sys.exit(1)
else:
print("Copy config.example.json to config.json and fill in your values,")
print("or run: python setup.py")
sys.exit(1)
else:
print("Run: python setup.py (or copy config.example.json to config.json)")
sys.exit(1)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
print(f"Invalid JSON in config: {e}") print(f"Invalid JSON in config: {e}")
sys.exit(1) sys.exit(1)
@@ -107,6 +183,14 @@ def main():
elif os.environ.get("DFT_HOST"): elif os.environ.get("DFT_HOST"):
config["listen_host"] = os.environ["DFT_HOST"] config["listen_host"] = os.environ["DFT_HOST"]
if args.socks5_port is not None:
config["socks5_port"] = args.socks5_port
elif os.environ.get("DFT_SOCKS5_PORT"):
config["socks5_port"] = int(os.environ["DFT_SOCKS5_PORT"])
if args.disable_socks5:
config["socks5_enabled"] = False
if args.log_level is not None: if args.log_level is not None:
config["log_level"] = args.log_level config["log_level"] = args.log_level
elif os.environ.get("DFT_LOG_LEVEL"): elif os.environ.get("DFT_LOG_LEVEL"):
@@ -117,88 +201,120 @@ def main():
print(f"Missing required config key: {key}") print(f"Missing required config key: {key}")
sys.exit(1) sys.exit(1)
mode = config.get("mode", "domain_fronting") if config.get("auth_key", "") in _PLACEHOLDER_AUTH_KEYS:
if mode == "custom_domain" and "custom_domain" not in config: print(
print("Mode 'custom_domain' requires 'custom_domain' in config") "Refusing to start: 'auth_key' is unset or uses a known placeholder.\n"
"Pick a long random secret and set it in both config.json AND "
"the AUTH_KEY constant inside Code.gs (they must match)."
)
sys.exit(1) sys.exit(1)
if mode == "domain_fronting":
for key in ("front_domain", "worker_host"):
if key not in config:
print(f"Mode 'domain_fronting' requires '{key}' in config")
sys.exit(1)
if mode == "google_fronting":
if "worker_host" not in config:
print("Mode 'google_fronting' requires 'worker_host' in config (your Cloud Run URL)")
sys.exit(1)
if mode == "apps_script":
sid = config.get("script_ids") or config.get("script_id")
if not sid or (isinstance(sid, str) and sid == "YOUR_APPS_SCRIPT_DEPLOYMENT_ID"):
print("Mode 'apps_script' requires 'script_id' in config.")
print("Deploy the Apps Script from appsscript/Code.gs and paste the Deployment ID.")
sys.exit(1)
# ── Certificate installation ────────────────────────────────────────── # Always Apps Script mode — force-set for backward-compat configs.
if args.install_cert: config["mode"] = "apps_script"
sid = config.get("script_ids") or config.get("script_id")
if not sid or (isinstance(sid, str) and sid == "YOUR_APPS_SCRIPT_DEPLOYMENT_ID"):
print("Missing 'script_id' in config.")
print("Deploy the Apps Script from Code.gs and paste the Deployment ID.")
sys.exit(1)
# ── Google IP Scanner ──────────────────────────────────────────────────
if args.scan:
setup_logging("INFO") setup_logging("INFO")
front_domain = config.get("front_domain", "www.google.com")
_log = logging.getLogger("Main") _log = logging.getLogger("Main")
_log.info("Installing CA certificate…") _log.info(f"Scanning Google IPs (fronting domain: {front_domain})")
ok = install_ca(CA_CERT_FILE) ok = scan_sync(front_domain)
sys.exit(0 if ok else 1) sys.exit(0 if ok else 1)
setup_logging(config.get("log_level", "INFO")) setup_logging(config.get("log_level", "INFO"))
log = logging.getLogger("Main") log = logging.getLogger("Main")
mode = config.get("mode", "domain_fronting") print_banner(__version__)
log.info("DomainFront Tunnel starting (mode: %s)", mode) log.info("DomainFront Tunnel starting (Apps Script relay)")
if mode == "custom_domain": log.info("Apps Script relay : SNI=%s → script.google.com",
log.info("Custom domain : %s", config["custom_domain"]) config.get("front_domain", "www.google.com"))
elif mode == "google_fronting": script_ids = config.get("script_ids") or config.get("script_id")
log.info("Google fronting : SNI=%s → Host=%s", if isinstance(script_ids, list):
config.get("front_domain", "www.google.com"), config["worker_host"]) log.info("Script IDs : %d scripts (sticky per-host)", len(script_ids))
log.info("Google IP : %s", config.get("google_ip", "216.239.38.120")) for i, sid in enumerate(script_ids):
elif mode == "apps_script": log.info(" [%d] %s", i + 1, sid)
log.info("Apps Script relay : SNI=%s → script.google.com",
config.get("front_domain", "www.google.com"))
script_ids = config.get("script_ids") or config.get("script_id")
if isinstance(script_ids, list):
log.info("Script IDs : %d scripts (round-robin)", len(script_ids))
for i, sid in enumerate(script_ids):
log.info(" [%d] %s", i + 1, sid)
else:
log.info("Script ID : %s", script_ids)
# Ensure CA file exists before checking / installing it.
# MITMCertManager generates ca/ca.crt on first instantiation.
if not os.path.exists(CA_CERT_FILE):
from core.mitm import MITMCertManager
MITMCertManager() # side-effect: creates ca/ca.crt + ca/ca.key
# Auto-install MITM CA if not already trusted
if not args.no_cert_check:
if not is_ca_trusted(CA_CERT_FILE):
log.warning("MITM CA is not trusted — attempting automatic installation…")
ok = install_ca(CA_CERT_FILE)
if ok:
log.info("CA certificate installed. You may need to restart your browser.")
else:
log.error(
"Auto-install failed. Run with --install-cert (may need admin/sudo) "
"or manually install ca/ca.crt as a trusted root CA."
)
else:
log.info("MITM CA is already trusted.")
else: else:
log.info("Front domain (SNI) : %s", config.get("front_domain", "?")) log.info("Script ID : %s", script_ids)
log.info("Worker host (Host) : %s", config.get("worker_host", "?"))
log.info("Proxy address : %s:%d", config.get("listen_host", "127.0.0.1"), config.get("listen_port", 8080)) # Ensure CA file exists before checking / installing it.
# MITMCertManager generates ca/ca.crt on first instantiation.
if not os.path.exists(CA_CERT_FILE):
from mitm import MITMCertManager
MITMCertManager() # side-effect: creates ca/ca.crt + ca/ca.key
# Auto-install MITM CA if not already trusted
if not args.no_cert_check:
if not is_ca_trusted(CA_CERT_FILE):
log.warning("MITM CA is not trusted — attempting automatic installation…")
ok = install_ca(CA_CERT_FILE)
if ok:
log.info("CA certificate installed. You may need to restart your browser.")
else:
log.error(
"Auto-install failed. Run with --install-cert (may need admin/sudo) "
"or manually install ca/ca.crt as a trusted root CA."
)
else:
log.info("MITM CA is already trusted.")
# ── LAN sharing configuration ────────────────────────────────────────
lan_sharing = config.get("lan_sharing", False)
listen_host = config.get("listen_host", "127.0.0.1")
if lan_sharing:
# If LAN sharing is enabled and host is still localhost, change to all interfaces
if listen_host == "127.0.0.1":
config["listen_host"] = "0.0.0.0"
listen_host = "0.0.0.0"
log.info("LAN sharing enabled — listening on all interfaces")
# If either explicit LAN sharing is enabled or we bind to all interfaces,
# print concrete IPv4 addresses users can use on other devices.
lan_mode = lan_sharing or listen_host in ("0.0.0.0", "::")
if lan_mode:
socks_port = config.get("socks5_port", 1080) if config.get("socks5_enabled", True) else None
log_lan_access(config.get("listen_port", 8080), socks_port)
try: try:
asyncio.run(ProxyServer(config).start()) asyncio.run(_run(config))
except KeyboardInterrupt: except KeyboardInterrupt:
log.info("Stopped") log.info("Stopped")
def _make_exception_handler(log):
"""Return an asyncio exception handler that silences Windows WinError 10054
noise from connection cleanup (ConnectionResetError in
_ProactorBasePipeTransport._call_connection_lost), which is harmless but
verbose on Python/Windows when a remote host force-closes a socket."""
def handler(loop, context):
exc = context.get("exception")
cb = context.get("handle") or context.get("source_traceback", "")
if (
isinstance(exc, ConnectionResetError)
and "_call_connection_lost" in str(cb)
):
return # suppress: benign Windows socket cleanup race
log.error("[asyncio] %s", context.get("message", context))
if exc:
loop.default_exception_handler(context)
return handler
async def _run(config):
loop = asyncio.get_running_loop()
_log = logging.getLogger("asyncio")
loop.set_exception_handler(_make_exception_handler(_log))
server = ProxyServer(config)
try:
await server.start()
finally:
await server.stop()
if __name__ == "__main__": if __name__ == "__main__":
main() main()