i write a jave code read a json data from local file system, and i want send that data as a key value pair's
public static void main(String[] args) throws IOException
Stream<String> objec = Files.lines(Paths.get("path\\data.json"));
String topicName="test";
Properties props=new Properties();
props.put("kafka.bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> sampleProducer= new KafkaProducer<String,String>(props);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,f);
But when i run this program that will send a data to kafkaconsumer as a String, how could i send a json data as a key value pair to kafka consumer...
here the sample json file
Help will be appreciate... Thanks in advance...
Upvotes: 1
Views: 4879
Reputation: 4884
Read the json file as JSonObject instead of string, and then send it to Kafka topic. I am using gson library for parsing (as sample code) but you can choose any json parsing library of your choice.
public class Main {
static Gson gson = new Gson();
public static JsonObject readJSON(String filePath) throws Exception {
JsonReader reader = new JsonReader(new FileReader(filePath));
return gson.fromJson(reader, JsonObject.class);
public static void main(String[] args) throws IOException {
String topicName = "test";
Properties props = new Properties();
props.put("kafka.bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer < String, String > sampleProducer = new KafkaProducer < String, String > (props);
ProducerRecord < String, String > record = new ProducerRecord < String, String > (topicName, readJSON("data.json").toString());
Also if just have to read the file & send it to topic as is, and not process any content. You can just read the whole file as String in one go & send it, rather than streaming line by line, this will preserve the json structure of the data:
public static String readFileAsString(File file)
throws IOException {
InputStream fileInputStream = new FileInputStream(file);
byte[] buffer = new byte[fileInputStream.available()];
int length =;
return new String(buffer, 0, length);
ProducerRecord < String, String > record = new ProducerRecord < String, String > (topicName, readFileAsString(new File("data.json")));
To pass the json file data as key value to the Kafka topic, you still have to parse the file as json object & then stream through json properties. Please check sample code below, I parse the json file as Map object using Jacksons, and then stream through its properties to send to topic one by one.
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
//read json file as map object
private static Map<String, String> readJsonFileAsMap(File file) throws Exception{
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(file, new TypeReference<Map<String,String>>(){});
//stream data as key value pair
KafkaProducer<String,String> sampleProducer= new KafkaProducer<String,String>(props);
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test",k,v);
If you are using console consumer to verify the data make sure print.key=true
, optionally you can add separator too key.separator=:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning --property "print.key=true" -property "key.separator=:"
Upvotes: 2