Maer007
Maer007

Reputation: 63

How use Observable.ToAsync with IEnumerable

I have run into some problem with Rx. I need that after processing each item in a new thread send result to main thread. I did it with other way. How can i solve this task with Rx? Here is code:

 Observable.ToAsync<string, IEnumerable<UserState>>(Run)(path)
 .ObserveOnDispatcher<IEnumerable<UserState>>()
 .Subscribe(
(o) =>
{   // need to run in Main Thread
    foreach (var item in o)
    {
     WriteLog(item.StatusCode, item.Url);
    }                        
},
(ex) =>{        },
() =>{      } );

   // need to run in New Thread
   private IEnumerable<UserState> Run(string sitemap)
   {
 ....
 foreach (var url in urls)
 {
    var result = new UserState
    {
        Url = url.Value,
        TotalItems = urls.Count
    };
    ....
    yield return result;
 }
   }

Upvotes: 1

Views: 2307

Answers (2)

Ankur
Ankur

Reputation: 33637

You want to generate the IEnumerable on some other background thread and then Process each users object in this enumerable on Main UI thread, if this understanding is correct then you can do something like this:

var users = Run(path); //NOTE: This doesn't execute your run method yet, Run will only execute when you start enumerating the users values
users.ToObservable(System.Concurrency.Scheduler.ThreadPool) //The enumerator will be scheduled on separate thread
.ObserveOn(frm) //Observe on UI thread of win form
.Subscribe(s => {}) //This will run in UI thread for each user object

Upvotes: 1

Enigmativity
Enigmativity

Reputation: 117027

It would be good if you could describe the issue that you have.

What I can tell you at the moment is that you have a couple of potential issues.

First, using ObserveOnDispatcher is meant to work with the System.Windows.Threading.Dispatcher (usually created with WPF). If you're running your code outside of WPF it effectively means "the current thread" and this could lead to your subscription not being able to run if the current thread is busy. In other words, you might create a dead-lock.

I ran your code in both WPF and LINQPad and it worked fine in WPF, but dead-locked in LINQPad. If I observed on another thread then it worked fine in LINQPad and failed in WPF.

Secondly, you're turning an iterator method into an async observable and that won't work as you expect. An iterator doesn't actually run any code until you actually iterate through the enumerable. Essentially you return from Run almost instantly and you only execute the body of you Run method in the Subscribe code - and that's the wrong thread!

What you need to do is force immediate execution of the enumerable - at the very least - change your code to look like this:

private UserState[] Run(string sitemap)
{
    ...
    Func</* url type */, UserState> create = url =>
    {
        var result = new UserState
        {
            Url = url.Value,
            TotalItems = urls.Count
        };
        ....
        return result;
    };
    return (from url in urls select create(url)).ToArray();
}

Your main code needs to have a little clean up:

Observable.ToAsync<string, UserState[]>(Run)(path)
    .ObserveOnDispatcher()
    .Subscribe(o =>
    {
        foreach (var item in o)
        {
            WriteLog(item.StatusCode, item.Url);
        }                        
    });

Let me know if any of this helps.


EDIT: Added sample FromEventPattern code as per OP request in the comments.

Here's an example Windows Forms use of FromEventPattern. The first part creates a way to clean up subscriptions when the form closes.

public partial class Form1 : Form
{
    public Form1()
    {
        InitializeComponent();

        // Create a collection of IDisposable
        // to allow clean-up of subscriptions
        var subscriptions =
            new System.Reactive.Disposables.CompositeDisposable();

        var formClosings = Observable
            .FromEventPattern<FormClosingEventHandler, FormClosingEventArgs>(
                h => this.FormClosing += h,
                h => this.FormClosing -= h);

        // Add a subscription that cleans up subscriptions
        // when the form closes
        subscriptions.Add(
            formClosings
                .Subscribe(ea => subscriptions.Dispose()));

This next part watches for mouse drags on a picture box and creates messages to let the user know how far they've dragged.

        var pictureBox1MouseDowns = Observable
            .FromEventPattern<MouseEventHandler, MouseEventArgs>(
                h => pictureBox1.MouseDown += h,
                h => pictureBox1.MouseDown -= h);

        var pictureBox1MouseMoves = Observable
            .FromEventPattern<MouseEventHandler, MouseEventArgs>(
                h => pictureBox1.MouseMove += h,
                h => pictureBox1.MouseMove -= h);

        var pictureBox1MouseUps = Observable
            .FromEventPattern<MouseEventHandler, MouseEventArgs>(
                h => pictureBox1.MouseUp += h,
                h => pictureBox1.MouseUp -= h);

        var pictureBox1MouseDrags =
            from md in pictureBox1MouseDowns
            from mm in pictureBox1MouseMoves.TakeUntil(pictureBox1MouseUps)
            let dx = mm.EventArgs.Location.X - md.EventArgs.Location.X
            let dy = mm.EventArgs.Location.Y - md.EventArgs.Location.Y
            select new Point(dx, dy);

        var pictureBox1MouseDragMessages =
            from md in pictureBox1MouseDrags
            let f = "You've dragged ({0}, {1}) from your starting point"
            select String.Format(f, md.X, md.Y);

The next part tracks the number of times a button is clicked and creates a messages to display to the user.

        var button1ClickCount = 0;

        var button1Clicks = Observable
            .FromEventPattern(
                h => button1.Click += h,
                h => button1.Click -= h);

        var button1ClickCounts =
            from c in button1Clicks
            select ++button1ClickCount;

        var button1ClickMessages =
            from cc in button1ClickCounts
            let f = "You clicked the button {0} time{1}"
            select String.Format(f, cc, cc == 1 ? "" : "s");

Finally the two message obervables are merged together and are subscribed to, placing the message in a label.

        var messages = pictureBox1MouseDragMessages
            .Merge(button1ClickMessages);

        // Add a subscription to display the
        // merged messages in the label
        subscriptions.Add(
            messages
                .Subscribe(m => label1.Text = m));
    }
}

Keep in mind that all of this resides in the form's constructor and no module level fields or properties are used and all the events handlers are removed when the form closes. Very neat stuff.

Upvotes: 1

Related Questions