Reputation: 63
I have run into some problem with Rx. I need that after processing each item in a new thread send result to main thread. I did it with other way. How can i solve this task with Rx? Here is code:
Observable.ToAsync<string, IEnumerable<UserState>>(Run)(path)
.ObserveOnDispatcher<IEnumerable<UserState>>()
.Subscribe(
(o) =>
{ // need to run in Main Thread
foreach (var item in o)
{
WriteLog(item.StatusCode, item.Url);
}
},
(ex) =>{ },
() =>{ } );
// need to run in New Thread
private IEnumerable<UserState> Run(string sitemap)
{
....
foreach (var url in urls)
{
var result = new UserState
{
Url = url.Value,
TotalItems = urls.Count
};
....
yield return result;
}
}
Upvotes: 1
Views: 2307
Reputation: 33637
You want to generate the IEnumerable on some other background thread and then Process each users object in this enumerable on Main UI thread, if this understanding is correct then you can do something like this:
var users = Run(path); //NOTE: This doesn't execute your run method yet, Run will only execute when you start enumerating the users values
users.ToObservable(System.Concurrency.Scheduler.ThreadPool) //The enumerator will be scheduled on separate thread
.ObserveOn(frm) //Observe on UI thread of win form
.Subscribe(s => {}) //This will run in UI thread for each user object
Upvotes: 1
Reputation: 117027
It would be good if you could describe the issue that you have.
What I can tell you at the moment is that you have a couple of potential issues.
First, using ObserveOnDispatcher
is meant to work with the System.Windows.Threading.Dispatcher
(usually created with WPF). If you're running your code outside of WPF it effectively means "the current thread" and this could lead to your subscription not being able to run if the current thread is busy. In other words, you might create a dead-lock.
I ran your code in both WPF and LINQPad and it worked fine in WPF, but dead-locked in LINQPad. If I observed on another thread then it worked fine in LINQPad and failed in WPF.
Secondly, you're turning an iterator method into an async observable and that won't work as you expect. An iterator doesn't actually run any code until you actually iterate through the enumerable. Essentially you return from Run
almost instantly and you only execute the body of you Run
method in the Subscribe
code - and that's the wrong thread!
What you need to do is force immediate execution of the enumerable - at the very least - change your code to look like this:
private UserState[] Run(string sitemap)
{
...
Func</* url type */, UserState> create = url =>
{
var result = new UserState
{
Url = url.Value,
TotalItems = urls.Count
};
....
return result;
};
return (from url in urls select create(url)).ToArray();
}
Your main code needs to have a little clean up:
Observable.ToAsync<string, UserState[]>(Run)(path)
.ObserveOnDispatcher()
.Subscribe(o =>
{
foreach (var item in o)
{
WriteLog(item.StatusCode, item.Url);
}
});
Let me know if any of this helps.
EDIT: Added sample FromEventPattern
code as per OP request in the comments.
Here's an example Windows Forms use of FromEventPattern
. The first part creates a way to clean up subscriptions when the form closes.
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
// Create a collection of IDisposable
// to allow clean-up of subscriptions
var subscriptions =
new System.Reactive.Disposables.CompositeDisposable();
var formClosings = Observable
.FromEventPattern<FormClosingEventHandler, FormClosingEventArgs>(
h => this.FormClosing += h,
h => this.FormClosing -= h);
// Add a subscription that cleans up subscriptions
// when the form closes
subscriptions.Add(
formClosings
.Subscribe(ea => subscriptions.Dispose()));
This next part watches for mouse drags on a picture box and creates messages to let the user know how far they've dragged.
var pictureBox1MouseDowns = Observable
.FromEventPattern<MouseEventHandler, MouseEventArgs>(
h => pictureBox1.MouseDown += h,
h => pictureBox1.MouseDown -= h);
var pictureBox1MouseMoves = Observable
.FromEventPattern<MouseEventHandler, MouseEventArgs>(
h => pictureBox1.MouseMove += h,
h => pictureBox1.MouseMove -= h);
var pictureBox1MouseUps = Observable
.FromEventPattern<MouseEventHandler, MouseEventArgs>(
h => pictureBox1.MouseUp += h,
h => pictureBox1.MouseUp -= h);
var pictureBox1MouseDrags =
from md in pictureBox1MouseDowns
from mm in pictureBox1MouseMoves.TakeUntil(pictureBox1MouseUps)
let dx = mm.EventArgs.Location.X - md.EventArgs.Location.X
let dy = mm.EventArgs.Location.Y - md.EventArgs.Location.Y
select new Point(dx, dy);
var pictureBox1MouseDragMessages =
from md in pictureBox1MouseDrags
let f = "You've dragged ({0}, {1}) from your starting point"
select String.Format(f, md.X, md.Y);
The next part tracks the number of times a button is clicked and creates a messages to display to the user.
var button1ClickCount = 0;
var button1Clicks = Observable
.FromEventPattern(
h => button1.Click += h,
h => button1.Click -= h);
var button1ClickCounts =
from c in button1Clicks
select ++button1ClickCount;
var button1ClickMessages =
from cc in button1ClickCounts
let f = "You clicked the button {0} time{1}"
select String.Format(f, cc, cc == 1 ? "" : "s");
Finally the two message obervables are merged together and are subscribed to, placing the message in a label.
var messages = pictureBox1MouseDragMessages
.Merge(button1ClickMessages);
// Add a subscription to display the
// merged messages in the label
subscriptions.Add(
messages
.Subscribe(m => label1.Text = m));
}
}
Keep in mind that all of this resides in the form's constructor and no module level fields or properties are used and all the events handlers are removed when the form closes. Very neat stuff.
Upvotes: 1