mirror of
https://github.com/sartoopjj/thefeed.git
synced 2026-05-19 07:44:34 +03:00
refactor: update refresh handling to support multiple concurrent operations
This commit is contained in:
@@ -3155,7 +3155,7 @@
|
||||
// Show immediate feedback progress bar
|
||||
showChannelFetchProgress(num, name);
|
||||
await loadMessages(num);
|
||||
await doRefresh(true);
|
||||
await doRefresh(false);
|
||||
}
|
||||
|
||||
function showChannelFetchProgress(num, name) {
|
||||
|
||||
+28
-32
@@ -115,11 +115,11 @@ type Server struct {
|
||||
fetcherCtx context.Context
|
||||
fetcherCancel context.CancelFunc
|
||||
|
||||
// refreshMu / refreshCancel allow a new refresh to cancel an in-progress one.
|
||||
// channelFetching tracks which channels are currently being fetched.
|
||||
refreshMu sync.Mutex
|
||||
refreshCancel context.CancelFunc
|
||||
channelFetching map[int]bool // prevents duplicate fetches for same channel
|
||||
// refreshMu / refreshCancels allow a new refresh to cancel an in-progress one.
|
||||
// Each channel gets its own cancel func so concurrent channel refreshes are allowed.
|
||||
// Key 0 is reserved for metadata-only refreshes.
|
||||
refreshMu sync.Mutex
|
||||
refreshCancels map[int]context.CancelFunc
|
||||
|
||||
logMu sync.RWMutex
|
||||
logLines []string
|
||||
@@ -148,15 +148,15 @@ func New(dataDir string, port int, password string) (*Server, error) {
|
||||
scanner := client.NewResolverScanner()
|
||||
|
||||
s := &Server{
|
||||
dataDir: dataDir,
|
||||
port: port,
|
||||
password: password,
|
||||
messages: make(map[int][]protocol.Message),
|
||||
clients: make(map[chan string]struct{}),
|
||||
channelFetching: make(map[int]bool),
|
||||
lastMsgIDs: make(map[int]uint32),
|
||||
lastHashes: make(map[int]uint32),
|
||||
scanner: scanner,
|
||||
dataDir: dataDir,
|
||||
port: port,
|
||||
password: password,
|
||||
messages: make(map[int][]protocol.Message),
|
||||
clients: make(map[chan string]struct{}),
|
||||
refreshCancels: make(map[int]context.CancelFunc),
|
||||
lastMsgIDs: make(map[int]uint32),
|
||||
lastHashes: make(map[int]uint32),
|
||||
scanner: scanner,
|
||||
}
|
||||
|
||||
// Migrate per-profile resolvers into the shared bank on first run.
|
||||
@@ -406,7 +406,7 @@ func (s *Server) handleRefresh(w http.ResponseWriter, r *http.Request) {
|
||||
chParam := r.URL.Query().Get("channel")
|
||||
if r.URL.Query().Get("quiet") == "1" && chParam == "" {
|
||||
s.refreshMu.Lock()
|
||||
running := s.refreshCancel != nil
|
||||
running := len(s.refreshCancels) > 0
|
||||
s.refreshMu.Unlock()
|
||||
if running {
|
||||
writeJSON(w, map[string]any{"ok": true, "skipped": true})
|
||||
@@ -443,9 +443,9 @@ func (s *Server) handleRescan(w http.ResponseWriter, r *http.Request) {
|
||||
// Cancel any in-progress metadata refresh so it doesn't race with the
|
||||
// scan — we want fresh resolver data before we hit DNS again.
|
||||
s.refreshMu.Lock()
|
||||
if s.refreshCancel != nil {
|
||||
s.refreshCancel()
|
||||
s.refreshCancel = nil
|
||||
for k, cancel := range s.refreshCancels {
|
||||
cancel()
|
||||
delete(s.refreshCancels, k)
|
||||
}
|
||||
s.refreshMu.Unlock()
|
||||
|
||||
@@ -941,10 +941,11 @@ func (s *Server) refreshMetadataOnly() {
|
||||
return
|
||||
}
|
||||
|
||||
// Cancel any in-progress refresh and start a new cancellable one.
|
||||
// Cancel any in-progress metadata refresh and start a new cancellable one.
|
||||
const metaKey = 0
|
||||
s.refreshMu.Lock()
|
||||
if s.refreshCancel != nil {
|
||||
s.refreshCancel()
|
||||
if prev, ok := s.refreshCancels[metaKey]; ok {
|
||||
prev()
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
@@ -954,18 +955,19 @@ func (s *Server) refreshMetadataOnly() {
|
||||
s.mu.RUnlock()
|
||||
|
||||
if fetcher == nil || basectx == nil {
|
||||
delete(s.refreshCancels, metaKey)
|
||||
s.refreshMu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Child context: cancelled either by the next refresh call or by a config change.
|
||||
ctx, cancel := context.WithCancel(basectx)
|
||||
s.refreshCancel = cancel
|
||||
s.refreshCancels[metaKey] = cancel
|
||||
s.refreshMu.Unlock()
|
||||
defer func() {
|
||||
cancel()
|
||||
s.refreshMu.Lock()
|
||||
s.refreshCancel = nil
|
||||
delete(s.refreshCancels, metaKey)
|
||||
s.refreshMu.Unlock()
|
||||
}()
|
||||
|
||||
@@ -1014,14 +1016,10 @@ func (s *Server) refreshMetadataOnly() {
|
||||
func (s *Server) refreshChannel(channelNum int) {
|
||||
// Prevent duplicate fetches for the same channel
|
||||
s.refreshMu.Lock()
|
||||
if s.channelFetching[channelNum] {
|
||||
if _, running := s.refreshCancels[channelNum]; running {
|
||||
s.refreshMu.Unlock()
|
||||
return
|
||||
}
|
||||
if s.refreshCancel != nil {
|
||||
s.refreshCancel()
|
||||
}
|
||||
s.channelFetching[channelNum] = true
|
||||
|
||||
s.mu.RLock()
|
||||
basectx := s.fetcherCtx
|
||||
@@ -1030,19 +1028,17 @@ func (s *Server) refreshChannel(channelNum int) {
|
||||
s.mu.RUnlock()
|
||||
|
||||
if fetcher == nil || basectx == nil {
|
||||
delete(s.channelFetching, channelNum)
|
||||
s.refreshMu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(basectx)
|
||||
s.refreshCancel = cancel
|
||||
s.refreshCancels[channelNum] = cancel
|
||||
s.refreshMu.Unlock()
|
||||
defer func() {
|
||||
cancel()
|
||||
s.refreshMu.Lock()
|
||||
s.refreshCancel = nil
|
||||
delete(s.channelFetching, channelNum)
|
||||
delete(s.refreshCancels, channelNum)
|
||||
s.refreshMu.Unlock()
|
||||
}()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user