diff --git a/README.md b/README.md index dcd139b..9365727 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,9 @@ DNS-based feed reader for Telegram channels. Designed for environments where onl - Browser-based web UI with RTL/Farsi support (VazirMatn font) - Configure via the web UI — no CLI flags needed - Sends encrypted DNS TXT queries via available resolvers +- **Resolver scoring**: tracks per-resolver success rate and latency; healthier resolvers are preferred automatically +- **Scatter mode**: fans out the same DNS request to multiple resolvers simultaneously and uses the fastest response (default: 2 concurrent resolvers per request) +- **1-hour localStorage cache**: channel list and messages are cached in the browser — reopening the app shows cached data instantly while a fresh fetch runs in the background - Send messages to channels and private chats (requires server `--allow-manage`) - Channel management (add/remove channels remotely via admin commands) - Message compression (deflate) for efficient transfer @@ -39,8 +42,9 @@ DNS-based feed reader for Telegram channels. Designed for environments where onl - Variable response and query sizes to prevent fingerprinting - Multiple query encoding modes for stealth -- Resolver shuffling and rate limiting -- Background noise traffic +- **Resolver scoring**: per-resolver success-rate + latency scoreboard; high-scoring resolvers are picked more often via weighted-random selection +- **Scatter mode**: same block fetched from N resolvers simultaneously, first response wins — faster fetches and implicit failover +- Rate limiting and background noise traffic to blend in - Message compression to minimize query count ## Protocol @@ -114,7 +118,7 @@ make build-server All data files (session, channels) are stored in the `--data-dir` directory (default: `./data`). -Environment variables: `THEFEED_DOMAIN`, `THEFEED_KEY`, `TELEGRAM_API_ID`, `TELEGRAM_API_HASH`, `TELEGRAM_PHONE`, `TELEGRAM_PASSWORD` +Environment variables: `THEFEED_DOMAIN`, `THEFEED_KEY`, `THEFEED_MSG_LIMIT`, `THEFEED_ALLOW_MANAGE` (set to `0` to force-disable even if the flag is baked into the service), `TELEGRAM_API_ID`, `TELEGRAM_API_HASH`, `TELEGRAM_PHONE`, `TELEGRAM_PASSWORD` #### Server Flags @@ -165,6 +169,8 @@ All configuration, cache, and data files are stored in the data directory. | `--password` | | Password for web UI (empty = no auth) | | `--version` | | Show version and exit | +The **concurrent requests (scatter)** setting and all other profile options (resolvers, rate limit, query mode, timeout) are configured through the web UI profile editor, not CLI flags. + #### Android (Termux) ```bash @@ -227,7 +233,8 @@ The browser-based UI has: - **Next-fetch timer**: countdown to next automatic refresh - **Media detection**: `[IMAGE]`, `[VIDEO]`, `[DOCUMENT]` tag highlighting - **Log panel** (bottom): live DNS query log -- **Settings modal**: configure domain, passphrase, resolvers, query mode, rate limit, timeout, debug mode +- **Settings modal**: configure domain, passphrase, resolvers, query mode, rate limit, concurrent requests (scatter), timeout, debug mode +- **Per-profile cache**: 1-hour browser cache so data is visible instantly on reopen ## Development diff --git a/android/app/src/main/java/com/thefeed/android/MainActivity.kt b/android/app/src/main/java/com/thefeed/android/MainActivity.kt index 6ed82ff..76254ba 100644 --- a/android/app/src/main/java/com/thefeed/android/MainActivity.kt +++ b/android/app/src/main/java/com/thefeed/android/MainActivity.kt @@ -19,6 +19,7 @@ import android.webkit.JsResult import android.webkit.WebChromeClient import android.app.AlertDialog import androidx.activity.ComponentActivity +import androidx.activity.OnBackPressedCallback import androidx.activity.result.contract.ActivityResultContracts import androidx.core.content.ContextCompat import androidx.core.view.ViewCompat @@ -57,10 +58,25 @@ class MainActivity : ComponentActivity() { requestNotificationPermission() configureWebView() + registerBackHandler() startThefeedService() waitForServerThenLoad() } + private fun registerBackHandler() { + onBackPressedDispatcher.addCallback(this, object : OnBackPressedCallback(true) { + override fun handleOnBackPressed() { + if (webView.canGoBack()) { + webView.goBack() + } else { + // No WebView history to go back to — move app to background + // instead of finishing the activity (keeps the service alive). + moveTaskToBack(true) + } + } + }) + } + private fun requestNotificationPermission() { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) { if (ContextCompat.checkSelfPermission(this, Manifest.permission.POST_NOTIFICATIONS) diff --git a/cmd/client/main.go b/cmd/client/main.go index 906e0f6..ff39956 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -17,6 +17,10 @@ func main() { port := flag.Int("port", 8080, "Web UI port") password := flag.String("password", "", "Admin password for web UI (empty = no auth)") showVersion := flag.Bool("version", false, "Show version and exit") + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "thefeed-client %s\n\nWeb UI for reading thefeed content over DNS.\n\nUsage:\n thefeed-client [flags]\n\nFlags:\n", version.Version) + flag.PrintDefaults() + } flag.Parse() if *showVersion { diff --git a/cmd/server/main.go b/cmd/server/main.go index 61fea5f..252d224 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -35,6 +35,10 @@ func main() { msgLimit := flag.Int("msg-limit", 15, "Maximum messages to fetch per Telegram channel") allowManage := flag.Bool("allow-manage", false, "Allow remote channel management and sending via DNS") showVersion := flag.Bool("version", false, "Show version and exit") + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "thefeed-server %s\n\nServes Telegram channel content over encrypted DNS for censorship-resistant access.\n\nUsage:\n thefeed-server [flags]\n\nFlags:\n", version.Version) + flag.PrintDefaults() + } flag.Parse() if *showVersion { diff --git a/internal/client/fetcher.go b/internal/client/fetcher.go index 7e8b94a..ef0591c 100644 --- a/internal/client/fetcher.go +++ b/internal/client/fetcher.go @@ -6,8 +6,10 @@ import ( "encoding/binary" "fmt" "math/rand" + "sort" "strings" "sync" + "sync/atomic" "time" "github.com/miekg/dns" @@ -26,6 +28,32 @@ var noiseDomains = []string{ "www.wikipedia.org", "www.reddit.com", "www.twitter.com", } +// resolverStat tracks per-resolver health metrics; fields are accessed with sync/atomic. +type resolverStat struct { + success int64 // number of successful queries + failure int64 // number of failed queries + totalMs int64 // sum of latency in milliseconds over successful queries +} + +func (s *resolverStat) score() float64 { + success := atomic.LoadInt64(&s.success) + failure := atomic.LoadInt64(&s.failure) + totalMs := atomic.LoadInt64(&s.totalMs) + total := success + failure + if total == 0 { + return 1.0 // no data yet → neutral weight + } + successRate := float64(success) / float64(total) + var avgMs float64 + if success > 0 { + avgMs = float64(totalMs) / float64(success) + } else { + avgMs = 30000 // 30 s effective penalty for 0% success resolvers + } + // Higher success rate + lower latency → higher score. + return successRate / (avgMs/1000.0 + 0.1) +} + // Fetcher fetches feed blocks over DNS. type Fetcher struct { domain string @@ -46,6 +74,17 @@ type Fetcher struct { debug bool logFunc LogFunc + + // Resolver scoring: per-resolver success/failure counters and latency. + stats sync.Map // string (resolver:port) -> *resolverStat + + // scatter is how many resolvers to query simultaneously per DNS block request. + // 1 = sequential (no scatter), 2+ = fan-out (use fastest response). + scatter int + + // exchangeFn is the function used to send a DNS message to a resolver. + // It defaults to a real dns.Client exchange and can be replaced in tests. + exchangeFn func(ctx context.Context, m *dns.Msg, addr string) (*dns.Msg, time.Duration, error) } // NewFetcher creates a new DNS block fetcher. @@ -58,15 +97,22 @@ func NewFetcher(domain, passphrase string, resolvers []string) (*Fetcher, error) r := make([]string, len(resolvers)) copy(r, resolvers) - return &Fetcher{ - domain: strings.TrimSuffix(domain, "."), - queryKey: qk, - responseKey: rk, - queryMode: protocol.QuerySingleLabel, - allResolvers: r, - activeResolvers: r, - timeout: 15 * time.Second, - }, nil + f := &Fetcher{ + domain: strings.TrimSuffix(domain, "."), + queryKey: qk, + responseKey: rk, + queryMode: protocol.QuerySingleLabel, + allResolvers: r, + // activeResolvers starts empty — the ResolverChecker fills it in after + // the first health-check scan so no fetch is attempted with unvalidated resolvers. + timeout: 25 * time.Second, + scatter: 2, // query 2 resolvers in parallel by default + } + f.exchangeFn = func(ctx context.Context, m *dns.Msg, addr string) (*dns.Msg, time.Duration, error) { + c := &dns.Client{Timeout: f.timeout, Net: "udp"} + return c.ExchangeContext(ctx, m, addr) + } + return f, nil } // SetRateLimit sets the maximum queries per second (0 = unlimited). Must be called before Start. @@ -130,6 +176,169 @@ func (f *Fetcher) Resolvers() []string { return result } +// SetScatter sets the number of resolvers queried simultaneously per DNS block request. +// 1 = sequential (no scatter). Values > 1 fan out to N resolvers and use the fastest response. +// Must be called before Start(). +func (f *Fetcher) SetScatter(n int) { + if n < 1 { + n = 1 + } + f.scatter = n +} + +// RecordSuccess records a successful DNS query for the given resolver. +func (f *Fetcher) RecordSuccess(resolver string, latency time.Duration) { + if !strings.Contains(resolver, ":") { + resolver += ":53" + } + v, _ := f.stats.LoadOrStore(resolver, &resolverStat{}) + s := v.(*resolverStat) + atomic.AddInt64(&s.success, 1) + atomic.AddInt64(&s.totalMs, latency.Milliseconds()) +} + +// RecordFailure records a failed DNS query for the given resolver. +func (f *Fetcher) RecordFailure(resolver string) { + if !strings.Contains(resolver, ":") { + resolver += ":53" + } + v, _ := f.stats.LoadOrStore(resolver, &resolverStat{}) + s := v.(*resolverStat) + atomic.AddInt64(&s.failure, 1) +} + +// resolverScore returns the health score for a resolver (higher = better). +func (f *Fetcher) resolverScore(resolver string) float64 { + key := resolver + if !strings.Contains(key, ":") { + key += ":53" + } + if v, ok := f.stats.Load(key); ok { + return v.(*resolverStat).score() + } + return 1.0 // no data yet → neutral weight +} + +// pickWeightedResolvers picks up to n resolvers from the active pool using +// weighted-random selection (higher score → more likely to be chosen). +func (f *Fetcher) pickWeightedResolvers(n int) []string { + resolvers := f.Resolvers() + if len(resolvers) == 0 { + return nil + } + if n <= 0 { + n = 1 + } + if n >= len(resolvers) { + // Return all resolvers sorted by score descending. + type scored struct { + r string + s float64 + } + ss := make([]scored, len(resolvers)) + for i, r := range resolvers { + ss[i] = scored{r, f.resolverScore(r)} + } + sort.Slice(ss, func(i, j int) bool { return ss[i].s > ss[j].s }) + out := make([]string, len(ss)) + for i, s := range ss { + out[i] = s.r + } + return out + } + // Weighted random sampling without replacement. + weights := make([]float64, len(resolvers)) + total := 0.0 + for i, r := range resolvers { + w := f.resolverScore(r) + if w < 0.001 { + w = 0.001 // every resolver keeps a minimal chance + } + weights[i] = w + total += w + } + picked := make([]string, 0, n) + for len(picked) < n && total > 0 { + r := rand.Float64() * total + cumul := 0.0 + chosen := -1 + for i, w := range weights { + if w == 0 { + continue + } + cumul += w + if r < cumul { + chosen = i + break + } + } + if chosen < 0 { + // Floating-point edge case: pick last non-zero entry. + for i := len(weights) - 1; i >= 0; i-- { + if weights[i] > 0 { + chosen = i + break + } + } + } + if chosen < 0 { + break + } + picked = append(picked, resolvers[chosen]) + total -= weights[chosen] + weights[chosen] = 0 + } + return picked +} + +// scatterQuery sends qname to all given resolvers concurrently and returns +// the first successful response. The winning response cancels the others. +func (f *Fetcher) scatterQuery(ctx context.Context, resolvers []string, qname string) ([]byte, error) { + if len(resolvers) == 1 { + return f.queryResolver(ctx, resolvers[0], qname) + } + type result struct { + data []byte + err error + } + resultCh := make(chan result, len(resolvers)) + subCtx, cancel := context.WithCancel(ctx) + defer cancel() + for i, r := range resolvers { + go func(resolver string, idx int) { + // Stagger launches: first resolver fires immediately, others wait + // a random 50–300 ms to avoid a simultaneous burst. + if idx > 0 { + jitter := time.Duration(50+rand.Intn(250)) * time.Millisecond + select { + case <-time.After(jitter): + case <-subCtx.Done(): + return + } + } + data, err := f.queryResolver(subCtx, resolver, qname) + select { + case resultCh <- result{data, err}: + case <-subCtx.Done(): + } + }(r, i) + } + var lastErr error + for i := 0; i < len(resolvers); i++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case r := <-resultCh: + if r.err == nil { + cancel() // cancel remaining in-flight queries + return r.data, nil + } + lastErr = r.err + } + } + return nil, lastErr +} + // Start launches background goroutines (rate limiter and noise generator). // ctx controls their lifetime — cancel it to cleanly stop them. // Call once per fetcher configuration; creating a new fetcher replaces the old one. @@ -244,7 +453,7 @@ func (f *Fetcher) rateWait(ctx context.Context) error { // It enqueues through the rate limiter and respects ctx cancellation. // On transient failure it retries up to 2 additional times with a short back-off. func (f *Fetcher) FetchBlock(ctx context.Context, channel, block uint16) ([]byte, error) { - const maxAttempts = 10 + const maxAttempts = 20 var lastErr error for attempt := 0; attempt < maxAttempts; attempt++ { if attempt > 0 { @@ -268,31 +477,23 @@ func (f *Fetcher) FetchBlock(ctx context.Context, channel, block uint16) ([]byte f.log("[debug] query ch=%d blk=%d attempt=%d qname=%s", channel, block, attempt+1, qname) } - resolvers := f.Resolvers() - if len(resolvers) == 0 { + scatter := f.scatter + if scatter < 1 { + scatter = 1 + } + picked := f.pickWeightedResolvers(scatter) + if len(picked) == 0 { return nil, fmt.Errorf("no active resolvers") } - // Shuffle to spread load across resolvers. - shuffled := make([]string, len(resolvers)) - copy(shuffled, resolvers) - rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) - - for _, resolver := range shuffled { - if ctx.Err() != nil { - return nil, ctx.Err() - } - data, err := f.queryResolver(ctx, resolver, qname) - if err != nil { - lastErr = err - continue - } + data, err := f.scatterQuery(ctx, picked, qname) + if err == nil { if f.debug { f.log("[debug] response ch=%d blk=%d len=%d", channel, block, len(data)) } return data, nil } - lastErr = fmt.Errorf("all resolvers failed: %w", lastErr) + lastErr = fmt.Errorf("scatter query failed: %w", err) if attempt+1 < maxAttempts { f.log("block ch=%d blk=%d attempt %d/%d failed, retrying: %v", channel, block, attempt+1, maxAttempts, lastErr) } @@ -418,22 +619,33 @@ func (f *Fetcher) queryResolver(ctx context.Context, resolver, qname string) ([] resolver += ":53" } + start := time.Now() resp, err := f.exchangeResolver(ctx, resolver, qname) + latency := time.Since(start) if err != nil { + f.RecordFailure(resolver) return nil, err } if resp.Rcode != dns.RcodeSuccess { + f.RecordFailure(resolver) return nil, fmt.Errorf("dns error from %s: %s", resolver, dns.RcodeToString[resp.Rcode]) } for _, ans := range resp.Answer { if txt, ok := ans.(*dns.TXT); ok { encoded := strings.Join(txt.Txt, "") - return protocol.DecodeResponse(f.responseKey, encoded) + data, err := protocol.DecodeResponse(f.responseKey, encoded) + if err != nil { + f.RecordFailure(resolver) + return nil, err + } + f.RecordSuccess(resolver, latency) + return data, nil } } + f.RecordFailure(resolver) return nil, fmt.Errorf("no TXT record in response from %s", resolver) } @@ -441,14 +653,12 @@ func (f *Fetcher) exchangeResolver(ctx context.Context, resolver, qname string) resolverCtx, cancel := context.WithTimeout(ctx, f.timeout) defer cancel() - c := &dns.Client{Timeout: f.timeout, Net: "udp"} - m := new(dns.Msg) m.SetQuestion(dns.Fqdn(qname), dns.TypeTXT) m.RecursionDesired = true m.SetEdns0(4096, false) - resp, _, err := c.ExchangeContext(resolverCtx, m, resolver) + resp, _, err := f.exchangeFn(resolverCtx, m, resolver) if err != nil { return nil, fmt.Errorf("dns exchange with %s: %w", resolver, err) } diff --git a/internal/client/fetcher_test.go b/internal/client/fetcher_test.go index d8864ad..32aabe9 100644 --- a/internal/client/fetcher_test.go +++ b/internal/client/fetcher_test.go @@ -1,26 +1,217 @@ package client -import "testing" +import ( + "context" + "fmt" + "testing" + "time" -func TestSetActiveResolversAllowsEmpty(t *testing.T) { - fetcher, err := NewFetcher("t.example.com", "test-passphrase", []string{"1.1.1.1:53", "8.8.8.8:53"}) + "github.com/miekg/dns" + "github.com/sartoopjj/thefeed/internal/protocol" +) + +// mockExchange returns a factory for exchangeFn that records calls and +// returns either a successful TXT response (encoded payload) or an error. +// +// When payload is non-nil the mock builds a valid encrypted TXT record using +// the fetcher's responseKey so that queryResolver can decode it correctly. +// When payload is nil the mock returns errFn(addr). +func mockExchange(f *Fetcher, payload []byte, errFn func(addr string) error) func(context.Context, *dns.Msg, string) (*dns.Msg, time.Duration, error) { + return func(ctx context.Context, m *dns.Msg, addr string) (*dns.Msg, time.Duration, error) { + if err := ctx.Err(); err != nil { + return nil, 0, err + } + if errFn != nil { + if err := errFn(addr); err != nil { + return nil, 0, err + } + } + resp := new(dns.Msg) + resp.SetReply(m) + resp.Rcode = dns.RcodeSuccess + if payload != nil { + encoded, encErr := protocol.EncodeResponse(f.responseKey, payload, 0) + if encErr != nil { + return nil, 0, encErr + } + resp.Answer = []dns.RR{&dns.TXT{ + Hdr: dns.RR_Header{Name: m.Question[0].Name, Rrtype: dns.TypeTXT, Class: dns.ClassINET, Ttl: 0}, + Txt: []string{encoded}, + }} + } + return resp, time.Millisecond, nil + } +} + +func newTestFetcher(t *testing.T, resolvers []string) *Fetcher { + t.Helper() + f, err := NewFetcher("t.example.com", "test-passphrase", resolvers) if err != nil { t.Fatalf("NewFetcher: %v", err) } - fetcher.SetActiveResolvers(nil) - if got := fetcher.Resolvers(); len(got) != 0 { + // Simulate the resolver scanner having validated all provided resolvers. + f.SetActiveResolvers(resolvers) + // Block all real DNS traffic by default. + f.exchangeFn = func(_ context.Context, _ *dns.Msg, addr string) (*dns.Msg, time.Duration, error) { + return nil, 0, fmt.Errorf("real DNS blocked in tests (resolver: %s)", addr) + } + return f +} + +func TestSetActiveResolversAllowsEmpty(t *testing.T) { + f := newTestFetcher(t, []string{"1.1.1.1:53", "8.8.8.8:53"}) + f.SetActiveResolvers(nil) + if got := f.Resolvers(); len(got) != 0 { t.Fatalf("len(Resolvers()) = %d, want 0", len(got)) } } func TestSetActiveResolversReplacesPool(t *testing.T) { - fetcher, err := NewFetcher("t.example.com", "test-passphrase", []string{"1.1.1.1:53", "8.8.8.8:53"}) - if err != nil { - t.Fatalf("NewFetcher: %v", err) - } - fetcher.SetActiveResolvers([]string{"9.9.9.9:53"}) - got := fetcher.Resolvers() + f := newTestFetcher(t, []string{"1.1.1.1:53", "8.8.8.8:53"}) + f.SetActiveResolvers([]string{"9.9.9.9:53"}) + got := f.Resolvers() if len(got) != 1 || got[0] != "9.9.9.9:53" { t.Fatalf("Resolvers() = %v, want [9.9.9.9:53]", got) } } + +// TestResolverScoreNoData checks that a resolver with no recorded stats gets neutral weight 1.0. +func TestResolverScoreNoData(t *testing.T) { + f := newTestFetcher(t, []string{"1.1.1.1:53"}) + if got := f.resolverScore("1.1.1.1:53"); got != 1.0 { + t.Fatalf("resolverScore with no data = %v, want 1.0", got) + } +} + +// TestResolverScoreSuccessBeatsFailure checks that a 100% success resolver +// scores higher than a 100% failure resolver. +func TestResolverScoreSuccessBeatsFailure(t *testing.T) { + f := newTestFetcher(t, []string{"1.1.1.1:53", "8.8.8.8:53"}) + f.RecordSuccess("1.1.1.1:53", 50*time.Millisecond) + f.RecordFailure("8.8.8.8:53") + good := f.resolverScore("1.1.1.1:53") + bad := f.resolverScore("8.8.8.8:53") + if good <= bad { + t.Fatalf("expected good resolver (%v) to score higher than bad (%v)", good, bad) + } +} + +// TestResolverScoreFasterBeatsSlower checks that an equal-success resolver +// with lower latency scores higher. +func TestResolverScoreFasterBeatsSlower(t *testing.T) { + f := newTestFetcher(t, []string{"1.1.1.1:53", "8.8.8.8:53"}) + f.RecordSuccess("1.1.1.1:53", 10*time.Millisecond) + f.RecordSuccess("8.8.8.8:53", 500*time.Millisecond) + fast := f.resolverScore("1.1.1.1:53") + slow := f.resolverScore("8.8.8.8:53") + if fast <= slow { + t.Fatalf("expected fast resolver (%v) to score higher than slow (%v)", fast, slow) + } +} + +// TestPickWeightedResolversReturnsN checks that pickWeightedResolvers returns +// at most n distinct resolvers. +func TestPickWeightedResolversReturnsN(t *testing.T) { + resolvers := []string{"1.1.1.1:53", "8.8.8.8:53", "9.9.9.9:53", "208.67.222.222:53"} + f := newTestFetcher(t, resolvers) + f.SetActiveResolvers(resolvers) + picked := f.pickWeightedResolvers(2) + if len(picked) != 2 { + t.Fatalf("pickWeightedResolvers(2) returned %d items, want 2", len(picked)) + } + seen := map[string]bool{} + for _, r := range picked { + if seen[r] { + t.Fatalf("pickWeightedResolvers returned duplicate resolver %s", r) + } + seen[r] = true + } +} + +// TestPickWeightedResolversMoreThanAvailable returns all when n > pool size. +func TestPickWeightedResolversMoreThanAvailable(t *testing.T) { + f := newTestFetcher(t, []string{"1.1.1.1:53"}) + picked := f.pickWeightedResolvers(5) + if len(picked) != 1 { + t.Fatalf("expected 1 resolver when pool has 1, got %d", len(picked)) + } +} + +// TestScatterQuerySuccess checks that scatterQuery returns data when +// the mock exchange responds successfully. +func TestScatterQuerySuccess(t *testing.T) { + f := newTestFetcher(t, []string{"1.1.1.1:53", "8.8.8.8:53"}) + want := []byte("hello") + f.exchangeFn = mockExchange(f, want, nil) + + ctx := context.Background() + got, err := f.scatterQuery(ctx, []string{"1.1.1.1:53"}, "test.t.example.com.") + if err != nil { + t.Fatalf("scatterQuery: unexpected error: %v", err) + } + if string(got) != string(want) { + t.Fatalf("scatterQuery returned %q, want %q", got, want) + } +} + +// TestScatterQueryUsesFirstResponse checks that when multiple resolvers respond, +// the first successful answer wins and the call returns without error. +func TestScatterQueryUsesFirstResponse(t *testing.T) { + f := newTestFetcher(t, []string{"1.1.1.1:53", "8.8.8.8:53"}) + want := []byte("winner") + f.exchangeFn = mockExchange(f, want, nil) + + ctx := context.Background() + got, err := f.scatterQuery(ctx, []string{"1.1.1.1:53", "8.8.8.8:53"}, "test.t.example.com.") + if err != nil { + t.Fatalf("scatterQuery: unexpected error: %v", err) + } + if string(got) != string(want) { + t.Fatalf("scatterQuery returned %q, want %q", got, want) + } +} + +// TestScatterQueryAllFail checks that scatterQuery returns an error when +// all resolvers fail. +func TestScatterQueryAllFail(t *testing.T) { + f := newTestFetcher(t, []string{"1.1.1.1:53", "8.8.8.8:53"}) + f.exchangeFn = mockExchange(f, nil, func(addr string) error { + return fmt.Errorf("connection refused from %s", addr) + }) + + ctx := context.Background() + _, err := f.scatterQuery(ctx, []string{"1.1.1.1:53", "8.8.8.8:53"}, "test.t.example.com.") + if err == nil { + t.Fatal("expected error when all resolvers fail, got nil") + } +} + +// TestScatterQueryContextCancel checks that scatterQuery respects context cancellation. +func TestScatterQueryContextCancel(t *testing.T) { + f := newTestFetcher(t, []string{"1.1.1.1:53"}) + // Block forever until context is cancelled. + f.exchangeFn = func(ctx context.Context, _ *dns.Msg, _ string) (*dns.Msg, time.Duration, error) { + <-ctx.Done() + return nil, 0, ctx.Err() + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + _, err := f.scatterQuery(ctx, []string{"1.1.1.1:53"}, "test.t.example.com.") + if err == nil { + t.Fatal("expected error after context cancel, got nil") + } +} + +// TestSetScatter validates that SetScatter clamps values < 1 to 1. +func TestSetScatter(t *testing.T) { + f := newTestFetcher(t, []string{"1.1.1.1:53"}) + f.SetScatter(0) // should clamp to 1 + if f.scatter != 1 { + t.Fatalf("scatter = %d after SetScatter(0), want 1", f.scatter) + } + f.SetScatter(3) + if f.scatter != 3 { + t.Fatalf("scatter = %d after SetScatter(3), want 3", f.scatter) + } +} diff --git a/internal/client/resolver.go b/internal/client/resolver.go index 6698d84..880b124 100644 --- a/internal/client/resolver.go +++ b/internal/client/resolver.go @@ -24,10 +24,10 @@ type ResolverChecker struct { } // NewResolverChecker creates a health checker for the resolvers in fetcher. -// timeout is the per-probe deadline; 0 uses a 5-second default. +// timeout is the per-probe deadline; 0 uses a 15-second default. func NewResolverChecker(fetcher *Fetcher, timeout time.Duration) *ResolverChecker { if timeout <= 0 { - timeout = 10 * time.Second + timeout = 15 * time.Second } return &ResolverChecker{ fetcher: fetcher, @@ -48,18 +48,38 @@ func (rc *ResolverChecker) Start(ctx context.Context) { } // StartAndNotify is like Start but calls onFirstDone (if non-nil) after the -// initial health-check pass finishes, before the periodic ticker begins. -// This lets callers sequence "DNS scan → metadata fetch" without races. +// first successful health-check pass (i.e. at least one resolver is healthy), +// before the periodic ticker begins. +// If the initial scan finds zero healthy resolvers it retries every minute +// until at least one resolver becomes reachable (or ctx is cancelled). // Safe to call only once per checker instance; subsequent calls are no-ops. func (rc *ResolverChecker) StartAndNotify(ctx context.Context, onFirstDone func()) { if !rc.started.CompareAndSwap(false, true) { return // already started — prevent duplicate scan goroutines } go func() { - rc.CheckNow(ctx) + // Keep scanning every minute until we find at least one healthy resolver. + for { + rc.CheckNow(ctx) + if ctx.Err() != nil { + return + } + if len(rc.fetcher.Resolvers()) > 0 { + break // at least one resolver is up — proceed normally + } + rc.log("No healthy resolvers found — retrying in 1 minute...") + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Minute): + } + } + if onFirstDone != nil && ctx.Err() == nil { onFirstDone() } + + // Periodic re-check every 30 minutes. ticker := time.NewTicker(30 * time.Minute) defer ticker.Stop() for { @@ -68,6 +88,23 @@ func (rc *ResolverChecker) StartAndNotify(ctx context.Context, onFirstDone func( return case <-ticker.C: rc.CheckNow(ctx) + // If the periodic check leaves us with no resolvers, + // fall back into the retry-every-minute loop. + if ctx.Err() == nil && len(rc.fetcher.Resolvers()) == 0 { + rc.log("All resolvers lost — scanning every minute until one recovers...") + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Minute): + } + rc.CheckNow(ctx) + if ctx.Err() != nil || len(rc.fetcher.Resolvers()) > 0 { + break + } + rc.log("Still no healthy resolvers — retrying in 1 minute...") + } + } } } }() @@ -110,7 +147,7 @@ func (rc *ResolverChecker) CheckNow(ctx context.Context) { rc.log("Resolver failed: %s", r) } done++ - rc.log("RESOLVER_SCAN progress %d/%d", done, total) + rc.log("RESOLVER_SCAN progress %d/%d healthy=%d", done, total, len(healthy)) mu.Unlock() }(r) } @@ -131,8 +168,11 @@ func (rc *ResolverChecker) CheckNow(ctx context.Context) { } // checkOne probes a single resolver by sending a metadata channel query -// (channel 0, block 0). A successful DNS response (any rcode that isn't a -// network/timeout error) means the resolver is reachable and understands the domain. +// (channel 0, block 0). A resolver is considered healthy only if it returns +// a DNS response containing at least one TXT record that can be decoded with +// the fetcher's response key — the same bar as a real data fetch. +// This filters out resolvers that are reachable but strip TXT records, or +// that resolve the domain through a path that doesn't reach the thefeed server. func (rc *ResolverChecker) checkOne(ctx context.Context, resolver string) bool { if !strings.Contains(resolver, ":") { resolver += ":53" @@ -157,10 +197,27 @@ func (rc *ResolverChecker) checkOne(ctx context.Context, resolver string) bool { m.RecursionDesired = true m.SetEdns0(4096, false) + start := time.Now() resp, _, err := c.ExchangeContext(probeCtx, m, resolver) - // We consider the resolver healthy if we get any DNS response back - // (even NXDOMAIN means the resolver forwarded the query to our server). - return err == nil && resp != nil + latency := time.Since(start) + if err != nil || resp == nil { + rc.fetcher.RecordFailure(resolver) + return false + } + + // Require a decodable TXT record — same check as a real fetch. + for _, ans := range resp.Answer { + if txt, ok := ans.(*dns.TXT); ok { + encoded := strings.Join(txt.Txt, "") + if _, decErr := protocol.DecodeResponse(rc.fetcher.responseKey, encoded); decErr == nil { + rc.fetcher.RecordSuccess(resolver, latency) + return true + } + } + } + + rc.fetcher.RecordFailure(resolver) + return false } func (rc *ResolverChecker) log(format string, args ...any) { diff --git a/internal/web/static/index.html b/internal/web/static/index.html index f5dfb26..7be20d5 100644 --- a/internal/web/static/index.html +++ b/internal/web/static/index.html @@ -75,7 +75,7 @@ input,textarea,select{font-family:inherit} .messages{flex:1;overflow-y:auto;padding:10px 14px;display:flex;flex-direction:column;gap:4px;direction:ltr} .msg-date-sep{text-align:center;padding:8px 0;font-size:12px;color:var(--text-dim)} .msg-date-sep span{background:rgba(0,0,0,.3);padding:3px 10px;border-radius:10px} -.msg{max-width:min(82%,580px);padding:7px 10px 4px;border-radius:12px;line-height:1.7;word-break:break-word;white-space:pre-wrap;font-size:inherit;background:var(--msg-in);border:1px solid var(--border);align-self:flex-start;border-bottom-left-radius:4px} +.msg{max-width:min(82%,580px);padding:7px 10px 4px;border-radius:12px;line-height:1.7;word-break:break-word;white-space:pre-wrap;font-size:inherit;background:var(--msg-in);border:1px solid rgba(255,255,255,.07);align-self:flex-start;border-bottom-left-radius:4px} .msg-meta{display:flex;justify-content:flex-end;gap:6px;font-size:10px;color:var(--text-dim);margin-top:2px;direction:ltr} .media-tag{display:block;padding:2px 6px;border-radius:4px;background:rgba(51,144,236,.15);color:var(--accent);font-size:11px;margin-bottom:6px} @@ -334,6 +334,7 @@ html[dir=ltr] .active-badge{margin-left:0;margin-right:6px}
+