Reputation: 17272
This is an observable sequence that retrieves paginated data from a web service. Each web service response contains a nextRecordsUrl
that indicates where to get the next set of records. What is the best way to convert this Observable to something more reusable?
Web services setup:
var auth = new AuthenticationClient ();
await auth.UsernamePasswordAsync (consumerKey, consumerSecret, userName, password + passwordSecurityToken);
var forceClient = new ForceClient (auth.InstanceUrl, auth.AccessToken, auth.ApiVersion);
The Observable:
var observable = Observable.Create<QueryResult<Account>> (async (IObserver<QueryResult<Account>> o) =>
{
try
{
var queryResult = await forceClient.QueryAsync<Account> ("SELECT Id, Name from Account");
if (queryResult != null)
{
o.OnNext (queryResult);
while (!string.IsNullOrEmpty (queryResult.nextRecordsUrl))
{
queryResult = await forceClient.QueryContinuationAsync<Account> (queryResult.nextRecordsUrl);
if (queryResult != null)
{
o.OnNext (queryResult);
}
}
}
o.OnCompleted ();
}
catch (Exception ex)
{
o.OnError (ex);
}
return () => {};
});
Subscribing to the observable and collecting the results:
var accounts = new List<Account> ();
observable.Subscribe (
observer => accounts.AddRange (observer.records),
ex => Console.WriteLine (ex.Message),
() => {});
EDIT: Using Brandon's solution I can now generate the list of results with Aggregate
List<Account> accounts = await forceClient.QueryPages<Account> ("SELECT Id, Name from Account")
.Aggregate (new List<Account> (), (list, value) =>
{
list.AddRange (value.records);
return list;
});
Upvotes: 1
Views: 356
Reputation: 18125
Believe it or not, the Rx-Expiremental library (also maintained by MS) has an operator for this called Expand
. Expand is used to take each element from an observable and run it through a function which produces another observable of the same type. That observable is then flattened in to the original, and each item from that goes through the same process.
Imagine being given a tree node with an observable of child nodes. You could use expand to easily traverse this tree. Since a linked-list is just a constrained version of a tree, and since what you have is effectively a linked list where each node is an observable, you can use expand.
public static IObservable<QueryResult<TResult>> QueryPages<TResult>(this ForceClient forceClient, string query)
{
return Observable.FromAsync(() => forceClient.QueryAsync<TResult>(query))
.Where(QueryResultIsValid)
.Expand(result =>
Observable.FromAsync(() => forceClient.QueryContinuationAsync<TResult>(queryResult.nextRecordsUrl))
.Where(QueryResultIsValid)
);
}
public static bool QueryResultIsValid(QueryResult<TResult> result)
{
return result != null;
}
Upvotes: 2
Reputation: 39182
Is something like this what you are looking for?
public static IObservable<QueryResult<TResult>> QueryPages<TResult>(this ForceClient forceClient, string query)
{
return Observable.Create<QueryResult<T>> (async (observer, token) =>
{
// No need for try/catch. Create() will call OnError if your task fails.
// Also no need for OnCompleted(). Create() calls it when your task completes
var queryResult = await forceClient.QueryAsync<TResult> (query);
while (queryResult != null)
{
observer.OnNext (queryResult);
// check the token *after* we call OnNext
// because if an observer unsubscribes
// it typically occurs during the notification
// e.g. they are using .Take(..) or
// something.
if (string.IsNullOrEmpty(queryResult.nextRecordsUrl) ||
token.IsCancellationRequested)
{
break;
}
queryResult = await forceClient.QueryContinuationAsync<TResult> (queryResult.nextRecordsUrl);
}
// No need to return anything. Just the Task itself is all that Create() wants.
}
});
// Usage:
var forceClient = // ...
var foos = forceClient.QueryPages<Foo>("SELECT A, B, C FROM Foo");
Notice I switched it to the overload that provides a cancellation token so that you can stop fetching pages if the observer unsubscribes (Your original version would have continued fetching pages even though the observer had stopped listening). Also note that the async Create
awaits your Task
and calls OnError
or OnCompleted
for you so you do not need to worry about that most of the time.
Upvotes: 2