Maria
Maria

Reputation: 604

Commit message in Spark Structured Streaming

I'm using spark sturctured streaming (2.3) and kafka 2.4 version.

I want to kow how can I use ASync and Sync commit offset property.

If I set enable.auto.commit as true, Is it Sync or ASync ?

How can I define callback in spark structured streaming ? Or how can I use Sync or ASync in Spark structured streaming ?

Thanks in Advance

My Code

package sparkProject;


import java.io.StringReader;
import java.util.*;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

public class XMLSparkStreamEntry {

    static StructType structType = new StructType();

    static {
        structType = structType.add("FirstName", DataTypes.StringType, false);
        structType = structType.add("LastName", DataTypes.StringType, false);
        structType = structType.add("Title", DataTypes.StringType, false);
        structType = structType.add("ID", DataTypes.StringType, false);
        structType = structType.add("Division", DataTypes.StringType, false);
        structType = structType.add("Supervisor", DataTypes.StringType, false);

    }

    static ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);

    public static void main(String[] args) throws StreamingQueryException {

        SparkConf conf = new SparkConf();
        SparkSession spark = SparkSession.builder().config(conf).appName("Spark Program").master("local[*]")
                .getOrCreate();

        Dataset<Row> ds1 = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "Kafkademo").load();

        Dataset<Row> ss = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

        Dataset<Row> finalOP = ss.flatMap(new FlatMapFunction<Row, Row>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterator<Row> call(Row t) throws Exception {

                JAXBContext jaxbContext = JAXBContext.newInstance(FileWrapper.class);
                Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();

                StringReader reader = new StringReader(t.getAs("value"));
                FileWrapper person = (FileWrapper) unmarshaller.unmarshal(reader);

                List<Employee> emp = new ArrayList<Employee>(person.getEmployees());
                List<Row> rows = new ArrayList<Row>();
                for (Employee e : emp) {

                    rows.add(RowFactory.create(e.getFirstname(), e.getLastname(), e.getTitle(), e.getId(),
                            e.getDivision(), e.getSupervisor()));

                }
                return rows.iterator();
            }
        }, encoder);


        Dataset<Row> wordCounts = finalOP.groupBy("firstname").count();

        StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();
        System.out.println("SHOW SCHEMA");
        query.awaitTermination();

    }

}

Can I anyone please check, where and how can I implement ASync and Sync offset commit in my above code ?

Thanks in Advance..!

Upvotes: 0

Views: 1408

Answers (2)

Ged
Ged

Reputation: 18098

Please read https://www.waitingforcode.com/apache-spark-structured-streaming/apache-spark-structured-streaming-apache-kafka-offsets-management/read This is an excellent source although a little bit of reading between the lines.

In short:

Structured Streaming ignores the offsets commits in Apache Kafka. Instead, it relies on its own offsets management on the driver side which is responsible for distributing offsets to executors and for checkpointing them at the end of the processing round (epoch or micro-batch).

Batck Spark Structured Streaming & KAFKA Integration works differently again.

Upvotes: 1

Gokulraj
Gokulraj

Reputation: 520

Spark Structured Streaming doesn't support Kafka commit offset feature. Suggested option from the official docs is to enable checkpointing.
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Other suggestion is to change it to Spark Streaming, which supports Kafka commitAsync API. https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

Upvotes: 0

Related Questions