using Npgsql; using System.Text.Json; using Sonex.Data.Database; namespace Sonex.Data.Messages; public sealed class MessagesService : IAsyncDisposable { public const string GlobalMessagesChannel = "global"; private const string InstanceChannelPrefix = "instance_"; private static readonly JsonSerializerOptions JsonOptions = new() { PropertyNameCaseInsensitive = true }; private readonly SemaphoreSlim _sync = new(1, 1); private CancellationTokenSource? _cts; private Task? _listenerTask; private NpgsqlConnection? _listenerConnection; public event EventHandler? NotificationReceived; public event EventHandler? ListenerError; public bool IsRunning => _listenerTask is { IsCompleted: false }; public async Task StartAsync(params string[] channels) { if (channels == null || channels.Length == 0) throw new ArgumentException("At least one channel is required.", nameof(channels)); await _sync.WaitAsync().ConfigureAwait(false); try { if (IsRunning) return; _cts = new CancellationTokenSource(); _listenerConnection = DB.OpenNpgsqlConnection(); _listenerConnection.Notification += ListenerConnection_Notification; foreach (var channel in channels.Where(x => !string.IsNullOrWhiteSpace(x)).Distinct(StringComparer.OrdinalIgnoreCase)) { await using var command = new NpgsqlCommand($"LISTEN {QuoteIdentifier(channel)};", _listenerConnection); await command.ExecuteNonQueryAsync(_cts.Token).ConfigureAwait(false); } _listenerTask = RunListenerLoopAsync(_listenerConnection, _cts.Token); } finally { _sync.Release(); } } public async Task StopAsync() { Task? listenerTask; CancellationTokenSource? cts; NpgsqlConnection? connection; await _sync.WaitAsync().ConfigureAwait(false); try { listenerTask = _listenerTask; cts = _cts; connection = _listenerConnection; _listenerTask = null; _cts = null; _listenerConnection = null; } finally { _sync.Release(); } if (connection != null) connection.Notification -= ListenerConnection_Notification; cts?.Cancel(); if (listenerTask != null) { try { await listenerTask.ConfigureAwait(false); } catch (OperationCanceledException) { } } if (connection != null) await connection.DisposeAsync().ConfigureAwait(false); cts?.Dispose(); } public Task NotifyInstanceAsync(string instanceId, Message notification, CancellationToken ct = default) { ArgumentException.ThrowIfNullOrWhiteSpace(instanceId); ArgumentNullException.ThrowIfNull(notification); return NotifyAsync(GetInstanceChannel(instanceId), notification, ct); } public static string GetInstanceChannel(string instanceId) { ArgumentException.ThrowIfNullOrWhiteSpace(instanceId); return InstanceChannelPrefix + instanceId.Trim().ToLowerInvariant(); } public async Task NotifyAsync(string channel, Message notification, CancellationToken ct = default) { ArgumentException.ThrowIfNullOrWhiteSpace(channel); ArgumentNullException.ThrowIfNull(notification); var payload = JsonSerializer.Serialize(PrepareOutgoing(notification), JsonOptions); await using var connection = DB.OpenNpgsqlConnection(); await using var command = new NpgsqlCommand( "SELECT pg_notify(@channel, jsonb_set(@payload::jsonb, '{Sender}', to_jsonb(current_user::text))::text);", connection); command.Parameters.AddWithValue("channel", channel); command.Parameters.AddWithValue("payload", payload); await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } public async ValueTask DisposeAsync() { await StopAsync().ConfigureAwait(false); _sync.Dispose(); } private async Task RunListenerLoopAsync(NpgsqlConnection connection, CancellationToken ct) { try { while (!ct.IsCancellationRequested) { await connection.WaitAsync(ct).ConfigureAwait(false); } } catch (OperationCanceledException) when (ct.IsCancellationRequested) { } catch (Exception ex) { ListenerError?.Invoke(this, ex); } } private void ListenerConnection_Notification(object sender, NpgsqlNotificationEventArgs args) { try { var notification = Deserialize(args.Payload); NotificationReceived?.Invoke(this, new MessageEventArgs( args.Channel, args.PID, args.Payload, notification)); } catch (Exception ex) { ListenerError?.Invoke(this, ex); } } private static Message Deserialize(string payload) { if (string.IsNullOrWhiteSpace(payload)) return new Message(); try { return JsonSerializer.Deserialize(payload, JsonOptions) ?? new Message(); } catch { return new Message { Text = payload }; } } private static Message PrepareOutgoing(Message notification) { var copy = Clone(notification); if (string.IsNullOrWhiteSpace(copy.Action)) copy.Action = "message"; return copy; } private static Message Clone(Message source) { return new Message { Action = source.Action, Title = source.Title, Text = source.Text, Sender = source.Sender }; } private static string QuoteIdentifier(string value) { return "\"" + value.Replace("\"", "\"\"", StringComparison.Ordinal) + "\""; } }