kafka integration with Pyspark structured streaming (Windows)

After installing anaconda on my windows 10 machine, and then I followed the following tutorial to set it up on my machine and run it with jupyter : https://changhsinlee.com/install-pyspark-windows-jupyter/

import findspark

findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

kafka_topic_name = "test_spark"
kafka_bootstrap_servers = '192.168.1.3:9092'

spark = SparkSession \
    .builder \
    .appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
    .master("local[*]") \
    .getOrCreate()

#Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic_name) \
    .option("startingOffsets", "latest") \
    .load()

and here it shows me an error that I need to deploy the connector :

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide"

and I go to the page and find the command to deploy it witch is : ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 ...

PS D:\Spark\spark-3.1.2-bin-hadoop3.2> .\bin\spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 ...
:: loading settings :: url = jar:file:/D:/Spark/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:\Users\T460S\.ivy2\cache
The jars for the packages stored in: C:\Users\T460S\.ivy2\jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-03401043-c6c7-40dd-8667-8001083bfb4c;1.0
        confs: [default]
        found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
        found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
        found org.apache.kafka#kafka-clients;2.6.0 in central
        found com.github.luben#zstd-jni;1.4.8-1 in central
        found org.lz4#lz4-java;1.7.1 in central
        found org.xerial.snappy#snappy-java;1.1.8.2 in central
        found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
        found org.spark-project.spark#unused;1.0.0 in central
        found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 595ms :: artifacts dl 19ms
        :: modules in use:
        com.github.luben#zstd-jni;1.4.8-1 from central in [default]
        org.apache.commons#commons-pool2;2.6.2 from central in [default]
        org.apache.kafka#kafka-clients;2.6.0 from central in [default]
        org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 from central in [default]
        org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 from central in [default]
        org.lz4#lz4-java;1.7.1 from central in [default]
        org.slf4j#slf4j-api;1.7.30 from local-m2-cache in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        org.xerial.snappy#snappy-java;1.1.8.2 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   9   |   0   |   0   |   0   ||   9   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-03401043-c6c7-40dd-8667-8001083bfb4c
        confs: [default]
        0 artifacts copied, 9 already retrieved (0kB/19ms)
21/06/23 19:32:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" org.apache.spark.SparkException: Failed to get main class in JAR with error 'D:\Spark\spark-3.1.2-bin-hadoop3.2\... (Accès refusé)'.  Please specify one with --class.
        at org.apache.spark.deploy.SparkSubmit.error(SparkSubmit.scala:968)
        at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:486)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

and I tried searching for a solution but nothing worked and I don't know what to pass as parameter in --class that they re telling me to add, and it says: Access refuse which means ACCESS DENIED I did not understand this, so anyone can tell me what to do please?

Upvotes: 2

Views: 3457

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 192023

Similar error (and same answer) - Spark Kafka Data Consuming Package

Did you literally write ... after the --packages option?

The error is telling you to give a .py file or --class along with a JAR file containing your application code

And if you did give one, then it would seem the Spark user cannot access the D:\ drive path that you gave, and you likely need to use winutils chmod to modify that


If you want to run the code in Jupyter, you can add --packages there too

import os

SCALA_VERSION = '2.12'
SPARK_VERISON = '3.1.2'

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'

import findspark
import pyspark

findspark.init()

...

Or use findspark.add_packages() - https://github.com/minrk/findspark/pull/11

Upvotes: 1

Related Questions