Files
2026-05-01 21:42:37 +03:30

760 lines
20 KiB
Go

package server
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/sartoopjj/thefeed/internal/protocol"
)
// githubAPI is the canonical REST endpoint. Tests can override it.
var githubAPI = "https://api.github.com"
const flushBatchLimit = 100
// GitHubRelay uploads encrypted media to a GitHub repo. Domain and object
// names are HMAC'd; blobs are AES-256-GCM. Uploads are batched into one
// Git Data API commit per flush.
type GitHubRelay struct {
cfg GitHubRelayConfig
passphrase string
domain string
relayKey [protocol.KeySize]byte
branch string
client *http.Client
mu sync.Mutex
known map[string]*ghEntry
pending map[string]*pendingUpload
statePath string
dirty bool
// commitMu serialises ref-advancing operations so concurrent flushes
// don't race on updateRef.
commitMu sync.Mutex
}
type ghEntry struct {
size int64
crc uint32
lastSeen time.Time
}
type pendingUpload struct {
blob []byte
size int64
crc uint32
}
// NewGitHubRelay returns nil when the config is incomplete.
func NewGitHubRelay(cfg GitHubRelayConfig, domain, passphrase string) *GitHubRelay {
if !cfg.Active() || domain == "" || passphrase == "" {
return nil
}
relayKey, err := protocol.DeriveRelayKey(passphrase)
if err != nil {
return nil
}
branch := cfg.Branch
if branch == "" {
branch = "main"
}
r := &GitHubRelay{
cfg: cfg,
passphrase: passphrase,
domain: protocol.RelayDomainSegment(domain, passphrase),
relayKey: relayKey,
branch: branch,
client: &http.Client{Timeout: 2 * time.Minute},
known: make(map[string]*ghEntry),
pending: make(map[string]*pendingUpload),
statePath: cfg.StatePath,
}
if r.statePath != "" {
if err := r.loadState(); err != nil {
log.Printf("[gh-relay] load state %s: %v", r.statePath, err)
}
}
return r
}
type persistedEntry struct {
Size int64 `json:"size"`
CRC uint32 `json:"crc"`
LastSeen time.Time `json:"lastSeen"`
}
func (g *GitHubRelay) loadState() error {
f, err := os.Open(g.statePath)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer f.Close()
var raw map[string]persistedEntry
if err := json.NewDecoder(f).Decode(&raw); err != nil {
return err
}
g.mu.Lock()
defer g.mu.Unlock()
for k, v := range raw {
g.known[k] = &ghEntry{size: v.Size, crc: v.CRC, lastSeen: v.LastSeen}
}
log.Printf("[gh-relay] loaded %d entries from %s", len(raw), g.statePath)
return nil
}
// saveStateLocked writes `known` to disk via a tmp+rename so a crash mid-write
// doesn't leave a truncated file. Caller must hold g.mu.
func (g *GitHubRelay) saveStateLocked() error {
if g.statePath == "" {
return nil
}
out := make(map[string]persistedEntry, len(g.known))
for k, e := range g.known {
out[k] = persistedEntry{Size: e.size, CRC: e.crc, LastSeen: e.lastSeen}
}
dir := filepath.Dir(g.statePath)
if err := os.MkdirAll(dir, 0o700); err != nil {
return err
}
tmp, err := os.CreateTemp(dir, "gh-relay-*.json")
if err != nil {
return err
}
enc := json.NewEncoder(tmp)
enc.SetIndent("", " ")
if err := enc.Encode(out); err != nil {
tmp.Close()
os.Remove(tmp.Name())
return err
}
if err := tmp.Close(); err != nil {
os.Remove(tmp.Name())
return err
}
g.dirty = false
return os.Rename(tmp.Name(), g.statePath)
}
// Repo returns the configured "owner/repo" so the discovery channel can
// expose it to clients without leaking the token.
func (g *GitHubRelay) Repo() string {
if g == nil {
return ""
}
return g.cfg.Repo
}
// MaxBytes is the per-file cap. 0 means no cap.
func (g *GitHubRelay) MaxBytes() int64 {
if g == nil {
return 0
}
return g.cfg.MaxBytes
}
// TTL returns the configured object lifetime.
func (g *GitHubRelay) TTL() time.Duration {
if g == nil {
return 0
}
return time.Duration(g.cfg.TTLMinutes) * time.Minute
}
// Domain is the HMAC'd path segment used inside the relay repo.
func (g *GitHubRelay) Domain() string {
if g == nil {
return ""
}
return g.domain
}
// Upload encrypts body and queues it for the next batched commit.
// ErrTooLarge if body exceeds the configured cap.
func (g *GitHubRelay) Upload(ctx context.Context, body []byte) error {
if g == nil {
return errors.New("github relay disabled")
}
if g.cfg.MaxBytes > 0 && int64(len(body)) > g.cfg.MaxBytes {
return ErrTooLarge
}
size := int64(len(body))
crc := crc32.ChecksumIEEE(body)
key := protocol.RelayObjectName(size, crc, g.passphrase)
g.mu.Lock()
if e, ok := g.known[key]; ok {
e.lastSeen = time.Now()
g.dirty = true
g.mu.Unlock()
return nil
}
if _, ok := g.pending[key]; ok {
g.mu.Unlock()
return nil
}
g.mu.Unlock()
blob, err := protocol.EncryptRelayBlob(g.relayKey, body)
if err != nil {
return fmt.Errorf("encrypt relay blob: %w", err)
}
g.mu.Lock()
if e, ok := g.known[key]; ok {
e.lastSeen = time.Now()
g.dirty = true
g.mu.Unlock()
return nil
}
if _, ok := g.pending[key]; ok {
g.mu.Unlock()
return nil
}
g.pending[key] = &pendingUpload{blob: blob, size: size, crc: crc}
overLimit := len(g.pending) >= flushBatchLimit
g.mu.Unlock()
if overLimit {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
if err := g.flushPending(ctx); err != nil {
log.Printf("[gh-relay] limit flush: %v", err)
}
}()
}
return nil
}
// Has reports whether the file is committed or queued for the next commit.
func (g *GitHubRelay) Has(size int64, crc uint32) bool {
if g == nil {
return false
}
key := protocol.RelayObjectName(size, crc, g.passphrase)
g.mu.Lock()
defer g.mu.Unlock()
if _, ok := g.known[key]; ok {
return true
}
_, ok := g.pending[key]
return ok
}
// Touch refreshes the lastSeen timestamp without re-uploading. Used when
// upstream re-delivers a file that's already in the relay.
func (g *GitHubRelay) Touch(size int64, crc uint32) {
if g == nil {
return
}
key := protocol.RelayObjectName(size, crc, g.passphrase)
g.mu.Lock()
if e, ok := g.known[key]; ok {
e.lastSeen = time.Now()
g.dirty = true
}
g.mu.Unlock()
}
// PruneStale removes every file in `known` whose lastSeen is older than
// cutoff. Selection happens INSIDE commitMu so concurrent prunes from
// different readers can't pick the same files and race the resulting
// commits (which used to produce 422 BadObjectState).
func (g *GitHubRelay) PruneStale(ctx context.Context, cutoff time.Time) (int, error) {
if g == nil {
return 0, nil
}
g.commitMu.Lock()
defer g.commitMu.Unlock()
g.mu.Lock()
var entries []treeEntry
var keys []string
for k, e := range g.known {
if e.lastSeen.Before(cutoff) {
entries = append(entries, treeEntry{
Path: g.domain + "/" + k,
Mode: "100644",
Type: "blob",
SHA: nil,
})
keys = append(keys, k)
}
}
g.mu.Unlock()
if len(entries) == 0 {
return 0, nil
}
log.Printf("[gh-relay] starting prune of %d file(s)", len(entries))
headSHA, err := g.getRef(ctx, g.branch)
if err != nil {
return 0, fmt.Errorf("get ref: %w", err)
}
parentTree, err := g.getCommitTree(ctx, headSHA)
if err != nil {
return 0, fmt.Errorf("get commit %s: %w", headSHA, err)
}
newTree, err := g.createTree(ctx, parentTree, entries)
if err != nil {
return 0, fmt.Errorf("create tree: %w", err)
}
msg := fmt.Sprintf("prune %d", len(entries))
commitSHA, err := g.createCommit(ctx, msg, newTree, []string{headSHA})
if err != nil {
return 0, fmt.Errorf("create commit: %w", err)
}
if err := g.updateRef(ctx, g.branch, commitSHA); err != nil {
return 0, fmt.Errorf("update ref %s: %w", g.branch, err)
}
g.mu.Lock()
for _, k := range keys {
delete(g.known, k)
}
g.dirty = true
if err := g.saveStateLocked(); err != nil {
log.Printf("[gh-relay] save state after prune: %v", err)
}
g.mu.Unlock()
return len(entries), nil
}
// --- Flush loop -------------------------------------------------------------
// Run waits for shutdown and flushes any remaining pending uploads on the
// way out. Flush + prune during normal operation are driven by
// Feed.AfterFetchCycle so they line up with the natural cadence of upstream
// fetches. A best-effort backstop tick handles the case where nothing has
// fetched in a long time (e.g. all channels were skipped from cache).
func (g *GitHubRelay) Run(ctx context.Context) {
if g == nil {
return
}
tick := time.NewTicker(10 * time.Minute)
defer tick.Stop()
saveTick := time.NewTicker(5 * time.Minute)
defer saveTick.Stop()
for {
select {
case <-saveTick.C:
g.mu.Lock()
if g.dirty && g.statePath != "" {
if err := g.saveStateLocked(); err != nil {
log.Printf("[gh-relay] periodic save: %v", err)
}
}
g.mu.Unlock()
case <-ctx.Done():
fctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
if err := g.flushPending(fctx); err != nil {
log.Printf("[gh-relay] shutdown flush: %v", err)
}
cancel()
g.mu.Lock()
if g.dirty {
if err := g.saveStateLocked(); err != nil {
log.Printf("[gh-relay] shutdown save: %v", err)
}
}
g.mu.Unlock()
return
case <-tick.C:
if g.queueSize() == 0 {
continue
}
fctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
if err := g.flushPending(fctx); err != nil {
log.Printf("[gh-relay] backstop flush: %v", err)
}
cancel()
}
}
}
func (g *GitHubRelay) queueSize() int {
g.mu.Lock()
n := len(g.pending)
g.mu.Unlock()
return n
}
// Flush forces an immediate commit of any pending uploads. Safe to call
// from tests or graceful shutdown; does nothing if the queue is empty.
func (g *GitHubRelay) Flush(ctx context.Context) error {
if g == nil {
return nil
}
return g.flushPending(ctx)
}
// flushPending drains the pending map into a single Git commit via the Git
// Data API. On any error the batch is re-queued so the next tick retries.
func (g *GitHubRelay) flushPending(ctx context.Context) error {
g.mu.Lock()
if len(g.pending) == 0 {
g.mu.Unlock()
return nil
}
batch := g.pending
g.pending = make(map[string]*pendingUpload)
g.mu.Unlock()
if err := g.commitBatch(ctx, batch); err != nil {
// Re-queue. A peer goroutine may have queued newer entries with
// the same key; prefer those.
g.mu.Lock()
for k, v := range batch {
if _, exists := g.pending[k]; !exists {
g.pending[k] = v
}
}
g.mu.Unlock()
return err
}
now := time.Now()
g.mu.Lock()
for k, p := range batch {
g.known[k] = &ghEntry{size: p.size, crc: p.crc, lastSeen: now}
}
g.dirty = true
if err := g.saveStateLocked(); err != nil {
log.Printf("[gh-relay] save state: %v", err)
}
g.mu.Unlock()
log.Printf("[gh-relay] committed %d file(s)", len(batch))
return nil
}
// treeEntry is the Git Data API tree-item shape used by both upload
// (SHA = newly-created blob) and delete (SHA = nil → entry removed from
// the resulting tree).
type treeEntry struct {
Path string `json:"path"`
Mode string `json:"mode"`
Type string `json:"type"`
SHA *string `json:"sha"` // pointer so nil serialises as JSON `null`
}
// commitBatch performs the Git Data API dance:
//
// GET ref → POST blobs → POST tree (with base_tree) → POST commit → PATCH ref.
//
// A single commit covers every file in the batch, regardless of count.
func (g *GitHubRelay) commitBatch(ctx context.Context, batch map[string]*pendingUpload) error {
if len(batch) == 0 {
return nil
}
g.commitMu.Lock()
defer g.commitMu.Unlock()
log.Printf("[gh-relay] starting upload of %d file(s)", len(batch))
headSHA, err := g.getRef(ctx, g.branch)
if err != nil {
return fmt.Errorf("get ref: %w", err)
}
parentTree, err := g.getCommitTree(ctx, headSHA)
if err != nil {
return fmt.Errorf("get commit %s: %w", headSHA, err)
}
entries := make([]treeEntry, 0, len(batch))
for objKey, p := range batch {
blobSHA, err := g.createBlob(ctx, p.blob)
if err != nil {
return fmt.Errorf("create blob %s: %w", objKey, err)
}
s := blobSHA
entries = append(entries, treeEntry{
Path: g.domain + "/" + objKey,
Mode: "100644",
Type: "blob",
SHA: &s,
})
}
newTree, err := g.createTree(ctx, parentTree, entries)
if err != nil {
return fmt.Errorf("create tree: %w", err)
}
msg := fmt.Sprintf("upload %d", len(batch))
commitSHA, err := g.createCommit(ctx, msg, newTree, []string{headSHA})
if err != nil {
return fmt.Errorf("create commit: %w", err)
}
if err := g.updateRef(ctx, g.branch, commitSHA); err != nil {
return fmt.Errorf("update ref %s: %w", g.branch, err)
}
return nil
}
// --- Git Data API plumbing --------------------------------------------------
func (g *GitHubRelay) getRef(ctx context.Context, branch string) (string, error) {
req, err := g.newReq(ctx, http.MethodGet, "/repos/"+g.cfg.Repo+"/git/ref/heads/"+branch, nil)
if err != nil {
return "", err
}
resp, err := g.client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
bodyStr := string(body)
// Detect "empty repo" by status + body message together. Don't
// trust status alone — GitHub uses 404 for missing branch,
// 409 for "Git Repository is empty.", and 409 also for other
// conflicts we don't want to silently bootstrap on top of.
if (resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusConflict) &&
strings.Contains(bodyStr, "Repository is empty") {
return g.bootstrapEmptyRepo(ctx, branch)
}
// Branch missing on a non-empty repo: caller can decide.
if resp.StatusCode == http.StatusNotFound {
return g.bootstrapEmptyRepo(ctx, branch)
}
return "", fmt.Errorf("%s — %s", resp.Status, trimErrBody(bodyStr))
}
var out struct {
Object struct {
SHA string `json:"sha"`
} `json:"object"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return "", err
}
return out.Object.SHA, nil
}
// bootstrapEmptyRepo initializes a fresh repo via the Contents API,
// which is the only endpoint that works without an existing Git ref.
// PUT'ing a single file auto-creates the branch with the initial commit;
// after that the Git Data API works normally for batched uploads.
// Returns the new commit SHA so the caller can use it as the parent.
func (g *GitHubRelay) bootstrapEmptyRepo(ctx context.Context, branch string) (string, error) {
log.Printf("[gh-relay] bootstrapping empty repo on branch %s", branch)
payload := map[string]any{
"message": "init",
"content": base64.StdEncoding.EncodeToString([]byte{'\n'}),
"branch": branch,
}
body, _ := json.Marshal(payload)
req, err := g.newReq(ctx, http.MethodPut, "/repos/"+g.cfg.Repo+"/contents/.gitkeep", bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := g.client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
raw, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("bootstrap put: %s — %s", resp.Status, string(raw))
}
var out struct {
Commit struct {
SHA string `json:"sha"`
} `json:"commit"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return "", fmt.Errorf("bootstrap decode: %w", err)
}
if out.Commit.SHA == "" {
return "", errors.New("bootstrap: no commit SHA in response")
}
return out.Commit.SHA, nil
}
func (g *GitHubRelay) getCommitTree(ctx context.Context, commitSHA string) (string, error) {
req, err := g.newReq(ctx, http.MethodGet, "/repos/"+g.cfg.Repo+"/git/commits/"+commitSHA, nil)
if err != nil {
return "", err
}
resp, err := g.client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return "", fmt.Errorf("%s — %s", resp.Status, ghErrorBody(resp))
}
var out struct {
Tree struct {
SHA string `json:"sha"`
} `json:"tree"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return "", err
}
return out.Tree.SHA, nil
}
func (g *GitHubRelay) createBlob(ctx context.Context, content []byte) (string, error) {
body, _ := json.Marshal(map[string]any{
"encoding": "base64",
"content": base64.StdEncoding.EncodeToString(content),
})
req, err := g.newReq(ctx, http.MethodPost, "/repos/"+g.cfg.Repo+"/git/blobs", bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := g.client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return "", fmt.Errorf("%s — %s", resp.Status, ghErrorBody(resp))
}
var out struct {
SHA string `json:"sha"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return "", err
}
return out.SHA, nil
}
func (g *GitHubRelay) createTree(ctx context.Context, baseTree string, entries any) (string, error) {
payload := map[string]any{"tree": entries}
if baseTree != "" {
payload["base_tree"] = baseTree
}
body, _ := json.Marshal(payload)
req, err := g.newReq(ctx, http.MethodPost, "/repos/"+g.cfg.Repo+"/git/trees", bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := g.client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return "", fmt.Errorf("%s — %s", resp.Status, ghErrorBody(resp))
}
var out struct {
SHA string `json:"sha"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return "", err
}
return out.SHA, nil
}
func (g *GitHubRelay) createCommit(ctx context.Context, message, treeSHA string, parents []string) (string, error) {
if parents == nil {
parents = []string{}
}
body, _ := json.Marshal(map[string]any{
"message": message,
"tree": treeSHA,
"parents": parents,
})
req, err := g.newReq(ctx, http.MethodPost, "/repos/"+g.cfg.Repo+"/git/commits", bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := g.client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return "", fmt.Errorf("%s — %s", resp.Status, ghErrorBody(resp))
}
var out struct {
SHA string `json:"sha"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return "", err
}
return out.SHA, nil
}
func (g *GitHubRelay) updateRef(ctx context.Context, branch, commitSHA string) error {
body, _ := json.Marshal(map[string]any{
"sha": commitSHA,
"force": false,
})
req, err := g.newReq(ctx, http.MethodPatch, "/repos/"+g.cfg.Repo+"/git/refs/heads/"+branch, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := g.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("%s — %s", resp.Status, ghErrorBody(resp))
}
return nil
}
// --- HTTP plumbing ----------------------------------------------------------
// ghErrorBody reads a short, log-safe error body from a non-2xx GitHub
// response. GitHub's 5xx pages ("Unicorn") are full HTML documents — we
// don't want them in the log. Truncate to 200 chars and replace HTML
// blobs with a one-line summary.
func ghErrorBody(resp *http.Response) string {
raw, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return trimErrBody(string(raw))
}
func trimErrBody(s string) string {
s = strings.TrimSpace(s)
lower := strings.ToLower(s)
if strings.HasPrefix(lower, "<!doctype") || strings.HasPrefix(lower, "<html") {
return "(HTML response — GitHub backend issue, retry later)"
}
if len(s) > 200 {
s = s[:200] + "…"
}
return s
}
func (g *GitHubRelay) newReq(ctx context.Context, method, urlPath string, body io.Reader) (*http.Request, error) {
full := strings.TrimRight(githubAPI, "/") + urlPath
req, err := http.NewRequestWithContext(ctx, method, full, body)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+g.cfg.Token)
req.Header.Set("Accept", "application/vnd.github+json")
req.Header.Set("X-GitHub-Api-Version", "2022-11-28")
req.Header.Set("User-Agent", "git-client/1.0")
return req, nil
}