user28291353
user28291353

Reputation: 1

Can't connect/write stream from spark container to table in cassandra container

I am composing these services in separate docker containers all on the same confluent network:

broker:
    image: confluentinc/cp-server:7.4.0
    hostname: broker
    container_name: broker
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    networks:
      - confluent
    healthcheck:
      test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
      interval: 10s
      timeout: 5s
      retries: 5

  spark-master:
    image: bitnami/spark:latest
    volumes: 
      - ./spark_stream.py:/opt/bitnami/spark/spark_stream.py
    command: bin/spark-class org.apache.spark.deploy.master.Master
    ports:
      - "9090:8080"
      - "7077:7077"
    networks:
      - confluent

  spark-worker:
    image: bitnami/spark:latest
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
    depends_on:
      - spark-master
    environment:
      SPARK_MODE: worker
      SPARK_WORKER_CORES: 2
      SPARK_WORKER_MEMORY: 1g
      SPARK_MASTER_URL: spark://spark-master:7077
    networks:
      - confluent
    volumes: 
      - ./spark_stream.py:/opt/bitnami/spark/spark_stream.py

  cassandra_db:
    image: cassandra:latest
    container_name: cassandra
    hostname: cassandra
    ports:
      - "9042:9042"
    environment:
      - MAX_HEAP_SIZE=512M
      - HEAP_NEWSIZE=100M
      - CASSANDRA_USERNAME=cassandra
      - CASSANDRA_PASSWORD=cassandra
    networks:
      - confluent


# Define the networks
networks:
  confluent:

I create a spark connection like this:

s_conn = SparkSession.builder \
            .appName('SparkDataStreaming') \
            .config('spark.cassandra.connection.host', 'cassandra') \
            .config("spark.cassandra.connection.port", "9042") \
            .config("spark.cassandra.auth.username", "cassandra") \
            .config("spark.cassandra.auth.password", "cassandra") \
            .getOrCreate()

And cassandra connection like this:

auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['localhost'], port=9042, auth_provider=auth_provider, connect_timeout=60)
cas_session = cluster.connect()

The spark connection reads from a kafka stream (also on same network) into a dataframe like this:

spark_df = spark_conn.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', 'broker:29092') \
            .option('subscribe', 'users_created') \
            .option('startingOffsets', 'earliest') \
            .load()

I write the dataframe to cassandra using this:

streaming_query = (spark_df.writeStream.format("org.apache.spark.sql.cassandra")
                               .option('checkpointLocation', '/tmp/checkpoint')
                               .option('keyspace', 'spark_streams')
                               .option('table', 'created_users')
                               .start())

When I try writing the dataframe to cassandra I get this error:

ERROR CassandraConnectorConf: Unknown host 'cassandra'
java.net.UnknownHostException: cassandra: nodename nor servname provided, or not known

And this is how I call my spark job (I believe with the right packages attached):

spark-submit --master spark://localhost:7077 --packages "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3" --conf spark.cassandra.connection.host=cassandra --conf spark.cassandra.connection.port=9042 spark_stream.py

I thought the spark connection would connect to cassandra, but it seems it can't find the host or contact point?

I tried changing spark.cassandra.connection.host to localhost or 127.0.0.1. For some reason localhost gives me a different error message:

ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
24/11/14 01:59:40 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 0, writer: CassandraBulkWrite(org.apache.spark.sql.SparkSession@105fe638,com.datastax.spark.connector.cql.CassandraConnector@4d392247,TableDef(spark_streams,created_users,ArrayBuffer(ColumnDef(id,PartitionKeyColumn,VarCharType)),ArrayBuffer(),Stream(ColumnDef(address,RegularColumn,VarCharType), ColumnDef(email,RegularColumn,VarCharType), ColumnDef(first_name,RegularColumn,VarCharType), ColumnDef(gender,RegularColumn,VarCharType), ColumnDef(last_name,RegularColumn,VarCharType), ColumnDef(phone,RegularColumn,VarCharType), ColumnDef(picture,RegularColumn,VarCharType), ColumnDef(post_code,RegularColumn,VarCharType), ColumnDef(registered_date,RegularColumn,VarCharType), ColumnDef(username,RegularColumn,VarCharType)),Stream(),false,false,Map()),WriteConf(RowsInBatch(5),1000,Partition,LOCAL_QUORUM,false,false,5,None,TTLOption(DefaultValue),TimestampOption(DefaultValue),true,None),StructType(StructField(id,StringType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(gender,StringType,true),StructField(address,StringType,true),StructField(post_code,StringType,true),StructField(email,StringType,true),StructField(username,StringType,true),StructField(registered_date,StringType,true),StructField(phone,StringType,true),StructField(picture,StringType,true)),org.apache.spark.SparkConf@5ac2dede)] is aborting.

I don't believe it's an error with column mismatches, but I could be wrong. I can see the data appear in the kafka topic through a confluent control-center I set up, so I know the data is making it that far.

Any help would be appreciated.

Upvotes: 0

Views: 39

Answers (1)

Erick Ramirez
Erick Ramirez

Reputation: 16313

This error indicates you are connecting to Cassandra from outside the Docker network confluent. The hostname cassandra is only known to other containers running inside Docker. Applications or services running outside Docker do not know about Docker-defined hostnames.

Since you have exposed the CQL port 9042 outside the Docker network, you will be able to connect to the Cassandra instance using (a) the host's IP address, or (b) localhost.

The same thing will apply to other services in the other containers. For example, hostname broker is not known to spark-submit if you are not running it in the Spark container.

I also noted that you are connecting to broker:29092 but port 29092 is not exposed on the broker container so it won't be accessible to other services. Cheers!

Upvotes: 0

Related Questions