From a14f765ff5fe2faac42cae870c9152d7b7fd19d2 Mon Sep 17 00:00:00 2001 From: AnduinXue Date: Tue, 28 Nov 2023 09:03:47 +0000 Subject: [PATCH] Observer based listening. --- .../Aiursoft.ChessServer.csproj | 1 + .../Controllers/GamesController.cs | 54 +++++++------------ .../Data/InMemoryDatabase.cs | 13 ++--- src/Aiursoft.ChessServer/Models/Channel.cs | 29 ++++------ 4 files changed, 35 insertions(+), 62 deletions(-) diff --git a/src/Aiursoft.ChessServer/Aiursoft.ChessServer.csproj b/src/Aiursoft.ChessServer/Aiursoft.ChessServer.csproj index 37671f8..01391c6 100644 --- a/src/Aiursoft.ChessServer/Aiursoft.ChessServer.csproj +++ b/src/Aiursoft.ChessServer/Aiursoft.ChessServer.csproj @@ -10,6 +10,7 @@ enable + diff --git a/src/Aiursoft.ChessServer/Controllers/GamesController.cs b/src/Aiursoft.ChessServer/Controllers/GamesController.cs index 6309318..feff9ad 100644 --- a/src/Aiursoft.ChessServer/Controllers/GamesController.cs +++ b/src/Aiursoft.ChessServer/Controllers/GamesController.cs @@ -1,23 +1,20 @@ -using Aiursoft.ChessServer.Data; +using AiurObserver; +using Aiursoft.ChessServer.Data; using Aiursoft.ChessServer.Models; using Aiursoft.ChessServer.Services; -using Aiursoft.CSTools.Services; using Microsoft.AspNetCore.Mvc; namespace Aiursoft.ChessServer.Controllers; public class GamesController : Controller { - private readonly Counter _counter; private readonly WebSocketPusher _pusher; private readonly InMemoryDatabase _database; public GamesController( - Counter counter, WebSocketPusher pusher, InMemoryDatabase database) { - _counter = counter; _pusher = pusher; _database = database; } @@ -32,7 +29,7 @@ public class GamesController : Controller [Route("games/{id}")] public IActionResult GetInfo([FromRoute] int id) { - var game = _database.GetOrAdd(id); + var game = _database.GetOrAddBoard(id); return Ok(new { Turn = game.Turn.AsChar, @@ -50,47 +47,41 @@ public class GamesController : Controller { "html", $"games/{id}/html"}, {"websocket", $"games/{id}/websocket"}, { "move-post", $"games/{id}/move/{{player}}/{{move_algebraic_notation}}"} - } + }, + Listeners = _database.GetOrAddChannel(id).GetListenerCount() }); } [Route("games/{id}/ws")] public async Task GetWebSocket([FromRoute] int id) { - var lastReadId = _counter.GetCurrent; - var (channel, blocker) = _database.ListenChannel(id); + var channel= _database.GetOrAddChannel(id); + IDisposable? subscription = null; await _pusher.Accept(HttpContext); try { await Task.Factory.StartNew(_pusher.PendingClose); + subscription = channel.Subscribe(async t => + { + await _pusher.SendMessage(t.Content); + }); while (_pusher.Connected) { - await blocker.WaitAsync(); - var nextMessages = channel - .GetMessagesFrom(lastReadId) - .ToList(); - var messageToPush = nextMessages.MinBy(t => t.Id); - await _pusher.SendMessage(messageToPush!.Content); - lastReadId = messageToPush.Id; + await Task.Delay(int.MaxValue, HttpContext.RequestAborted); } } finally { await _pusher.Close(); - if (!channel.UnRegister(out blocker)) - { - throw new InvalidOperationException("Failed to unregister blocker!"); - } + subscription?.Dispose(); } - - // System.InvalidOperationException: StatusCode cannot be set because the response has already started. } [Route("games/{id}/ascii")] public IActionResult GetAscii([FromRoute] int id) { - var game = _database.GetOrAdd(id); + var game = _database.GetOrAddBoard(id); return Ok(game.ToAscii()); } @@ -103,35 +94,30 @@ public class GamesController : Controller [Route("games/{id}/fen")] public IActionResult GetFen([FromRoute] int id) { - var game = _database.GetOrAdd(id); + var game = _database.GetOrAddBoard(id); return Ok(game.ToFen()); } [Route("games/{id}/pgn")] public IActionResult GetPgn([FromRoute]int id) { - var game = _database.GetOrAdd(id); + var game = _database.GetOrAddBoard(id); return Ok(game.ToPgn()); } [HttpPost] [Route("games/{id}/move/{player}/{move}")] - public IActionResult Move([FromRoute]int id, [FromRoute]string player, [FromRoute]string move) + public async Task Move([FromRoute]int id, [FromRoute]string player, [FromRoute]string move) { - var game = _database.GetOrAdd(id); + var game = _database.GetOrAddBoard(id); try { if (game.IsValidMove(move) && !game.IsEndGame && game.Turn.AsChar.ToString() == player) { game.Move(move); var fen = game.ToFen(); - if (_database.Channels.TryGetValue(id, out var channel)) - { - channel.Push(new Message(fen) - { - Id = _counter.GetUniqueNo(), - }); - } + var channel = _database.GetOrAddChannel(id); + await channel.Push(new Message(fen)); return Ok(fen); } return BadRequest(); diff --git a/src/Aiursoft.ChessServer/Data/InMemoryDatabase.cs b/src/Aiursoft.ChessServer/Data/InMemoryDatabase.cs index 841843e..6760db1 100644 --- a/src/Aiursoft.ChessServer/Data/InMemoryDatabase.cs +++ b/src/Aiursoft.ChessServer/Data/InMemoryDatabase.cs @@ -13,8 +13,8 @@ public class Game public class InMemoryDatabase : ISingletonDependency { - public ConcurrentDictionary Boards { get; } = new(); - public ConcurrentDictionary Channels { get; } = new(); + private ConcurrentDictionary Boards { get; } = new(); + private ConcurrentDictionary Channels { get; } = new(); public Game[] GetActiveGames() { @@ -25,16 +25,13 @@ public class InMemoryDatabase : ISingletonDependency }).ToArray(); } - public ChessBoard GetOrAdd(int id) + public ChessBoard GetOrAddBoard(int id) { return Boards.GetOrAdd(id, _ => new ChessBoard()); } - public (Channel, SemaphoreSlim) ListenChannel(int id) + public Channel GetOrAddChannel(int id) { - var channel = Channels.GetOrAdd(id, _ => new Channel()); - var blocker = new SemaphoreSlim(0); - channel.HasNewMessageBlocker.Add(blocker); - return (channel, blocker); + return Channels.GetOrAdd(id, _ => new Channel()); } } diff --git a/src/Aiursoft.ChessServer/Models/Channel.cs b/src/Aiursoft.ChessServer/Models/Channel.cs index e15c6b5..7aeef5b 100644 --- a/src/Aiursoft.ChessServer/Models/Channel.cs +++ b/src/Aiursoft.ChessServer/Models/Channel.cs @@ -1,4 +1,4 @@ -using System.Collections.Concurrent; +using AiurObserver; namespace Aiursoft.ChessServer.Models; @@ -9,32 +9,21 @@ public class Message Content = content; } - public int Id { get; init; } public string Content { get; set; } } -public class Channel +public class Channel : AsyncObservable { - private ConcurrentBag Messages { get; } = new(); - - public ConcurrentBag HasNewMessageBlocker { get; } = new(); - - public IEnumerable GetMessagesFrom(int lastReadId) + public Channel() { - return Messages.Where(t => t.Id > lastReadId); + Messages.AddLast(new Message("placeholder")); } - public void Push(Message message) + private LinkedList Messages { get; } = new(); + + public async Task Push(Message message) { - Messages.Add(message); - foreach (var blocker in HasNewMessageBlocker) - { - blocker.Release(); - } - } - - public bool UnRegister(out SemaphoreSlim blocker) - { - return HasNewMessageBlocker.TryTake(out blocker!); + Messages.AddLast(message); + await Task.WhenAll(Broadcast(message)); } } \ No newline at end of file