feat: implement CA certificate serving for LAN devices during sharing

This commit is contained in:
Abolfazl
2026-05-05 07:53:34 +03:30
parent f42497aa82
commit 01e28f50bb
3 changed files with 68 additions and 3 deletions
+17 -1
View File
@@ -284,8 +284,23 @@ def main():
# print concrete IPv4 addresses users can use on other devices.
lan_mode = lan_sharing or listen_host in ("0.0.0.0", "::")
if lan_mode:
http_port = config.get("http_port", config.get("listen_port", 8080))
socks_port = config.get("socks5_port", 1080)
log_lan_access(config.get("http_port", config.get("listen_port", 8080)), socks_port)
log_lan_access(http_port, socks_port)
if lan_sharing:
# Log CA download URLs so LAN devices know where to get the cert.
from core.lan_utils import get_lan_ips
ca_urls = [f"http://{addr}/ca.crt" for addr in get_lan_ips(http_port)]
if ca_urls:
log.info(
"CA certificate download (install on other devices): %s",
" OR ".join(ca_urls),
)
else:
log.info(
"CA certificate download: http://<your-LAN-IP>:%d/ca.crt", http_port
)
try:
asyncio.run(_run(config))
@@ -293,6 +308,7 @@ def main():
log.info("Stopped")
def _make_exception_handler(log):
"""Return an asyncio exception handler that silences Windows WinError 10054
noise from connection cleanup (ConnectionResetError in
+15 -1
View File
@@ -64,6 +64,9 @@ LEVEL_LABEL = {
# Special spotlight line for execution usage updates.
EXEC_USAGE_PREFIX = "Apps Script executions used so far:"
# Spotlight line for the CA certificate LAN download URL.
CA_DOWNLOAD_PREFIX = "CA certificate download"
# Stable per-component color (keeps log scanning easy).
COMPONENT_COLORS = {
"Main": FG_CYAN,
@@ -156,8 +159,19 @@ class PrettyFormatter(logging.Formatter):
and isinstance(message, str)
and message.startswith(EXEC_USAGE_PREFIX)
)
highlight_ca_download = (
isinstance(message, str)
and message.startswith(CA_DOWNLOAD_PREFIX)
)
if highlight_exec_usage:
if highlight_ca_download:
plain_time = self._fmt_time(record)
plain_level = f"{LEVEL_GLYPH.get(record.levelname, '·')} {LEVEL_LABEL.get(record.levelname, record.levelname[:5].ljust(5))}"
plain_comp = f"[{record.name[: self.COMPONENT_WIDTH].ljust(self.COMPONENT_WIDTH)}]"
line = f"{plain_time} {plain_level} {plain_comp} {message}"
if self.use_color:
line = f"{BOLD}{FG_GREEN}{line}{RESET}"
elif highlight_exec_usage:
# Force a single vivid color for the entire line so this metric pops.
plain_time = self._fmt_time(record)
plain_level = f"{LEVEL_GLYPH.get(record.levelname, '·')} {LEVEL_LABEL.get(record.levelname, record.levelname[:5].ljust(5))}"
+36 -1
View File
@@ -188,13 +188,18 @@ class ProxyServer:
self._SNI_REWRITE_SUFFIXES = SNI_REWRITE_SUFFIXES
try:
from .mitm import MITMCertManager
from .mitm import MITMCertManager, CA_CERT_FILE
self.mitm = MITMCertManager()
self._ca_cert_file = CA_CERT_FILE
except ImportError:
log.error("Apps Script relay requires the 'cryptography' package.")
log.error("Run: pip install cryptography")
raise SystemExit(1)
# When LAN sharing is active, serve the CA cert over HTTP so other
# devices on the network can download and install it easily.
self._lan_sharing: bool = bool(config.get("lan_sharing", False))
# ── Host-policy helpers ───────────────────────────────────────
@staticmethod
@@ -366,6 +371,31 @@ class ProxyServer:
# ── client handler ────────────────────────────────────────────
async def _serve_ca_cert(self, writer: asyncio.StreamWriter) -> None:
"""Serve the MITM CA certificate so LAN devices can install it."""
import os as _os
ca_path = getattr(self, "_ca_cert_file", None)
if not ca_path or not _os.path.exists(ca_path):
writer.write(
b"HTTP/1.1 404 Not Found\r\n"
b"Content-Length: 0\r\n"
b"Connection: close\r\n\r\n"
)
await writer.drain()
return
with open(ca_path, "rb") as f:
cert_data = f.read()
headers = (
b"HTTP/1.1 200 OK\r\n"
b"Content-Type: application/x-x509-ca-cert\r\n"
b"Content-Disposition: attachment; filename=\"ca.crt\"\r\n"
+ b"Content-Length: " + str(len(cert_data)).encode() + b"\r\n"
+ b"Connection: close\r\n\r\n"
)
writer.write(headers + cert_data)
await writer.drain()
log.info("Served CA certificate to LAN device")
async def _on_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info("peername")
task = self._track_current_task()
@@ -401,6 +431,11 @@ class ProxyServer:
return
method = parts[0].upper()
path = parts[1] if len(parts) >= 2 else "/"
if method == "GET" and path == "/ca.crt" and self._lan_sharing:
await self._serve_ca_cert(writer)
return
if method == "CONNECT":
await self._do_connect(parts[1], reader, writer)