Reputation: 84
Because I could not find any implementation where we don't use a loop to get the stream content I start to implement one but I'm facing several problems that may some of you can point me to the right place.
The implementation uses a combination of Pub/Sub and the stream: * log -> stream channel * log:notification -> pub/sub * log:lastReadMessage -> contains the last read key from the stream
static async Task Main(string[] args)
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
var value = new NameValueEntry[]
new NameValueEntry("id", Guid.NewGuid().ToString()),
new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
redisDb.StreamAdd("log", value);
var publisher = connectionMultiplexer.GetSubscriber();
publisher.Publish("log:notify", string.Empty, CommandFlags.None);
await Task.Delay(TimeSpan.FromSeconds(1));
static async Task Main(string[] args)
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
var observableStream = CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
.Subscribe(x => {
private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
lastReadMessage = lastReadMessageData;
return Observable.Create<string>(obs =>
var subscriber = connection.GetSubscriber();
subscriber.Subscribe($"{channel}:notify", async (ch, msg) =>
var locker = await taskFromStreamBlocker
if (!locker)
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
return Disposable.Create(() => subscriber.Unsubscribe(channel));
Why the semaphore?
Because I could have lots of messages add to the stream and I don't want o to have the same message processed twice.
If we have unprocessed messages in the stream, how can we process without having an event from the Pub/Sub When we start we can verify if it is unprocessed messages and processes it. If during this time a new message is added to the stream, and we aren't subscribing yet the Pub/sub, the subscriber will not process the message until we receive a notification through the Pub/Sub.
The semaphore is important to not process the same message twice but at the same time it's a curse. During the process of a message, another can be added to the stream. When that happens the subscriber will not process right away but only the next time it's notified (at this point will process two messages).
How you would implement this? Is there an implementation of the Redis streams using Rx only? The solution should not use some kind of loop and be memory efficient. Is this possible?
Best wishes
Paulo Aboim Pinto
Upvotes: 1
Views: 4833
Reputation: 1317
I use a tight loop just do an XRange and save a position - KISS.. but if there is no work it backs off so its pretty fast when there is a lot going on its a tight loop.
If you need higher performance eg reading while processing however i would caution against this for most cases.
I dont use distributed locks / semaphores anymore.
If your dealing with Commands eg dosomething instead of xyz has happened these can fail. Again the consumer should deal with the case it has already happened not the redis / stream reading part.
Some libs with magic call backs dont solve these issues the call backs will have retry when time out run on any node etc. The complexity / issues are still there they just move somewhere else.
You may have an observable on top for consumers but this is basically cosmetic it does not solve the problem and if you look under many implementations somewhere you will see the same loop. I would not use this instead get the consumer to register an action.
public interface IStreamSubscriber
void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
void Start();
In your case the call back could have the observable and not use the loop but there is a low level loop underneath which can also do message to object conversion for the consumer.
Upvotes: 1
Reputation: 84
and this is another solution using a timer with 200ms elapse time
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
lastReadMessage = lastReadMessageData;
var instance = ThreadPoolScheduler.Instance;
return Observable.Create<string>(obs =>
var disposable = Observable
.Interval(TimeSpan.FromMilliseconds(200), instance)
.Subscribe(async _ =>
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
cancellationToken.Register(() => disposable.Dispose());
return Disposable.Empty;
Upvotes: 0
Reputation: 84
this is the solution with WHILE that I want to avoid
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
lastReadMessage = lastReadMessageData;
return Observable.Create<string>(async obs =>
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
await Task.Delay(TimeSpan.FromMilliseconds(500));
return Disposable.Empty;
Upvotes: 0