mirror of
https://github.com/sartoopjj/thefeed.git
synced 2026-05-18 06:24:34 +03:00
feat: add TitlesChannel (0xFFF9) for per-channel display names
- Add dedicated TitlesChannel (0xFFF9) following the VersionChannel pattern - Server encodes name→title map via EncodeTitlesData/DecodeTitlesData - Metadata wire format unchanged for backward compatibility with old clients - All three fetchers (public Telegram, MTProto, X/Nitter) extract and store display names - Client fetches TitlesChannel with a 10s deadline; falls back to channel handles gracefully on old servers - Old clients are unaffected — they never query 0xFFF9
This commit is contained in:
@@ -682,6 +682,44 @@ func (f *Fetcher) FetchLatestVersion(ctx context.Context) (string, error) {
|
||||
return protocol.DecodeVersionData(data)
|
||||
}
|
||||
|
||||
// FetchTitles fetches and decodes the channel display name map from TitlesChannel.
|
||||
// Returns an empty map (not an error) when the server does not yet have any display names
|
||||
// or when the server is an older version that does not support TitlesChannel.
|
||||
// Uses a short deadline so that old servers (which return NXDOMAIN immediately) do not
|
||||
// cause the 20-retry backoff in FetchBlock to stall the caller for ~95 seconds.
|
||||
func (f *Fetcher) FetchTitles(ctx context.Context) (map[string]string, error) {
|
||||
fetchCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
data, err := f.FetchBlock(fetchCtx, protocol.TitlesChannel, 0)
|
||||
if err != nil {
|
||||
return map[string]string{}, nil
|
||||
}
|
||||
|
||||
titles, parseErr := protocol.DecodeTitlesData(data)
|
||||
if parseErr == nil {
|
||||
return titles, nil
|
||||
}
|
||||
|
||||
// Titles may span multiple blocks — concatenate and retry.
|
||||
allData := make([]byte, len(data))
|
||||
copy(allData, data)
|
||||
for blk := uint16(1); blk < 10; blk++ {
|
||||
if fetchCtx.Err() != nil {
|
||||
return map[string]string{}, nil
|
||||
}
|
||||
block, fetchErr := f.FetchBlock(fetchCtx, protocol.TitlesChannel, blk)
|
||||
if fetchErr != nil {
|
||||
break
|
||||
}
|
||||
allData = append(allData, block...)
|
||||
if titles, parseErr = protocol.DecodeTitlesData(allData); parseErr == nil {
|
||||
return titles, nil
|
||||
}
|
||||
}
|
||||
return map[string]string{}, nil
|
||||
}
|
||||
|
||||
// ErrContentHashMismatch is returned when the fetched messages do not match
|
||||
// the expected content hash from metadata. This typically means the server
|
||||
// regenerated its blocks between the metadata fetch and the block fetch
|
||||
|
||||
@@ -32,6 +32,9 @@ const (
|
||||
// VersionChannel serves latest release version with random suffix.
|
||||
VersionChannel uint16 = 0xFFFA
|
||||
|
||||
// TitlesChannel serves per-channel human-readable display names.
|
||||
TitlesChannel uint16 = 0xFFF9
|
||||
|
||||
// MaxUpstreamBlockPayload keeps uploaded query chunks comfortably below DNS
|
||||
// name limits across typical domains and resolver paths.
|
||||
MaxUpstreamBlockPayload = 8
|
||||
|
||||
@@ -105,16 +105,12 @@ func ContentHashOf(msgs []Message) uint32 {
|
||||
|
||||
// SerializeMetadata encodes metadata into bytes for channel 0 blocks.
|
||||
// Format: marker(3) + timestamp(4) + nextFetch(4) + flags(1) + channelCount(2) + per-channel data
|
||||
// Per-channel: nameLen(1) + name + blocks(2) + lastMsgID(4) + contentHash(4) + chatType(1) + flags(1) + displayNameLen(1) + displayName
|
||||
// Per-channel: nameLen(1) + name + blocks(2) + lastMsgID(4) + contentHash(4) + chatType(1) + flags(1)
|
||||
func SerializeMetadata(m *Metadata) []byte {
|
||||
// 3 marker + 4 timestamp + 4 nextFetch + 1 flags + 2 channel count + per-channel data
|
||||
size := MarkerSize + 4 + 4 + 1 + 2
|
||||
for _, ch := range m.Channels {
|
||||
dn := ch.DisplayName
|
||||
if len(dn) > 255 {
|
||||
dn = dn[:255]
|
||||
}
|
||||
size += 1 + len(ch.Name) + 2 + 4 + 4 + 1 + 1 + 1 + len(dn)
|
||||
size += 1 + len(ch.Name) + 2 + 4 + 4 + 1 + 1
|
||||
}
|
||||
buf := make([]byte, size)
|
||||
off := 0
|
||||
@@ -161,14 +157,6 @@ func SerializeMetadata(m *Metadata) []byte {
|
||||
}
|
||||
buf[off] = chFlags
|
||||
off++
|
||||
dnBytes := []byte(ch.DisplayName)
|
||||
if len(dnBytes) > 255 {
|
||||
dnBytes = dnBytes[:255]
|
||||
}
|
||||
buf[off] = byte(len(dnBytes))
|
||||
off++
|
||||
copy(buf[off:], dnBytes)
|
||||
off += len(dnBytes)
|
||||
}
|
||||
|
||||
return buf
|
||||
@@ -226,19 +214,8 @@ func ParseMetadata(data []byte) (*Metadata, error) {
|
||||
chFlags := data[off]
|
||||
off++
|
||||
|
||||
var displayName string
|
||||
if off < len(data) {
|
||||
dnLen := int(data[off])
|
||||
off++
|
||||
if off+dnLen <= len(data) {
|
||||
displayName = string(data[off : off+dnLen])
|
||||
off += dnLen
|
||||
}
|
||||
}
|
||||
|
||||
m.Channels = append(m.Channels, ChannelInfo{
|
||||
Name: name,
|
||||
DisplayName: displayName,
|
||||
Blocks: blocks,
|
||||
LastMsgID: lastID,
|
||||
ContentHash: contentHash,
|
||||
@@ -409,6 +386,79 @@ func CompressMessages(data []byte) []byte {
|
||||
return append([]byte{compressionDeflate}, compressed...)
|
||||
}
|
||||
|
||||
// EncodeTitlesData encodes a name→title map into bytes for TitlesChannel blocks.
|
||||
// Format: count(2) + [nameLen(1)+name+titleLen(1)+title]*count
|
||||
func EncodeTitlesData(titles map[string]string) []byte {
|
||||
size := 2
|
||||
for name, title := range titles {
|
||||
n := name
|
||||
if len(n) > 255 {
|
||||
n = n[:255]
|
||||
}
|
||||
t := title
|
||||
if len([]byte(t)) > 255 {
|
||||
t = string([]byte(t)[:255])
|
||||
}
|
||||
size += 1 + len(n) + 1 + len([]byte(t))
|
||||
}
|
||||
buf := make([]byte, size)
|
||||
binary.BigEndian.PutUint16(buf, uint16(len(titles)))
|
||||
off := 2
|
||||
for name, title := range titles {
|
||||
nb := []byte(name)
|
||||
if len(nb) > 255 {
|
||||
nb = nb[:255]
|
||||
}
|
||||
tb := []byte(title)
|
||||
if len(tb) > 255 {
|
||||
tb = tb[:255]
|
||||
}
|
||||
buf[off] = byte(len(nb))
|
||||
off++
|
||||
copy(buf[off:], nb)
|
||||
off += len(nb)
|
||||
buf[off] = byte(len(tb))
|
||||
off++
|
||||
copy(buf[off:], tb)
|
||||
off += len(tb)
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
// DecodeTitlesData decodes a name→title map from bytes produced by EncodeTitlesData.
|
||||
func DecodeTitlesData(data []byte) (map[string]string, error) {
|
||||
if len(data) < 2 {
|
||||
return nil, fmt.Errorf("titles data too short: %d bytes", len(data))
|
||||
}
|
||||
count := int(binary.BigEndian.Uint16(data))
|
||||
titles := make(map[string]string, count)
|
||||
off := 2
|
||||
for i := 0; i < count; i++ {
|
||||
if off >= len(data) {
|
||||
return nil, fmt.Errorf("truncated titles data at entry %d", i)
|
||||
}
|
||||
nameLen := int(data[off])
|
||||
off++
|
||||
if off+nameLen > len(data) {
|
||||
return nil, fmt.Errorf("truncated title name at entry %d", i)
|
||||
}
|
||||
name := string(data[off : off+nameLen])
|
||||
off += nameLen
|
||||
if off >= len(data) {
|
||||
return nil, fmt.Errorf("truncated titles data at title %d", i)
|
||||
}
|
||||
titleLen := int(data[off])
|
||||
off++
|
||||
if off+titleLen > len(data) {
|
||||
return nil, fmt.Errorf("truncated title value at entry %d", i)
|
||||
}
|
||||
title := string(data[off : off+titleLen])
|
||||
off += titleLen
|
||||
titles[name] = title
|
||||
}
|
||||
return titles, nil
|
||||
}
|
||||
|
||||
// DecompressMessages decompresses data produced by CompressMessages.
|
||||
// Reads the 1-byte header to determine the compression type.
|
||||
func DecompressMessages(data []byte) ([]byte, error) {
|
||||
|
||||
+31
-2
@@ -22,6 +22,7 @@ type Feed struct {
|
||||
canSend map[int]bool
|
||||
metaBlocks [][]byte // metadata for all channels
|
||||
versionBlocks [][]byte // channel for latest server-known release version
|
||||
titlesBlocks [][]byte // channel for per-channel display names
|
||||
updated time.Time
|
||||
telegramLoggedIn bool
|
||||
nextFetch uint32
|
||||
@@ -42,6 +43,7 @@ func NewFeed(channels []string) *Feed {
|
||||
f.rotateMarker()
|
||||
f.rebuildMetaBlocks()
|
||||
f.rebuildVersionBlocks()
|
||||
f.rebuildTitlesBlocks()
|
||||
return f
|
||||
}
|
||||
|
||||
@@ -83,6 +85,9 @@ func (f *Feed) GetBlock(channel, block int) ([]byte, error) {
|
||||
if channel == int(protocol.VersionChannel) {
|
||||
return f.getVersionBlock(block)
|
||||
}
|
||||
if channel == int(protocol.TitlesChannel) {
|
||||
return f.getTitlesBlock(block)
|
||||
}
|
||||
|
||||
ch, ok := f.blocks[channel]
|
||||
if !ok {
|
||||
@@ -137,7 +142,6 @@ func (f *Feed) rebuildMetaBlocks() {
|
||||
}
|
||||
meta.Channels = append(meta.Channels, protocol.ChannelInfo{
|
||||
Name: name,
|
||||
DisplayName: f.displayNames[chNum],
|
||||
Blocks: blockCount,
|
||||
LastMsgID: f.lastIDs[chNum],
|
||||
ContentHash: f.contentHashes[chNum],
|
||||
@@ -149,6 +153,31 @@ func (f *Feed) rebuildMetaBlocks() {
|
||||
f.metaBlocks = protocol.SplitIntoBlocks(protocol.SerializeMetadata(&meta))
|
||||
}
|
||||
|
||||
func (f *Feed) getTitlesBlock(block int) ([]byte, error) {
|
||||
blocks := f.titlesBlocks
|
||||
if len(blocks) == 0 {
|
||||
f.rebuildTitlesBlocks()
|
||||
blocks = f.titlesBlocks
|
||||
}
|
||||
if block < 0 || block >= len(blocks) {
|
||||
return nil, fmt.Errorf("titles block %d out of range (%d blocks)", block, len(blocks))
|
||||
}
|
||||
return blocks[block], nil
|
||||
}
|
||||
|
||||
// rebuildTitlesBlocks re-serializes the display name map and splits it into blocks.
|
||||
// Must be called with f.mu held.
|
||||
func (f *Feed) rebuildTitlesBlocks() {
|
||||
titles := make(map[string]string, len(f.channels))
|
||||
for i, name := range f.channels {
|
||||
chNum := i + 1
|
||||
if dn := f.displayNames[chNum]; dn != "" {
|
||||
titles[name] = dn
|
||||
}
|
||||
}
|
||||
f.titlesBlocks = protocol.SplitIntoBlocks(protocol.EncodeTitlesData(titles))
|
||||
}
|
||||
|
||||
func (f *Feed) rebuildVersionBlocks() {
|
||||
block, err := protocol.EncodeVersionData(f.latestVersion)
|
||||
if err != nil {
|
||||
@@ -229,5 +258,5 @@ func (f *Feed) SetChannelDisplayName(channelNum int, displayName string) {
|
||||
return
|
||||
}
|
||||
f.displayNames[channelNum] = displayName
|
||||
f.rebuildMetaBlocks()
|
||||
f.rebuildTitlesBlocks()
|
||||
}
|
||||
|
||||
+17
-10
@@ -184,13 +184,17 @@ func (xr *XPublicReader) fetchAll(ctx context.Context) {
|
||||
continue
|
||||
}
|
||||
|
||||
msgs, err := xr.fetchAccount(ctx, account)
|
||||
msgs, title, err := xr.fetchAccount(ctx, account)
|
||||
if err != nil {
|
||||
log.Printf("[x] fetch @%s: all instances failed: %v", account, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
|
||||
if title != "" {
|
||||
xr.feed.SetChannelDisplayName(chNum, title)
|
||||
}
|
||||
|
||||
if ok && len(cached.msgs) > 0 {
|
||||
msgs = mergeMessages(cached.msgs, msgs)
|
||||
}
|
||||
@@ -209,7 +213,7 @@ func (xr *XPublicReader) fetchAll(ctx context.Context) {
|
||||
log.Printf("[x] fetch cycle done in %s: %d fetched, %d failed, %d total", time.Since(start).Round(time.Millisecond), fetched, failed, len(xr.accounts))
|
||||
}
|
||||
|
||||
func (xr *XPublicReader) fetchAccount(ctx context.Context, username string) ([]protocol.Message, error) {
|
||||
func (xr *XPublicReader) fetchAccount(ctx context.Context, username string) ([]protocol.Message, string, error) {
|
||||
var lastErr error
|
||||
for _, instance := range xr.instances {
|
||||
u := strings.TrimSuffix(instance, "/") + "/" + url.PathEscape(username) + "/rss"
|
||||
@@ -243,7 +247,7 @@ func (xr *XPublicReader) fetchAccount(ctx context.Context, username string) ([]p
|
||||
continue
|
||||
}
|
||||
|
||||
msgs, err := parseXRSSMessages(body, username)
|
||||
msgs, title, err := parseXRSSMessages(body, username)
|
||||
if err != nil {
|
||||
log.Printf("[x] @%s: instance %s: parse error: %v", username, instance, err)
|
||||
lastErr = fmt.Errorf("%s: %w", instance, err)
|
||||
@@ -262,16 +266,17 @@ func (xr *XPublicReader) fetchAccount(ctx context.Context, username string) ([]p
|
||||
lastErr = fmt.Errorf("%s: all %d messages were garbled", instance, len(msgs))
|
||||
continue
|
||||
}
|
||||
return cleaned, nil
|
||||
return cleaned, title, nil
|
||||
}
|
||||
if lastErr == nil {
|
||||
lastErr = fmt.Errorf("no Nitter instances configured")
|
||||
}
|
||||
return nil, lastErr
|
||||
return nil, "", lastErr
|
||||
}
|
||||
|
||||
type xRSS struct {
|
||||
Channel struct {
|
||||
Title string `xml:"title"`
|
||||
Items []xRSSItem `xml:"item"`
|
||||
} `xml:"channel"`
|
||||
}
|
||||
@@ -285,16 +290,18 @@ type xRSSItem struct {
|
||||
PubDate string `xml:"pubDate"`
|
||||
}
|
||||
|
||||
func parseXRSSMessages(body []byte, feedUser string) ([]protocol.Message, error) {
|
||||
func parseXRSSMessages(body []byte, feedUser string) ([]protocol.Message, string, error) {
|
||||
body = sanitizeUTF8(body)
|
||||
var feed xRSS
|
||||
if err := xml.Unmarshal(body, &feed); err != nil {
|
||||
return nil, fmt.Errorf("parse rss: %w", err)
|
||||
return nil, "", fmt.Errorf("parse rss: %w", err)
|
||||
}
|
||||
if len(feed.Channel.Items) == 0 {
|
||||
return nil, fmt.Errorf("empty rss feed")
|
||||
return nil, "", fmt.Errorf("empty rss feed")
|
||||
}
|
||||
|
||||
title := strings.TrimSpace(feed.Channel.Title)
|
||||
|
||||
feedUserLower := strings.ToLower(strings.TrimPrefix(feedUser, "@"))
|
||||
msgs := make([]protocol.Message, 0, len(feed.Channel.Items))
|
||||
for _, item := range feed.Channel.Items {
|
||||
@@ -325,9 +332,9 @@ func parseXRSSMessages(body []byte, feedUser string) ([]protocol.Message, error)
|
||||
msgs = append(msgs, protocol.Message{ID: id, Timestamp: ts, Text: text})
|
||||
}
|
||||
if len(msgs) == 0 {
|
||||
return nil, fmt.Errorf("no parseable posts")
|
||||
return nil, "", fmt.Errorf("no parseable posts")
|
||||
}
|
||||
return msgs, nil
|
||||
return msgs, title, nil
|
||||
}
|
||||
|
||||
// extractLinkUsername extracts the username from a Nitter/X status URL.
|
||||
|
||||
@@ -24,7 +24,7 @@ func TestParseXRSSMessages(t *testing.T) {
|
||||
</item>
|
||||
</channel></rss>`)
|
||||
|
||||
msgs, err := parseXRSSMessages(body, "test")
|
||||
msgs, _, err := parseXRSSMessages(body, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("parseXRSSMessages: %v", err)
|
||||
}
|
||||
@@ -51,7 +51,7 @@ func TestParseXRSSMessages_MediaOnlyFallback(t *testing.T) {
|
||||
</item>
|
||||
</channel></rss>`)
|
||||
|
||||
msgs, err := parseXRSSMessages(body, "test")
|
||||
msgs, _, err := parseXRSSMessages(body, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("parseXRSSMessages: %v", err)
|
||||
}
|
||||
@@ -75,7 +75,7 @@ func TestParseXRSSMessages_AlternateIDFormat(t *testing.T) {
|
||||
</item>
|
||||
</channel></rss>`)
|
||||
|
||||
msgs, err := parseXRSSMessages(body, "test")
|
||||
msgs, _, err := parseXRSSMessages(body, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("parseXRSSMessages: %v", err)
|
||||
}
|
||||
@@ -161,7 +161,7 @@ func TestParseXRSSMessages_Retweet(t *testing.T) {
|
||||
</item>
|
||||
</channel></rss>`)
|
||||
|
||||
msgs, err := parseXRSSMessages(body, "myaccount")
|
||||
msgs, _, err := parseXRSSMessages(body, "myaccount")
|
||||
if err != nil {
|
||||
t.Fatalf("parseXRSSMessages: %v", err)
|
||||
}
|
||||
@@ -186,7 +186,7 @@ func TestParseXRSSMessages_RetweetByFormat(t *testing.T) {
|
||||
</item>
|
||||
</channel></rss>`)
|
||||
|
||||
msgs, err := parseXRSSMessages(body, "myaccount")
|
||||
msgs, _, err := parseXRSSMessages(body, "myaccount")
|
||||
if err != nil {
|
||||
t.Fatalf("parseXRSSMessages: %v", err)
|
||||
}
|
||||
@@ -211,7 +211,7 @@ func TestParseXRSSMessages_QuoteTweet(t *testing.T) {
|
||||
</item>
|
||||
</channel></rss>`)
|
||||
|
||||
msgs, err := parseXRSSMessages(body, "account")
|
||||
msgs, _, err := parseXRSSMessages(body, "account")
|
||||
if err != nil {
|
||||
t.Fatalf("parseXRSSMessages: %v", err)
|
||||
}
|
||||
@@ -241,7 +241,7 @@ func TestParseXRSSMessages_PureRetweet(t *testing.T) {
|
||||
</item>
|
||||
</channel></rss>`)
|
||||
|
||||
msgs, err := parseXRSSMessages(body, "RezaVaisi")
|
||||
msgs, _, err := parseXRSSMessages(body, "RezaVaisi")
|
||||
if err != nil {
|
||||
t.Fatalf("parseXRSSMessages: %v", err)
|
||||
}
|
||||
|
||||
+18
-2
@@ -999,8 +999,16 @@ func (s *Server) refreshMetadataOnly() {
|
||||
return
|
||||
}
|
||||
|
||||
titles, _ := fetcher.FetchTitles(ctx)
|
||||
channels := meta.Channels
|
||||
for i := range channels {
|
||||
if t, ok := titles[channels[i].Name]; ok && t != "" {
|
||||
channels[i].DisplayName = t
|
||||
}
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.channels = meta.Channels
|
||||
s.channels = channels
|
||||
s.telegramLoggedIn = meta.TelegramLoggedIn
|
||||
s.nextFetch = meta.NextFetch
|
||||
s.metaFetchedAt = time.Now()
|
||||
@@ -1088,8 +1096,16 @@ func (s *Server) refreshChannel(channelNum int) {
|
||||
}
|
||||
return
|
||||
}
|
||||
titles, _ := fetcher.FetchTitles(ctx)
|
||||
channels := meta.Channels
|
||||
for i := range channels {
|
||||
if t, ok := titles[channels[i].Name]; ok && t != "" {
|
||||
channels[i].DisplayName = t
|
||||
}
|
||||
}
|
||||
meta.Channels = channels
|
||||
s.mu.Lock()
|
||||
s.channels = meta.Channels
|
||||
s.channels = channels
|
||||
s.telegramLoggedIn = meta.TelegramLoggedIn
|
||||
s.nextFetch = meta.NextFetch
|
||||
s.metaFetchedAt = time.Now()
|
||||
|
||||
Reference in New Issue
Block a user