Reputation: 7591
I have read: Nanomsg multicast bandwidth issue
But I don't need true multicast IP (e.g. 239.0.0.0:3000) I also have a very light load. So I am not that concerned about backpressure.
Yes, I could use the bus paradigm. But say I want to test first with the pubsub.
What would be the sender use as the send to url for tcp to send to multiple clients?
(I am actually using the next gen nanomsg):
https://nanomsg.github.io/nng/man/v1.0.0/nng_tcp.7.html
Can I send to tcp://*:3000
Can I bind the subscribers to that address?
Upvotes: 1
Views: 902
Reputation: 7591
The info that I have gotten from gdamore seems to indicate that one cannot use tcp://*:3000 since you need to bind the producer/publisher to some interface. However, a fixed endpoint I now have 1-1 publisher to subscriber working over tcp.
(this was discussed in: https://www.freelists.org/post/nanomsg/does-nanomsg-support-multi-producer-in-pubsub-mode,10 )
Now that I see that multicast is not really possible (until UDP transport is allowed) I have revised my question and put it in the context of my final solution. See:
multiple publishers & subscribers in nanomsg (nng)
Upvotes: 1
Reputation: 1
Can I send to
tcp://*:3000
?
Yes.
Can I bind the subscribers to that address ?
Yes. Sure, one has to own one's own set (obviously can't "share" any exclusively owned resource)
Yet,
the PUB
will have to "knit-the-net" with properly configured .connect()
-s ( nng_dial()
-s ), so as to reach all those remote AccessPoint-s, which it need not be aware of on its own.
A freedom to use a reverse .bind()/.connect()
was a common practice for ZeroMQ and nanomsg, yet check if the nng keeps this freedom of choice.
...want to test first with the pubsub. What would be the sender use as the send to url for tcp to send to multiple clients ?
Well, things seem to be a bit confusing, if just having started with ZeroMQ / nanomsg / nng.
First :
both the initial zeromq API v2.x and nng will not send to just multiple clients, but will always send to all clients ever visible to the Socket-archetype engine ( possible configuration tweaking details are beyond of the scope of this post ).
In other words, the PUB
-side application code does not decide at all who will receive a message or not. All will once seen clients be on a "delivery-list" and the network will have to carry all replicas towards all such clients, that were able to get ad-hoc connected ( some sooner, some later, the rest who never got visible back are a separate concert - also not being your issue, as posted in your previous question ).
The PUB/SUB
subscription-mechanics is an important issue here. Both the nng and the early zeromq v2.x, perform the subscription TOPIC-filtering deferred, and it actually takes place once each message gets delivered to a remote client-side, at the costs of remote-{PASS|FAIL}-rejection, based on that remote client's subscribed TOPIC(s). So all messages flow towards all clients by design.
This means, that your network-level communication multicast is practically of no benefit for this type of a local-(only)-network ( as was posted in your previous question ) PUB/SUB
Scalable Formal Communication Pattern Archetype. Also nng does not have published so far to support an implementation for such a transport-class ( whereas zeromq has ).
Next :
it seems, that a short read into the main conceptual differences in [ ZeroMQ hierarchy in less than a five seconds ] Section might help disambiguate both the terminology and the concept of LEGO-style infrastructure setup:
code is obviously schematic, incomplete, but exemplary to read and comprehend:
// PUB_sender( "<tcp4>://<A.B.C.D>:<PORT>" ); // will launch it
// <tcp6>://[::1]:<PORT>
// <tls+tcp4>://<___aTransportClass_specific_AccessPoint_ADDRESS_>:<_aTransportClass_spcific_AccessPoint_SPECIFIER>
//
// -------------------------------------------------------------------
// SUB_client( "..." ); // anyone ( all ) who will "dial-in" will receive messages
#define PUT64( ptr, u ) \
do { \
(ptr)[0] = (uint8_t)(((uint64_t)(u)) >> 56); \
(ptr)[1] = (uint8_t)(((uint64_t)(u)) >> 48); \
(ptr)[2] = (uint8_t)(((uint64_t)(u)) >> 40); \
(ptr)[3] = (uint8_t)(((uint64_t)(u)) >> 32); \
(ptr)[4] = (uint8_t)(((uint64_t)(u)) >> 24); \
(ptr)[5] = (uint8_t)(((uint64_t)(u)) >> 16); \
(ptr)[6] = (uint8_t)(((uint64_t)(u)) >> 8); \
(ptr)[7] = (uint8_t)((uint64_t)(u)); \
} while (0)
int
PUB_sender( const char *aTransportClassAccessPointURL )
{
int aRetVAL = -1;
nng_socket aSOCKET = NNG_SOCKET_INITIALIZER;
// ---------------------------------aSOCKET = Context().socket( PUB );
if ( ( aRetVAL = nng_pub0_open( &aSOCKET )
) != 0 ) fatal( "nng_pub0_open",
aRetVAL
);
// ---------------------------------aSOCKET.bind( aTransportClassAccessPointURL );
if ( ( aRetVAL = nng_listen( aSOCKET,
aTransportClassAccessPointURL,
NULL,
NNG_FLAG_NONBLOCK
)
) != 0 ) fatal( "nng_listen",
aRetVAL
);
// ---------------------------------aSOCKET.setsockopt( LINGER, 0 );
if ( ( aRetVAL = nng_setopt_int( aSOCKET,
NNG_OPT_LINGER,
0
)
) != 0 ) fatal( "nng_setopt_int",
aRetVAL
);
for (;;)
{
char * aBUF = "12345678";
uint64_t aVAL;
time_t aNOW;
aNOW = time( &aNOW ); printf( "PUB.send()-s: "); showdate( aNOW );
PUT64( aBUF, (uint64_t) aNOW );
// ----------------------------aSOCKET.send( aBUF, ... );
if ( ( aRetVAL = nng_send( aSOCKET,
aBUF,
sizeof( aBUF ),
NNG_FLAG_ALLOC
)
) != 0 ) fatal( "nng_send",
aRetVAL
);
}
nng_free( aBUF, 8 );
nng_close( aSOCKET );
}
Upvotes: 0