diff --git a/internal/client/fetcher.go b/internal/client/fetcher.go index ef0591c..9667714 100644 --- a/internal/client/fetcher.go +++ b/internal/client/fetcher.go @@ -120,6 +120,21 @@ func (f *Fetcher) SetRateLimit(qps float64) { f.rateQPS = qps } +// ScanConcurrency returns how many resolvers the scanner should probe in +// parallel, derived from the configured rate limit. +// Rule: concurrency = max(1, floor(rateQPS)). +// If rateQPS is 0 (unlimited), falls back to the default of 10. +func (f *Fetcher) ScanConcurrency() int { + if f.rateQPS <= 0 { + return 10 + } + n := int(f.rateQPS) + if n < 1 { + n = 1 + } + return n +} + // SetTimeout sets the per-query DNS timeout. func (f *Fetcher) SetTimeout(d time.Duration) { f.timeout = d @@ -146,6 +161,7 @@ func (f *Fetcher) SetActiveResolvers(resolvers []string) { defer f.mu.Unlock() f.activeResolvers = make([]string, len(resolvers)) copy(f.activeResolvers, resolvers) + f.log("active resolvers updated: %d/%d healthy", len(resolvers), len(f.allResolvers)) } // SetResolvers replaces the full resolver list and resets the active pool. @@ -344,6 +360,7 @@ func (f *Fetcher) scatterQuery(ctx context.Context, resolvers []string, qname st // Call once per fetcher configuration; creating a new fetcher replaces the old one. func (f *Fetcher) Start(ctx context.Context) { if f.rateQPS > 0 { + f.log("fetcher started: %d configured resolvers, rate=%.1f q/s, scatter=%d", len(f.allResolvers), f.rateQPS, f.scatter) f.rateCh = make(chan struct{}, 1) go f.runRateLimiter(ctx) go f.runNoise(ctx) @@ -473,9 +490,6 @@ func (f *Fetcher) FetchBlock(ctx context.Context, channel, block uint16) ([]byte if err != nil { return nil, fmt.Errorf("encode query: %w", err) } - if f.debug { - f.log("[debug] query ch=%d blk=%d attempt=%d qname=%s", channel, block, attempt+1, qname) - } scatter := f.scatter if scatter < 1 { @@ -485,6 +499,10 @@ func (f *Fetcher) FetchBlock(ctx context.Context, channel, block uint16) ([]byte if len(picked) == 0 { return nil, fmt.Errorf("no active resolvers") } + if f.debug { + f.log("[debug] query ch=%d blk=%d attempt=%d qname=%s resolvers=[%s]", + channel, block, attempt+1, qname, strings.Join(picked, ",")) + } data, err := f.scatterQuery(ctx, picked, qname) if err == nil { diff --git a/internal/client/resolver.go b/internal/client/resolver.go index 2cfe731..d429d15 100644 --- a/internal/client/resolver.go +++ b/internal/client/resolver.go @@ -3,6 +3,7 @@ package client import ( "context" "fmt" + "math/rand" "strings" "sync" "sync/atomic" @@ -20,6 +21,7 @@ type ResolverChecker struct { fetcher *Fetcher timeout time.Duration logFunc LogFunc + onScanDone func([]string) // called after each completed scan with healthy resolvers started atomic.Bool // guards against double-start scanMu sync.Mutex // protects scanCancel scanRunMu sync.Mutex // only one CheckNow at a time (via TryLock) @@ -43,6 +45,12 @@ func (rc *ResolverChecker) SetLogFunc(fn LogFunc) { rc.logFunc = fn } +// SetOnScanDone registers a callback invoked after each completed CheckNow pass +// with the list of healthy resolver addresses. Not called when the scan is cancelled. +func (rc *ResolverChecker) SetOnScanDone(fn func([]string)) { + rc.onScanDone = fn +} + // Start begins the periodic health-check loop in the background. // An initial check runs immediately; subsequent checks happen every 10 minutes. // ctx controls the lifetime — cancel it to stop the checker. @@ -82,35 +90,51 @@ func (rc *ResolverChecker) StartAndNotify(ctx context.Context, onFirstDone func( onFirstDone() } - // Periodic re-check every 30 minutes. - ticker := time.NewTicker(30 * time.Minute) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - rc.CheckNow(ctx) - // If the periodic check leaves us with no resolvers, - // fall back into the retry-every-minute loop. - if ctx.Err() == nil && len(rc.fetcher.Resolvers()) == 0 { - rc.log("All resolvers lost — scanning every minute until one recovers...") - for { - select { - case <-ctx.Done(): - return - case <-time.After(1 * time.Minute): - } - rc.CheckNow(ctx) - if ctx.Err() != nil || len(rc.fetcher.Resolvers()) > 0 { - break - } - rc.log("Still no healthy resolvers — retrying in 1 minute...") + rc.runPeriodicLoop(ctx) + }() +} + +// StartPeriodic starts only the periodic 30-minute health-check loop without +// running an initial scan. Use when resolvers are already available (e.g. +// loaded from a saved last-scan file on startup). +// Safe to call only once per checker instance; subsequent calls are no-ops. +func (rc *ResolverChecker) StartPeriodic(ctx context.Context) { + if !rc.started.CompareAndSwap(false, true) { + return + } + go rc.runPeriodicLoop(ctx) +} + +// runPeriodicLoop is the shared 30-minute ticker loop used by both +// StartAndNotify and StartPeriodic. +func (rc *ResolverChecker) runPeriodicLoop(ctx context.Context) { + ticker := time.NewTicker(30 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + rc.CheckNow(ctx) + // If the periodic check leaves us with no resolvers, + // fall back into the retry-every-minute loop. + if ctx.Err() == nil && len(rc.fetcher.Resolvers()) == 0 { + rc.log("All resolvers lost — scanning every minute until one recovers...") + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Minute): } + rc.CheckNow(ctx) + if ctx.Err() != nil || len(rc.fetcher.Resolvers()) > 0 { + break + } + rc.log("Still no healthy resolvers — retrying in 1 minute...") } } } - }() + } } // CancelCurrentScan cancels any in-progress CheckNow call, causing it to @@ -155,20 +179,43 @@ func (rc *ResolverChecker) CheckNow(ctx context.Context) bool { return true } + // Shuffle so each scan probes resolvers in a fresh random order, preventing + // the same resolvers from always being probed first (more even load distribution). + rand.Shuffle(len(resolvers), func(i, j int) { resolvers[i], resolvers[j] = resolvers[j], resolvers[i] }) + total := len(resolvers) + concurrency := rc.fetcher.ScanConcurrency() rc.log("RESOLVER_SCAN start %d", total) + rc.log("scanner started: probing %d resolvers (concurrency=%d, batch-pause every 50)", total, concurrency) var healthy []string var mu sync.Mutex var done int wg := &sync.WaitGroup{} - sem := make(chan struct{}, 10) // probe up to 10 resolvers concurrently + sem := make(chan struct{}, concurrency) + launched := 0 for _, r := range resolvers { // Stop launching new probes if context was cancelled. if scanCtx.Err() != nil { break } + // Rate-limit pause: every 50 launched probes, sleep 3-10 s so we don't + // flood resolver rate limits before moving to the next batch. + if launched > 0 && launched%50 == 0 { + pause := 3*time.Second + time.Duration(rand.Intn(8))*time.Second + timer := time.NewTimer(pause) + select { + case <-scanCtx.Done(): + timer.Stop() + break + case <-timer.C: + } + if scanCtx.Err() != nil { + break + } + } + launched++ wg.Add(1) go func(r string) { defer wg.Done() @@ -198,10 +245,13 @@ func (rc *ResolverChecker) CheckNow(ctx context.Context) bool { if len(healthy) == 0 { rc.log("Resolver check done: 0/%d healthy", len(resolvers)) rc.log("RESOLVER_SCAN done 0/%d", total) - return true + } else { + rc.log("Resolver check done: %d/%d healthy", len(healthy), len(resolvers)) + rc.log("RESOLVER_SCAN done %d/%d", len(healthy), total) + } + if rc.onScanDone != nil { + rc.onScanDone(healthy) } - rc.log("Resolver check done: %d/%d healthy", len(healthy), len(resolvers)) - rc.log("RESOLVER_SCAN done %d/%d", len(healthy), total) return true } diff --git a/internal/web/static/index.html b/internal/web/static/index.html index 3f277d0..ea525b8 100644 --- a/internal/web/static/index.html +++ b/internal/web/static/index.html @@ -20,7 +20,7 @@ input,textarea,select{font-family:inherit} /* ===== LAYOUT ===== */ .app{display:flex;height:100vh;direction:ltr} .sidebar{width:280px;min-width:280px;background:var(--sidebar-bg);display:flex;flex-direction:column;border-right:1px solid var(--border);overflow:hidden} -.chat-area{flex:1;display:flex;flex-direction:column;background:var(--bg);overflow:hidden} +.chat-area{flex:1;display:flex;flex-direction:column;background:var(--bg);overflow:hidden;position:relative} /* ===== SIDEBAR HEADER ===== */ .sidebar-header{padding:10px 12px;display:flex;flex-direction:column;gap:8px;border-bottom:1px solid var(--border)} @@ -104,6 +104,13 @@ input,textarea,select{font-family:inherit} .refresh-has-new{animation:badge-pulse 2s ease-in-out infinite;color:var(--accent)!important} .msg-copy-btn{background:none;border:none;color:var(--text-dim);font-size:14px;cursor:pointer;padding:0 3px;line-height:1;flex-shrink:0;opacity:.45;transition:opacity .15s}.msg-copy-btn:hover{opacity:1} +/* ===== SCROLL-TO-BOTTOM ===== */ +.scroll-down-btn{position:absolute;bottom:70px;right:16px;width:36px;height:36px;border:none;border-radius:50%;background:var(--surface2);color:var(--text);font-size:18px;display:none;align-items:center;justify-content:center;box-shadow:0 2px 10px rgba(0,0,0,.45);z-index:10;cursor:pointer;border:1px solid var(--border)} +.scroll-down-btn.visible{display:flex} +.scroll-down-btn:hover{background:var(--hover)} +.scroll-down-badge{position:absolute;top:-4px;right:-4px;background:var(--accent);color:#fff;font-size:9px;font-weight:700;min-width:16px;height:16px;border-radius:8px;display:none;align-items:center;justify-content:center;padding:0 3px} +.scroll-down-badge.visible{display:flex} + /* ===== LOG ===== */ .log-toggle{display:flex;align-items:center;justify-content:space-between;padding:3px 14px;background:var(--bg2);cursor:pointer;user-select:none;font-size:10px;color:var(--text-dim);letter-spacing:.5px;border-top:1px solid var(--border)} .log-toggle:hover{color:var(--text)} @@ -260,6 +267,7 @@ html[dir=ltr] .active-badge{margin-left:0;margin-right:6px} +