Reputation: 83
I am trying to build a custom transformer application using the guidelines provided here https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#streams-dev-guide
I have started kafka on my windows machine.
I have http source running on windows machine it writes to destination transformData.
Command: java -Dserver.port=8123 -Dhttp.path-pattern=/data -Dspring.cloud.stream.bindings.output.destination=transformData -jar http-source-kafka-10-1.3.1.RELEASE.jar
I have transform application running that reads input from transformData and outputs to destination transformedData
Command
java -Dserver.port=8090 -Dspring.cloud.stream.bindings.input.destination=transformData -Dspring.cloud.stream.bindings.output.destination=transformedData -jar transformer-0.0.1-SNAPSHOT.jar
I have log sink running that reads from destination transformedData
Command
java -Dserver.port=8888 -Dspring.cloud.stream.bindings.input.destination=transformedData -jar log-sink-kafka-10-1.3.1.RELEASE.jar
Problem:
When I try to send this curl request:
curl -H "Content-Type: application/json" -X POST -d '{"id":"1", "temp":"400"}' http://172.20.24.47:8123/data
On the custom Transformer console I see errors:
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token '▒': was expecting ('true', 'false' or 'null') at [Source: (byte[])"? contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"id":"1", "temp":"400"}"; line: 1, column: 4] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3121) ~[jackson-databind-2.9.6.jar!/:2.9.6] at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:114) ~[spring-cloud-stream-2.0.0.RELEASE.jar!/:2.0.0.RELEASE] ... 37 common frames omitted
Can any one help?
Upvotes: 0
Views: 440
Reputation: 83
I got this to finally work. When building the custom application using the Spring initializr instead of selecting 2.0.4 release as starter I reverted back to 1.5.15 Release. Now I have no more need to pass properties on the subscriber end that is the custom app and the logger sink app using headerModes set to embeddedHeaders.
Upvotes: 1
Reputation: 5924
It appears that you are using Spring Cloud Stream 2.0.0.RELEASE, but your app is 1.3.x. Can you set spring.cloud.stream.bindings.input.consumer.headerMode
to embeddedHeaders
in the processor app where it is failing? In 2.0, the default is not to embed headers as Kafka supports it out of the box. However, in versions prior to 2.0 (which is used by 1.3.x apps), the default is to embed the headers. You need to explicitly set that when using that combination.
Upvotes: 0