Reputation: 57
My question is about how Apache Spark Streaming can handle an output operation that takes a long time by either improving parallelization or by combining many writes into a single, larger write. In this case, the write is a cypher request to Neo4J, but it could apply to other data storage.
I have an Apache Spark Streaming application in Java that writes to 2 datastores: Elasticsearch and Neo4j. Here are the versions:
The Elasticsearch output was easy enough as I used the Elasticsearch-Hadoop for Apache Spark library.
Our input is a stream from Kafka received on a particular topic, and I deserialize the elements of the stream through a map function to create a JavaDStream<[OurMessage]> dataStream
. I then do transforms on this message to create a cypher query String cypherRequest
(using an OurMessage to String transformation) that is sent to a singleton that manages the Bolt Driver connection to Neo4j (I know I should use a connection pool, but maybe that's another question). The cypher query produces a number of nodes and/or edges based on the contents of OurMessage.
The code looks something like the following.
dataStream.foreachRDD( rdd -> {
rdd.foreach( cypherQuery -> {
BoltDriverSingleton.getInstance().update(cypherQuery);
});
});
I have two thoughts about how to improve throughput:
Am I over thinking this? I've tried to research so much that I might be in too deep.
Thanks to anyone who might be able to help; this is my first StackOverflow question in a long time, so please leave feedback and I will be responsive and correct this question as needed.
Upvotes: 0
Views: 403
Reputation: 57
I think all we need is a simple Map/Reduce. The following should allow us to parse each message in the RDD and then write it to the Graph DB all at once.
dataStream.map( message -> {
return (ParseResult) Neo4JMessageParser.parse(message);
}).foreachRDD( rdd -> {
List<ParseResult> parseResults = rdd.collect();
String cypherQuery = Neo4JMessageParser.buildQuery(parseResults);
Neo4JRepository.update(cypherQuery);
// commit offsets
});
By doing this, we should be able to reduce the overhead associated with having to do a write for each incoming message.
Upvotes: 0