Reputation: 23
I'm trying to implement a Spring Cloud Stream application to:
I've tried some different approaches, but I must be misunderstanding some things.
I've tried to model this case with a simple application, which works perfectly (generated with Spring Initializr and those relevant changes):
pom.xml
<properties>
<java.version>21</java.version>
<spring-cloud.version>2023.0.4</spring-cloud.version>
<spring-function.version>5.0.1</spring-function.version>
</properties>
<!-- ... -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Java code
@Configuration
public class CloudFunctionConfiguration {
private final StreamBridge streamBridge;
public CloudFunctionConfiguration(final StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@Bean
Supplier<String> source() {
return () -> "a,1,b,2,3,c";
}
@Bean
Consumer<String> splitter() {
return string -> {
Arrays.asList(string.split(",")).forEach(s -> {
final var message = MessageBuilder
.withPayload("{\"attribute\": \"".concat(s).concat("\"}"))
.setHeader("some-header", "some-content")
.build();
streamBridge.send("output", message);
});
};
}
}
application.properties
spring.application.name=sftp-demo
logging.level.root=DEBUG
logging.file.name=app.log
spring.cloud.function.definition=source|splitter
spring.cloud.stream.output-bindings=output
spring.cloud.stream.bindings.output.producer.required-groups=app
spring.cloud.stream.bindings.output.destination=ex.docs
spring.cloud.stream.rabbit.bindings.source|splitter-out-0.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.source|splitter-out-0.consumer.declareExchange=false
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
This works as expected; all messages are published each and every time the polled supplier is invoked.
But things stop working when I change that simple String
supplier for the SFTP supplier function.
I've generated a sample GZipped file with three records, and then I made these changes:
pom.xml (added)
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>spring-sftp-supplier</artifactId>
<version>${spring-function.version}</version><!-- 5.0.1 -->
</dependency>
Java code (new version)
@Configuration
public class CloudFunctionConfiguration {
protected static final Logger LOGGER = LoggerFactory.getLogger(CloudFunctionConfiguration.class);
private final GzipMessageParser parser;
private final StreamBridge streamBridge;
public CloudFunctionConfiguration(
final GzipMessageParser parser,
final StreamBridge streamBridge) {
this.parser = parser;
this.streamBridge = streamBridge;
}
@Bean
Consumer<Message<byte[]>> messageParser() {
return gzipFile -> {
parser.parse(gzipFile).forEach(message -> {
LOGGER.info("SENDING MESSAGE {}", message);
streamBridge.send("output", message);
});
};
}
}
application.properties (changes) [edit: fixed a copy/paste mistake]
# changed
spring.cloud.function.definition=sftpSupplier|messageParser
spring.cloud.stream.rabbit.bindings.sftpSupplier|messageParser-out-0.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.sftpSupplier|messageParser-out-0.consumer.declareExchange=false
# added
spring.cloud.config.enabled=false
file.consumer.mode=contents
sftp.supplier.delay-when-empty=10s
sftp.supplier.filename-regex=^some-prefix.*?GZ$
sftp.supplier.remote-dir=upload
sftp.supplier.stream=true
sftp.supplier.factory.host=localhost
sftp.supplier.factory.port=2222
sftp.supplier.factory.username=sftpuser
sftp.supplier.factory.password=sftppwd
sftp.supplier.factory.private-key=file:///opt/keys/ssh_host_rsa_key
sftp.supplier.factory.allow-unknown-keys=true
It's actually a little more complicated than that (I've configured a database and a metadata store), but I'll omit those extra configurations (for they seem to work fine).
When I run this app, all messages are sent to RabbitMQ, but I get some errors. This is what's in the log.
sftpSupplier
to the messageParser
function)(...) DEBUG 781757 --- [sftp-demo] [boundedElastic-1] c.f.c.c.BeanFactoryAwareFunctionRegistry : Converted Message: GenericMessage [payload=byte[528], headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"some-prefix_doc-files.GZ","link":false,"modified":1733178662000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":528}, file_remoteDirectory=upload, id=8b2b9553-cb47-fdd3-111f-6c4d39af4593, contentType=application/octet-stream, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@6b5b5c4b, file_remoteFile=some-prefix_doc-files.GZ, timestamp=1733261637557}] to: GenericMessage [payload=byte[528], headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"some-prefix_doc-files.GZ","link":false,"modified":1733178662000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":528}, file_remoteDirectory=upload, id=8b2b9553-cb47-fdd3-111f-6c4d39af4593, contentType=application/octet-stream, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@6b5b5c4b, file_remoteFile=some-prefix_doc-files.GZ, timestamp=1733261637557}]
I see that LOGGER.info("SENDING MESSAGE {}", message);
entry, then a preSend
hook log, a Publishing message (...)
log and a postSend
hook log.
All looks nice, but then I see this:
(...) DEBUG 781757 --- [sftp-demo] [scheduling-1] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function sftpSupplier|messageParser
(...) DEBUG 781757 --- [sftp-demo] [scheduling-1] o.s.i.e.SourcePollingChannelAdapter : Poll resulted in Message: GenericMessage [payload=MonoMap, headers={id=e531d10a-8edc-88fd-fe07-3bba0b950a48, timestamp=1733261637927}]
(...)
(...) DEBUG 823084 --- [sftp-demo] [scheduling-1] o.s.integration.handler.LoggingHandler : bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4841e6], failedMessage=GenericMessage [payload=MonoMap, headers={id=c5903f2f-f4a7-1ca5-18e0-b76cb2f98fa6, timestamp=1733263993106}], headers={id=cb9ce0d7-bac6-0ba2-6e0e-7ac40ad989a1, timestamp=1733263993106}] for original GenericMessage [payload=MonoMap, headers={id=364cff1e-6120-f005-ee63-a34865009323, timestamp=1733263993106}]
(...) ERROR 823084 --- [sftp-demo] [scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4841e6], failedMessage=GenericMessage [payload=MonoMap, headers={id=c5903f2f-f4a7-1ca5-18e0-b76cb2f98fa6, timestamp=1733263993106}]
(...)
Caused by: java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: reactor.core.publisher.MonoMap
That is, Spring Cloud Stream is polling the function definition and automatically binding it to the RabbitMQ Binder. Since the poll results in a message with a Mono payload (I don't know why), the binder tries to convert the message and send it - and then it fails.
Curiously, my first working example does this polling as well, but this is the result:
DEBUG 759414 --- [demo] [scheduling-1] o.s.i.e.SourcePollingChannelAdapter : Received no Message during the poll, returning 'false'
And that's why it works, I think.
Then I've also tried the functional batch producer approach (by creating a Function<Message<String>, List<Message<String>>>
), but I couldn't get the application to send the messages individually; RabbitMQ just gets one message with the serialized List
.
It looks like I shouldn't be doing this thing this way, but by reading the docs I got the impression I could (and should?) compose functions (Spring Cloud Functions with my own) to easily build integration applications.
So how could I build such an application? Should I just do it with Spring Integration, or is there something I can do in code or configuration?
Upvotes: 2
Views: 73
Reputation: 121552
So, here is some kind of answer to your straggle: https://github.com/spring-cloud/spring-functions-catalog/pull/108.
Honestly, even I had some obstacles to figure out how to make everything working together.
With that sample, you can replace simple fileSupplier
to your sftpSupplier
requirements and so on.
I didn't try with StreamBridge
, but that is possible anyway, although it would disconnect the purpose of functions composition a bit.
Upvotes: 2