Files
MasterHttpRelayVPN/mitm.py
T
Abolfazl fd22de27ca Add cross-platform CA auto-installer and production hardening
- Add cert_installer.py: cross-platform trusted CA installer
  (Windows certutil/PowerShell, macOS security, Linux update-ca-certificates,
  Firefox NSS via certutil/certutil)
- main.py: add --install-cert and --no-cert-check CLI flags; auto-detect and
  auto-install MITM CA on startup when not yet trusted
- mitm.py: rename CA CN/O from 'DomainFront Tunnel' to 'MasterHttpRelayVPN'
- proxy_server.py: downgrade TLS handshake errors to DEBUG to reduce log noise
  for non-HTTPS traffic (MTProto, plain HTTP on non-443 ports)
- README.md / README_FA.md: document new CLI flags, auto-install behaviour,
  and cert_installer.py in project files table
2026-04-21 04:56:49 +03:30

154 lines
5.4 KiB
Python

"""
MITM certificate manager for HTTPS interception.
Generates a CA certificate (once, stored as files) and per-domain
certificates (on the fly, cached in memory) so the local proxy can
decrypt HTTPS traffic and relay it through Apps Script.
The user must install ca/ca.crt in their browser's trusted CAs once.
Requires: pip install cryptography
"""
import datetime
import logging
import os
import ssl
import tempfile
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
log = logging.getLogger("MITM")
CA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ca")
CA_KEY_FILE = os.path.join(CA_DIR, "ca.key")
CA_CERT_FILE = os.path.join(CA_DIR, "ca.crt")
class MITMCertManager:
def __init__(self):
self._ca_key = None
self._ca_cert = None
self._ctx_cache: dict[str, ssl.SSLContext] = {}
self._cert_dir = tempfile.mkdtemp(prefix="domainfront_certs_")
self._ensure_ca()
def _ensure_ca(self):
if os.path.exists(CA_KEY_FILE) and os.path.exists(CA_CERT_FILE):
with open(CA_KEY_FILE, "rb") as f:
self._ca_key = serialization.load_pem_private_key(
f.read(), password=None
)
with open(CA_CERT_FILE, "rb") as f:
self._ca_cert = x509.load_pem_x509_certificate(f.read())
log.info("Loaded CA from %s", CA_DIR)
else:
self._create_ca()
def _create_ca(self):
os.makedirs(CA_DIR, exist_ok=True)
self._ca_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048
)
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, "MasterHttpRelayVPN"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MasterHttpRelayVPN"),
])
now = datetime.datetime.now(datetime.timezone.utc)
self._ca_cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(self._ca_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=3650))
.add_extension(
x509.BasicConstraints(ca=True, path_length=0), critical=True
)
.add_extension(
x509.KeyUsage(
digital_signature=True,
key_cert_sign=True,
crl_sign=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
encipher_only=False,
decipher_only=False,
),
critical=True,
)
.sign(self._ca_key, hashes.SHA256())
)
with open(CA_KEY_FILE, "wb") as f:
f.write(
self._ca_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
)
)
with open(CA_CERT_FILE, "wb") as f:
f.write(self._ca_cert.public_bytes(serialization.Encoding.PEM))
log.warning("Generated new CA certificate: %s", CA_CERT_FILE)
log.warning(">>> Install this file in your browser's Trusted Root CAs! <<<")
def get_server_context(self, domain: str) -> ssl.SSLContext:
if domain not in self._ctx_cache:
key_pem, cert_pem = self._generate_domain_cert(domain)
cert_file = os.path.join(self._cert_dir, f"{domain}.crt")
key_file = os.path.join(self._cert_dir, f"{domain}.key")
ca_pem = self._ca_cert.public_bytes(serialization.Encoding.PEM)
with open(cert_file, "wb") as f:
f.write(cert_pem + ca_pem)
with open(key_file, "wb") as f:
f.write(key_pem)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.set_alpn_protocols(["http/1.1"])
ctx.load_cert_chain(cert_file, key_file)
self._ctx_cache[domain] = ctx
return self._ctx_cache[domain]
def _generate_domain_cert(self, domain: str):
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048
)
subject = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, domain),
])
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(self._ca_cert.subject)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=365))
.add_extension(
x509.SubjectAlternativeName([x509.DNSName(domain)]),
critical=False,
)
.sign(self._ca_key, hashes.SHA256())
)
key_pem = key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
)
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
return key_pem, cert_pem