Reputation: 5724
I have a function to replace a single entity in a repository (e.g. a database) with another entity. This returns a Task, the result of which indicates if a replacement was successful.
// Single Replace
Task<bool> Replace(Entity e, Entity Other);
Now say I want to create the same function, but allowing to replace several entities. My idea for a function signature was:
// Multiple Replace
IObservable<KeyValuePair<Entity, bool>> Replace(IDictionary<Entity, Entity> Map);
Map contains a dictionary whose keys are the entities which shall be replaced, and its values are the new entities that shall replace the old ones.
Each KeyValuePair of this Map dictionary should return a KeyValuePair< Entity, bool >, where the key corresponds to Map.Key (= the Entitiy which shall be replaced), and Value is a bool that indicates whether the replacement was successful or not.
I'd like to return this as a stream, hence I chose IObservable< KeyValuePair< Entity,bool > >, pushing out a KeyValuePair< Entity,bool > for each result that becomes available.
For now, I'd just like to have the "Multiple Replace" function use the "Single Replace" function to convert; but how can I make these calls and return the result in the required format?
Task<bool> Replace(Entity e, Entity Other)
{
// ... do the work ...
}
IObservable<KeyValuePair<Entity, bool>> Replace(IDictionary<Entity, Entity> Map)
{
// How this should work:
// for each KeyValuePair in Map, call the Single Replace function
// and - once available - return its result (bool) mapped to the Entity in the
// resulting Observable stream
IDictionary<Entity, Task<bool>> dict = Map.ToDictionary(x => x.Key, x => Replace(x.Key, x.Value));
// .. now what? How can I now convert the dict into an IObservable<KeyValuePair<Entity,bool>> ?
}
Would appreciate any pointers!
Upvotes: 1
Views: 352
Reputation: 117027
I'm not sure if I've missed anything, but I think this might work for you:
public IObservable<KeyValuePair<T, bool>> Convert<T>(IDictionary<T, Task<bool>> source)
{
return
Observable
.Create<KeyValuePair<T, bool>>(o =>
(
from s in source.ToObservable()
from v in s.Value.ToObservable()
select new KeyValuePair<T, bool>(s.Key, v)
).Subscribe(o));
}
Upvotes: 1
Reputation: 5724
based on Peter's solution, a little modification that leverages Task.ToObservable:
IObservable<Tuple<Entity, bool>> Replace(IDictionary<Entity, Entity> Map)
{
Task<Tuple<Entity, bool>>[] tasks = new Task<Tuple<Entity, bool>>(Map.Count);
int i = 0;
foreach (var kvp in Map)
{
tasks[i++] = ReplaceAsync(kvp.Key, kvp.Value);
}
return tasks
.Select(x => x.ToObservable())
.Merge(); // or use .Concat for sequential execution
}
Seems to work fine, does anyone spot any potential issues?
Using .Merge gives an IObservable that starts the tasks in parallel, providing results as they finish, and completes when all tasks are done.
Alternatively I can use .Concat() to execute them sequentially, i.e. start the first task first, wait for it to finish, then start the second task, etc.
Nice thing is that error handling comes for free I believe, so whenever any of the tasks throws an Exception, OnError of the IObservable is auomatically called.
Upvotes: 1
Reputation: 70652
Without seeing a good Minimal, Complete, and Verifiable example that clearly shows what you're doing, and especially how you intend to implement the IObservable<T>
object returned from your method, it's impossible to offer specific advice. However, I can provide some suggestions that should at least point you in a useful direction.
First, I will suggest that it is more useful to have a Task<Tuple<Entity, bool>>
(or Task<KeyValuePair<Entity, bool>>
, though I am not very fond of using dictionary-related types for non-dictionary-dependent code…i.e. where you're really just pairing things rather than having a true key/value pairing). That way, when the task completes, you know from the task itself all of the information you need.
This can be accomplished with a simple helper method, e.g.:
async Task<Tuple<Entity, bool>> ReplaceAsync(Entity e, Entity other)
{
return Tuple.Create(e, await Replace(e, other));
}
I used Tuple
here, but of course you may create a named type for the purpose, and doing so may in fact help readability of the code.
Having done that, you can then just enumerate all of the items in your input dictionary (or other more appropriate collection :) ), and then use Task.WhenAny()
to iteratively remove and publish each completed item as they occur:
IObservable<Tuple<Entity, bool>> Replace(IDictionary<Entity, Entity> Map)
{
Task<Tuple<Entity, bool>>[] tasks = new Task<Tuple<Entity, bool>>(Map.Count);
int i = 0;
foreach (var kvp in Map)
{
tasks[i++] = ReplaceAsync(kvp.Key, kvp.Value);
}
// Create the IObservable object somehow. Make sure it has the
// array of tasks in it. E.g. (see below for TaskObservable example)
TaskObservable<Tuple<Entity, bool>> observable =
new TaskObservable<Tuple<Entity, bool>>(tasks);
// Now, start running the observable object. Note: not really all that great
// to ignore the returned Task object but without more context in your question
// I can't offer anything more specific. You will probably want to store the
// Task object *somewhere*, await it, wrap all that in try/catch, etc. to
// make sure you are correctly monitoring the progress of the task.
var _ = observable.Run();
return observable;
}
The IObservable<T>
might look something like:
class TaskObservable<T> : IObservable<T>
{
private class ObserverItem : IDisposable
{
public readonly IObserver<T> Observer;
public readonly TaskObservable<T> Owner;
public ObserverItem(IObserver<T> observer, TaskObservable<T> owner)
{
Observer = observer;
Owner = owner;
}
public void Dispose()
{
if (!Owner._observerItems.Contains(this))
{
throw new InvalidOperationException(
"This observer is no longer subscribed");
}
Owner._observerItems.Remove(this);
}
}
private readonly Task<T>[] _tasks;
private readonly List<ObserverItem> _observerItems = new List<ObserverItem>();
public TaskObservable(Task<T>[] tasks)
{
_tasks = tasks;
}
public IDisposable Subscribe(IObserver<T> observer)
{
ObserverItem item = new ObserverItem(observer, this);
_observerItems.Add(item);
return item;
}
private void Publish(T t)
{
foreach (ObserverItem item in _observerItems)
{
item.Observer.OnNext(t);
}
}
private void Complete()
{
foreach (ObserverItem item in _observerItems)
{
item.Observer.OnCompleted();
}
}
async Task Run()
{
for (int i = 0; i < _tasks.Length; i++)
{
Task<T> completedTask = await Task.WhenAny(tasks);
Publish(completedTask.Result);
}
Complete();
}
}
Upvotes: 1