marian
marian

Reputation: 21

Spring Redis Stream consumer stops consuming messages (Address already in use)

I tried to test redis streams with spring-data-redis. I've implemented two aplications - first one adds records to the stream, second one consumes messages from the stream.

It works, but after some time (usually after already 80000+ messages were processed) the 'consumer app' throws an exception: "org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379"

I have installed Redis 6.2.1 on Ubuntu (win10 linux subsystem) and run it with default configuration (port is 6379). The same happens when I run Redis in Docker.

I have tested Redis performance, so messages were added in a loop with no delay. I add messages to stream as:

ObjectRecord<String, String> record = StreamRecords.newRecord()
                .ofObject(message)
                .withStreamKey("my-stream");

redisTemplate.opsForStream()
                .add(record);

I consume the messages as:

StreamReceiver.StreamReceiverOptions<String, MapRecord<String, String, String>> options =
                StreamReceiver.StreamReceiverOptions.builder()
                        .pollTimeout(Duration.ofSeconds(2))
                        .build();

StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);
        Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-tream"));

messages.subscribe(new StreamSubscriber());

where StreamSubscriber is just my org.reactivestreams.Subscriber implementation. I tried also StreamMessageListenerContainer approach, but the result is the same.

org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.translateException(LettuceConnectionFactory.java:1553)
    at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.lambda$getConnectionAsync$0(LettuceConnectionFactory.java:1491)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at io.lettuce.core.DefaultConnectionFuture.lambda$null$0(DefaultConnectionFuture.java:257)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:143)
    at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:254)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at io.lettuce.core.AbstractRedisClient.lambda$initializeChannelAsync0$4(AbstractRedisClient.java:405)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
    at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
    at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
    ... 30 more
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:78)
    at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
    at io.lettuce.core.RedisClient.lambda$transformAsyncConnectionException$20(RedisClient.java:767)
    at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:253)
    ... 22 more
Caused by: java.util.concurrent.CompletionException: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
    ... 20 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

RedisTemplate and LettuceConnectionFactory were created and autoconfigured by springboot (i have tried some also custom configuration, but with no success)

What I have also observed is that redis connection is closed after message was send to the stream and new connection is created when the new message is going to be sent.

2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] sk.kedros.learn.camel.redis.Publisher    : sending message: {"obuSn":"eeee00a5-616e-46ad-80e4-50780fab1336","lon":0.0,"lat":0.0,"data":"99761 - provider1"}
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] o.s.d.redis.core.RedisConnectionUtils    : Fetching Redis Connection from RedisConnectionFactory
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] io.lettuce.core.RedisChannelHandler      : dispatching command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, epid=0x1] write() writeAndFlush command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, epid=0x1] write() done
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandEncoder  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Received: 22 bytes, 1 commands in the stack
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] i.l.core.protocol.RedisStateMachine      : Decode done, empty stack: true
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Completing command AsyncCommand [type=XADD, output=StatusOutput [output=1618125131455-2, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] o.s.d.redis.core.RedisConnectionUtils    : Closing Redis Connection.

that also influences Redis connection of the 'consumer app' - where the redis connections are opened and closed periodically.

2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] onStreamMessage(MapBackedRecord{recordId=1618125112425-0, kvMap={payload={"obuSn":"d066bdd7-21b5-46f0-81ca-09afe4fd6596","lon":0.0,"lat":1.0,"data":"68541 - provider1"}}}): Emitting item, slow-path
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.RedisStateMachine      : Decode done, empty stack: true
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] Completing command SubscriptionCommand [type=XREAD, output=StreamReadOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisChannelHandler      : closeAsync()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, epid=0x3bdf] closeAsync()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] onComplete()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] scheduleIfRequired()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] scheduleIfRequired(): Activating subscription
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] scheduleIfRequired(): Activating subscription, offset ReadOffset(offset=1618125112425-0)
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisClient              : Trying to get a Redis connection for: redis://localhost
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisClient              : Resolved SocketAddress localhost:6379 using redis://localhost
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.AbstractRedisClient      : Connecting to Redis at localhost:6379
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelInactive()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-8] io.lettuce.core.protocol.CommandHandler  : [channel=0x87e269ec, [id: 0x6e14c57c] (inactive), chid=0x3be0] channelRegistered()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, epid=0x3bdf] deactivating endpoint handler
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelInactive() done
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.ConnectionWatchdog     : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, last known addr=localhost/127.0.0.1:6379] channelInactive()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.ConnectionWatchdog     : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, last known addr=localhost/127.0.0.1:6379] Reconnect scheduling disabled
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelUnregistered()
2021-04-11 09:11:52.432 DEBUG 21636 --- [ioEventLoop-4-8] io.lettuce.core.AbstractRedisClient      : Connecting to Redis at localhost:6379: localhost:6379

io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]

As I mentioned, it works fine for some time, but finally it ends with Address already in use: no further information

Am I missing something? Is it Redis installation/configuration problem? Or is it lettuce client problem? Or?

Upvotes: 2

Views: 2642

Answers (2)

Florian Patzl
Florian Patzl

Reputation: 333

In my case, the error no longer occurred after enabling connection pooling of the underlying Lettuce Redis client.

In application.properties:

# Explicitly select our preferred driver instead of relying on automatic selection.
spring.data.redis.client-type=lettuce
# Spring seems to auto-detect commons-pool2 in the classpath and activate pooling,
# but we prefer to clearly state we are using pooling here.
spring.data.redis.lettuce.pool.enabled=true

and adding the dependency on commons-pool2 that is necessary for connection pooling:

implementation("org.apache.commons:commons-pool2")

Upvotes: 0

Juyel
Juyel

Reputation: 131

Probably Your Redis server gets crashed due to lack of memory. allocate more memory to your server or you can perform stream maxlen operation either for Redis stream so that the Redis server has the consistency of memory size.

Upvotes: 0

Related Questions