mirror of
https://github.com/masterking32/MasterHttpRelayVPN.git
synced 2026-05-17 21:24:37 +03:00
feat: Add unittest coverage
This commit is contained in:
@@ -51,3 +51,7 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
python main.py --help
|
python main.py --help
|
||||||
python main.py --version
|
python main.py --version
|
||||||
|
|
||||||
|
- name: Unit tests
|
||||||
|
run: |
|
||||||
|
python -m unittest discover -s tests -v
|
||||||
|
|||||||
@@ -0,0 +1 @@
|
|||||||
|
"""Test package for MasterHttpRelayVPN."""
|
||||||
@@ -0,0 +1,51 @@
|
|||||||
|
import pathlib
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
||||||
|
SRC = ROOT / "src"
|
||||||
|
if str(SRC) not in sys.path:
|
||||||
|
sys.path.insert(0, str(SRC))
|
||||||
|
|
||||||
|
from core.adblock import parse_hosts_text
|
||||||
|
|
||||||
|
|
||||||
|
class ParseHostsTextTests(unittest.TestCase):
|
||||||
|
def test_parses_hosts_and_bare_domains_and_deduplicates(self):
|
||||||
|
text = """
|
||||||
|
# comment
|
||||||
|
0.0.0.0 ads.example.com
|
||||||
|
127.0.0.1 tracker.example.com # inline comment
|
||||||
|
plain.example.org
|
||||||
|
ads.example.com
|
||||||
|
EXAMPLE.NET.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
parse_hosts_text(text),
|
||||||
|
[
|
||||||
|
"ads.example.com",
|
||||||
|
"tracker.example.com",
|
||||||
|
"plain.example.org",
|
||||||
|
"example.net",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_skips_invalid_reserved_and_wildcard_entries(self):
|
||||||
|
text = """
|
||||||
|
localhost
|
||||||
|
localhost.localdomain
|
||||||
|
192.168.1.1
|
||||||
|
::1
|
||||||
|
analytics-*.example.com
|
||||||
|
invalid
|
||||||
|
bad_domain.example
|
||||||
|
host name.example.com
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.assertEqual(parse_hosts_text(text), [])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -0,0 +1,41 @@
|
|||||||
|
import gzip
|
||||||
|
import pathlib
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
import zlib
|
||||||
|
|
||||||
|
|
||||||
|
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
||||||
|
SRC = ROOT / "src"
|
||||||
|
if str(SRC) not in sys.path:
|
||||||
|
sys.path.insert(0, str(SRC))
|
||||||
|
|
||||||
|
from core import codec
|
||||||
|
|
||||||
|
|
||||||
|
class CodecTests(unittest.TestCase):
|
||||||
|
def test_supported_encodings_always_include_gzip_and_deflate(self):
|
||||||
|
encodings = codec.supported_encodings()
|
||||||
|
self.assertIn("gzip", encodings)
|
||||||
|
self.assertIn("deflate", encodings)
|
||||||
|
|
||||||
|
def test_decode_passes_through_unknown_empty_and_invalid_payloads(self):
|
||||||
|
raw = b"plain-body"
|
||||||
|
self.assertIs(codec.decode(raw, ""), raw)
|
||||||
|
self.assertIs(codec.decode(raw, "identity"), raw)
|
||||||
|
self.assertIs(codec.decode(raw, "unknown"), raw)
|
||||||
|
self.assertEqual(codec.decode(raw, "gzip"), raw)
|
||||||
|
|
||||||
|
def test_decode_gzip_and_deflate(self):
|
||||||
|
raw = b"hello through compression"
|
||||||
|
gzip_body = gzip.compress(raw)
|
||||||
|
zlib_body = zlib.compress(raw)
|
||||||
|
raw_deflate_body = zlib.compress(raw, wbits=-zlib.MAX_WBITS)
|
||||||
|
|
||||||
|
self.assertEqual(codec.decode(gzip_body, "gzip"), raw)
|
||||||
|
self.assertEqual(codec.decode(zlib_body, "deflate"), raw)
|
||||||
|
self.assertEqual(codec.decode(raw_deflate_body, "deflate"), raw)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -0,0 +1,66 @@
|
|||||||
|
import pathlib
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
||||||
|
SRC = ROOT / "src"
|
||||||
|
if str(SRC) not in sys.path:
|
||||||
|
sys.path.insert(0, str(SRC))
|
||||||
|
|
||||||
|
from core.constants import FRONT_SNI_POOL_GOOGLE
|
||||||
|
from relay.fronting_support import (
|
||||||
|
build_sni_pool,
|
||||||
|
parse_content_range,
|
||||||
|
validate_range_response,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class BuildSniPoolTests(unittest.TestCase):
|
||||||
|
def test_uses_explicit_overrides_with_normalization_and_deduplication(self):
|
||||||
|
result = build_sni_pool(
|
||||||
|
"www.google.com",
|
||||||
|
[" Mail.Google.com ", "mail.google.com", "accounts.google.com."],
|
||||||
|
)
|
||||||
|
self.assertEqual(result, ["mail.google.com", "accounts.google.com"])
|
||||||
|
|
||||||
|
def test_google_front_domain_uses_google_pool(self):
|
||||||
|
result = build_sni_pool("www.google.com", None)
|
||||||
|
self.assertEqual(result, list(FRONT_SNI_POOL_GOOGLE))
|
||||||
|
|
||||||
|
def test_non_google_front_domain_falls_back_to_single_host(self):
|
||||||
|
self.assertEqual(build_sni_pool("cdn.example.com", None), ["cdn.example.com"])
|
||||||
|
self.assertEqual(build_sni_pool("", None), ["www.google.com"])
|
||||||
|
|
||||||
|
|
||||||
|
class RangeParsingTests(unittest.TestCase):
|
||||||
|
def test_parse_content_range_accepts_valid_values(self):
|
||||||
|
self.assertEqual(parse_content_range("bytes 10-19/20"), (10, 19, 20))
|
||||||
|
|
||||||
|
def test_parse_content_range_rejects_invalid_values(self):
|
||||||
|
self.assertIsNone(parse_content_range(""))
|
||||||
|
self.assertIsNone(parse_content_range("bytes 10-19/*"))
|
||||||
|
self.assertIsNone(parse_content_range("bytes 10-9/20"))
|
||||||
|
self.assertIsNone(parse_content_range("bytes 10-19/19"))
|
||||||
|
|
||||||
|
def test_validate_range_response_success_and_failures(self):
|
||||||
|
headers = {"content-range": "bytes 10-13/20"}
|
||||||
|
body = b"data"
|
||||||
|
|
||||||
|
self.assertIsNone(validate_range_response(206, headers, body, 10, 13, 20))
|
||||||
|
self.assertEqual(
|
||||||
|
validate_range_response(200, headers, body, 10, 13, 20),
|
||||||
|
"status 200",
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
validate_range_response(206, {"content-range": "bytes 11-13/20"}, body, 10, 13, 20),
|
||||||
|
"Content-Range mismatch 11-13",
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
validate_range_response(206, headers, b"abc", 10, 13, 20),
|
||||||
|
"short chunk 3/4 B",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
@@ -0,0 +1,99 @@
|
|||||||
|
import pathlib
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
||||||
|
SRC = ROOT / "src"
|
||||||
|
if str(SRC) not in sys.path:
|
||||||
|
sys.path.insert(0, str(SRC))
|
||||||
|
|
||||||
|
from core.constants import CACHE_TTL_STATIC_LONG
|
||||||
|
from proxy.proxy_support import (
|
||||||
|
ResponseCache,
|
||||||
|
has_unsupported_transfer_encoding,
|
||||||
|
host_matches_rules,
|
||||||
|
is_ip_literal,
|
||||||
|
load_host_rules,
|
||||||
|
parse_content_length,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ProxySupportTests(unittest.TestCase):
|
||||||
|
def test_is_ip_literal_handles_ipv4_ipv6_and_hostnames(self):
|
||||||
|
self.assertTrue(is_ip_literal("127.0.0.1"))
|
||||||
|
self.assertTrue(is_ip_literal("[2001:db8::1]"))
|
||||||
|
self.assertFalse(is_ip_literal("example.com"))
|
||||||
|
|
||||||
|
def test_load_host_rules_and_match_suffixes(self):
|
||||||
|
rules = load_host_rules(["Example.com", ".example.org", "api.example.net."])
|
||||||
|
self.assertTrue(host_matches_rules("example.com", rules))
|
||||||
|
self.assertTrue(host_matches_rules("sub.example.org", rules))
|
||||||
|
self.assertTrue(host_matches_rules("api.example.net", rules))
|
||||||
|
self.assertFalse(host_matches_rules("example.org", rules))
|
||||||
|
self.assertFalse(host_matches_rules("other.test", rules))
|
||||||
|
|
||||||
|
def test_parse_content_length_uses_exact_header_name(self):
|
||||||
|
headers = (
|
||||||
|
b"HTTP/1.1 200 OK\r\n"
|
||||||
|
b"X-Content-Length: 999\r\n"
|
||||||
|
b"Content-Length: 42\r\n\r\n"
|
||||||
|
)
|
||||||
|
self.assertEqual(parse_content_length(headers), 42)
|
||||||
|
self.assertEqual(parse_content_length(b"HTTP/1.1 200 OK\r\n\r\n"), 0)
|
||||||
|
|
||||||
|
def test_has_unsupported_transfer_encoding(self):
|
||||||
|
self.assertFalse(
|
||||||
|
has_unsupported_transfer_encoding(
|
||||||
|
b"Transfer-Encoding: identity\r\n\r\n"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertTrue(
|
||||||
|
has_unsupported_transfer_encoding(
|
||||||
|
b"Transfer-Encoding: chunked\r\n\r\n"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertTrue(
|
||||||
|
has_unsupported_transfer_encoding(
|
||||||
|
b"Transfer-Encoding: gzip, chunked\r\n\r\n"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ResponseCacheParseTtlTests(unittest.TestCase):
|
||||||
|
def test_static_asset_uses_long_ttl(self):
|
||||||
|
raw = (
|
||||||
|
b"HTTP/1.1 200 OK\r\n"
|
||||||
|
b"Content-Type: image/png\r\n\r\n"
|
||||||
|
b"body"
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
ResponseCache.parse_ttl(raw, "https://example.com/logo.png"),
|
||||||
|
CACHE_TTL_STATIC_LONG,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_private_no_store_and_set_cookie_disable_caching(self):
|
||||||
|
private_resp = (
|
||||||
|
b"HTTP/1.1 200 OK\r\n"
|
||||||
|
b"Cache-Control: private, max-age=600\r\n\r\n"
|
||||||
|
b"body"
|
||||||
|
)
|
||||||
|
no_store_resp = (
|
||||||
|
b"HTTP/1.1 200 OK\r\n"
|
||||||
|
b"Cache-Control: no-store\r\n\r\n"
|
||||||
|
b"body"
|
||||||
|
)
|
||||||
|
cookie_resp = (
|
||||||
|
b"HTTP/1.1 200 OK\r\n"
|
||||||
|
b"Set-Cookie: sid=1\r\n"
|
||||||
|
b"Content-Type: image/png\r\n\r\n"
|
||||||
|
b"body"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(ResponseCache.parse_ttl(private_resp, "https://example.com/app.js"), 0)
|
||||||
|
self.assertEqual(ResponseCache.parse_ttl(no_store_resp, "https://example.com/app.js"), 0)
|
||||||
|
self.assertEqual(ResponseCache.parse_ttl(cookie_resp, "https://example.com/logo.png"), 0)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user