diff --git a/src/Aiursoft.ChessServer/Aiursoft.ChessServer.csproj b/src/Aiursoft.ChessServer/Aiursoft.ChessServer.csproj
index 37671f8..01391c6 100644
--- a/src/Aiursoft.ChessServer/Aiursoft.ChessServer.csproj
+++ b/src/Aiursoft.ChessServer/Aiursoft.ChessServer.csproj
@@ -10,6 +10,7 @@
enable
+
diff --git a/src/Aiursoft.ChessServer/Controllers/GamesController.cs b/src/Aiursoft.ChessServer/Controllers/GamesController.cs
index 6309318..feff9ad 100644
--- a/src/Aiursoft.ChessServer/Controllers/GamesController.cs
+++ b/src/Aiursoft.ChessServer/Controllers/GamesController.cs
@@ -1,23 +1,20 @@
-using Aiursoft.ChessServer.Data;
+using AiurObserver;
+using Aiursoft.ChessServer.Data;
using Aiursoft.ChessServer.Models;
using Aiursoft.ChessServer.Services;
-using Aiursoft.CSTools.Services;
using Microsoft.AspNetCore.Mvc;
namespace Aiursoft.ChessServer.Controllers;
public class GamesController : Controller
{
- private readonly Counter _counter;
private readonly WebSocketPusher _pusher;
private readonly InMemoryDatabase _database;
public GamesController(
- Counter counter,
WebSocketPusher pusher,
InMemoryDatabase database)
{
- _counter = counter;
_pusher = pusher;
_database = database;
}
@@ -32,7 +29,7 @@ public class GamesController : Controller
[Route("games/{id}")]
public IActionResult GetInfo([FromRoute] int id)
{
- var game = _database.GetOrAdd(id);
+ var game = _database.GetOrAddBoard(id);
return Ok(new
{
Turn = game.Turn.AsChar,
@@ -50,47 +47,41 @@ public class GamesController : Controller
{ "html", $"games/{id}/html"},
{"websocket", $"games/{id}/websocket"},
{ "move-post", $"games/{id}/move/{{player}}/{{move_algebraic_notation}}"}
- }
+ },
+ Listeners = _database.GetOrAddChannel(id).GetListenerCount()
});
}
[Route("games/{id}/ws")]
public async Task GetWebSocket([FromRoute] int id)
{
- var lastReadId = _counter.GetCurrent;
- var (channel, blocker) = _database.ListenChannel(id);
+ var channel= _database.GetOrAddChannel(id);
+ IDisposable? subscription = null;
await _pusher.Accept(HttpContext);
try
{
await Task.Factory.StartNew(_pusher.PendingClose);
+ subscription = channel.Subscribe(async t =>
+ {
+ await _pusher.SendMessage(t.Content);
+ });
while (_pusher.Connected)
{
- await blocker.WaitAsync();
- var nextMessages = channel
- .GetMessagesFrom(lastReadId)
- .ToList();
- var messageToPush = nextMessages.MinBy(t => t.Id);
- await _pusher.SendMessage(messageToPush!.Content);
- lastReadId = messageToPush.Id;
+ await Task.Delay(int.MaxValue, HttpContext.RequestAborted);
}
}
finally
{
await _pusher.Close();
- if (!channel.UnRegister(out blocker))
- {
- throw new InvalidOperationException("Failed to unregister blocker!");
- }
+ subscription?.Dispose();
}
-
- // System.InvalidOperationException: StatusCode cannot be set because the response has already started.
}
[Route("games/{id}/ascii")]
public IActionResult GetAscii([FromRoute] int id)
{
- var game = _database.GetOrAdd(id);
+ var game = _database.GetOrAddBoard(id);
return Ok(game.ToAscii());
}
@@ -103,35 +94,30 @@ public class GamesController : Controller
[Route("games/{id}/fen")]
public IActionResult GetFen([FromRoute] int id)
{
- var game = _database.GetOrAdd(id);
+ var game = _database.GetOrAddBoard(id);
return Ok(game.ToFen());
}
[Route("games/{id}/pgn")]
public IActionResult GetPgn([FromRoute]int id)
{
- var game = _database.GetOrAdd(id);
+ var game = _database.GetOrAddBoard(id);
return Ok(game.ToPgn());
}
[HttpPost]
[Route("games/{id}/move/{player}/{move}")]
- public IActionResult Move([FromRoute]int id, [FromRoute]string player, [FromRoute]string move)
+ public async Task Move([FromRoute]int id, [FromRoute]string player, [FromRoute]string move)
{
- var game = _database.GetOrAdd(id);
+ var game = _database.GetOrAddBoard(id);
try
{
if (game.IsValidMove(move) && !game.IsEndGame && game.Turn.AsChar.ToString() == player)
{
game.Move(move);
var fen = game.ToFen();
- if (_database.Channels.TryGetValue(id, out var channel))
- {
- channel.Push(new Message(fen)
- {
- Id = _counter.GetUniqueNo(),
- });
- }
+ var channel = _database.GetOrAddChannel(id);
+ await channel.Push(new Message(fen));
return Ok(fen);
}
return BadRequest();
diff --git a/src/Aiursoft.ChessServer/Data/InMemoryDatabase.cs b/src/Aiursoft.ChessServer/Data/InMemoryDatabase.cs
index 841843e..6760db1 100644
--- a/src/Aiursoft.ChessServer/Data/InMemoryDatabase.cs
+++ b/src/Aiursoft.ChessServer/Data/InMemoryDatabase.cs
@@ -13,8 +13,8 @@ public class Game
public class InMemoryDatabase : ISingletonDependency
{
- public ConcurrentDictionary Boards { get; } = new();
- public ConcurrentDictionary Channels { get; } = new();
+ private ConcurrentDictionary Boards { get; } = new();
+ private ConcurrentDictionary Channels { get; } = new();
public Game[] GetActiveGames()
{
@@ -25,16 +25,13 @@ public class InMemoryDatabase : ISingletonDependency
}).ToArray();
}
- public ChessBoard GetOrAdd(int id)
+ public ChessBoard GetOrAddBoard(int id)
{
return Boards.GetOrAdd(id, _ => new ChessBoard());
}
- public (Channel, SemaphoreSlim) ListenChannel(int id)
+ public Channel GetOrAddChannel(int id)
{
- var channel = Channels.GetOrAdd(id, _ => new Channel());
- var blocker = new SemaphoreSlim(0);
- channel.HasNewMessageBlocker.Add(blocker);
- return (channel, blocker);
+ return Channels.GetOrAdd(id, _ => new Channel());
}
}
diff --git a/src/Aiursoft.ChessServer/Models/Channel.cs b/src/Aiursoft.ChessServer/Models/Channel.cs
index e15c6b5..7aeef5b 100644
--- a/src/Aiursoft.ChessServer/Models/Channel.cs
+++ b/src/Aiursoft.ChessServer/Models/Channel.cs
@@ -1,4 +1,4 @@
-using System.Collections.Concurrent;
+using AiurObserver;
namespace Aiursoft.ChessServer.Models;
@@ -9,32 +9,21 @@ public class Message
Content = content;
}
- public int Id { get; init; }
public string Content { get; set; }
}
-public class Channel
+public class Channel : AsyncObservable
{
- private ConcurrentBag Messages { get; } = new();
-
- public ConcurrentBag HasNewMessageBlocker { get; } = new();
-
- public IEnumerable GetMessagesFrom(int lastReadId)
+ public Channel()
{
- return Messages.Where(t => t.Id > lastReadId);
+ Messages.AddLast(new Message("placeholder"));
}
- public void Push(Message message)
+ private LinkedList Messages { get; } = new();
+
+ public async Task Push(Message message)
{
- Messages.Add(message);
- foreach (var blocker in HasNewMessageBlocker)
- {
- blocker.Release();
- }
- }
-
- public bool UnRegister(out SemaphoreSlim blocker)
- {
- return HasNewMessageBlocker.TryTake(out blocker!);
+ Messages.AddLast(message);
+ await Task.WhenAll(Broadcast(message));
}
}
\ No newline at end of file