francollado99
francollado99

Reputation: 29

Spark is not inserting data into Cassandra when using ```writeStream```

I'm trying to create a pipeline in streaming that is calling an API using Airflow, then processing it with Kafka and inserting the data into Cassandra using Spark. I'm struggling when inserting data into Cassandra. In the following code I'm creating the keyspace and table and I see that it is correctly created in Cassandra but I see that no data is being inserting in Cassandra even though the Airflow's DAG is sending data correctly into the Control Server.

if __name__ == "__main__":
    # create spark connection
    spark_conn = create_spark_connection()

    if spark_conn is not None:
        # connect to kafka with spark connection
        spark_df = connect_to_kafka(spark_conn)
        selection_df = create_selection_df_from_kafka(spark_df)
        session = create_cassandra_connection()

        if session is not None:
            create_keyspace(session)
            create_table(session)
            """"""
            logging.info("Streaming is being started...")
            
            streaming_query = (selection_df.writeStream.format("org.apache.spark.sql.cassandra")
                               .option('checkpointLocation', '/tmp/checkpoint')
                               .option('keyspace', 'spark_streams')
                               .option('table', 'created_users')
                               .start())

            streaming_query.awaitTermination()
            """"""

Can I get some feedback? Thanks.

Upvotes: 0

Views: 81

Answers (0)

Related Questions