feat: Implement version tracking, fix telegram fetcher, hourly report, fix add channel bug

This commit is contained in:
Sarto
2026-04-11 15:45:59 +03:30
parent f360a14e46
commit 8c413f9ebf
16 changed files with 783 additions and 41 deletions
+2
View File
@@ -34,6 +34,7 @@ func main() {
maxPadding := flag.Int("padding", 32, "Max random padding bytes in DNS responses (anti-DPI, 0=disabled)")
msgLimit := flag.Int("msg-limit", 15, "Maximum messages to fetch per Telegram channel")
allowManage := flag.Bool("allow-manage", false, "Allow remote channel management and sending via DNS")
debug := flag.Bool("debug", false, "Log every decoded DNS query")
showVersion := flag.Bool("version", false, "Show version and exit")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "thefeed-server %s\n\nServes Telegram channel content over encrypted DNS for censorship-resistant access.\n\nUsage:\n thefeed-server [flags]\n\nFlags:\n", version.Version)
@@ -140,6 +141,7 @@ func main() {
MsgLimit: *msgLimit,
NoTelegram: *noTelegram,
AllowManage: *allowManage,
Debug: *debug,
Telegram: server.TelegramConfig{
APIID: id,
APIHash: *apiHash,
+12 -1
View File
@@ -106,7 +106,7 @@ func NewFetcher(domain, passphrase string, resolvers []string) (*Fetcher, error)
// activeResolvers starts empty — the ResolverChecker fills it in after
// the first health-check scan so no fetch is attempted with unvalidated resolvers.
timeout: 25 * time.Second,
scatter: 2, // query 2 resolvers in parallel by default
scatter: 4, // query 4 resolvers in parallel by default
}
f.exchangeFn = func(ctx context.Context, m *dns.Msg, addr string) (*dns.Msg, time.Duration, error) {
c := &dns.Client{Timeout: f.timeout, Net: "udp"}
@@ -553,6 +553,17 @@ func (f *Fetcher) FetchMetadata(ctx context.Context) (*protocol.Metadata, error)
return nil, fmt.Errorf("could not parse metadata: %w", err)
}
// FetchLatestVersion fetches the latest release version from the dedicated
// version channel. The block is padded to a random size matching regular content
// blocks (DPI-resistant). Empty string means unknown/unavailable.
func (f *Fetcher) FetchLatestVersion(ctx context.Context) (string, error) {
data, err := f.FetchBlock(ctx, protocol.VersionChannel, 0)
if err != nil {
return "", fmt.Errorf("fetch version block: %w", err)
}
return protocol.DecodeVersionData(data)
}
// FetchChannel fetches all blocks for a channel and returns the parsed messages.
// Cancelling ctx immediately aborts any queued or in-flight block fetches.
// Each block is retried individually via FetchBlock before the channel fetch fails.
+3 -3
View File
@@ -94,7 +94,7 @@ func (rc *ResolverChecker) StartAndNotify(ctx context.Context, onFirstDone func(
}()
}
// StartPeriodic starts only the periodic 30-minute health-check loop without
// StartPeriodic starts only the periodic Hour health-check loop without
// running an initial scan. Use when resolvers are already available (e.g.
// loaded from a saved last-scan file on startup).
// Safe to call only once per checker instance; subsequent calls are no-ops.
@@ -105,10 +105,10 @@ func (rc *ResolverChecker) StartPeriodic(ctx context.Context) {
go rc.runPeriodicLoop(ctx)
}
// runPeriodicLoop is the shared 30-minute ticker loop used by both
// runPeriodicLoop is the shared Hour ticker loop used by both
// StartAndNotify and StartPeriodic.
func (rc *ResolverChecker) runPeriodicLoop(ctx context.Context) {
ticker := time.NewTicker(30 * time.Minute)
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
select {
+3
View File
@@ -29,6 +29,9 @@ const (
// UpstreamDataChannel carries one chunk of a chunked upstream session.
UpstreamDataChannel uint16 = 0xFFFB
// VersionChannel serves latest release version with random suffix.
VersionChannel uint16 = 0xFFFA
// MaxUpstreamBlockPayload keeps uploaded query chunks comfortably below DNS
// name limits across typical domains and resolver paths.
MaxUpstreamBlockPayload = 8
+35
View File
@@ -308,6 +308,41 @@ func randBlockSize() int {
return MinBlockPayload + int(n.Int64())
}
// EncodeVersionData encodes a version string into a single block padded to a
// random size in [MinBlockPayload, MaxBlockPayload], making it indistinguishable
// in size from regular content blocks for DPI resistance. Format:
//
// [2 bytes: version byte length][version bytes][random padding]
func EncodeVersionData(version string) ([]byte, error) {
raw := []byte(version)
if len(raw) > MaxBlockPayload-2 {
raw = raw[:MaxBlockPayload-2]
}
blockSize := randBlockSize()
if blockSize < 2+len(raw) {
blockSize = 2 + len(raw)
}
buf := make([]byte, blockSize)
binary.BigEndian.PutUint16(buf, uint16(len(raw)))
copy(buf[2:], raw)
if _, err := rand.Read(buf[2+len(raw):]); err != nil {
return nil, fmt.Errorf("version padding: %w", err)
}
return buf, nil
}
// DecodeVersionData extracts the version string from a block produced by EncodeVersionData.
func DecodeVersionData(block []byte) (string, error) {
if len(block) < 2 {
return "", fmt.Errorf("version block too short: %d bytes", len(block))
}
dataLen := int(binary.BigEndian.Uint16(block))
if 2+dataLen > len(block) {
return "", fmt.Errorf("version block truncated: need %d bytes, have %d", 2+dataLen, len(block))
}
return string(block[2 : 2+dataLen]), nil
}
const (
// compressionNone means no compression applied (raw serialized messages).
compressionNone byte = 0x00
+30
View File
@@ -118,3 +118,33 @@ func TestParseMetadataTooShort(t *testing.T) {
t.Error("expected error for short metadata")
}
}
func TestEncodeDecodeVersionData(t *testing.T) {
versions := []string{"", "v1.0.0", "v2.3.14", "1.0.0-beta"}
for _, ver := range versions {
block, err := EncodeVersionData(ver)
if err != nil {
t.Fatalf("EncodeVersionData(%q): %v", ver, err)
}
if len(block) < MinBlockPayload {
t.Errorf("EncodeVersionData(%q): block len %d < MinBlockPayload %d", ver, len(block), MinBlockPayload)
}
if len(block) > MaxBlockPayload {
t.Errorf("EncodeVersionData(%q): block len %d > MaxBlockPayload %d", ver, len(block), MaxBlockPayload)
}
got, err := DecodeVersionData(block)
if err != nil {
t.Fatalf("DecodeVersionData(%q): %v", ver, err)
}
if got != ver {
t.Errorf("round-trip: got %q, want %q", got, ver)
}
}
}
func TestDecodeVersionDataTooShort(t *testing.T) {
_, err := DecodeVersionData([]byte{0x01})
if err == nil {
t.Error("expected error for 1-byte block")
}
}
+209 -13
View File
@@ -3,9 +3,12 @@ package server
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log"
"net"
"os"
"sort"
"strings"
"sync"
"time"
@@ -15,11 +18,15 @@ import (
"github.com/sartoopjj/thefeed/internal/protocol"
)
const reportChannelBuffer = 4096
const topResolverLimit = 20
// DNSServer serves feed data over DNS TXT queries.
type DNSServer struct {
domain string
feed *Feed
reader *TelegramReader // nil when --no-telegram
channelCtl channelRefresher
queryKey [protocol.KeySize]byte
responseKey [protocol.KeySize]byte
listenAddr string
@@ -29,6 +36,27 @@ type DNSServer struct {
sessionsMu sync.Mutex
sessions map[uint16]*uploadSession
reportCh chan reportEvent
debug bool
}
type channelFetchStats struct {
Queries int64 `json:"queries"`
}
type reportEvent struct {
channel uint16
resolver string
}
type hourlyFetchReport struct {
windowStart time.Time
totalQueries int64
metadataQueries int64
versionQueries int64
perChannel map[uint16]*channelFetchStats
perResolver map[string]int64
}
type uploadSession struct {
@@ -40,12 +68,18 @@ type uploadSession struct {
expiresAt time.Time
}
type channelRefresher interface {
UpdateChannels(channels []string)
RequestRefresh()
}
// NewDNSServer creates a DNS server for the given domain.
func NewDNSServer(listenAddr, domain string, feed *Feed, queryKey, responseKey [protocol.KeySize]byte, maxPadding int, reader *TelegramReader, allowManage bool, channelsFile string) *DNSServer {
func NewDNSServer(listenAddr, domain string, feed *Feed, queryKey, responseKey [protocol.KeySize]byte, maxPadding int, reader *TelegramReader, allowManage bool, channelsFile string, debug bool) *DNSServer {
s := &DNSServer{
domain: strings.TrimSuffix(domain, "."),
feed: feed,
reader: reader,
channelCtl: reader,
queryKey: queryKey,
responseKey: responseKey,
listenAddr: listenAddr,
@@ -53,10 +87,18 @@ func NewDNSServer(listenAddr, domain string, feed *Feed, queryKey, responseKey [
allowManage: allowManage,
channelsFile: channelsFile,
sessions: make(map[uint16]*uploadSession),
reportCh: make(chan reportEvent, reportChannelBuffer),
debug: debug,
}
return s
}
// SetChannelRefresher allows wiring a non-Telegram channel source (e.g. public reader)
// for admin update/refresh operations.
func (s *DNSServer) SetChannelRefresher(channelCtl channelRefresher) {
s.channelCtl = channelCtl
}
// ListenAndServe starts the DNS server on UDP, shutting down when ctx is cancelled.
func (s *DNSServer) ListenAndServe(ctx context.Context) error {
mux := dns.NewServeMux()
@@ -74,6 +116,8 @@ func (s *DNSServer) ListenAndServe(ctx context.Context) error {
server.Shutdown()
}()
go s.runHourlyReports(ctx)
log.Printf("[dns] listening on %s (domain: %s)", s.listenAddr, s.domain)
return server.ListenAndServe()
}
@@ -125,6 +169,11 @@ func (s *DNSServer) handleQuery(w dns.ResponseWriter, r *dns.Msg) {
return
}
s.trackRequestStart(channel, resolverHost(w.RemoteAddr()))
if s.debug {
log.Printf("[dns] query ch=%d blk=%d from=%s name=%s", channel, block, resolverHost(w.RemoteAddr()), q.Name)
}
data, err := s.feed.GetBlock(int(channel), int(block))
if err != nil {
log.Printf("[dns] get block ch=%d blk=%d: %v", channel, block, err)
@@ -457,11 +506,13 @@ func (s *DNSServer) adminAddChannel(username string) (string, error) {
log.Printf("[admin] added channel @%s", username)
// Update the live reader and trigger immediate fetch.
if s.reader != nil {
all, _ := loadChannelsFromFile(s.channelsFile)
s.reader.UpdateChannels(all)
s.reader.RequestRefresh()
all, err := loadChannelsFromFile(s.channelsFile)
if err == nil {
s.feed.SetChannels(all)
if s.channelCtl != nil {
s.channelCtl.UpdateChannels(all)
s.channelCtl.RequestRefresh()
}
}
return "OK", nil
@@ -503,10 +554,10 @@ func (s *DNSServer) adminRemoveChannel(username string) (string, error) {
log.Printf("[admin] removed channel @%s", username)
// Update the live reader and trigger immediate fetch.
if s.reader != nil {
s.reader.UpdateChannels(remaining)
s.reader.RequestRefresh()
s.feed.SetChannels(remaining)
if s.channelCtl != nil {
s.channelCtl.UpdateChannels(remaining)
s.channelCtl.RequestRefresh()
}
return "OK", nil
@@ -521,10 +572,10 @@ func (s *DNSServer) adminListChannels() (string, error) {
}
func (s *DNSServer) adminRefresh() (string, error) {
if s.reader == nil {
return "", fmt.Errorf("no Telegram reader")
if s.channelCtl == nil {
return "", fmt.Errorf("no active channel reader")
}
s.reader.RequestRefresh()
s.channelCtl.RequestRefresh()
log.Printf("[admin] hard refresh requested")
return "OK", nil
}
@@ -547,3 +598,148 @@ func loadChannelsFromFile(path string) ([]string, error) {
}
return channels, scanner.Err()
}
func resolverHost(addr net.Addr) string {
if addr == nil {
return ""
}
host, _, err := net.SplitHostPort(addr.String())
if err == nil {
return host
}
return addr.String()
}
func (s *DNSServer) trackRequestStart(channel uint16, resolver string) {
// Exclude special control channels from per-channel content reporting.
if channel == protocol.UpstreamInitChannel || channel == protocol.UpstreamDataChannel || channel == protocol.SendChannel || channel == protocol.AdminChannel {
return
}
s.reportCh <- reportEvent{channel: channel, resolver: resolver}
}
func (s *DNSServer) runHourlyReports(ctx context.Context) {
rep := newHourlyFetchReport(time.Now())
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.drainReportEvents(rep)
s.emitHourlyReport(rep, true)
return
case event := <-s.reportCh:
recordReportQuery(rep, event)
case <-ticker.C:
s.emitHourlyReport(rep, false)
rep = newHourlyFetchReport(time.Now())
}
}
}
func newHourlyFetchReport(start time.Time) *hourlyFetchReport {
return &hourlyFetchReport{
windowStart: start,
perChannel: make(map[uint16]*channelFetchStats),
perResolver: make(map[string]int64),
}
}
func recordReportQuery(rep *hourlyFetchReport, event reportEvent) {
rep.totalQueries++
if event.resolver != "" {
rep.perResolver[event.resolver]++
}
channel := event.channel
if channel == protocol.MetadataChannel {
rep.metadataQueries++
return
}
if channel == protocol.VersionChannel {
rep.versionQueries++
return
}
stats := rep.perChannel[channel]
if stats == nil {
stats = &channelFetchStats{}
rep.perChannel[channel] = stats
}
stats.Queries++
}
func (s *DNSServer) drainReportEvents(rep *hourlyFetchReport) {
for {
select {
case event := <-s.reportCh:
recordReportQuery(rep, event)
default:
return
}
}
}
func (s *DNSServer) emitHourlyReport(rep *hourlyFetchReport, final bool) {
names := s.feed.ChannelNames()
chs := make([]uint16, 0, len(rep.perChannel))
for ch := range rep.perChannel {
chs = append(chs, ch)
}
sort.Slice(chs, func(i, j int) bool { return chs[i] < chs[j] })
type channelEntry struct {
Channel uint16 `json:"channel"`
Name string `json:"name,omitempty"`
Queries int64 `json:"queries"`
}
entries := make([]channelEntry, 0, len(chs))
for _, ch := range chs {
st := rep.perChannel[ch]
name := ""
if int(ch) >= 1 && int(ch) <= len(names) {
name = names[int(ch)-1]
}
entries = append(entries, channelEntry{
Channel: ch,
Name: name,
Queries: st.Queries,
})
}
type resolverEntry struct {
Resolver string `json:"resolver"`
Queries int64 `json:"queries"`
}
resolvers := make([]resolverEntry, 0, len(rep.perResolver))
for resolver, queries := range rep.perResolver {
resolvers = append(resolvers, resolverEntry{Resolver: resolver, Queries: queries})
}
sort.Slice(resolvers, func(i, j int) bool {
if resolvers[i].Queries == resolvers[j].Queries {
return resolvers[i].Resolver < resolvers[j].Resolver
}
return resolvers[i].Queries > resolvers[j].Queries
})
if len(resolvers) > topResolverLimit {
resolvers = resolvers[:topResolverLimit]
}
payload := map[string]any{
"type": "dns_hourly_report",
"from": rep.windowStart.UTC().Format(time.RFC3339),
"to": time.Now().UTC().Format(time.RFC3339),
"totalDnsQueries": rep.totalQueries,
"totalMetadataQueries": rep.metadataQueries,
"totalVersionQueries": rep.versionQueries,
"channels": entries,
"topResolvers": resolvers,
"finalFlush": final,
}
b, err := json.Marshal(payload)
if err != nil {
log.Printf("[dns_hourly] marshal error: %v", err)
return
}
log.Printf("[dns_hourly] %s", string(b))
}
+34
View File
@@ -20,9 +20,11 @@ type Feed struct {
chatTypes map[int]protocol.ChatType
canSend map[int]bool
metaBlocks [][]byte // metadata for all channels
versionBlocks [][]byte // channel for latest server-known release version
updated time.Time
telegramLoggedIn bool
nextFetch uint32
latestVersion string
}
// NewFeed creates a new Feed with the given channel names.
@@ -37,6 +39,7 @@ func NewFeed(channels []string) *Feed {
}
f.rotateMarker()
f.rebuildMetaBlocks()
f.rebuildVersionBlocks()
return f
}
@@ -75,6 +78,9 @@ func (f *Feed) GetBlock(channel, block int) ([]byte, error) {
if channel == protocol.MetadataChannel {
return f.getMetadataBlock(block)
}
if channel == int(protocol.VersionChannel) {
return f.getVersionBlock(block)
}
ch, ok := f.blocks[channel]
if !ok {
@@ -86,6 +92,18 @@ func (f *Feed) GetBlock(channel, block int) ([]byte, error) {
return ch[block], nil
}
func (f *Feed) getVersionBlock(block int) ([]byte, error) {
blocks := f.versionBlocks
if len(blocks) == 0 {
f.rebuildVersionBlocks()
blocks = f.versionBlocks
}
if block < 0 || block >= len(blocks) {
return nil, fmt.Errorf("version block %d out of range (%d blocks)", block, len(blocks))
}
return blocks[block], nil
}
func (f *Feed) getMetadataBlock(block int) ([]byte, error) {
blocks := f.metaBlocks
if len(blocks) == 0 {
@@ -128,6 +146,22 @@ func (f *Feed) rebuildMetaBlocks() {
f.metaBlocks = protocol.SplitIntoBlocks(protocol.SerializeMetadata(&meta))
}
func (f *Feed) rebuildVersionBlocks() {
block, err := protocol.EncodeVersionData(f.latestVersion)
if err != nil {
block = make([]byte, protocol.MinBlockPayload)
}
f.versionBlocks = [][]byte{block}
}
// SetLatestVersion stores latest known release version for the dedicated version channel.
func (f *Feed) SetLatestVersion(v string) {
f.mu.Lock()
defer f.mu.Unlock()
f.latestVersion = v
f.rebuildVersionBlocks()
}
// ChannelNames returns the configured channel names.
func (f *Feed) ChannelNames() []string {
f.mu.RLock()
+57 -4
View File
@@ -30,6 +30,8 @@ type PublicReader struct {
mu sync.RWMutex
cache map[string]cachedMessages
cacheTTL time.Duration
refreshCh chan struct{}
}
// NewPublicReader creates a reader for public channels without Telegram login.
@@ -48,9 +50,10 @@ func NewPublicReader(channelUsernames []string, feed *Feed, msgLimit int) *Publi
client: &http.Client{
Timeout: 30 * time.Second,
},
baseURL: "https://t.me/s",
cache: make(map[string]cachedMessages),
cacheTTL: 10 * time.Minute,
baseURL: "https://t.me/s",
cache: make(map[string]cachedMessages),
cacheTTL: 10 * time.Minute,
refreshCh: make(chan struct{}, 1),
}
}
@@ -70,10 +73,38 @@ func (pr *PublicReader) Run(ctx context.Context) error {
case <-ticker.C:
pr.fetchAll(ctx)
pr.feed.SetNextFetch(uint32(time.Now().Add(10 * time.Minute).Unix()))
case <-pr.refreshCh:
pr.mu.Lock()
pr.cache = make(map[string]cachedMessages)
pr.mu.Unlock()
pr.fetchAll(ctx)
ticker.Reset(10 * time.Minute)
pr.feed.SetNextFetch(uint32(time.Now().Add(10 * time.Minute).Unix()))
}
}
}
// RequestRefresh signals the fetch loop to re-fetch immediately.
func (pr *PublicReader) RequestRefresh() {
select {
case pr.refreshCh <- struct{}{}:
default:
}
}
// UpdateChannels replaces the channel list and updates Feed metadata.
func (pr *PublicReader) UpdateChannels(channels []string) {
cleaned := make([]string, len(channels))
for i, u := range channels {
cleaned[i] = strings.TrimPrefix(strings.TrimSpace(u), "@")
}
pr.mu.Lock()
pr.channels = cleaned
pr.cache = make(map[string]cachedMessages)
pr.mu.Unlock()
pr.feed.SetChannels(cleaned)
}
func (pr *PublicReader) fetchAll(ctx context.Context) {
for i, username := range pr.channels {
chNum := i + 1
@@ -175,7 +206,7 @@ func parsePublicMessages(body []byte) ([]protocol.Message, error) {
if err != nil {
return
}
text := strings.TrimSpace(extractMessageText(findFirstByClass(n, "tgme_widget_message_text")))
text := strings.TrimSpace(extractMessageText(findMessageBodyNode(n)))
mediaPrefix := ""
switch {
case findFirstByClass(n, "tgme_widget_message_photo_wrap") != nil:
@@ -326,6 +357,28 @@ func findFirstElement(n *html.Node, tag string) *html.Node {
return found
}
// findMessageBodyNode returns the main post text node while skipping reply preview
// snippets. In Telegram public HTML, quoted/replied text may appear before the
// real message body and can otherwise be mistakenly parsed as the post text.
func findMessageBodyNode(n *html.Node) *html.Node {
var found *html.Node
visitNodes(n, func(cur *html.Node) {
if found != nil || !hasClass(cur, "tgme_widget_message_text") {
return
}
for p := cur.Parent; p != nil; p = p.Parent {
if hasClass(p, "tgme_widget_message_reply") {
return
}
}
found = cur
})
if found != nil {
return found
}
return findFirstByClass(n, "tgme_widget_message_text")
}
func extractMessageText(n *html.Node) string {
if n == nil {
return ""
+24
View File
@@ -100,3 +100,27 @@ func TestMergeMessages(t *testing.T) {
t.Fatalf("merged[2].ID = %d, want 99", merged[2].ID)
}
}
func TestParsePublicMessagesReplyPreviewUsesMainBody(t *testing.T) {
body := []byte(`
<html><body>
<div class="tgme_widget_message" data-post="testchan/201">
<div class="tgme_widget_message_reply">
<div class="tgme_widget_message_text">old replied message preview</div>
</div>
<div class="tgme_widget_message_text">this is the real new post</div>
</div>
</body></html>
`)
msgs, err := parsePublicMessages(body)
if err != nil {
t.Fatalf("parsePublicMessages: %v", err)
}
if len(msgs) != 1 {
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
}
if msgs[0].Text != "this is the real new post" {
t.Fatalf("msgs[0].Text = %q, want %q", msgs[0].Text, "this is the real new post")
}
}
+10 -1
View File
@@ -21,6 +21,7 @@ type Config struct {
MsgLimit int // max messages per channel (0 = default 15)
NoTelegram bool // if true, fetch public channels without Telegram login
AllowManage bool // if true, remote channel management and sending via DNS is allowed
Debug bool // if true, log every decoded DNS query
Telegram TelegramConfig
}
@@ -54,6 +55,9 @@ func (s *Server) Run(ctx context.Context) error {
return fmt.Errorf("derive keys: %w", err)
}
go startLatestVersionTracker(ctx, s.feed)
var channelCtl channelRefresher
// Handle login-only mode
if s.cfg.Telegram.LoginOnly {
reader := NewTelegramReader(s.cfg.Telegram, s.feed.ChannelNames(), s.feed, 15)
@@ -68,6 +72,7 @@ func (s *Server) Run(ctx context.Context) error {
}
reader := NewTelegramReader(s.cfg.Telegram, s.feed.ChannelNames(), s.feed, msgLimit)
s.reader = reader
channelCtl = reader
go func() {
if err := reader.Run(ctx); err != nil {
log.Printf("[telegram] error: %v", err)
@@ -79,6 +84,7 @@ func (s *Server) Run(ctx context.Context) error {
msgLimit = 15
}
publicReader := NewPublicReader(s.feed.ChannelNames(), s.feed, msgLimit)
channelCtl = publicReader
go func() {
if err := publicReader.Run(ctx); err != nil && ctx.Err() == nil {
log.Printf("[public] error: %v", err)
@@ -92,7 +98,10 @@ func (s *Server) Run(ctx context.Context) error {
if maxPad == 0 {
maxPad = protocol.DefaultMaxPadding
}
dnsServer := NewDNSServer(s.cfg.ListenAddr, s.cfg.Domain, s.feed, queryKey, responseKey, maxPad, s.reader, s.cfg.AllowManage, s.cfg.ChannelsFile)
dnsServer := NewDNSServer(s.cfg.ListenAddr, s.cfg.Domain, s.feed, queryKey, responseKey, maxPad, s.reader, s.cfg.AllowManage, s.cfg.ChannelsFile, s.cfg.Debug)
if channelCtl != nil {
dnsServer.SetChannelRefresher(channelCtl)
}
return dnsServer.ListenAndServe(ctx)
}
+72
View File
@@ -0,0 +1,72 @@
package server
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
)
const latestReleaseURL = "https://api.github.com/repos/sartoopjj/thefeed/releases/latest"
type githubRelease struct {
TagName string `json:"tag_name"`
}
// startLatestVersionTracker periodically fetches latest GitHub release version
// and stores it in the dedicated version channel.
func startLatestVersionTracker(ctx context.Context, feed *Feed) {
update := func() {
v, err := fetchLatestReleaseVersion(ctx)
if err != nil {
return
}
feed.SetLatestVersion(v)
}
update()
ticker := time.NewTicker(6 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
update()
}
}
}
func fetchLatestReleaseVersion(parent context.Context) (string, error) {
ctx, cancel := context.WithTimeout(parent, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, latestReleaseURL, nil)
if err != nil {
return "", err
}
req.Header.Set("User-Agent", "thefeed-server")
req.Header.Set("Accept", "application/vnd.github+json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("latest release status: %s", resp.Status)
}
var rel githubRelease
if err := json.NewDecoder(resp.Body).Decode(&rel); err != nil {
return "", err
}
v := strings.TrimSpace(rel.TagName)
v = strings.TrimPrefix(v, "v")
if v == "" {
return "", fmt.Errorf("empty latest release tag")
}
return v, nil
}
+121 -17
View File
@@ -1390,6 +1390,16 @@
<span data-i18n="version">Version</span>
<span id="appVersionEl" style="font-family:monospace;color:var(--text)">-</span>
</div>
<div
style="margin-top:10px;font-size:11px;color:var(--text-dim);display:flex;align-items:center;justify-content:space-between">
<span data-i18n="latest_version">Latest Version</span>
<span id="latestVersionEl" style="font-family:monospace;color:var(--text)">-</span>
</div>
<div style="margin-top:10px;display:flex;align-items:center;justify-content:space-between">
<span style="font-size:12px;color:var(--text-dim)" data-i18n="check_latest_version">Check for Updates</span>
<button class="btn btn-outline" id="checkVersionBtn" onclick="checkLatestVersion()"
style="font-size:11px;padding:4px 12px" data-i18n="check_now">Check Now</button>
</div>
<div style="margin-top:10px;display:flex;align-items:center;justify-content:space-between">
<span style="font-size:12px;color:var(--text-dim)" data-i18n="clear_cache">Clear Cache</span>
<button class="btn btn-flat" onclick="clearCache()"
@@ -1447,10 +1457,10 @@
<option value="double">Multi-label (hex)</option>
</select>
</div>
<div class="form-group"><label data-i18n="rate_limit">Rate Limit (q/s, 0=unlimited)</label><input type="number"
id="peRateLimit" value="5" min="0" step="0.1"></div>
<div class="form-group"><label data-i18n="scatter">Concurrent requests</label><input type="number" id="peScatter"
value="2" min="1" max="5" title="How many resolvers to query simultaneously per DNS request"></div>
<div class="form-group"><label data-i18n="rate_limit">Concurrent block fetches</label><input type="number"
id="peRateLimit" value="6" min="0" step="0.1"></div>
<div class="form-group"><label data-i18n="scatter">Parallel resolvers per block</label><input type="number" id="peScatter"
value="4" min="1" max="5" title="How many resolvers are queried at the same time for one block"></div>
<!-- Channel Management (editing only) -->
<div id="peChannelSection" style="display:none">
<hr class="section-divider">
@@ -1494,10 +1504,17 @@
import_success: 'وارد شد! پروفایل "{d}" ساخته شد.', import_error: 'خطا در وارد کردن',
new_profile: 'پروفایل جدید', edit_profile: 'ویرایش پروفایل',
nickname: 'نام مستعار', domain: 'دامنه', passphrase: 'رمز رمزنگاری',
resolvers: 'Resolvers (یک در هر خط)', query_mode: 'حالت کوئری', rate_limit: 'محدودیت نرخ (ک/ث، ۰=بدون محدودیت)',
resolvers: 'Resolvers (یک در هر خط)', query_mode: 'حالت کوئری', rate_limit: 'تعداد همزمانی دریافت بلاک‌ها',
channels: 'کانال\u200cها', add: 'افزودن', remove: 'حذف',
scatter: 'درخواست‌های همزمان',
channel_mgmt_note: 'مدیریت کانال نیاز به پشتیبانی سمت سرور دارد. اگر توسط ادمین غیرفعال شده باشد، افزودن/حذف کار نمی\u200cکند.',
scatter: 'تعداد ریزالور همزمان برای هر بلاک',
update_available: 'نسخه جدید موجود است: {v}',
latest_version: 'آخرین نسخه قابل دانلود',
check_latest_version: 'بررسی نسخه جدید',
check_now: 'بررسی',
checking_version: 'در حال بررسی...',
version_up_to_date: 'نسخه شما به‌روز است: {v}',
version_check_failed: 'بررسی نسخه ناموفق بود',
channel_mgmt_note: 'قابلیت مدیریت کانال نیاز به فعال سازی سمت سرور دارد. اگر توسط ادمین غیرفعال شده باشد، افزودن/حذف کار نمی\u200cکند.',
channel_mgmt_inactive: 'برای مدیریت کانال\u200cها، ابتدا این پروفایل را فعال کنید.',
channel_placeholder: 'نام کاربری کانال',
version: 'نسخه',
@@ -1536,9 +1553,16 @@
import_success: 'Imported! Profile "{d}" created.', import_error: 'Import error',
new_profile: 'New Profile', edit_profile: 'Edit Profile',
nickname: 'Nickname', domain: 'Domain', passphrase: 'Passphrase',
resolvers: 'Resolvers (one per line)', query_mode: 'Query Mode', rate_limit: 'Rate Limit (q/s, 0=unlimited)',
resolvers: 'Resolvers (one per line)', query_mode: 'Query Mode', rate_limit: 'Concurrent block fetches',
channels: 'Channels', add: 'Add', remove: 'Remove',
scatter: 'Concurrent requests',
scatter: 'Parallel resolvers per block',
update_available: 'New version available: {v}',
latest_version: 'Latest Version',
check_latest_version: 'Check for Updates',
check_now: 'Check Now',
checking_version: 'Checking...',
version_up_to_date: 'You are up to date: {v}',
version_check_failed: 'Version check failed',
channel_mgmt_note: 'Channel management requires server-side support. If disabled by the server admin, adding/removing channels will not work.',
channel_mgmt_inactive: 'Switch to this profile first to manage its channels.',
channel_placeholder: 'channel_username',
@@ -1581,6 +1605,7 @@
// ===== STATE =====
var selectedChannel = 0, channels = [], eventSource = null, autoRefreshTimer = null, telegramLoggedIn = false, logVisible = false;
var serverNextFetch = 0, nextFetchInterval = null, previousMsgIDs = {}, currentMsgTexts = [];
var appVersion = '', latestVersion = '';
var profiles = null, activeProfileId = '', editingProfileId = null, resolverScanHint = '', resolverScanHealthy = 0, resolverScanDone = 0, resolverScanTotal = 0;
// ===== MOBILE NAV =====
@@ -1604,6 +1629,8 @@
checkAndShowSavedResolversPrompt(st);
telegramLoggedIn = !!st.telegramLoggedIn;
serverNextFetch = st.nextFetch || 0;
latestVersion = st.latestVersion || '';
renderLatestVersion();
updateNextFetchDisplay();
await loadChannels();
if (channels && channels.length > 0) await selectChannel(1);
@@ -1622,13 +1649,56 @@
document.getElementById('fontSizeVal').textContent = s.fontSize;
}
if (s.debug) document.getElementById('cfgDebug').checked = true;
if (s.version) { var vEl = document.getElementById('appVersionEl'); if (vEl) vEl.textContent = s.version + (s.commit && s.commit !== 'unknown' ? ' (' + s.commit.slice(0, 7) + ')' : ''); }
if (s.version) { appVersion = s.version; renderAppVersion(s.version, s.commit); }
renderLatestVersion();
} catch (e) { }
}
function renderAppVersion(v, commit) {
var vEl = document.getElementById('appVersionEl');
if (!vEl) return;
if (!v) { vEl.textContent = '-'; return; }
vEl.textContent = v + (commit && commit !== 'unknown' ? ' (' + commit.slice(0, 7) + ')' : '');
}
function renderLatestVersion() {
var vEl = document.getElementById('latestVersionEl');
if (vEl) vEl.textContent = latestVersion || '-';
}
function normalizeVersion(v) {
if (!v) return '';
v = String(v).trim().replace(/^v/i, '');
return v;
}
function compareSemver(a, b) {
a = normalizeVersion(a); b = normalizeVersion(b);
if (!a || !b || a === 'dev' || b === 'dev') return 0;
var as = a.split('.'); var bs = b.split('.');
var n = Math.max(as.length, bs.length);
for (var i = 0; i < n; i++) {
var ai = parseInt(as[i] || '0', 10); if (isNaN(ai)) ai = 0;
var bi = parseInt(bs[i] || '0', 10); if (isNaN(bi)) bi = 0;
if (ai > bi) return 1;
if (ai < bi) return -1;
}
return 0;
}
function maybeWarnNewVersion() {
if (!latestVersion || !appVersion) return;
if (compareSemver(latestVersion, appVersion) <= 0) return;
var seenKey = 'thefeed_seen_update_' + normalizeVersion(latestVersion);
if (localStorage.getItem(seenKey) === '1') return;
localStorage.setItem(seenKey, '1');
showToast(t('update_available').replace('{v}', latestVersion));
addLogLine('Warning: ' + t('update_available').replace('{v}', latestVersion));
}
function previewFontSize(v) { document.documentElement.style.setProperty('--font-size', v + 'px'); document.getElementById('fontSizeVal').textContent = v }
// ===== SETTINGS =====
function openSettings() { document.getElementById('settingsModal').classList.add('active') }
function openSettings() { renderLatestVersion(); document.getElementById('settingsModal').classList.add('active') }
function closeSettings() { document.getElementById('settingsModal').classList.remove('active') }
async function saveSettings() {
var fs = parseInt(document.getElementById('fontSizeSlider').value) || 14;
@@ -1636,6 +1706,39 @@
try { await fetch('/api/settings', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ fontSize: fs, debug: dbg }) }) } catch (e) { }
closeSettings();
}
async function checkLatestVersion() {
var btn = document.getElementById('checkVersionBtn');
var prevText = btn ? btn.textContent : '';
if (btn) {
btn.disabled = true;
btn.textContent = t('checking_version');
}
try {
var r = await fetch('/api/version-check', { method: 'POST' });
var text = await r.text();
var data = {};
try { data = JSON.parse(text) } catch (e) { }
if (!r.ok) {
showToast(text || t('version_check_failed'));
return;
}
latestVersion = data.latestVersion || '';
renderLatestVersion();
if (!latestVersion) {
showToast(t('version_check_failed'));
return;
}
if (compareSemver(latestVersion, appVersion) > 0) maybeWarnNewVersion();
else showToast(t('version_up_to_date').replace('{v}', latestVersion));
} catch (e) {
showToast(e.message || t('version_check_failed'));
} finally {
if (btn) {
btn.disabled = false;
btn.textContent = prevText || t('check_now');
}
}
}
async function clearCache() {
try { var r = await fetch('/api/cache/clear', { method: 'POST' }); var j = await r.json(); if (j.ok) { alert(t('cache_cleared')) } } catch (e) { }
}
@@ -1816,7 +1919,7 @@
params.split('&').forEach(function (kv) { var p = kv.split('='); if (p[0] === 'r' && p[1]) resolvers = decodeURIComponent(p[1]).split(',').filter(Boolean) });
if (!domain || !key) { errEl.textContent = t('uri_missing'); errEl.style.display = 'block'; return }
if (!resolvers.length) resolvers = ['8.8.8.8', '1.1.1.1'];
var profile = { id: '', nickname: domain, config: { domain: domain, key: key, resolvers: resolvers, queryMode: 'single', rateLimit: 5 } };
var profile = { id: '', nickname: domain, config: { domain: domain, key: key, resolvers: resolvers, queryMode: 'single', rateLimit: 6 } };
var r = await fetch('/api/profiles', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ action: 'create', profile: profile }) });
if (!r.ok) throw new Error('save failed');
okEl.textContent = t('import_success').replace('{d}', domain); okEl.style.display = 'block';
@@ -1841,8 +1944,8 @@
document.getElementById('peKey').value = p.config.key || '';
document.getElementById('peResolvers').value = (p.config.resolvers || []).join('\n');
document.getElementById('peQueryMode').value = p.config.queryMode || 'single';
document.getElementById('peRateLimit').value = p.config.rateLimit || 5;
document.getElementById('peScatter').value = p.config.scatter || 2;
document.getElementById('peRateLimit').value = p.config.rateLimit || 6;
document.getElementById('peScatter').value = p.config.scatter || 4;
}
document.getElementById('peChannelSection').style.display = '';
var isActive = id === activeProfileId;
@@ -1863,8 +1966,8 @@
document.getElementById('peKey').value = '';
document.getElementById('peResolvers').value = '';
document.getElementById('peQueryMode').value = 'single';
document.getElementById('peRateLimit').value = '5';
document.getElementById('peScatter').value = '2';
document.getElementById('peRateLimit').value = '6';
document.getElementById('peScatter').value = '4';
document.getElementById('peChannelSection').style.display = 'none';
}
}
@@ -1913,7 +2016,7 @@
var key = document.getElementById('peKey').value;
var resolvers = document.getElementById('peResolvers').value.trim().split(/[\n,]+/).map(function (s) { return s.trim() }).filter(Boolean);
if (!domain || !key || !resolvers.length) { errEl.textContent = t('resolvers') + ' / ' + t('domain') + ' / ' + t('passphrase'); errEl.style.display = 'block'; return }
var profile = { id: editingProfileId || '', nickname: nick || domain, config: { domain: domain, key: key, resolvers: resolvers, queryMode: document.getElementById('peQueryMode').value, rateLimit: parseFloat(document.getElementById('peRateLimit').value) || 5, scatter: parseInt(document.getElementById('peScatter').value) || 2 } };
var profile = { id: editingProfileId || '', nickname: nick || domain, config: { domain: domain, key: key, resolvers: resolvers, queryMode: document.getElementById('peQueryMode').value, rateLimit: parseFloat(document.getElementById('peRateLimit').value) || 6, scatter: parseInt(document.getElementById('peScatter').value) || 4 } };
var action = editingProfileId ? 'update' : 'create';
var wasFirst = !profiles || !profiles.profiles || profiles.profiles.length === 0;
try {
@@ -1970,6 +2073,7 @@
var sr = await fetch('/api/status'); var st = await sr.json();
telegramLoggedIn = !!st.telegramLoggedIn;
if (st.nextFetch) { serverNextFetch = st.nextFetch; updateNextFetchDisplay() }
if (st.latestVersion) { latestVersion = st.latestVersion; renderLatestVersion() }
} catch (e) { }
}
+79 -1
View File
@@ -38,7 +38,7 @@ type Config struct {
// Also used as the resolver health-check probe timeout.
Timeout float64 `json:"timeout,omitempty"`
// Scatter is the number of resolvers queried simultaneously per DNS block request
// (0 or 1 = sequential, 2 = default parallel pair).
// (0 or 1 = sequential, 4 = default parallel pair).
Scatter int `json:"scatter,omitempty"`
}
@@ -78,6 +78,7 @@ type Server struct {
messages map[int][]protocol.Message
telegramLoggedIn bool
nextFetch uint32
latestVersion string
lastMsgIDs map[int]uint32 // last seen message IDs per channel
lastHashes map[int]uint32 // last seen content hashes per channel
@@ -178,6 +179,7 @@ func (s *Server) Run() error {
mux.HandleFunc("/api/profiles", s.handleProfiles)
mux.HandleFunc("/api/profiles/switch", s.handleProfileSwitch)
mux.HandleFunc("/api/settings", s.handleSettings)
mux.HandleFunc("/api/version-check", s.handleVersionCheck)
mux.HandleFunc("/api/cache/clear", s.handleClearCache)
mux.HandleFunc("/api/resolvers/apply-saved", s.handleApplySavedResolvers)
mux.HandleFunc("/", s.handleIndex)
@@ -251,6 +253,7 @@ func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
status["channels"] = s.channels
status["telegramLoggedIn"] = s.telegramLoggedIn
status["nextFetch"] = s.nextFetch
status["latestVersion"] = s.latestVersion
// Include last resolver scan if recent (<24 h) so the frontend can offer a quick-start.
if ls := s.loadLastScan(); ls != nil {
status["lastScan"] = map[string]any{
@@ -684,6 +687,57 @@ func (s *Server) initFetcher() error {
return nil
}
func (s *Server) checkLatestVersion(ctx context.Context) (string, error) {
s.mu.RLock()
cfg := s.config
s.mu.RUnlock()
if cfg == nil {
return "", fmt.Errorf("no config")
}
fetcher, err := client.NewFetcher(cfg.Domain, cfg.Key, cfg.Resolvers)
if err != nil {
return "", fmt.Errorf("create fetcher: %w", err)
}
if cfg.QueryMode == "double" {
fetcher.SetQueryMode(protocol.QueryMultiLabel)
}
var debug bool
if pl, err := s.loadProfiles(); err == nil {
debug = pl.Debug
}
fetcher.SetDebug(debug)
if cfg.RateLimit > 0 {
fetcher.SetRateLimit(cfg.RateLimit)
}
if cfg.Scatter > 1 {
fetcher.SetScatter(cfg.Scatter)
}
timeout := 15 * time.Second
if cfg.Timeout > 0 {
timeout = time.Duration(cfg.Timeout * float64(time.Second))
}
fetcher.SetTimeout(timeout)
fetcher.SetLogFunc(func(msg string) {
s.addLog(msg)
})
fetcher.SetActiveResolvers(cfg.Resolvers)
checkCtx, cancel := context.WithTimeout(ctx, timeout*3)
defer cancel()
fetcher.Start(checkCtx)
v, err := fetcher.FetchLatestVersion(checkCtx)
if err != nil {
return "", err
}
s.mu.Lock()
s.latestVersion = v
s.mu.Unlock()
return v, nil
}
// startCheckerThenRefresh runs the resolver health-check pass synchronously
// (in a new goroutine), then starts the periodic checker and fetches metadata.
// This ensures fresh resolver data is used for the very first metadata query.
@@ -1358,6 +1412,30 @@ func (s *Server) handleSettings(w http.ResponseWriter, r *http.Request) {
}
}
func (s *Server) handleVersionCheck(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", 405)
return
}
v, err := s.checkLatestVersion(r.Context())
if err != nil {
if strings.Contains(err.Error(), "no config") {
http.Error(w, "no config", 400)
return
}
errStr := err.Error()
if strings.Contains(errStr, "integrity check failed") || strings.Contains(errStr, "message authentication failed") || strings.Contains(errStr, "cipher") {
http.Error(w, "invalid passphrase", 400)
return
}
http.Error(w, fmt.Sprintf("version check failed: %v", err), 502)
return
}
writeJSON(w, map[string]any{"ok": true, "latestVersion": v})
}
// handleClearCache deletes all files in the cache directory.
func (s *Server) handleClearCache(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
+1 -1
View File
@@ -76,7 +76,7 @@ func startDNSServerEx(t *testing.T, domain, passphrase string, allowManage bool,
channelsFile = f.Name()
}
dnsServer := server.NewDNSServer(addr, domain, feed, qk, rk, protocol.DefaultMaxPadding, nil, allowManage, channelsFile)
dnsServer := server.NewDNSServer(addr, domain, feed, qk, rk, protocol.DefaultMaxPadding, nil, allowManage, channelsFile, false)
ctx, cancel := context.WithCancel(context.Background())
+91
View File
@@ -3,11 +3,13 @@ package e2e_test
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"
"github.com/sartoopjj/thefeed/internal/protocol"
"github.com/sartoopjj/thefeed/internal/web"
)
@@ -112,3 +114,92 @@ func TestE2E_Settings_MethodNotAllowed(t *testing.T) {
t.Errorf("expected 405, got %d", resp.StatusCode)
}
}
func TestE2E_VersionCheck_NoConfig(t *testing.T) {
base, _ := startWebServer(t)
resp := postJSON(t, base+"/api/version-check", `{}`)
defer resp.Body.Close()
if resp.StatusCode != 400 {
t.Fatalf("POST /api/version-check: expected 400, got %d", resp.StatusCode)
}
}
func TestE2E_VersionCheck_MethodNotAllowed(t *testing.T) {
base, _ := startWebServer(t)
req, _ := http.NewRequest(http.MethodGet, base+"/api/version-check", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("GET /api/version-check: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 405 {
t.Fatalf("GET /api/version-check: expected 405, got %d", resp.StatusCode)
}
}
func TestE2E_VersionCheck_Success(t *testing.T) {
domain := "test.example.com"
passphrase := "testpass"
resolver, feed, cancel := startDNSServerEx(t, domain, passphrase, false, []string{"news"}, map[int][]protocol.Message{})
defer cancel()
feed.SetLatestVersion("v9.9.9")
dataDir := t.TempDir()
port := findFreePort(t, "tcp")
srv, err := web.New(dataDir, port, "")
if err != nil {
t.Fatalf("create web server: %v", err)
}
go srv.Run()
time.Sleep(200 * time.Millisecond)
base := fmt.Sprintf("http://127.0.0.1:%d", port)
cfg := fmt.Sprintf(`{"domain":%q,"key":%q,"resolvers":[%q],"queryMode":"single","rateLimit":10}`, domain, passphrase, resolver)
resp := postJSON(t, base+"/api/config", cfg)
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("POST /api/config status=%d body=%s", resp.StatusCode, body)
}
resp2 := postJSON(t, base+"/api/version-check", `{}`)
defer resp2.Body.Close()
if resp2.StatusCode != 200 {
body, _ := io.ReadAll(resp2.Body)
t.Fatalf("POST /api/version-check status=%d body=%s", resp2.StatusCode, body)
}
m := decodeJSON(t, resp2)
if m["latestVersion"] != "v9.9.9" {
t.Fatalf("latestVersion = %v, want v9.9.9", m["latestVersion"])
}
resp3 := getJSON(t, base+"/api/status")
defer resp3.Body.Close()
status := decodeJSON(t, resp3)
if status["latestVersion"] != "v9.9.9" {
t.Fatalf("status latestVersion = %v, want v9.9.9", status["latestVersion"])
}
}
func TestE2E_SettingsPage_HasVersionControls(t *testing.T) {
base, _ := startWebServer(t)
resp, err := http.Get(base + "/")
if err != nil {
t.Fatalf("GET /: %v", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("read body: %v", err)
}
html := string(body)
if !strings.Contains(html, `id="latestVersionEl"`) {
t.Fatalf("settings page missing latestVersionEl")
}
if !strings.Contains(html, `id="checkVersionBtn"`) {
t.Fatalf("settings page missing checkVersionBtn")
}
}