Reputation: 4769
I want to create RSocket request endpoints using plan Java RSocket SDK and without using Spring Framework. I was able to create a sample request with the help of the below code snippet. This works only for the request tcp://localhost:7000
. But I want to create different endpoints similar to this.
public class ServerExample {
public static void main(String[] args) {
Hooks.onErrorDropped(e -> {});
RSocket handler = new RSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
System.out.println("RequestResponse: " + payload.getDataUtf8());
return Mono.just(payload);
}
@Override
public Flux<Payload> requestStream(Payload payload) {
System.out.println("RequestStream: " + payload.getDataUtf8());
return Flux.just("First", "Second").map(DefaultPayload::create);
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
System.out.println("FireAndForget: " + payload.getDataUtf8());
return Mono.empty();
}
};
RSocketServer.create(SocketAcceptor.with(handler))
.bindNow(TcpServerTransport.create("localhost", 7000))
.onClose()
.doOnSubscribe(subscription -> System.out.println("RSocket Server listen on tcp://localhost:7000"))
.block();
}
}
Upvotes: 0
Views: 533
Reputation: 16354
You can leverage the RSocket protocol defined metadata and more explicitly the Routing
extension metadata.
Your server implementation would be updated to respond based on the particular request routing metadata value dynamically switching responders:
public class ServerExample {
public static void main(String[] args) {
Hooks.onErrorDropped(e -> {
});
final String userRoute = "/user"; // this defines one of the route values
final String organizationRoute = "/organization"; // this defines another route value
RSocket handler = new RSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
final String route;
if (payload.hasMetadata()) { // check if you have compulsory metadata
route = new RoutingMetadata(payload.metadata().slice()).iterator().next(); // read the routing metadata value
} else {
throw new IllegalStateException();
}
switch (route) { // based on the route value, you can respond accordingly
case userRoute: {
System.out.println("RequestResponse for route " + route + ": " + payload.getDataUtf8());
return Mono.just(DefaultPayload.create("Echo for: User"));
}
case organizationRoute: {
System.out.println("RequestResponse for route " + route + ": " + payload.getDataUtf8());
return Mono.just(DefaultPayload.create("Echo for: Organization"));
}
default: return Mono.just(DefaultPayload.create("Unsupported route"));
}
}
@Override
public Flux<Payload> requestStream(Payload payload) {
System.out.println("RequestStream: " + payload.getDataUtf8());
return Flux.just("First", "Second").map(DefaultPayload::create);
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
System.out.println("FireAndForget: " + payload.getDataUtf8());
return Mono.empty();
}
};
RSocketServer.create(SocketAcceptor.with(handler))
.bindNow(TcpServerTransport.create("localhost", 7000))
.onClose()
.doOnSubscribe(subscription -> System.out.println("RSocket Server listen on tcp://localhost:7000"))
.block();
}
}
Within the client implementation, you need to provide the encoded routing
metadata as follows:
public class ClientExample {
public static void main(String[] args) {
Mono<RSocket> source =
RSocketConnector.create()
.reconnect(Retry.backoff(50, Duration.ofMillis(500)))
.connect(TcpClientTransport.create("localhost", 7000));
// User route request
ByteBuf userRouteMetadata = TaggingMetadataCodec.createRoutingMetadata(
UnpooledByteBufAllocator.DEFAULT,
Collections.singletonList("/user")
)
.getContent();
RSocketClient.from(source)
.requestResponse(Mono.just(DefaultPayload.create(Unpooled.buffer().writeBytes("Requesting user resource".getBytes()), userRouteMetadata)))
.doOnSubscribe(s -> logger.info("Executing Request"))
.doOnNext(
d -> {
logger.info("Received response data {}", d.getDataUtf8());
d.release();
})
.repeat(5)
.blockLast();
// Organization route request
ByteBuf organizationRouteMetadata = TaggingMetadataCodec.createRoutingMetadata(
UnpooledByteBufAllocator.DEFAULT,
Collections.singletonList("/organization")
)
.getContent();
RSocketClient.from(source)
.requestResponse(Mono.just(DefaultPayload.create(Unpooled.buffer().writeBytes("Requesting organization resource".getBytes()), organizationRouteMetadata)))
.doOnSubscribe(s -> logger.info("Executing Request"))
.doOnNext(
d -> {
logger.info("Received response data {}", d.getDataUtf8());
d.release();
})
.repeat(5)
.blockLast();
}
}
As you can note from this sample, the implementation is fairly complex and will grow in complexity as your implementation gathers more requirements. Special attention must be paid to proper
ByteBuf
manipulation as this can badly hurt your application memory and you can easily leak one of these by retaining a reference hence the need to rely on a solid implementation as the one provided by Spring.
Upvotes: 2