Mathias Rönnlund
Mathias Rönnlund

Reputation: 4855

Writing stream in Databricks with toTable doesn't execute foreachBatch

The below code is working as it should, i.e. data is written to the output table and is selectable from the table within 10 seconds. The problem is that foreachBatch is not executed.

When I have tested it with .format("console") and calling .start() then foreachBatch is run. So it feels like .toTable() is to blame here.

This code is using the Kafka connector but the same problems existed with Event hub connector.

If I try to add .start() after toTable() is get the error

'StreamingQuery' object has no attribute 'start'

Here is the code that is working except foreachBatch

TOPIC = "myeventhub"
BOOTSTRAP_SERVERS = "myeventhub.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=mykeyname;SharedAccessKey=mykey;EntityPath=myentitypath;\";"

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

n = 100
count = 0

def run_command(batchDF, epoch_id):
    global count
    count += 1
    if count % n == 0:
        spark.sql("OPTIMIZE firstcatalog.bronze.factorydatas3 ZORDER BY (readtimestamp)")

...Omitted code where I transform the data in the value column to strongly typed data...

myTypedDF.writeStream \
    .foreachBatch(run_command) \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
    .partitionBy("somecolumn") \
    .toTable("myunitycatalog.bronze.mytable")

Upvotes: 1

Views: 1850

Answers (1)

Alex Ott
Alex Ott

Reputation: 87259

you either do foreachBatch or toTable, but not both. You can move writing to table inside the foreachBatch function - just make sure that you do idempotent writes because batch could be restarted. Change your code to this:

def run_command(batchDF, epoch_id):
    global count
    batchDF.write.format("delta") \
       .option("txnVersion", epoch_id) \
       .option("txnAppId", "my_app") \
       .partitionBy("somecolumn") \
       .mode("append") \
       .saveAsTable("myunitycatalog.bronze.mytable")
    count += 1
    if count % n == 0:
        spark.sql("OPTIMIZE myunitycatalog.bronze.mytable ZORDER BY (readtimestamp)")

myTypedDF.writeStream \
    .foreachBatch(run_command) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/events/_checkpoints/") \
    .start()

Upvotes: 3

Related Questions