mirror of
https://github.com/masterking32/MasterHttpRelayVPN.git
synced 2026-05-18 07:34:35 +03:00
fd22de27ca
- Add cert_installer.py: cross-platform trusted CA installer (Windows certutil/PowerShell, macOS security, Linux update-ca-certificates, Firefox NSS via certutil/certutil) - main.py: add --install-cert and --no-cert-check CLI flags; auto-detect and auto-install MITM CA on startup when not yet trusted - mitm.py: rename CA CN/O from 'DomainFront Tunnel' to 'MasterHttpRelayVPN' - proxy_server.py: downgrade TLS handshake errors to DEBUG to reduce log noise for non-HTTPS traffic (MTProto, plain HTTP on non-443 ports) - README.md / README_FA.md: document new CLI flags, auto-install behaviour, and cert_installer.py in project files table
154 lines
5.4 KiB
Python
154 lines
5.4 KiB
Python
"""
|
|
MITM certificate manager for HTTPS interception.
|
|
|
|
Generates a CA certificate (once, stored as files) and per-domain
|
|
certificates (on the fly, cached in memory) so the local proxy can
|
|
decrypt HTTPS traffic and relay it through Apps Script.
|
|
|
|
The user must install ca/ca.crt in their browser's trusted CAs once.
|
|
|
|
Requires: pip install cryptography
|
|
"""
|
|
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import ssl
|
|
import tempfile
|
|
|
|
from cryptography import x509
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from cryptography.x509.oid import NameOID
|
|
|
|
log = logging.getLogger("MITM")
|
|
|
|
CA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ca")
|
|
CA_KEY_FILE = os.path.join(CA_DIR, "ca.key")
|
|
CA_CERT_FILE = os.path.join(CA_DIR, "ca.crt")
|
|
|
|
|
|
class MITMCertManager:
|
|
def __init__(self):
|
|
self._ca_key = None
|
|
self._ca_cert = None
|
|
self._ctx_cache: dict[str, ssl.SSLContext] = {}
|
|
self._cert_dir = tempfile.mkdtemp(prefix="domainfront_certs_")
|
|
self._ensure_ca()
|
|
|
|
def _ensure_ca(self):
|
|
if os.path.exists(CA_KEY_FILE) and os.path.exists(CA_CERT_FILE):
|
|
with open(CA_KEY_FILE, "rb") as f:
|
|
self._ca_key = serialization.load_pem_private_key(
|
|
f.read(), password=None
|
|
)
|
|
with open(CA_CERT_FILE, "rb") as f:
|
|
self._ca_cert = x509.load_pem_x509_certificate(f.read())
|
|
log.info("Loaded CA from %s", CA_DIR)
|
|
else:
|
|
self._create_ca()
|
|
|
|
def _create_ca(self):
|
|
os.makedirs(CA_DIR, exist_ok=True)
|
|
|
|
self._ca_key = rsa.generate_private_key(
|
|
public_exponent=65537, key_size=2048
|
|
)
|
|
subject = issuer = x509.Name([
|
|
x509.NameAttribute(NameOID.COMMON_NAME, "MasterHttpRelayVPN"),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MasterHttpRelayVPN"),
|
|
])
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
self._ca_cert = (
|
|
x509.CertificateBuilder()
|
|
.subject_name(subject)
|
|
.issuer_name(issuer)
|
|
.public_key(self._ca_key.public_key())
|
|
.serial_number(x509.random_serial_number())
|
|
.not_valid_before(now)
|
|
.not_valid_after(now + datetime.timedelta(days=3650))
|
|
.add_extension(
|
|
x509.BasicConstraints(ca=True, path_length=0), critical=True
|
|
)
|
|
.add_extension(
|
|
x509.KeyUsage(
|
|
digital_signature=True,
|
|
key_cert_sign=True,
|
|
crl_sign=True,
|
|
content_commitment=False,
|
|
key_encipherment=False,
|
|
data_encipherment=False,
|
|
key_agreement=False,
|
|
encipher_only=False,
|
|
decipher_only=False,
|
|
),
|
|
critical=True,
|
|
)
|
|
.sign(self._ca_key, hashes.SHA256())
|
|
)
|
|
|
|
with open(CA_KEY_FILE, "wb") as f:
|
|
f.write(
|
|
self._ca_key.private_bytes(
|
|
serialization.Encoding.PEM,
|
|
serialization.PrivateFormat.TraditionalOpenSSL,
|
|
serialization.NoEncryption(),
|
|
)
|
|
)
|
|
with open(CA_CERT_FILE, "wb") as f:
|
|
f.write(self._ca_cert.public_bytes(serialization.Encoding.PEM))
|
|
|
|
log.warning("Generated new CA certificate: %s", CA_CERT_FILE)
|
|
log.warning(">>> Install this file in your browser's Trusted Root CAs! <<<")
|
|
|
|
def get_server_context(self, domain: str) -> ssl.SSLContext:
|
|
if domain not in self._ctx_cache:
|
|
key_pem, cert_pem = self._generate_domain_cert(domain)
|
|
|
|
cert_file = os.path.join(self._cert_dir, f"{domain}.crt")
|
|
key_file = os.path.join(self._cert_dir, f"{domain}.key")
|
|
|
|
ca_pem = self._ca_cert.public_bytes(serialization.Encoding.PEM)
|
|
with open(cert_file, "wb") as f:
|
|
f.write(cert_pem + ca_pem)
|
|
with open(key_file, "wb") as f:
|
|
f.write(key_pem)
|
|
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
ctx.set_alpn_protocols(["http/1.1"])
|
|
ctx.load_cert_chain(cert_file, key_file)
|
|
self._ctx_cache[domain] = ctx
|
|
|
|
return self._ctx_cache[domain]
|
|
|
|
def _generate_domain_cert(self, domain: str):
|
|
key = rsa.generate_private_key(
|
|
public_exponent=65537, key_size=2048
|
|
)
|
|
subject = x509.Name([
|
|
x509.NameAttribute(NameOID.COMMON_NAME, domain),
|
|
])
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
cert = (
|
|
x509.CertificateBuilder()
|
|
.subject_name(subject)
|
|
.issuer_name(self._ca_cert.subject)
|
|
.public_key(key.public_key())
|
|
.serial_number(x509.random_serial_number())
|
|
.not_valid_before(now)
|
|
.not_valid_after(now + datetime.timedelta(days=365))
|
|
.add_extension(
|
|
x509.SubjectAlternativeName([x509.DNSName(domain)]),
|
|
critical=False,
|
|
)
|
|
.sign(self._ca_key, hashes.SHA256())
|
|
)
|
|
|
|
key_pem = key.private_bytes(
|
|
serialization.Encoding.PEM,
|
|
serialization.PrivateFormat.TraditionalOpenSSL,
|
|
serialization.NoEncryption(),
|
|
)
|
|
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
|
|
return key_pem, cert_pem
|