Tom
Tom

Reputation: 6342

Ignite and Kafka Integration

I am trying the Ignite and Kafka Integration to bring kafka message into Ignite cache.

My message key is a random string(To work with Ignite, the kafka message key can't be null), and the value is a json string representation for Person(a java class)

When Ignite receives such a message, it looks that Ignite will use the message's key(the random string in my case) as the cache key.

Is it possible to change the message key to the person's id, so that I can put the into the cache.

Looks that streamer.receiver(new StreamReceiver) is workable

        streamer.receiver(new StreamReceiver<String, String>() {
            public void receive(IgniteCache<String, String> cache, Collection<Map.Entry<String, String>> entries) throws IgniteException {
                for (Map.Entry<String, String> entry : entries) {
                    Person p = fromJson(entry.getValue());
                    //ignore the message key,and use person id as the cache key
                    cache.put(p.getId(), p);
                }
            }
        });

Is this the recommended way? and I am not sure whether calling cache.put in StreamReceiver is a correct way, since it is only a pre-processing step before writing to cache.

Upvotes: 2

Views: 935

Answers (1)

a_gura
a_gura

Reputation: 410

Data streamer will map all your keys to cache affinity nodes, create batches of entries and send batches to affinity nodes. After it StreamReceiver will receive your entries, get Person's ID and invoke cache.put(K, V). Putting entry lead to mapping your key to corresponding cache affinity node and sending update request to this node.

Everything looks good. But result of mapping your random key from Kafka and result of mapping Person's ID will be different (most likely different nodes). As result your will get poor performance due to redundant network hops.

Unfortunately, current KafkaStreamer implementations doesn't support stream tuple extractors (see e.g. StreamSingleTupleExtractor class). But you can easily create your own Kafka streamer implementation using existing one as example.

Also you can try use KafkaStreamer's keyDecoder and valDecoder in order to extract Person's ID from Kafka message. I don't sure, but it can help.

Upvotes: 2

Related Questions