Reputation: 1301
Using spring cloud stream with RabbitMQ version. I want to achieve consumer auto-scaling as explained here. with the below configuration, I have 2 concurrent consumers. But SimpleMessageListenerContainer couldn't add consumers dynamically. I had checked SMLC instance is initialized with both concurrency and max-concurrency properties. Is there anything, I am missing to get dynamic scaling?
Note: I had published 200 messages in input1 exchange in rabbitmq.
pom.xml file:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>consumerex</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>graphql</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml file:
spring:
cloud:
stream:
bindings:
input1:
destination: input1
binder: local_rabbit
group: logMessageConsumers
consumer:
concurrency: 2
input2:
destination: input2
binder: local_rabbit
group: logMessageConsumers
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
rabbit:
bindings:
input1:
consumer:
maxConcurrency: 10
server:
port: 0
management:
health:
binders:
enabled: true
App.java file:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@SpringBootApplication
@EnableBinding(MyProcessor.class)
public class App
{
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@StreamListener("input1")
public void enrichLogMessage(String log) {
//code to keep current consumer busy. So auto scalling can happen.
long time = System.currentTimeMillis() + 120000;
while(time> System.currentTimeMillis()) {
}
System.out.println(Thread.currentThread().getName() + " input1 " + log);
}
@StreamListener("input2")
public void input2(String log) {
System.out.println(Thread.currentThread().getName() + " input2 " + log);
}
}
MyProcessor.java file:
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface MyProcessor {
@Input("input1")
SubscribableChannel input1();
@Input("input2")
SubscribableChannel input2();
}
Upvotes: 0
Views: 1946
Reputation: 174484
The algorithm to increase the concurrency runs on the listener thread(s) so sleeping the thread like that will prevent it from working.
The following application shows it working as designed...
@SpringBootApplication
@EnableBinding(Sink.class)
public class So59723356Application {
private static final Logger log = LoggerFactory.getLogger(So59723356Application.class);
public static void main(String[] args) {
SpringApplication.run(So59723356Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) throws InterruptedException {
log.info(in);
Thread.sleep(1_000);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> IntStream.range(0, 50).forEach(i -> template.convertAndSend("input", "input", "foo"));
}
}
spring.cloud.stream.bindings.input.group=group
spring.cloud.stream.bindings.input.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input.consumer.max-concurrency=5
and
2020-01-14 14:20:51.849 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:20:51.852 INFO 71536 --- [ main] com.example.demo.So59723356Application : Started So59723356Application in 1.729 seconds (JVM running for 2.123)
2020-01-14 14:20:52.855 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:20:53.862 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:20:54.869 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:20:55.874 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:20:56.882 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:20:57.885 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:20:58.889 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:20:59.894 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:21:00.901 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:21:01.906 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:21:02.911 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:21:03.917 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:21:03.917 INFO 71536 --- [ input.group-2] com.example.demo.So59723356Application : foo
2020-01-14 14:21:04.922 INFO 71536 --- [ input.group-2] com.example.demo.So59723356Application : foo
2020-01-14 14:21:04.922 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:21:05.924 INFO 71536 --- [ input.group-2] com.example.demo.So59723356Application : foo
2020-01-14 14:21:05.924 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
2020-01-14 14:21:06.930 INFO 71536 --- [ input.group-2] com.example.demo.So59723356Application : foo
2020-01-14 14:21:06.930 INFO 71536 --- [ input.group-1] com.example.demo.So59723356Application : foo
See the new thread start at 14:21:03.917
.
Upvotes: 2