Matt Roberts
Matt Roberts

Reputation: 26877

How to subscribe to long-running source of messages

I'm trying reactiveui, and I don't understand how to make a simple scenario work: I have a method that listens for messages in a chat room. So, it's long running, and fires events when a message is found.

using reactiveui, I want to kick off this long running method when the window opens, and have new messages populated on the screen in a listbox. Because I'm using rx, I assumed I'd need an IObseravble version of the long running method, so I made one like this:

    public static IObservable<Message> ObservableStream(int roomid, CancellationToken token)
    {
        return Observable.Create<Message>(
            async (IObserver<Message> observer) =>
                {
                 ...
                }
         );
    }

But, I've no idea how to plumb this into reactiveui. Would I need a ObservableAsPropertyHelper<List<Message>> ? At the moment I just kick off the long running method in a Task.Factory.Startnew and then on events I manually add to a list of messages, which is bound to the front end list box. This works but it's not using any reactiveui, and it strikes me that there should be a reactiveui way to do this:

public class MainWindowViewModel : ReactiveObject
{
    private ThreadSafeObservableCollection<Message> _Messages;
    public ThreadSafeObservableCollection<Message> Messages
    {
        get { return _Messages; }
        set { 
            this.RaiseAndSetIfChanged(x => x._Messages, value);
        }
    }


    public MainWindowViewModel()
    {
        Client.NewMessage += (sender, args) => Messages.Add(args.Message);
        var task = Task.Factory.StartNew(() => Client.GetStream(token), token, TaskCreationOptions.LongRunning, TaskScheduler.Current);

    }
}


// IN the code-behind

this.OneWayBind(ViewModel, x => x.Messages, x => x.MessageList.ItemsSource);

Upvotes: 2

Views: 311

Answers (1)

Ana Betts
Ana Betts

Reputation: 74654

How about:

var Messages = ObservableStream(...).CreateCollection();

Then, you can listen to Messages for ItemAdded et al, or just bind it via OneWayBind and the UI will automatically update.

Would I need a ObservableAsPropertyHelper> ?

So, normally this would be a good idea for most Web API calls, but since you're streaming the list instead of replacing the list every time, you need to create a collection at startup and add items to it as they come in. Another way you could do this is:

var Messages = new ReactiveList<Message>();
ObservableStream(...).ObserveOn(RxApp.MainThreadScheduler).Subscribe(x => Message.Add(x));

Upvotes: 2

Related Questions