Merge pull request #53 from AMMIROSOH/AMMIROSOH/add-unittest

feat: Add unittest coverage
This commit is contained in:
Abolfazl Ghaemi
2026-05-11 16:06:04 +03:30
committed by GitHub
6 changed files with 262 additions and 0 deletions
+4
View File
@@ -51,3 +51,7 @@ jobs:
run: |
python main.py --help
python main.py --version
- name: Unit tests
run: |
python -m unittest discover -s tests -v
+1
View File
@@ -0,0 +1 @@
"""Test package for MasterHttpRelayVPN."""
+51
View File
@@ -0,0 +1,51 @@
import pathlib
import sys
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
from core.adblock import parse_hosts_text
class ParseHostsTextTests(unittest.TestCase):
def test_parses_hosts_and_bare_domains_and_deduplicates(self):
text = """
# comment
0.0.0.0 ads.example.com
127.0.0.1 tracker.example.com # inline comment
plain.example.org
ads.example.com
EXAMPLE.NET.
"""
self.assertEqual(
parse_hosts_text(text),
[
"ads.example.com",
"tracker.example.com",
"plain.example.org",
"example.net",
],
)
def test_skips_invalid_reserved_and_wildcard_entries(self):
text = """
localhost
localhost.localdomain
192.168.1.1
::1
analytics-*.example.com
invalid
bad_domain.example
host name.example.com
"""
self.assertEqual(parse_hosts_text(text), [])
if __name__ == "__main__":
unittest.main()
+41
View File
@@ -0,0 +1,41 @@
import gzip
import pathlib
import sys
import unittest
import zlib
ROOT = pathlib.Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
from core import codec
class CodecTests(unittest.TestCase):
def test_supported_encodings_always_include_gzip_and_deflate(self):
encodings = codec.supported_encodings()
self.assertIn("gzip", encodings)
self.assertIn("deflate", encodings)
def test_decode_passes_through_unknown_empty_and_invalid_payloads(self):
raw = b"plain-body"
self.assertIs(codec.decode(raw, ""), raw)
self.assertIs(codec.decode(raw, "identity"), raw)
self.assertIs(codec.decode(raw, "unknown"), raw)
self.assertEqual(codec.decode(raw, "gzip"), raw)
def test_decode_gzip_and_deflate(self):
raw = b"hello through compression"
gzip_body = gzip.compress(raw)
zlib_body = zlib.compress(raw)
raw_deflate_body = zlib.compress(raw, wbits=-zlib.MAX_WBITS)
self.assertEqual(codec.decode(gzip_body, "gzip"), raw)
self.assertEqual(codec.decode(zlib_body, "deflate"), raw)
self.assertEqual(codec.decode(raw_deflate_body, "deflate"), raw)
if __name__ == "__main__":
unittest.main()
+66
View File
@@ -0,0 +1,66 @@
import pathlib
import sys
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
from core.constants import FRONT_SNI_POOL_GOOGLE
from relay.fronting_support import (
build_sni_pool,
parse_content_range,
validate_range_response,
)
class BuildSniPoolTests(unittest.TestCase):
def test_uses_explicit_overrides_with_normalization_and_deduplication(self):
result = build_sni_pool(
"www.google.com",
[" Mail.Google.com ", "mail.google.com", "accounts.google.com."],
)
self.assertEqual(result, ["mail.google.com", "accounts.google.com"])
def test_google_front_domain_uses_google_pool(self):
result = build_sni_pool("www.google.com", None)
self.assertEqual(result, list(FRONT_SNI_POOL_GOOGLE))
def test_non_google_front_domain_falls_back_to_single_host(self):
self.assertEqual(build_sni_pool("cdn.example.com", None), ["cdn.example.com"])
self.assertEqual(build_sni_pool("", None), ["www.google.com"])
class RangeParsingTests(unittest.TestCase):
def test_parse_content_range_accepts_valid_values(self):
self.assertEqual(parse_content_range("bytes 10-19/20"), (10, 19, 20))
def test_parse_content_range_rejects_invalid_values(self):
self.assertIsNone(parse_content_range(""))
self.assertIsNone(parse_content_range("bytes 10-19/*"))
self.assertIsNone(parse_content_range("bytes 10-9/20"))
self.assertIsNone(parse_content_range("bytes 10-19/19"))
def test_validate_range_response_success_and_failures(self):
headers = {"content-range": "bytes 10-13/20"}
body = b"data"
self.assertIsNone(validate_range_response(206, headers, body, 10, 13, 20))
self.assertEqual(
validate_range_response(200, headers, body, 10, 13, 20),
"status 200",
)
self.assertEqual(
validate_range_response(206, {"content-range": "bytes 11-13/20"}, body, 10, 13, 20),
"Content-Range mismatch 11-13",
)
self.assertEqual(
validate_range_response(206, headers, b"abc", 10, 13, 20),
"short chunk 3/4 B",
)
if __name__ == "__main__":
unittest.main()
+99
View File
@@ -0,0 +1,99 @@
import pathlib
import sys
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
from core.constants import CACHE_TTL_STATIC_LONG
from proxy.proxy_support import (
ResponseCache,
has_unsupported_transfer_encoding,
host_matches_rules,
is_ip_literal,
load_host_rules,
parse_content_length,
)
class ProxySupportTests(unittest.TestCase):
def test_is_ip_literal_handles_ipv4_ipv6_and_hostnames(self):
self.assertTrue(is_ip_literal("127.0.0.1"))
self.assertTrue(is_ip_literal("[2001:db8::1]"))
self.assertFalse(is_ip_literal("example.com"))
def test_load_host_rules_and_match_suffixes(self):
rules = load_host_rules(["Example.com", ".example.org", "api.example.net."])
self.assertTrue(host_matches_rules("example.com", rules))
self.assertTrue(host_matches_rules("sub.example.org", rules))
self.assertTrue(host_matches_rules("api.example.net", rules))
self.assertFalse(host_matches_rules("example.org", rules))
self.assertFalse(host_matches_rules("other.test", rules))
def test_parse_content_length_uses_exact_header_name(self):
headers = (
b"HTTP/1.1 200 OK\r\n"
b"X-Content-Length: 999\r\n"
b"Content-Length: 42\r\n\r\n"
)
self.assertEqual(parse_content_length(headers), 42)
self.assertEqual(parse_content_length(b"HTTP/1.1 200 OK\r\n\r\n"), 0)
def test_has_unsupported_transfer_encoding(self):
self.assertFalse(
has_unsupported_transfer_encoding(
b"Transfer-Encoding: identity\r\n\r\n"
)
)
self.assertTrue(
has_unsupported_transfer_encoding(
b"Transfer-Encoding: chunked\r\n\r\n"
)
)
self.assertTrue(
has_unsupported_transfer_encoding(
b"Transfer-Encoding: gzip, chunked\r\n\r\n"
)
)
class ResponseCacheParseTtlTests(unittest.TestCase):
def test_static_asset_uses_long_ttl(self):
raw = (
b"HTTP/1.1 200 OK\r\n"
b"Content-Type: image/png\r\n\r\n"
b"body"
)
self.assertEqual(
ResponseCache.parse_ttl(raw, "https://example.com/logo.png"),
CACHE_TTL_STATIC_LONG,
)
def test_private_no_store_and_set_cookie_disable_caching(self):
private_resp = (
b"HTTP/1.1 200 OK\r\n"
b"Cache-Control: private, max-age=600\r\n\r\n"
b"body"
)
no_store_resp = (
b"HTTP/1.1 200 OK\r\n"
b"Cache-Control: no-store\r\n\r\n"
b"body"
)
cookie_resp = (
b"HTTP/1.1 200 OK\r\n"
b"Set-Cookie: sid=1\r\n"
b"Content-Type: image/png\r\n\r\n"
b"body"
)
self.assertEqual(ResponseCache.parse_ttl(private_resp, "https://example.com/app.js"), 0)
self.assertEqual(ResponseCache.parse_ttl(no_store_resp, "https://example.com/app.js"), 0)
self.assertEqual(ResponseCache.parse_ttl(cookie_resp, "https://example.com/logo.png"), 0)
if __name__ == "__main__":
unittest.main()