Reputation: 279
I want to produce a message into kafka topic. That message should have this pattern:
{"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}}
I know that is a json pattern, so how can i convert that json in String?
I use a maven project, so which dependencies are needed to use
String stringData = JSON.stringify({"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}});
So I think it is better don't convert Json to string and send indeed that massage into kafka topic.
My Code is like that, it can send a String but i don't know how i can modify my code to send the massage above. maybe you can help me.
Producer<String, String> producer = null;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
String msg = "welcome";
producer.send(new ProducerRecord<String, String>("event", msg));
producer.close();
Upvotes: 4
Views: 47999
Reputation: 279
That solved my problem:
Producer<String, String> producer = null;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
try {
producer = new KafkaProducer<String, String>(props);
} catch (Exception e) {
e.printStackTrace();
}
blobStorageChecker = new BlobStorageChecker();
String folder = blobStorageChecker.getCurrentDateUTC();
String msg = "{\"targetFileInfo\":{\"path\":\"test/"+folder+"row01-small.txt\"},\"sourceFileInfo\":{\"lastModifiedTime\":1525437960000,\"file\":\"/row01-small-01.txt\",\"filename\":\"/data/row01/row01-small.txt\",\"size\":19728,\"remoteUri\":\"ftp://waws-prod-am2-191.ftp.net/data/orsted-real/inbound/row01\",\"contentEncoding\":\"\",\"contentType\":\"\"}}";
ProducerRecord<String, String> record = new ProducerRecord<String, String>("event-orsted-v1", null, msg);
if (producer != null) {
try {
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
}
}
producer.close();
Upvotes: 5
Reputation: 3260
As per the comment you need to send JsonNode
as message on kafka.
Write a custom Serializer / Deserializer for the same.
import java.io.IOException;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JsonNodeSerDes implements Serializer<JsonNode>, Deserializer<JsonNode> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, JsonNode data) {
try {
return mapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
return new byte[0];
}
}
@Override
public JsonNode deserialize(String topic, byte[] data) {
try {
return mapper.readValue(data, JsonNode.class);
} catch (IOException e) {
return null;
}
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public void close() {
}
}
I wrote serializer / deserializer in the same class. You can separate them in two class (one implementing Serializer
, another implementing Deserializer
).
While creating KafkaProducer
you need to provide "value.serializer" config and "value.deserializer" config for KafkaConsumer
.
External Dependencies used:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.8</version>
</dependency>
Upvotes: 3