feat: increase fetch interval to 10 minutes for PublicReader and TelegramReader to optimize resource usage

This commit is contained in:
Sarto
2026-04-04 02:26:27 +03:30
parent e7a514e82c
commit f3194a3191
3 changed files with 33 additions and 15 deletions
+25 -7
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/miekg/dns"
@@ -19,6 +20,7 @@ type ResolverChecker struct {
fetcher *Fetcher
timeout time.Duration
logFunc LogFunc
started atomic.Bool // guards against double-start
}
// NewResolverChecker creates a health checker for the resolvers in fetcher.
@@ -48,10 +50,14 @@ func (rc *ResolverChecker) Start(ctx context.Context) {
// StartAndNotify is like Start but calls onFirstDone (if non-nil) after the
// initial health-check pass finishes, before the periodic ticker begins.
// This lets callers sequence "DNS scan → metadata fetch" without races.
// Safe to call only once per checker instance; subsequent calls are no-ops.
func (rc *ResolverChecker) StartAndNotify(ctx context.Context, onFirstDone func()) {
if !rc.started.CompareAndSwap(false, true) {
return // already started — prevent duplicate scan goroutines
}
go func() {
rc.CheckNow()
if onFirstDone != nil {
rc.CheckNow(ctx)
if onFirstDone != nil && ctx.Err() == nil {
onFirstDone()
}
ticker := time.NewTicker(30 * time.Minute)
@@ -61,14 +67,15 @@ func (rc *ResolverChecker) StartAndNotify(ctx context.Context, onFirstDone func(
case <-ctx.Done():
return
case <-ticker.C:
rc.CheckNow()
rc.CheckNow(ctx)
}
}
}()
}
// CheckNow runs a single resolver health-check pass immediately.
func (rc *ResolverChecker) CheckNow() {
// ctx is used to abort in-flight probes early (e.g. when a profile is switched).
func (rc *ResolverChecker) CheckNow(ctx context.Context) {
resolvers := rc.fetcher.AllResolvers()
if len(resolvers) == 0 {
return
@@ -84,13 +91,17 @@ func (rc *ResolverChecker) CheckNow() {
sem := make(chan struct{}, 10) // probe up to 10 resolvers concurrently
for _, r := range resolvers {
// Stop launching new probes if context was cancelled.
if ctx.Err() != nil {
break
}
wg.Add(1)
go func(r string) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
ok := rc.checkOne(r)
ok := rc.checkOne(ctx, r)
mu.Lock()
if ok {
healthy = append(healthy, r)
@@ -105,6 +116,10 @@ func (rc *ResolverChecker) CheckNow() {
}
wg.Wait()
if ctx.Err() != nil {
return // context cancelled — don't update resolver list
}
rc.fetcher.SetActiveResolvers(healthy)
if len(healthy) == 0 {
rc.log("Resolver check done: 0/%d healthy", len(resolvers))
@@ -118,7 +133,7 @@ func (rc *ResolverChecker) CheckNow() {
// checkOne probes a single resolver by sending a metadata channel query
// (channel 0, block 0). A successful DNS response (any rcode that isn't a
// network/timeout error) means the resolver is reachable and understands the domain.
func (rc *ResolverChecker) checkOne(resolver string) bool {
func (rc *ResolverChecker) checkOne(ctx context.Context, resolver string) bool {
if !strings.Contains(resolver, ":") {
resolver += ":53"
}
@@ -133,13 +148,16 @@ func (rc *ResolverChecker) checkOne(resolver string) bool {
return false
}
probeCtx, cancel := context.WithTimeout(ctx, rc.timeout)
defer cancel()
c := &dns.Client{Timeout: rc.timeout}
m := new(dns.Msg)
m.SetQuestion(dns.Fqdn(qname), dns.TypeTXT)
m.RecursionDesired = true
m.SetEdns0(4096, false)
resp, _, err := c.Exchange(m, resolver)
resp, _, err := c.ExchangeContext(probeCtx, m, resolver)
// We consider the resolver healthy if we get any DNS response back
// (even NXDOMAIN means the resolver forwarded the query to our server).
return err == nil && resp != nil
+3 -3
View File
@@ -59,9 +59,9 @@ func (pr *PublicReader) Run(ctx context.Context) error {
pr.feed.SetTelegramLoggedIn(false)
pr.fetchAll(ctx)
ticker := time.NewTicker(5 * time.Minute)
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
pr.feed.SetNextFetch(uint32(time.Now().Add(5 * time.Minute).Unix()))
pr.feed.SetNextFetch(uint32(time.Now().Add(10 * time.Minute).Unix()))
for {
select {
@@ -69,7 +69,7 @@ func (pr *PublicReader) Run(ctx context.Context) error {
return ctx.Err()
case <-ticker.C:
pr.fetchAll(ctx)
pr.feed.SetNextFetch(uint32(time.Now().Add(5 * time.Minute).Unix()))
pr.feed.SetNextFetch(uint32(time.Now().Add(10 * time.Minute).Unix()))
}
}
}
+5 -5
View File
@@ -140,10 +140,10 @@ func (tr *TelegramReader) Run(ctx context.Context) error {
tr.fetchAll(ctx, api)
// Periodic fetch loop
ticker := time.NewTicker(5 * time.Minute)
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
tr.feed.SetNextFetch(uint32(time.Now().Add(5 * time.Minute).Unix()))
tr.feed.SetNextFetch(uint32(time.Now().Add(10 * time.Minute).Unix()))
for {
select {
@@ -151,15 +151,15 @@ func (tr *TelegramReader) Run(ctx context.Context) error {
return ctx.Err()
case <-ticker.C:
tr.fetchAll(ctx, api)
tr.feed.SetNextFetch(uint32(time.Now().Add(5 * time.Minute).Unix()))
tr.feed.SetNextFetch(uint32(time.Now().Add(10 * time.Minute).Unix()))
case <-tr.refreshCh:
// Invalidate cache so fetchAll re-fetches everything.
tr.mu.Lock()
tr.cache = make(map[string]cachedMessages)
tr.mu.Unlock()
tr.fetchAll(ctx, api)
ticker.Reset(5 * time.Minute)
tr.feed.SetNextFetch(uint32(time.Now().Add(5 * time.Minute).Unix()))
ticker.Reset(10 * time.Minute)
tr.feed.SetNextFetch(uint32(time.Now().Add(10 * time.Minute).Unix()))
}
}
})