melon
melon

Reputation: 1

Executors for PySpark app always finish with "state KILLED exitStatus 143"

I got the problem while running

spark-submit --master spark://localhost:7077 \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1, \
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
--conf spark.cassandra.connection.host=localhost \
consume_n_stream.py

but in the other hand

spark-submit --master local["*"] \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1, \
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 \
--conf spark.cassandra.connection.host=localhost \
consume_n_stream.py

is working

this's consume_n_stream.py

import logging

from pyspark.sql import SparkSession
from cassandra.cluster import Cluster
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

def create_spark_connection(url: str):
    s_conn = None
    
    try:
        s_conn = SparkSession.builder \
                .appName('SparkDataStreaming') \
                .config('spark.jars.packages', 
                        "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,"
                        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
                .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
                .config('spark.cassandra.connection.host', url) \
                .config("spark.cassandra.connection.port", "9042") \
                .getOrCreate()

        s_conn.sparkContext.setLogLevel("ERROR")
        logging.info("Spark connection created successfully!")
    except Exception as e:
        logging.error(f"Couldn't create the spark session due to exception {e}")

    return s_conn

def connect_to_kafka(spark_conn, bootstrap_servers, topic):
    
    spark_df = None
    try:
        spark_df = spark_conn.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', bootstrap_servers) \
            .option('subscribe', topic) \
            .option('startingOffsets', 'earliest') \
            .load()
            
        logging.info("kafka dataframe created successfully")
    except Exception as e:
        logging.warning(f"kafka dataframe could not be created because: {e}")

    return spark_df

def create_cassandra_connection(url: str):
    cas_session = None
    try:
        cluster = Cluster(contact_points=[url])
        cas_session = cluster.connect()
        logging.info("connect to Cassandra is successfully")
    except Exception as e:
        logging.error(f"Could not create cassandra connection due to {e}")
    
    return cas_session

def create_keyspace(session):
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS spark_streams
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    """)

    logging.info("Keyspace created successfully!")


def create_table(session):
    session.execute("""
    CREATE TABLE IF NOT EXISTS spark_streams.test_streaming (
        id UUID PRIMARY KEY,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        address TEXT,
        post_code TEXT,
        email TEXT,
        username TEXT,
        registered_date TEXT,
        phone TEXT,
        picture TEXT);
    """)

    logging.info("Table created successfully!")

def select_df_from_kafka(spark_df):
    schema = StructType([
        StructField("id", StringType(), False),
        StructField("first_name", StringType(), False),
        StructField("last_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("address", StringType(), False),
        StructField("post_code", StringType(), False),
        StructField("email", StringType(), False),
        StructField("username", StringType(), False),
        StructField("registered_date", StringType(), False),
        StructField("phone", StringType(), False),
        StructField("picture", StringType(), False)
    ])

    sel = spark_df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col('value'), schema).alias('data')).select("data.*")

    return sel


if __name__ == "__main__":
        
    spark_conn = create_spark_connection("localhost")

    if spark_conn is not None:
        spark_df = connect_to_kafka(spark_conn, "localhost:9092", "test_streaming")
        print(f"spark_df: {spark_df}")
        selected_df = select_df_from_kafka(spark_df)
        print(f"selected_df: {selected_df}")
        session = create_cassandra_connection("localhost")
        print(f"session: {session}")

        if session is not None:
            create_keyspace(session)
            print("HERE1")
            create_table(session)
            print("HERE2")

            streaming_query = (selected_df.writeStream.format("org.apache.spark.sql.cassandra")
                            .option('keyspace', 'spark_streams')
                            .option('table', 'test_streaming')
                            .option('checkpointLocation', '/tmp/checkpoint')
                            .start())
            
            print("HERE3")
            streaming_query.awaitTermination()

this's my logging

25/01/05 17:57:05 INFO Worker: Asked to launch executor app-20250105175705-0010/0 for SparkDataStreaming
25/01/05 17:57:05 INFO SecurityManager: Changing view acls to: spark
25/01/05 17:57:05 INFO SecurityManager: Changing modify acls to: spark
25/01/05 17:57:05 INFO SecurityManager: Changing view acls groups to: 
25/01/05 17:57:05 INFO SecurityManager: Changing modify acls groups to: 
25/01/05 17:57:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark; groups with view permissions: EMPTY; users with modify permissions: spark; groups with modify permissions: EMPTY
25/01/05 17:57:05 INFO ExecutorRunner: Launch command: "/opt/bitnami/java/bin/java" "-cp" "/opt/bitnami/spark/conf/:/opt/bitnami/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=42227" "-Dspark.cassandra.connection.port=9042" "-Djava.net.preferIPv6Addresses=false" "-XX:+IgnoreUnrecognizedVMOptions" "--add-opens=java.base/java.lang=ALL-UNNAMED" "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED" "--add-opens=java.base/java.io=ALL-UNNAMED" "--add-opens=java.base/java.net=ALL-UNNAMED" "--add-opens=java.base/java.nio=ALL-UNNAMED" "--add-opens=java.base/java.util=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED" "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED" "--add-opens=java.base/sun.security.action=ALL-UNNAMED" "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" "-Djdk.reflect.useDirectMethodHandle=false" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://[email protected]:42227" "--executor-id" "0" "--hostname" "172.18.0.8" "--cores" "2" "--app-id" "app-20250105175705-0010" "--worker-url" "spark://[email protected]:35967" "--resourceProfileId" "0"
25/01/05 17:57:17 INFO Worker: Asked to kill executor app-20250105175705-0010/0
25/01/05 17:57:17 INFO ExecutorRunner: Runner thread for executor app-20250105175705-0010/0 interrupted
25/01/05 17:57:17 INFO ExecutorRunner: Killing process!
25/01/05 17:57:17 INFO Worker: Executor app-20250105175705-0010/0 finished with state KILLED exitStatus 143
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 0
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20250105175705-0010, execId=0)
25/01/05 17:57:17 INFO ExternalShuffleBlockResolver: Application app-20250105175705-0010 removed, cleanupLocalDirs = true
25/01/05 17:57:17 INFO Worker: Cleaning up local directories for application app-20250105175705-0010
25/01/05 17:57:05 INFO Master: Start scheduling for app app-20250105175705-0010 with rpId: 0
25/01/05 17:57:17 INFO Master: Received unregister request from application app-20250105175705-0010
25/01/05 17:57:17 INFO Master: Removing app app-20250105175705-0010
25/01/05 17:57:17 INFO Master: 172.18.0.1:43216 got disassociated, removing it.
25/01/05 17:57:17 INFO Master: 192.168.65.3:42227 got disassociated, removing it.
25/01/05 17:57:17 WARN Master: Got status update for unknown executor app-20250105175705-0010/0
:: loading settings :: url = jar:file:/usr/local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
spark_df: DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]
selected_df: DataFrame[id: string, first_name: string, last_name: string, gender: string, address: string, post_code: string, email: string, username: string, registered_date: string, phone: string, picture: string]
WARNING:cassandra.cluster:Cluster.__init__ called with contact_points specified, but no load_balancing_policy. In the next major version, this will raise an error; please specify a load-balancing policy. (contact_points = ['127.0.0.1'], lbp = None)
WARNING:cassandra.cluster:Downgrading core protocol version from 66 to 65 for 127.0.0.1:9042. To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version
WARNING:cassandra.cluster:Downgrading core protocol version from 65 to 5 for 127.0.0.1:9042. To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version
session: <cassandra.cluster.Session object at 0x7f383127ec80>
HERE1
HERE2
HERE3
.
.
.
Traceback (most recent call last):                                                                                                          
  File "/workspace/consume_n_stream.py", line 32, in <module>                                                                               
    streaming_query.awaitTermination()                                                                                                      
  File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/streaming/query.py", line 221, in awaitTerminatio
n                                                                                                                                           
  File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__      
  File "/usr/local/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 185, in deco    
pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 446ae8f0-92b7-426b-8a9b-67e2c98eb9dc, runId = 8b2ca8
3a-5465-42e1-8f3e-b2c3a00c2a42] terminated with exception: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent
 failure: Lost task 0.3 in stage 0.0 (TID 3) (172.18.0.7 executor 0): java.io.IOException: Failed to open native connection to Cassandra at 
{localhost:9042} :: Could not reach any contact point, make sure you've provided valid addresses (showing first 2 nodes, use getAllErrors() 
for more): Node(endPoint=localhost/127.0.0.1:9042, hostId=null, hashCode=3bb98041): [com.datastax.oss.driver.api.core.connection.ConnectionI
nitException: [s3|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (com.datastax.oss.driver.
shaded.netty.channel.StacklessClosedChannelException)], Node(endPoint=localhost/[0:0:0:0:0:0:0:1]:9042, hostId=null, hashCode=357e7946): [co
m.datastax.oss.driver.api.core.connection.ConnectionInitException: [s3|control|connecting...] Protocol initialization request, step 1 (OPTIO
NS): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException)]
.
.
.
    spark-master:
        image: bitnami/spark:3.5.1
        container_name: spark-master
        command: bin/spark-class org.apache.spark.deploy.master.Master
        ports:
            - "9090:8080"
            - "7077:7077"
        networks:
            - confluent
    spark-worker:
        image: bitnami/spark:3.5.1
        container_name: spark-worker
        command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
        depends_on:
            - spark-master
        environment:
            SPARK_MODE: worker
            SPARK_WORKER_CORES: 2
            SPARK_WORKER_MEMORY: 1g
            SPARK_MASTER_URL: spark://spark-master:7077
        networks:
            - confluent

    cassandra_db:
        image: cassandra:latest
        container_name: cassandra
        hostname: cassandra
        ports:
            - "9042:9042"
        environment:
            - MAX_HEAP_SIZE=512M
            - HEAP_NEWSIZE=100M
            - CASSANDRA_USERNAME=cassandra
            - CASSANDRA_PASSWORD=cassandra
        networks:
            - confluent

networks:
  confluent:
    driver: bridge
FROM python:3.11

# got java version 17
RUN apt-get update
RUN apt-get install default-jdk -y

WORKDIR /workspace

RUN pip install pyspark==3.5.1 cassandra-driver==3.29.2

then activate my workspace

docker run -it --rm --name=workspace --network=host -v $(pwd):/workspace --shm-size=4g workspace bash

desire: spark workers can write data to cassandra's database.

Upvotes: 0

Views: 101

Answers (1)

Erick Ramirez
Erick Ramirez

Reputation: 16293

You didn't provide the contents of consume_n_stream.py so it's hard to know why your app is failing. As a friendly reminder, it's always a good idea to provide some minimal code that reproduces the problem so you're more likely to get responses to your question (see How to ask a good question for guidance).

I would recommend doing an isolation test to reduce the surface area you have to troubleshoot. A simple thing to do is to verify connectivity between Spark and Cassandra by instantiating a PySpark shell. For example:

$ pyspark \
  --master <spark_master_url> \
  --spark.cassandra.connection.host=localhost \
  --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1
  --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

Assuming the PySpark shell loaded successfully, you can then try reading from one of the Cassandra tables. For example:

>>>  spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="table_name", keyspace="ks_name")\
    .load().show()

Performing this simple test should help you narrow down your investigation. Cheers!

UPDATE: Instead of spark://localhost:7077, try using spark://spark-master:7077. Also set spark.cassandra.connection.host=cassandra. localhost is not routable in your configuration.

Upvotes: 1

Related Questions