Observer based listening.

This commit is contained in:
AnduinXue
2023-11-28 09:03:47 +00:00
parent 4ee0a1aaf4
commit a14f765ff5
4 changed files with 35 additions and 62 deletions
@@ -10,6 +10,7 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Aiursoft.AiurObserver" Version="7.0.3" />
<PackageReference Include="Aiursoft.Scanner" Version="7.0.1" />
<PackageReference Include="Aiursoft.WebTools" Version="7.0.10" />
<PackageReference Include="Gera.Chess" Version="1.0.5" />
@@ -1,23 +1,20 @@
using Aiursoft.ChessServer.Data;
using AiurObserver;
using Aiursoft.ChessServer.Data;
using Aiursoft.ChessServer.Models;
using Aiursoft.ChessServer.Services;
using Aiursoft.CSTools.Services;
using Microsoft.AspNetCore.Mvc;
namespace Aiursoft.ChessServer.Controllers;
public class GamesController : Controller
{
private readonly Counter _counter;
private readonly WebSocketPusher _pusher;
private readonly InMemoryDatabase _database;
public GamesController(
Counter counter,
WebSocketPusher pusher,
InMemoryDatabase database)
{
_counter = counter;
_pusher = pusher;
_database = database;
}
@@ -32,7 +29,7 @@ public class GamesController : Controller
[Route("games/{id}")]
public IActionResult GetInfo([FromRoute] int id)
{
var game = _database.GetOrAdd(id);
var game = _database.GetOrAddBoard(id);
return Ok(new
{
Turn = game.Turn.AsChar,
@@ -50,47 +47,41 @@ public class GamesController : Controller
{ "html", $"games/{id}/html"},
{"websocket", $"games/{id}/websocket"},
{ "move-post", $"games/{id}/move/{{player}}/{{move_algebraic_notation}}"}
}
},
Listeners = _database.GetOrAddChannel(id).GetListenerCount()
});
}
[Route("games/{id}/ws")]
public async Task GetWebSocket([FromRoute] int id)
{
var lastReadId = _counter.GetCurrent;
var (channel, blocker) = _database.ListenChannel(id);
var channel= _database.GetOrAddChannel(id);
IDisposable? subscription = null;
await _pusher.Accept(HttpContext);
try
{
await Task.Factory.StartNew(_pusher.PendingClose);
subscription = channel.Subscribe(async t =>
{
await _pusher.SendMessage(t.Content);
});
while (_pusher.Connected)
{
await blocker.WaitAsync();
var nextMessages = channel
.GetMessagesFrom(lastReadId)
.ToList();
var messageToPush = nextMessages.MinBy(t => t.Id);
await _pusher.SendMessage(messageToPush!.Content);
lastReadId = messageToPush.Id;
await Task.Delay(int.MaxValue, HttpContext.RequestAborted);
}
}
finally
{
await _pusher.Close();
if (!channel.UnRegister(out blocker))
{
throw new InvalidOperationException("Failed to unregister blocker!");
}
subscription?.Dispose();
}
// System.InvalidOperationException: StatusCode cannot be set because the response has already started.
}
[Route("games/{id}/ascii")]
public IActionResult GetAscii([FromRoute] int id)
{
var game = _database.GetOrAdd(id);
var game = _database.GetOrAddBoard(id);
return Ok(game.ToAscii());
}
@@ -103,35 +94,30 @@ public class GamesController : Controller
[Route("games/{id}/fen")]
public IActionResult GetFen([FromRoute] int id)
{
var game = _database.GetOrAdd(id);
var game = _database.GetOrAddBoard(id);
return Ok(game.ToFen());
}
[Route("games/{id}/pgn")]
public IActionResult GetPgn([FromRoute]int id)
{
var game = _database.GetOrAdd(id);
var game = _database.GetOrAddBoard(id);
return Ok(game.ToPgn());
}
[HttpPost]
[Route("games/{id}/move/{player}/{move}")]
public IActionResult Move([FromRoute]int id, [FromRoute]string player, [FromRoute]string move)
public async Task<IActionResult> Move([FromRoute]int id, [FromRoute]string player, [FromRoute]string move)
{
var game = _database.GetOrAdd(id);
var game = _database.GetOrAddBoard(id);
try
{
if (game.IsValidMove(move) && !game.IsEndGame && game.Turn.AsChar.ToString() == player)
{
game.Move(move);
var fen = game.ToFen();
if (_database.Channels.TryGetValue(id, out var channel))
{
channel.Push(new Message(fen)
{
Id = _counter.GetUniqueNo(),
});
}
var channel = _database.GetOrAddChannel(id);
await channel.Push(new Message(fen));
return Ok(fen);
}
return BadRequest();
@@ -13,8 +13,8 @@ public class Game
public class InMemoryDatabase : ISingletonDependency
{
public ConcurrentDictionary<int, ChessBoard> Boards { get; } = new();
public ConcurrentDictionary<int, Channel> Channels { get; } = new();
private ConcurrentDictionary<int, ChessBoard> Boards { get; } = new();
private ConcurrentDictionary<int, Channel> Channels { get; } = new();
public Game[] GetActiveGames()
{
@@ -25,16 +25,13 @@ public class InMemoryDatabase : ISingletonDependency
}).ToArray();
}
public ChessBoard GetOrAdd(int id)
public ChessBoard GetOrAddBoard(int id)
{
return Boards.GetOrAdd(id, _ => new ChessBoard());
}
public (Channel, SemaphoreSlim) ListenChannel(int id)
public Channel GetOrAddChannel(int id)
{
var channel = Channels.GetOrAdd(id, _ => new Channel());
var blocker = new SemaphoreSlim(0);
channel.HasNewMessageBlocker.Add(blocker);
return (channel, blocker);
return Channels.GetOrAdd(id, _ => new Channel());
}
}
+9 -20
View File
@@ -1,4 +1,4 @@
using System.Collections.Concurrent;
using AiurObserver;
namespace Aiursoft.ChessServer.Models;
@@ -9,32 +9,21 @@ public class Message
Content = content;
}
public int Id { get; init; }
public string Content { get; set; }
}
public class Channel
public class Channel : AsyncObservable<Message>
{
private ConcurrentBag<Message> Messages { get; } = new();
public ConcurrentBag<SemaphoreSlim> HasNewMessageBlocker { get; } = new();
public IEnumerable<Message> GetMessagesFrom(int lastReadId)
public Channel()
{
return Messages.Where(t => t.Id > lastReadId);
Messages.AddLast(new Message("placeholder"));
}
public void Push(Message message)
private LinkedList<Message> Messages { get; } = new();
public async Task Push(Message message)
{
Messages.Add(message);
foreach (var blocker in HasNewMessageBlocker)
{
blocker.Release();
}
}
public bool UnRegister(out SemaphoreSlim blocker)
{
return HasNewMessageBlocker.TryTake(out blocker!);
Messages.AddLast(message);
await Task.WhenAll(Broadcast(message));
}
}