Reputation: 10538
I'm trying to listen for some connections using the standard System.Net Socket API and I'm planning on using Reactive Extensions to bridge the gap and create a intuitive way of listening to aforementioned connectionss.
Here's my code so far:
public RxConnectionListener(int port, Socket socket, IScheduler scheduler)
{
_socket = socket;
// TODO: Lazy binding?
_socket.Bind(new IPEndPoint(IPAddress.Any, port));
_socket.Listen(0);
var task = Task.Factory.FromAsync(
socket.BeginAccept,
result => socket.EndAccept(result),
null);
_connections = Observable.Defer(() => Observable.FromAsync(() => task)
).Select(s => new RxConnection(s))
.ObserveOn(scheduler)
.Repeat();
}
Now, the socket listening IS working as planned - I'm receiving connections no problem. Issue is, is that the first connection is being received more than once (i.e, it appears Observable.FromAsync
is caching the result of the async task
object). I know this is obviously due to the Repeat()
statement but I was under the impression that wrapping Observable.FromAsync
inside of Observable.Defer
and then invoking the Repeat
on the deferred observable would circumvent the caching - what am I doing wrong?
Subscription code is simply:
listener
.Connections
.Subscribe(OnNewConnection);
Where listener.Connections
is a property on an instance of RxConnectionListener
called Connections
which is backed by _connections
OnNewConnection
is as follows:
protected virtual void OnNewConnection(IConnection connection)
{
Console.WriteLine(connection.RemoteAddress);
}
Observed (pun intended) output after trying to connect via TCP once:
::ffff:127.0.0.1
::ffff:127.0.0.1
::ffff:127.0.0.1
::ffff:127.0.0.1
..
(to infinity and beyond)
Edit: for completeness, I'm using the EventLoopScheduler
, although commenting out the ObserveOn
calls makes no difference.
Upvotes: 1
Views: 420
Reputation: 1438
I'm quite sure that the Observable.FromAsync would cache the Result property of the task you're building, therefore the repeat will just give you back the result from the task instead of a new observable every time. In order to create a valid repeatable connection you will need to rebuild your task every time. This way the socket result will not be cached anymore.
var _connections = Observable.Defer(() => Observable.FromAsync(() =>
Task.Factory.FromAsync(
socket.BeginAccept,
result => socket.EndAccept(result),
null)))
.Select(s => new RxConnection(s))
.Repeat();
Upvotes: 0
Reputation: 4744
By writing
var task = Task.Factory.FromAsync(
socket.BeginAccept,
result => socket.EndAccept(result),
null);
You created a task getting the next socket connected. If you ask for the task's result twice, you will get the same result both times. That's normal: a task always behaves this way: it runs until it completes, then "caches" its result (be it a normal ending or an exception).
What you meant to do was tu create a function that creates a Task, like this:
Func<Task<Socket>> acceptTask = () =>
{
return Task.Factory.FromAsync(
socket.BeginAccept,
result => socket.EndAccept(result),
null);
};
Now, you can create an observable from this Task factory easily: Observable.FromAsyn(acceptTask)
Note that it is probably a bad idea to create an observable from a task that you created from the async pattern: there are methods to create the Observable directly from the pattern: save the creation of observables from tasks to the cases where the operations you want to create an observable from are already tasks.
Upvotes: 1