Reputation: 1
I am composing these services in separate docker containers all on the same confluent network:
broker:
image: confluentinc/cp-server:7.4.0
hostname: broker
container_name: broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
- confluent
healthcheck:
test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
interval: 10s
timeout: 5s
retries: 5
spark-master:
image: bitnami/spark:latest
volumes:
- ./spark_stream.py:/opt/bitnami/spark/spark_stream.py
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
networks:
- confluent
spark-worker:
image: bitnami/spark:latest
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
networks:
- confluent
volumes:
- ./spark_stream.py:/opt/bitnami/spark/spark_stream.py
cassandra_db:
image: cassandra:latest
container_name: cassandra
hostname: cassandra
ports:
- "9042:9042"
environment:
- MAX_HEAP_SIZE=512M
- HEAP_NEWSIZE=100M
- CASSANDRA_USERNAME=cassandra
- CASSANDRA_PASSWORD=cassandra
networks:
- confluent
# Define the networks
networks:
confluent:
I create a spark connection like this:
s_conn = SparkSession.builder \
.appName('SparkDataStreaming') \
.config('spark.cassandra.connection.host', 'cassandra') \
.config("spark.cassandra.connection.port", "9042") \
.config("spark.cassandra.auth.username", "cassandra") \
.config("spark.cassandra.auth.password", "cassandra") \
.getOrCreate()
And cassandra connection like this:
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['localhost'], port=9042, auth_provider=auth_provider, connect_timeout=60)
cas_session = cluster.connect()
The spark connection reads from a kafka stream (also on same network) into a dataframe like this:
spark_df = spark_conn.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'broker:29092') \
.option('subscribe', 'users_created') \
.option('startingOffsets', 'earliest') \
.load()
I write the dataframe to cassandra using this:
streaming_query = (spark_df.writeStream.format("org.apache.spark.sql.cassandra")
.option('checkpointLocation', '/tmp/checkpoint')
.option('keyspace', 'spark_streams')
.option('table', 'created_users')
.start())
When I try writing the dataframe to cassandra I get this error:
ERROR CassandraConnectorConf: Unknown host 'cassandra'
java.net.UnknownHostException: cassandra: nodename nor servname provided, or not known
And this is how I call my spark job (I believe with the right packages attached):
spark-submit --master spark://localhost:7077 --packages "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3" --conf spark.cassandra.connection.host=cassandra --conf spark.cassandra.connection.port=9042 spark_stream.py
I thought the spark connection would connect to cassandra, but it seems it can't find the host or contact point?
I tried changing spark.cassandra.connection.host
to localhost
or 127.0.0.1
. For some reason localhost
gives me a different error message:
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
24/11/14 01:59:40 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 0, writer: CassandraBulkWrite(org.apache.spark.sql.SparkSession@105fe638,com.datastax.spark.connector.cql.CassandraConnector@4d392247,TableDef(spark_streams,created_users,ArrayBuffer(ColumnDef(id,PartitionKeyColumn,VarCharType)),ArrayBuffer(),Stream(ColumnDef(address,RegularColumn,VarCharType), ColumnDef(email,RegularColumn,VarCharType), ColumnDef(first_name,RegularColumn,VarCharType), ColumnDef(gender,RegularColumn,VarCharType), ColumnDef(last_name,RegularColumn,VarCharType), ColumnDef(phone,RegularColumn,VarCharType), ColumnDef(picture,RegularColumn,VarCharType), ColumnDef(post_code,RegularColumn,VarCharType), ColumnDef(registered_date,RegularColumn,VarCharType), ColumnDef(username,RegularColumn,VarCharType)),Stream(),false,false,Map()),WriteConf(RowsInBatch(5),1000,Partition,LOCAL_QUORUM,false,false,5,None,TTLOption(DefaultValue),TimestampOption(DefaultValue),true,None),StructType(StructField(id,StringType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(gender,StringType,true),StructField(address,StringType,true),StructField(post_code,StringType,true),StructField(email,StringType,true),StructField(username,StringType,true),StructField(registered_date,StringType,true),StructField(phone,StringType,true),StructField(picture,StringType,true)),org.apache.spark.SparkConf@5ac2dede)] is aborting.
I don't believe it's an error with column mismatches, but I could be wrong. I can see the data appear in the kafka topic through a confluent control-center I set up, so I know the data is making it that far.
Any help would be appreciated.
Upvotes: 0
Views: 39
Reputation: 16313
This error indicates you are connecting to Cassandra from outside the Docker network confluent
. The hostname cassandra
is only known to other containers running inside Docker. Applications or services running outside Docker do not know about Docker-defined hostnames.
Since you have exposed the CQL port 9042
outside the Docker network, you will be able to connect to the Cassandra instance using (a) the host's IP address, or (b) localhost
.
The same thing will apply to other services in the other containers. For example, hostname broker
is not known to spark-submit
if you are not running it in the Spark container.
I also noted that you are connecting to broker:29092
but port 29092
is not exposed on the broker
container so it won't be accessible to other services. Cheers!
Upvotes: 0