Reputation: 3595
I have a weird case that I need to create many processes from my main process.
These processes that I create will queue some messages from a web socket.
And in an interval, like every second or so, I will poll these small processes from my main process. The language I use is D and the messaging library is ZMQD ( which is just a wrapper for C zmq library ).
The minimal example I have for my main process :
Socket*[] socketList;
string sRecv( Socket* socket)
{
ubyte[256] buffer;
immutable size = socket.receive(buffer);
import std.algorithm: min;
return buffer[0 .. min(size,256)].idup.asString();
}
void startServer( string servername )
{
auto pid = spawnProcess(["/home/erdem/eclipse-workspace/WebSocketDenemesi/websocketdenemesi",
servername, "\n"]);
auto requester = new Socket(SocketType.req);
auto allName = "ipc:///tmp/" ~ servername;
requester.connect(allName);
socketList ~= requester;
}
void main() {
import std.array : split;
import std.algorithm : each;
startServer("iotabtc@depth");
startServer("iotabtc@aggTrade");
startServer("ethbtc@depth");
int counter = 30;
while(counter--) {
foreach ( requester; socketList)
{
requester.send("send");
}
foreach ( requester; socketList)
{
auto strList = sRecv(requester).split("\n");
strList.each!( str => writefln("Received [%d]reply [%s]", strList.length, str) );
}
sleep(1000.msecs);
}
foreach ( requester; socketList)
{
requester.send("done");
}
}
And the minimal example I have for my small processes :
WebSocket startSocket( string temp )
{
auto ws_url = URL(temp);
auto ws = connectWebSocket(ws_url);
if ( !ws.connected )
return null;
return ws;
}
void close( WebSocket ws )
{
int timeOut = 5;
while ( ws && ws.connected && timeOut-- )
{
vibe.core.concurrency.async( { ws.close(); return true;} );
sleep(5.msecs);
}
}
string sRecv(ref Socket socket)
{
ubyte[256] buffer;
immutable size = socket.tryReceive(buffer)[0];
import std.algorithm: min;
return size ? buffer[0 .. min(size,256)].idup.asString() : "";
}
void main( string[] args ) {
auto responder = Socket(SocketType.rep);
string seperatorChar = args[2];
string temp = "ipc:///tmp/" ~ args[1];
responder.bind(temp);
string socketName = "wss://stream.binance.com:9443/ws/" ~ args[1];
auto curSocket = startSocket(socketName);
string curString;
while (true) {
auto result = responder.sRecv();
if ( result == "send")
{
responder.send(curString);
curString = "";
}
else if ( result == "done" )
{
break;
}
else
{
if ( curSocket.dataAvailableForRead )
{
auto text = curSocket.receiveText();
if ( !curString.empty )
curString ~= seperatorChar;
curString ~= text;
}
}
sleep(100.msecs);
}
writeln( "Shutting down: ", args[1]);
curSocket.close();
}
This is the first time I am using this Messaging library. That is why I am using simple REQ/REP
sockets. Is there a better way to achieve my requirement. Is there a better messaging pattern for example? For example is there a pattern in which my small processes are not blocked by responder.receive( buffer );
.
If there is some, than I will not need to listen websocket from another thread.
Upvotes: 2
Views: 130
Reputation: 1
Welcome to the ZeroMQ-based distributed-computing
Is there a better messaging pattern for example ?
This depends on how your processes need to communicate. In short, using REQ/REP
in a blocking-mode is almost the worst option from the menu.
given your websocket just receives an async piece of information ( which is a common way, how Markets re-broadcast the flow of events ), the pure ws.recv()
+ PUSHer.send()
+ if PULLer.poll(): PULLer.recv()
pipelined event-acquisition + PUSH/PULL
propagation + conditional re-processing would best meet the real-world behaviour.
given your footprint of processing farm may grow beyond a single localhost, other transport-classes, for non-local nodes ~{ tipc:// | tcp:// | udp:// | pgm:// | epgm:// | norm:// | vmci:// }
might get into the game, altogether with ipc://
-links on your current localhost - ZeroMQ transparency in handling this mix is a cool benefit of moving into mastering the Zen-of-Zero.
given latency is critical on a massive scale of processing distribution, a PUB/SUB
Scalable Formal Communication Archetype Pattern may become beneficial, with an option to use .setsockopt( zmq.CONFLATE, 1 )
for non-logging nodes, where just the most recent prices are relevant for taking any responsive XTO action of any sort of kind.
Upvotes: 2