mirror of
https://github.com/sartoopjj/thefeed.git
synced 2026-05-19 08:04:42 +03:00
feat: enhance logging for fetch cycles in public, telegram, xpublic readers and version tracker
This commit is contained in:
@@ -110,6 +110,9 @@ func (pr *PublicReader) UpdateChannels(channels []string) {
|
||||
}
|
||||
|
||||
func (pr *PublicReader) fetchAll(ctx context.Context) {
|
||||
log.Printf("[public] fetch cycle started for %d channels", len(pr.channels))
|
||||
start := time.Now()
|
||||
var fetched, failed int
|
||||
for i, username := range pr.channels {
|
||||
chNum := pr.baseCh + i
|
||||
|
||||
@@ -123,6 +126,7 @@ func (pr *PublicReader) fetchAll(ctx context.Context) {
|
||||
msgs, err := pr.fetchChannel(ctx, username)
|
||||
if err != nil {
|
||||
log.Printf("[public] fetch %s: %v", username, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -140,8 +144,10 @@ func (pr *PublicReader) fetchAll(ctx context.Context) {
|
||||
|
||||
pr.feed.UpdateChannel(chNum, msgs)
|
||||
pr.feed.SetChatInfo(chNum, protocol.ChatTypeChannel, false)
|
||||
fetched++
|
||||
log.Printf("[public] updated %s: %d messages", username, len(msgs))
|
||||
}
|
||||
log.Printf("[public] fetch cycle done in %s: %d fetched, %d failed, %d total", time.Since(start).Round(time.Millisecond), fetched, failed, len(pr.channels))
|
||||
}
|
||||
|
||||
func (pr *PublicReader) fetchChannel(ctx context.Context, username string) ([]protocol.Message, error) {
|
||||
|
||||
@@ -84,8 +84,11 @@ func (s *Server) Run(ctx context.Context) error {
|
||||
s.reader = reader
|
||||
channelCtl = reader
|
||||
go func() {
|
||||
if err := reader.Run(ctx); err != nil {
|
||||
log.Printf("[telegram] error: %v", err)
|
||||
log.Println("[telegram] reader goroutine started")
|
||||
if err := reader.Run(ctx); err != nil && ctx.Err() == nil {
|
||||
log.Printf("[telegram] reader goroutine STOPPED with error: %v", err)
|
||||
} else {
|
||||
log.Println("[telegram] reader goroutine exited")
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
@@ -99,8 +102,11 @@ func (s *Server) Run(ctx context.Context) error {
|
||||
publicReader := NewPublicReader(s.telegramChannels, s.feed, msgLimit, 1)
|
||||
channelCtl = publicReader
|
||||
go func() {
|
||||
log.Println("[public] reader goroutine started")
|
||||
if err := publicReader.Run(ctx); err != nil && ctx.Err() == nil {
|
||||
log.Printf("[public] error: %v", err)
|
||||
log.Printf("[public] reader goroutine STOPPED with error: %v", err)
|
||||
} else {
|
||||
log.Println("[public] reader goroutine exited")
|
||||
}
|
||||
}()
|
||||
log.Println("[server] running without Telegram login; fetching public channels via t.me")
|
||||
@@ -114,8 +120,11 @@ func (s *Server) Run(ctx context.Context) error {
|
||||
}
|
||||
xReader = NewXPublicReader(s.xAccounts, s.feed, msgLimit, len(s.telegramChannels)+1, s.cfg.XRSSInstances)
|
||||
go func() {
|
||||
log.Println("[x] reader goroutine started")
|
||||
if err := xReader.Run(ctx); err != nil && ctx.Err() == nil {
|
||||
log.Printf("[x] error: %v", err)
|
||||
log.Printf("[x] reader goroutine STOPPED with error: %v", err)
|
||||
} else {
|
||||
log.Println("[x] reader goroutine exited")
|
||||
}
|
||||
}()
|
||||
log.Printf("[server] enabled X source for %d accounts", len(s.xAccounts))
|
||||
|
||||
@@ -218,6 +218,9 @@ func (tr *TelegramReader) authenticate(ctx context.Context, client *telegram.Cli
|
||||
}
|
||||
|
||||
func (tr *TelegramReader) fetchAll(ctx context.Context, api *tg.Client) {
|
||||
log.Printf("[telegram] fetch cycle started for %d channels", len(tr.channels))
|
||||
start := time.Now()
|
||||
var fetched, failed int
|
||||
for i, username := range tr.channels {
|
||||
chNum := tr.baseCh + i
|
||||
|
||||
@@ -232,7 +235,8 @@ func (tr *TelegramReader) fetchAll(ctx context.Context, api *tg.Client) {
|
||||
// Resolve peer to get chat type info
|
||||
rp, err := tr.resolvePeer(ctx, api, username)
|
||||
if err != nil {
|
||||
log.Printf("[telegram] fetch %s: %v", username, err)
|
||||
log.Printf("[telegram] fetch %s: resolve peer failed: %v", username, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -241,14 +245,16 @@ func (tr *TelegramReader) fetchAll(ctx context.Context, api *tg.Client) {
|
||||
Limit: tr.msgLimit,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[telegram] fetch %s: get history: %v", username, err)
|
||||
log.Printf("[telegram] fetch %s: get history failed: %v", username, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
|
||||
userNames := buildUserMap(hist)
|
||||
msgs, err := tr.extractMessages(hist, rp.chatType, userNames)
|
||||
if err != nil {
|
||||
log.Printf("[telegram] fetch %s: %v", username, err)
|
||||
log.Printf("[telegram] fetch %s: extract messages failed: %v", username, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -260,8 +266,10 @@ func (tr *TelegramReader) fetchAll(ctx context.Context, api *tg.Client) {
|
||||
// Update feed with messages and chat type info
|
||||
tr.feed.UpdateChannel(chNum, msgs)
|
||||
tr.feed.SetChatInfo(chNum, rp.chatType, rp.canSend)
|
||||
fetched++
|
||||
log.Printf("[telegram] updated %s: %d messages (type=%d, canSend=%v)", username, len(msgs), rp.chatType, rp.canSend)
|
||||
}
|
||||
log.Printf("[telegram] fetch cycle done in %s: %d fetched, %d failed, %d total", time.Since(start).Round(time.Millisecond), fetched, failed, len(tr.channels))
|
||||
}
|
||||
|
||||
// resolvePeer resolves a Telegram username to an InputPeer, handling channels,
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -21,6 +22,7 @@ func startLatestVersionTracker(ctx context.Context, feed *Feed) {
|
||||
update := func() {
|
||||
v, err := fetchLatestReleaseVersion(ctx)
|
||||
if err != nil {
|
||||
log.Printf("[version] check latest release failed: %v", err)
|
||||
return
|
||||
}
|
||||
feed.SetLatestVersion(v)
|
||||
|
||||
@@ -158,6 +158,10 @@ func (xr *XPublicReader) SetBaseCh(baseCh int) {
|
||||
}
|
||||
|
||||
func (xr *XPublicReader) fetchAll(ctx context.Context) {
|
||||
log.Printf("[x] fetch cycle started for %d accounts (instances: %v)", len(xr.accounts), xr.instances)
|
||||
start := time.Now()
|
||||
var fetched, failed int
|
||||
|
||||
xr.mu.RLock()
|
||||
baseCh := xr.baseCh
|
||||
xr.mu.RUnlock()
|
||||
@@ -180,7 +184,8 @@ func (xr *XPublicReader) fetchAll(ctx context.Context) {
|
||||
|
||||
msgs, err := xr.fetchAccount(ctx, account)
|
||||
if err != nil {
|
||||
log.Printf("[x] fetch %s: %v", account, err)
|
||||
log.Printf("[x] fetch @%s: all instances failed: %v", account, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -196,8 +201,10 @@ func (xr *XPublicReader) fetchAll(ctx context.Context) {
|
||||
xr.mu.Unlock()
|
||||
|
||||
xr.feed.UpdateChannel(chNum, msgs)
|
||||
fetched++
|
||||
log.Printf("[x] updated @%s: %d posts", account, len(msgs))
|
||||
}
|
||||
log.Printf("[x] fetch cycle done in %s: %d fetched, %d failed, %d total", time.Since(start).Round(time.Millisecond), fetched, failed, len(xr.accounts))
|
||||
}
|
||||
|
||||
func (xr *XPublicReader) fetchAccount(ctx context.Context, username string) ([]protocol.Message, error) {
|
||||
@@ -206,6 +213,7 @@ func (xr *XPublicReader) fetchAccount(ctx context.Context, username string) ([]p
|
||||
u := strings.TrimSuffix(instance, "/") + "/" + url.PathEscape(username) + "/rss"
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
||||
if err != nil {
|
||||
log.Printf("[x] @%s: instance %s: request build error: %v", username, instance, err)
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
@@ -214,6 +222,7 @@ func (xr *XPublicReader) fetchAccount(ctx context.Context, username string) ([]p
|
||||
|
||||
resp, err := xr.client.Do(req)
|
||||
if err != nil {
|
||||
log.Printf("[x] @%s: instance %s: network error: %v", username, instance, err)
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
@@ -222,16 +231,19 @@ func (xr *XPublicReader) fetchAccount(ctx context.Context, username string) ([]p
|
||||
log.Printf("[x] close response body: %v", cerr)
|
||||
}
|
||||
if readErr != nil {
|
||||
log.Printf("[x] @%s: instance %s: body read error: %v", username, instance, readErr)
|
||||
lastErr = readErr
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
log.Printf("[x] @%s: instance %s: HTTP %s", username, instance, resp.Status)
|
||||
lastErr = fmt.Errorf("%s: unexpected HTTP status %s", instance, resp.Status)
|
||||
continue
|
||||
}
|
||||
|
||||
msgs, err := parseXRSSMessages(body, username)
|
||||
if err != nil {
|
||||
log.Printf("[x] @%s: instance %s: parse error: %v", username, instance, err)
|
||||
lastErr = fmt.Errorf("%s: %w", instance, err)
|
||||
continue
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user