Reputation: 480
Hi I'm learning to use Akka.net and what I want to do is, create a simple TCP server that will periodically send data to the tcp connection. (which will then be picked up by a processingjs client and display in the output)
Not sure what I'm missing here. Could any of you experts shed some light on the issue?
These are my actors:
using Akka.Actor;
using Akka.IO;
using System;
using System.Text;
using System.Threading.Tasks;
namespace ActorTcpProcessing
{
public class ProcessingServer : ReceiveActor
{
public ProcessingServer()
{
Receive<Tcp.Bound>(TcpBoundHandler);
Receive<Tcp.Connected>(TcpConnectedHandler);
}
internal static Props Create()
{
return Props.Create(() => new ProcessingServer());
}
private void TcpBoundHandler(Tcp.Bound bound)
{
Console.WriteLine("Listening on {0}", bound.LocalAddress);
}
private void TcpConnectedHandler(Tcp.Connected connected)
{
var tcpConnection = Sender;
var connectionHandler = Context.ActorOf(SensorReader.CreateReader(tcpConnection));
tcpConnection.Tell(new Tcp.Register(connectionHandler));
}
}
internal class SensorReader : ReceiveActor
{
private readonly IActorRef _tcpConnection;
public SensorReader(IActorRef tcpConnection)
{
_tcpConnection = tcpConnection;
Receive<SensorDataReceived>(TcpWriteSensorData);
}
internal static Props CreateReader(IActorRef connectionHandler)
{
return Props.Create(() => new SensorReader(connectionHandler));
}
private void TcpWriteSensorData(SensorDataReceived receivedData)
{
_tcpConnection.Tell(Tcp.Write.Create(ByteString.FromString(receivedData.SensorData + "\n")));
}
}
}
I am under the impression that when I register the SensorReader
on a Tcp.Connected
message, all I need to do is send a SensorData
message to the stream.
as such: _sys.EventStream.Publish(new SensorData("sensor-data[x]"));
However, this doesn't seem to work. I get the following log message:
[INFO][11/09/2021 10:22:05 AM][Thread 0004][akka://tcp-processing/deadLetters] Message [Bound] from [akka://tcp-processing/system/IO-TCP/$a#1070400244] to [akka://tcp-processing/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://tcp-processing/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
This is how I'm sending the Tcp.Bind message to my server.
using System;
using Akka.IO;
using Akka.Actor;
using System.Net;
using Topshelf;
namespace ActorTcpProcessing
{
class StreamTcpService : ServiceControl
{
ActorSystem _sys;
public bool Start(HostControl hostControl)
{
_sys = ActorSystem.Create("tcp-processing");
var server = _sys.ActorOf(ProcessingServer.Create(), nameof(ProcessingServer));
var tcpManager = _sys.Tcp();
tcpManager.Tell(new Tcp.Bind(
server,
new IPEndPoint(IPAddress.Any, 10002)));
_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
return true;
}
public bool Stop(HostControl hostControl)
{
_sys.Terminate().ContinueWith(task =>
{
if (task.IsCompleted && !task.IsCanceled)
Console.WriteLine("Stream Tcp stopped");
});
return true;
}
}
}
As you can see, I am specifying the server
as the handler of the Bound message. Or am I completely wrong?
Also, I must mention that when I start my processingjs client, I do receive a Tcp.Connected
message to my ProcessingServer
, which in turn creates the SensorReader
and registers as the handler.
I'd also like to know am I using the EventStream
publish correctly?
_sys.EventStream.Publish(new SensorData("sensor-data[x]"));
Any help to get this is highly appreciated. Thank you.
Upvotes: 1
Views: 720
Reputation: 278
Your problem is at line:
tcpManager.Tell(new Tcp.Bind(server,new IPEndPoint(IPAddress.Any, 10002)));
This is a bit misleading, but server
in the constructor of Tcp.Bind
is the handler of all incoming connections in the form of Tcp.Connected
, not the actor that will receive the Tcp.Bound
message.
The Tcp
actor will reply with Tcp.Bound
to the Sender
of the Tcp.Bind
message. To fix your problem, you should pass the IActorRef
reference of the Actor expected to receive the Tcp.Bound
message instead of the original sender. In your case:
tcpManager.Tell(new Tcp.Bind(server, new IPEndPoint(IPAddress.Any, 10002)), server);
Upvotes: 2