Reputation: 462
I was requested to look into using Apache Spark's MemoryStream to simulate a Kafka stream in a Java Spring Boot service. The documentation / online community is a bit small on this topic so I am seeking aid.
This is my implementation code.
final DataStreamReader kafkaDataStreamReader = kafkaLoader.getStream(sparkSession, options);
final Dataset<Row> df = kafkaDataStreamReader.load();
return df.writeStream().foreachBatch((batch, batchId) -> {
// Process a batch of data received from Kafka
updateData(name, customizerFunction, avroSchema, batch);
DataStreamReader
which might be the reason why I'm struggling to create a MemoryStream
.@Slf4j
@Service
@Profile("it")
public class ItKafkaLoader extends KafkaLoader {
@Autowired
SparkSession sparkSession;
@SneakyThrows
@Override
public DataStreamReader getStream(SparkSession sparkSession, Map<String,Object> options) {
options = Map.of();
MemoryStream<String> stream = null;
try {
stream = new MemoryStream<>(1, sparkSession.sqlContext(), null, Encoders.STRING());
String jsonString = "{..}";
Seq<String> seq = JavaConverters
.collectionAsScalaIterableConverter(List.of(jsonString))
.asScala()
.toSeq();
Offset currentOffset = stream.addData(seq);
stream.commit(currentOffset);
} catch (Exception e){
log.warn("Error creating MemoryStream: ", e);
return new DataStreamReader(sparkSession);
}
Dataset<Row> data = stream.toDF();
log.debug("Stream enabled [t/f]: {}", data.isStreaming());
return data
.sqlContext()
.readStream();
.format("kafka")
.option("kafka.bootstrap.servers", "test-servers")
.option("subscribe", "test-data");
}
ItKafkaLoader
is called when I'm running integration tests, hence ActiveProfiles is set to it
here, and is where I'm struggling to create a MemoryStream. Because my implementation code is expecting a returned object of type DataStreamReader
I believe I need to call on readStream()
since it's of type DataStreamReader
? However, when I just try readStream()
Spark throws an exception about my path not being defined.
java.lang.IllegalArgumentException: 'path' is not specified
at org.apache.spark.sql.errors.QueryExecutionErrors$.dataPathNotSpecifiedError
When searching this error I tend to see that I need to set my format to Kafka. And then doing this, Spark expects a topic and then a broker. I was hoping that since I was using MemoryStream that Spark would just recognize that this is a dummy Kafka cluster & topic and go about kicking of my simulated Kafka Stream through my MemoryStream. That doesn't happen, and when I run my integration test I get these errors.
- Query [id = 4ebacd71-d..., runId = 1a2c4...] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
- Invalid url in bootstrap.servers: test-servers
Ideally, I would like to figure out how to fix getStream()
in ItKafkaLoader, however I have a slight feeling that i don't understand what MemoryStream is really for and might need to do something different.
Update: I have seen that in newer versions of Spark you can just set the format to memory, however, it appears that my Spark version v2.12 does not support that. I also do not have the green light to upgrade my Spark version.
Upvotes: 1
Views: 145
Reputation: 462
Ok I figured out how to make MemoryStream work for my need of simulating a stream.
writeStream()
in my implementation code to kick utilize my simulated stream.
getStream()
function to be of type Dataset
instead of a DataStreamReader
Here is my code changes
@Profile("it")
public class ItKafkaLoader extends KafkaLoader {
@SneakyThrows
@Override
public Dataset<Row> getStream(SparkSession sparkSession, Map<String, String> options) {
MemoryStream<Data> stream;
try {
Encoder<Data> encoder = Encoders.bean(Data.class);
stream = new MemoryStream<>(1, sparkSession.sqlContext(),null, encoder);
List<Data> data = getData();
Dataset<Data> df = sparkSession.createDataset(data, encoder);
Seq<Data> seqT = JavaConverters
.asScalaIteratorConverter(df.toLocalIterator())
.asScala()
.toSeq();
stream.addData(seqT);
} catch (Exception e) {
log.warn("Error creating MemoryStream: ", e);
return sparkSession.emptyDataFrame();
}
Dataset<Row> data = stream.toDF();
log.debug("Stream enabled [t/f]: {}", data.isStreaming());
return data;
}
Some things to point out:
MemoryStream<String>
then my schema would've just had a default column of value
where a row is the whole json object instead of my required attributes inside of my custom Data
object.
Data
addData()
function in Java Spark requires a Seq type
Seq<Data> seq = JavaConverters
.asScalaIteratorConverter(<list / collection here ...>)
.asScala()
.toSeq();
Here is how my updated implementation code looks
Dataset<Row> df = kafkaLoader.getStream(sparkSession, options);
StreamingQuery streamingDf = df.writeStream().foreachBatch((batch, batchId) -> {
// Process a batch of data received from Kafka
updateData(name, customizerFunction, avroSchema, batch);
...
})
.option("checkpointLocation", "checkpointname-" + name)
.start();
Upvotes: 0