mirror of
https://github.com/sartoopjj/thefeed.git
synced 2026-05-18 06:34:36 +03:00
feat: lazy background title fetch with disk cache and backoff
- Persist display names in per-channel JSON cache files (Name + DisplayName fields on cachedChannel). GetAllTitles reads all ch_*.json files; PutTitle updates a channel file in-place without losing messages. - Replace blocking FetchTitles calls in fetchMeta and refreshChannel with an instant disk read (GetAllTitles) applied before the SSE broadcast, so channels appear with cached titles immediately on every load. - ensureTitlesFetched runs in a single background goroutine (titlesMu + titlesLoading guard prevents duplicates). On success it persists titles and pushes an SSE update. On error or empty response it backs off for 5 minutes so an old server does not cause endless retries. - Block 0 of TitlesChannel now carries a uint16 total-block-count prefix (added in rebuildTitlesBlocks). FetchTitles reads the count from block 0 and fetches all remaining blocks in parallel instead of sequentially. - FetchTitles timeout raised from 10 s to 1 minute.
This commit is contained in:
@@ -54,8 +54,10 @@ type Cache struct {
|
||||
}
|
||||
|
||||
type cachedChannel struct {
|
||||
Messages []protocol.Message `json:"messages"`
|
||||
FetchedAt int64 `json:"fetched_at"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Messages []protocol.Message `json:"messages"`
|
||||
FetchedAt int64 `json:"fetched_at"`
|
||||
DisplayName string `json:"display_name,omitempty"`
|
||||
}
|
||||
|
||||
type cachedMeta struct {
|
||||
@@ -159,6 +161,55 @@ func (c *Cache) PutMetadata(meta *protocol.Metadata) error {
|
||||
return os.WriteFile(filepath.Join(c.dir, "metadata.json"), data, 0600)
|
||||
}
|
||||
|
||||
// GetAllTitles reads the display name from every ch_*.json cache file and returns
|
||||
// a map of original channel name → display name. Files without a stored name or
|
||||
// display name are skipped silently.
|
||||
func (c *Cache) GetAllTitles() map[string]string {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
entries, err := os.ReadDir(c.dir)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
titles := make(map[string]string)
|
||||
for _, e := range entries {
|
||||
if e.IsDir() || !strings.HasPrefix(e.Name(), "ch_") || !strings.HasSuffix(e.Name(), ".json") {
|
||||
continue
|
||||
}
|
||||
data, err := os.ReadFile(filepath.Join(c.dir, e.Name()))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var cc cachedChannel
|
||||
if json.Unmarshal(data, &cc) != nil || cc.Name == "" || cc.DisplayName == "" {
|
||||
continue
|
||||
}
|
||||
titles[cc.Name] = cc.DisplayName
|
||||
}
|
||||
return titles
|
||||
}
|
||||
|
||||
// PutTitle persists a display name for a channel into its cache file.
|
||||
// If the file already exists it is updated in-place so that stored messages are preserved.
|
||||
func (c *Cache) PutTitle(channelName, title string) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
path := c.channelPath(channelName)
|
||||
var cc cachedChannel
|
||||
if data, err := os.ReadFile(path); err == nil {
|
||||
_ = json.Unmarshal(data, &cc)
|
||||
}
|
||||
cc.Name = channelName
|
||||
cc.DisplayName = title
|
||||
data, err := json.Marshal(cc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(path, data, 0600)
|
||||
}
|
||||
|
||||
// Cleanup removes channel cache files (ch_*.json) not modified in 7 days.
|
||||
func (c *Cache) Cleanup() error {
|
||||
c.mu.Lock()
|
||||
|
||||
+42
-23
@@ -683,41 +683,60 @@ func (f *Fetcher) FetchLatestVersion(ctx context.Context) (string, error) {
|
||||
}
|
||||
|
||||
// FetchTitles fetches and decodes the channel display name map from TitlesChannel.
|
||||
// Returns an empty map (not an error) when the server does not yet have any display names
|
||||
// or when the server is an older version that does not support TitlesChannel.
|
||||
// Uses a short deadline so that old servers (which return NXDOMAIN immediately) do not
|
||||
// cause the 20-retry backoff in FetchBlock to stall the caller for ~95 seconds.
|
||||
// Returns an empty map (not an error) when the server does not support TitlesChannel.
|
||||
// Block 0 carries a uint16 total-block count prefix; remaining blocks are fetched in
|
||||
// parallel so the overall fetch is bounded by the slowest single block, not the sum.
|
||||
func (f *Fetcher) FetchTitles(ctx context.Context) (map[string]string, error) {
|
||||
fetchCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
fetchCtx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
defer cancel()
|
||||
|
||||
data, err := f.FetchBlock(fetchCtx, protocol.TitlesChannel, 0)
|
||||
if err != nil {
|
||||
block0, err := f.FetchBlock(fetchCtx, protocol.TitlesChannel, 0)
|
||||
if err != nil || len(block0) < 2 {
|
||||
return map[string]string{}, nil
|
||||
}
|
||||
|
||||
titles, parseErr := protocol.DecodeTitlesData(data)
|
||||
if parseErr == nil {
|
||||
totalBlocks := int(binary.BigEndian.Uint16(block0))
|
||||
payload0 := block0[2:]
|
||||
|
||||
if totalBlocks <= 1 {
|
||||
titles, _ := protocol.DecodeTitlesData(payload0)
|
||||
if titles == nil {
|
||||
titles = map[string]string{}
|
||||
}
|
||||
return titles, nil
|
||||
}
|
||||
|
||||
// Titles may span multiple blocks — concatenate and retry.
|
||||
allData := make([]byte, len(data))
|
||||
copy(allData, data)
|
||||
for blk := uint16(1); blk < 10; blk++ {
|
||||
if fetchCtx.Err() != nil {
|
||||
// Fetch remaining blocks in parallel.
|
||||
type blockResult struct {
|
||||
data []byte
|
||||
err error
|
||||
}
|
||||
results := make([]blockResult, totalBlocks)
|
||||
results[0] = blockResult{data: payload0}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for blk := 1; blk < totalBlocks; blk++ {
|
||||
wg.Add(1)
|
||||
go func(blk int) {
|
||||
defer wg.Done()
|
||||
data, fetchErr := f.FetchBlock(fetchCtx, protocol.TitlesChannel, uint16(blk))
|
||||
results[blk] = blockResult{data: data, err: fetchErr}
|
||||
}(blk)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
var allData []byte
|
||||
for _, r := range results {
|
||||
if r.err != nil {
|
||||
return map[string]string{}, nil
|
||||
}
|
||||
block, fetchErr := f.FetchBlock(fetchCtx, protocol.TitlesChannel, blk)
|
||||
if fetchErr != nil {
|
||||
break
|
||||
}
|
||||
allData = append(allData, block...)
|
||||
if titles, parseErr = protocol.DecodeTitlesData(allData); parseErr == nil {
|
||||
return titles, nil
|
||||
}
|
||||
allData = append(allData, r.data...)
|
||||
}
|
||||
return map[string]string{}, nil
|
||||
titles, _ := protocol.DecodeTitlesData(allData)
|
||||
if titles == nil {
|
||||
titles = map[string]string{}
|
||||
}
|
||||
return titles, nil
|
||||
}
|
||||
|
||||
// ErrContentHashMismatch is returned when the fetched messages do not match
|
||||
|
||||
@@ -166,6 +166,8 @@ func (f *Feed) getTitlesBlock(block int) ([]byte, error) {
|
||||
}
|
||||
|
||||
// rebuildTitlesBlocks re-serializes the display name map and splits it into blocks.
|
||||
// Block 0 is prefixed with a uint16 total-block count so the client can fetch all
|
||||
// remaining blocks in parallel after reading the first one.
|
||||
// Must be called with f.mu held.
|
||||
func (f *Feed) rebuildTitlesBlocks() {
|
||||
titles := make(map[string]string, len(f.channels))
|
||||
@@ -175,7 +177,12 @@ func (f *Feed) rebuildTitlesBlocks() {
|
||||
titles[name] = dn
|
||||
}
|
||||
}
|
||||
f.titlesBlocks = protocol.SplitIntoBlocks(protocol.EncodeTitlesData(titles))
|
||||
blocks := protocol.SplitIntoBlocks(protocol.EncodeTitlesData(titles))
|
||||
if len(blocks) > 0 {
|
||||
prefix := []byte{byte(len(blocks) >> 8), byte(len(blocks))}
|
||||
blocks[0] = append(prefix, blocks[0]...)
|
||||
}
|
||||
f.titlesBlocks = blocks
|
||||
}
|
||||
|
||||
func (f *Feed) rebuildVersionBlocks() {
|
||||
|
||||
+106
-8
@@ -130,6 +130,12 @@ type Server struct {
|
||||
stopRefresh chan struct{}
|
||||
|
||||
scanner *client.ResolverScanner
|
||||
|
||||
// titlesMu guards the background title-fetch state.
|
||||
// Only one goroutine fetches titles at a time; errors impose a 5-minute backoff.
|
||||
titlesMu sync.Mutex
|
||||
titlesLoading bool
|
||||
titlesBackoffUntil time.Time
|
||||
}
|
||||
|
||||
// New creates a new web server.
|
||||
@@ -999,11 +1005,14 @@ func (s *Server) refreshMetadataOnly() {
|
||||
return
|
||||
}
|
||||
|
||||
titles, _ := fetcher.FetchTitles(ctx)
|
||||
channels := meta.Channels
|
||||
for i := range channels {
|
||||
if t, ok := titles[channels[i].Name]; ok && t != "" {
|
||||
channels[i].DisplayName = t
|
||||
if cache != nil {
|
||||
if cached := cache.GetAllTitles(); len(cached) > 0 {
|
||||
for i := range channels {
|
||||
if t := cached[channels[i].Name]; t != "" {
|
||||
channels[i].DisplayName = t
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1019,6 +1028,82 @@ func (s *Server) refreshMetadataOnly() {
|
||||
}
|
||||
|
||||
s.broadcast("event: update\ndata: \"channels\"\n\n")
|
||||
|
||||
needsFetch := false
|
||||
for _, ch := range channels {
|
||||
if ch.DisplayName == "" {
|
||||
needsFetch = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if needsFetch {
|
||||
go s.ensureTitlesFetched(basectx)
|
||||
}
|
||||
}
|
||||
|
||||
// ensureTitlesFetched fetches channel display names from TitlesChannel in the background.
|
||||
// At most one fetch runs at a time; errors impose a 5-minute backoff so that an
|
||||
// outdated server does not cause endless retries.
|
||||
func (s *Server) ensureTitlesFetched(ctx context.Context) {
|
||||
s.titlesMu.Lock()
|
||||
if s.titlesLoading || time.Now().Before(s.titlesBackoffUntil) {
|
||||
s.titlesMu.Unlock()
|
||||
return
|
||||
}
|
||||
s.titlesLoading = true
|
||||
s.titlesMu.Unlock()
|
||||
|
||||
defer func() {
|
||||
s.titlesMu.Lock()
|
||||
s.titlesLoading = false
|
||||
s.titlesMu.Unlock()
|
||||
}()
|
||||
|
||||
s.mu.RLock()
|
||||
fetcher := s.fetcher
|
||||
cache := s.cache
|
||||
s.mu.RUnlock()
|
||||
|
||||
if fetcher == nil {
|
||||
return
|
||||
}
|
||||
|
||||
titles, err := fetcher.FetchTitles(ctx)
|
||||
if err != nil && ctx.Err() == nil {
|
||||
s.titlesMu.Lock()
|
||||
s.titlesBackoffUntil = time.Now().Add(5 * time.Minute)
|
||||
s.titlesMu.Unlock()
|
||||
return
|
||||
}
|
||||
if len(titles) == 0 {
|
||||
// Server doesn't support TitlesChannel or has no titles yet; back off.
|
||||
s.titlesMu.Lock()
|
||||
s.titlesBackoffUntil = time.Now().Add(5 * time.Minute)
|
||||
s.titlesMu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if cache != nil {
|
||||
for name, title := range titles {
|
||||
_ = cache.PutTitle(name, title)
|
||||
}
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
channels := s.channels
|
||||
updated := false
|
||||
for i := range channels {
|
||||
if t, ok := titles[channels[i].Name]; ok && t != "" && channels[i].DisplayName != t {
|
||||
channels[i].DisplayName = t
|
||||
updated = true
|
||||
}
|
||||
}
|
||||
s.channels = channels
|
||||
s.mu.Unlock()
|
||||
|
||||
if updated {
|
||||
s.broadcast("event: update\ndata: \"channels\"\n\n")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) refreshChannel(channelNum int) {
|
||||
@@ -1096,11 +1181,14 @@ func (s *Server) refreshChannel(channelNum int) {
|
||||
}
|
||||
return
|
||||
}
|
||||
titles, _ := fetcher.FetchTitles(ctx)
|
||||
channels := meta.Channels
|
||||
for i := range channels {
|
||||
if t, ok := titles[channels[i].Name]; ok && t != "" {
|
||||
channels[i].DisplayName = t
|
||||
if cache != nil {
|
||||
if cached := cache.GetAllTitles(); len(cached) > 0 {
|
||||
for i := range channels {
|
||||
if t := cached[channels[i].Name]; t != "" {
|
||||
channels[i].DisplayName = t
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
meta.Channels = channels
|
||||
@@ -1114,6 +1202,16 @@ func (s *Server) refreshChannel(channelNum int) {
|
||||
_ = cache.PutMetadata(meta)
|
||||
}
|
||||
s.broadcast("event: update\ndata: \"channels\"\n\n")
|
||||
needsFetch := false
|
||||
for _, ch := range channels {
|
||||
if ch.DisplayName == "" {
|
||||
needsFetch = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if needsFetch {
|
||||
go s.ensureTitlesFetched(basectx)
|
||||
}
|
||||
}
|
||||
|
||||
channels := meta.Channels
|
||||
|
||||
Reference in New Issue
Block a user