Acech
Acech

Reputation: 1

how to make connection exclusive in redisson when sending some redis commands like BLPOP?

I'm facing a problem with redisson client and redis cluster behind a proxy infrastructure. As far as i know, the redisson client is working with a connection pool and a asynchronous mechanism, and may sending data bytes to same tcp socket(let's assume it is) when multiple threads are sharing it.
BUT, currently the redis-cluster-proxy doesn't support the BLPOP(or other blocking commands) well.
If we send BLPOP command to the proxy, and afterward commands will fail. You could say that when BLPOP is waiting for the cluster to reponse, afterwards commands will fail due to a structural problem of proxy.

    @Test
    public void testProxySocket() throws Exception {
        final Socket redisSocket = new Socket("the.proxy.domain", 6379);

        final InputStream in = redisSocket.getInputStream();
        final OutputStream out = redisSocket.getOutputStream();

        out.write("AUTH <PASSWORD>\r\n".getBytes(StandardCharsets.UTF_8));
        final ExecutorService pool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int idx = i;
            pool.submit(() -> {
                try {
                    out.write("BLPOP test:list 2\r\nSET test:attr 3 EX 3\r\nPING\r\n".getBytes(StandardCharsets.UTF_8));
                    System.out.println("Sent pipe commands: " + idx);
                } catch (Exception e) {
                    System.out.println("Sent pipe commands: " + idx + " failed! " + e.getMessage());
                }
            });
        }

        final byte[] buffer = new byte[1024];
        int len;
        while ((len = in.read(buffer)) != -1) {
            log.info("Read data: {}", new String(buffer, 0, len, StandardCharsets.UTF_8));
        }
        // output will be
        // Read data: *-1
        // -ERR, the proxy connection is blocking
        // -ERR, the proxy connection is blocking
        // -ERR, the proxy connection is blocking
        // ......

        pool.shutdown();
    }

Of course, the workaround for BLPOP(or other blocking commands) will be: use the exclusive socket connection when executing this command.
But, when i use the redisson-client's RBlockingQueue for BLPOP commands, eg: 'bq.poll()', it always send the BLPOP command in the asynchronous way, and wait for the BLPOP future to be done.

RBlockingQueue<String> bq = redissonClient.getBlockingQueue("test:blocking:queue");
String item = bq.poll();
// The RedissonBlockingQueue implementation:
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
    return commandExecutor.getInterrupted(pollAsync(timeout, unit));
}
public RFuture<V> pollAsync(long timeout, TimeUnit unit) {
    return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), toSeconds(timeout, unit));
}
// The execution of BLPOP will be delivered to
// RedisExecutor.execute()
connectionFuture.whenComplete((connection, e) -> {
    if (connectionFuture.isCancelled()) {
        connectionManager.getShutdownLatch().release();
        return;
    }

    if (connectionFuture.isDone() && connectionFuture.isCompletedExceptionally()) {
        connectionManager.getShutdownLatch().release();
        exception = convertException(connectionFuture);
        return;
    }

    sendCommand(attemptPromise, connection);

    writeFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            checkWriteFuture(writeFuture, attemptPromise, connection);
        }
    });
});
// ......

So, how to force the BLPOP command or the whole RedissonBlockingQueue to use a "exclusive" connection which will block other threads from sending commands.

Upvotes: 0

Views: 47

Answers (0)

Related Questions