Reputation: 21
I tried to test redis streams with spring-data-redis. I've implemented two aplications - first one adds records to the stream, second one consumes messages from the stream.
It works, but after some time (usually after already 80000+ messages were processed) the 'consumer app' throws an exception: "org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379"
I have installed Redis 6.2.1 on Ubuntu (win10 linux subsystem) and run it with default configuration (port is 6379). The same happens when I run Redis in Docker.
I have tested Redis performance, so messages were added in a loop with no delay. I add messages to stream as:
ObjectRecord<String, String> record = StreamRecords.newRecord()
.ofObject(message)
.withStreamKey("my-stream");
redisTemplate.opsForStream()
.add(record);
I consume the messages as:
StreamReceiver.StreamReceiverOptions<String, MapRecord<String, String, String>> options =
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);
Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-tream"));
messages.subscribe(new StreamSubscriber());
where StreamSubscriber
is just my org.reactivestreams.Subscriber
implementation. I tried also StreamMessageListenerContainer
approach, but the result is the same.
org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.translateException(LettuceConnectionFactory.java:1553)
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.lambda$getConnectionAsync$0(LettuceConnectionFactory.java:1491)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at io.lettuce.core.DefaultConnectionFuture.lambda$null$0(DefaultConnectionFuture.java:257)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:143)
at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:254)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at io.lettuce.core.AbstractRedisClient.lambda$initializeChannelAsync0$4(AbstractRedisClient.java:405)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
... 30 more
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:78)
at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
at io.lettuce.core.RedisClient.lambda$transformAsyncConnectionException$20(RedisClient.java:767)
at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:253)
... 22 more
Caused by: java.util.concurrent.CompletionException: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
... 20 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
RedisTemplate
and LettuceConnectionFactory
were created and autoconfigured by springboot (i have tried some also custom configuration, but with no success)
What I have also observed is that redis connection is closed after message was send to the stream and new connection is created when the new message is going to be sent.
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] sk.kedros.learn.camel.redis.Publisher : sending message: {"obuSn":"eeee00a5-616e-46ad-80e4-50780fab1336","lon":0.0,"lat":0.0,"data":"99761 - provider1"}
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] o.s.d.redis.core.RedisConnectionUtils : Fetching Redis Connection from RedisConnectionFactory
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] io.lettuce.core.RedisChannelHandler : dispatching command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] i.lettuce.core.protocol.DefaultEndpoint : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, epid=0x1] write() writeAndFlush command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] i.lettuce.core.protocol.DefaultEndpoint : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, epid=0x1] write() done
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandEncoder : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Received: 22 bytes, 1 commands in the stack
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] i.l.core.protocol.RedisStateMachine : Decode done, empty stack: true
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Completing command AsyncCommand [type=XADD, output=StatusOutput [output=1618125131455-2, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] o.s.d.redis.core.RedisConnectionUtils : Closing Redis Connection.
that also influences Redis connection of the 'consumer app' - where the redis connections are opened and closed periodically.
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver : [stream: loc] onStreamMessage(MapBackedRecord{recordId=1618125112425-0, kvMap={payload={"obuSn":"d066bdd7-21b5-46f0-81ca-09afe4fd6596","lon":0.0,"lat":1.0,"data":"68541 - provider1"}}}): Emitting item, slow-path
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.RedisStateMachine : Decode done, empty stack: true
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] Completing command SubscriptionCommand [type=XREAD, output=StreamReadOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisChannelHandler : closeAsync()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] i.lettuce.core.protocol.DefaultEndpoint : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, epid=0x3bdf] closeAsync()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver : [stream: loc] onComplete()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver : [stream: loc] scheduleIfRequired()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver : [stream: loc] scheduleIfRequired(): Activating subscription
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver : [stream: loc] scheduleIfRequired(): Activating subscription, offset ReadOffset(offset=1618125112425-0)
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisClient : Trying to get a Redis connection for: redis://localhost
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisClient : Resolved SocketAddress localhost:6379 using redis://localhost
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.AbstractRedisClient : Connecting to Redis at localhost:6379
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelInactive()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-8] io.lettuce.core.protocol.CommandHandler : [channel=0x87e269ec, [id: 0x6e14c57c] (inactive), chid=0x3be0] channelRegistered()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.lettuce.core.protocol.DefaultEndpoint : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, epid=0x3bdf] deactivating endpoint handler
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelInactive() done
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.ConnectionWatchdog : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, last known addr=localhost/127.0.0.1:6379] channelInactive()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.ConnectionWatchdog : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, last known addr=localhost/127.0.0.1:6379] Reconnect scheduling disabled
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelUnregistered()
2021-04-11 09:11:52.432 DEBUG 21636 --- [ioEventLoop-4-8] io.lettuce.core.AbstractRedisClient : Connecting to Redis at localhost:6379: localhost:6379
io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]
As I mentioned, it works fine for some time, but finally it ends with Address already in use: no further information
Am I missing something? Is it Redis installation/configuration problem? Or is it lettuce client problem? Or?
Upvotes: 2
Views: 2642
Reputation: 333
In my case, the error no longer occurred after enabling connection pooling of the underlying Lettuce Redis client.
In application.properties:
# Explicitly select our preferred driver instead of relying on automatic selection.
spring.data.redis.client-type=lettuce
# Spring seems to auto-detect commons-pool2 in the classpath and activate pooling,
# but we prefer to clearly state we are using pooling here.
spring.data.redis.lettuce.pool.enabled=true
and adding the dependency on commons-pool2
that is necessary for connection pooling:
implementation("org.apache.commons:commons-pool2")
Upvotes: 0
Reputation: 131
Probably Your Redis server gets crashed due to lack of memory. allocate more memory to your server or you can perform stream maxlen operation either for Redis stream so that the Redis server has the consistency of memory size.
Upvotes: 0