Reputation: 1362
So i want to implement simple application which send notification kafka producer to kafka consumer.So far i have successfully send String message to producer to consumer.But when i try to send notification object kafka consumer didn't receive any objects.This is the code i have used.
public class Notification implements Serializable{
private String name;
private String message;
private long currentTimeStamp;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public long getCurrentTimeStamp() {
return currentTimeStamp;
}
public void setCurrentTimeStamp(long currentTimeStamp) {
this.currentTimeStamp = currentTimeStamp;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Notification that = (Notification) o;
if (currentTimeStamp != that.currentTimeStamp) return false;
if (message != null ? !message.equals(that.message) : that.message != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return true;
}
@Override
public int hashCode() {
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (message != null ? message.hashCode() : 0);
result = 31 * result + (int) (currentTimeStamp ^ (currentTimeStamp >>> 32));
return result;
}
@Override
public String toString() {
return "Notification{" +
"name='" + name + '\'' +
", message='" + message + '\'' +
", currentTimeStamp=" + currentTimeStamp +
'}';
}
}
And this is producer
public class KafkaProducer {
static String topic = "kafka-tutorial";
public static void main(String[] args) {
System.out.println("Start Kafka producer");
Properties properties = new Properties();
properties.put("metadata.broker.list", "localhost:9092");
properties.put("serializer.class", "dev.innova.kafka.tutorial.producer.CustomSerializer");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String, Notification> producer = new kafka.javaapi.producer.Producer<String, Notification>(producerConfig);
KeyedMessage<String, Notification> message = new KeyedMessage<String, Notification>(topic, createNotification());
System.out.println("send Message to broker");
producer.send(message);
producer.close();
}
private static Notification createNotification(){
Notification notification = new Notification();
notification.setMessage("Sample Message");
notification.setName("Sajith");
notification.setCurrentTimeStamp(System.currentTimeMillis());
return notification;
}
}
And this is consumer
public class KafkaConcumer extends Thread {
final static String clientId = "SimpleConsumerDemoClient";
final static String TOPIC = "kafka-tutorial";
ConsumerConnector consumerConnector;
public KafkaConcumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect","localhost:2181");
properties.put("group.id","test-group");
properties.put("serializer.class", "dev.innova.kafka.tutorial.producer.CustomSerializer");
properties.put("zookeeper.session.timeout.ms", "400");
properties.put("zookeeper.sync.time.ms", "200");
properties.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("It :" + it.size());
while(it.hasNext()){
System.out.println(new String(it.next().message()));
}
}
private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
for(MessageAndOffset messageAndOffset: messageSet) {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
}
}
}
And finally i have used customserializer to serialize and deserialize object.
public class CustomSerializer implements Encoder<Notification>, Decoder<Notification> {
public CustomSerializer(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Notification o) {
return new byte[0];
}
@Override
public Notification fromBytes(byte[] bytes) {
return null;
}
}
Can someone tell me what is the issue ? is this the right way ?
Upvotes: 1
Views: 3698
Reputation: 36
I strongly suggest you to convert your object to an Avro object before sending it.
It is not that difficult and is the Kafka way of transmitting objects.
Upvotes: 1
Reputation: 77
Create a custom deserializer , Kafka need a way to serialize and deserialize .We have to provide both of these implementations so far Need to add library to get the object mapper class
FasterXML jackson – 2.8.6
Example - serializer
public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {
@Override
public byte[] serialize(String arg0, Object arg1) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
TestModel model =(TestModel) arg1;
try {
retVal = objectMapper.writeValueAsString(model).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return retVal;
}
@Override
public void close() {
}
@Override
public void configure(Map map, boolean bln) {
}
}
Deserializer
public class PayloadDeserializer implements Deserializer {
@Override
public void close() {
}
@Override
public TestModel deserialize(String arg0, byte[] arg1) {
ObjectMapper mapper = new ObjectMapper();
TestModel testModel = null;
try {
testModel = mapper.readValue(arg1, TestModel.class);
} catch (Exception e) {
e.printStackTrace();
}
return testModel;
}
@Override
public void configure(Map map, boolean bln) {
}
}
Finally we have to pass deserializer class to the receiver
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - PayloadDeserializer.class
or
deserializer.class - classpath.PayloadDeserializer
Upvotes: 1
Reputation: 16392
You have two problems.
First, your deserializer doesn't have any logic. It returns an empty byte array for each object it serializes and returns a null object whenever it's asked to deserialize an object. You need to put code there that actually serializes and deserializes your objects.
Second, if you plan to use the native JVM serialization and deserialization logic from the JVM, you'll need to add a serialVersionUID to your beans that will be transported. Something like this:
private static final long serialVersionUID = 123L;
You can use any value you like. When an object is deserialized by the JVM the serialVersionId in the object is compared to the value specified in the loaded class definition. If the two are different then the JVM assumes that even though you have a class definition loaded you don't have the correct version of the class definition loaded and serialization will fail. If you don't specify a value for serialVersionID in your class definition then the JVM will make one up for you and two different JVM's (the one with the producer and the one with the consumer) will almost certainly make up different values for you.
EDIT
You'd need to make your serializer look something like this if you want to leverage the default Java serialization:
public class CustomSerializer implements Encoder<Notification>, Decoder<Notification> {
public CustomSerializer(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Notification o) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
} catch (IOException e) {
return new byte[0];
}
}
@Override
public Notification fromBytes(byte[] bytes) {
try {
return (Notification) new ObjectInputStream(new ByteArrayInputStream(b)).readObject();
} catch (Exception e) {
return null;
}
}
Upvotes: 3