girish b
girish b

Reputation: 21

Unable to create a stream in spark-streaming using kinesis stream

I am new to kinesis and i am trying to process the kinesis stream data with spark-streaming (Pyspark) and facing the below error

Below is my code: I am pushing twitter data to my kinesis stream and trying to process using Spark-streaming. I tried including --jars with all dependencies but still facing the same issue.Spark version -2.4.3 and also 2.3.3 with appropriate spark-streaming-kinesis-asl-assembly.jar

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from pyspark import StorageLevel
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils,
InitialPositionInStream


spark_session = SparkSession.builder.getOrCreate()
ssc = StreamingContext(spark_session.sparkContext, 10)
sc = spark_session.sparkContext
Kinesis_app_name = "test"
Kinesis_stream_name = "python-stream"
endpoint_url = "https://kinesis.us-east-1.amazonaws.com"
region_name = "us-east-1"

data = KinesisUtils.createStream(
    ssc, Kinesis_app_name, Kinesis_stream_name, endpoint_url,
    region_name, InitialPositionInStream.LATEST, 10, StorageLevel.MEMORY_AND_DISK_2)


data.pprint()


ssc.start()  # Start the computation
ssc.awaitTermination()

I would like to process the stream using spark-streaming but getting the below error :

File "C:\spark-2.3.3-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\kinesis.py", line 92, in createStream
          File "C:\spark-2.3.3-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
          File "C:\spark-2.3.3-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
        py4j.protocol.Py4JJavaError: An error occurred while calling o27.createStream.
        : java.lang.NoClassDefFoundError: com/amazonaws/services/kinesis/model/Record
                at java.lang.Class.getDeclaredMethods0(Native Method)
                at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
                at java.lang.Class.getDeclaredMethods(Class.java:1975)
                at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:232)
                at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
                at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
                at org.apache.spark.streaming.kinesis.KinesisUtils$.createStream(KinesisUtils.scala:127)
                at org.apache.spark.streaming.kinesis.KinesisUtils$.createStream(KinesisUtils.scala:554)
                at org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper.createStream(KinesisUtils.scala:616)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
                at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
                at py4j.Gateway.invoke(Gateway.java:282)
                at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
                at py4j.commands.CallCommand.execute(CallCommand.java:79)
                at py4j.GatewayConnection.run(GatewayConnection.java:238)
                at java.lang.Thread.run(Thread.java:748)
        Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.kinesis.model.Record
                at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
                at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
                at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
                ... 20 more

Upvotes: 2

Views: 1199

Answers (2)

Henry Arjet
Henry Arjet

Reputation: 46

I ran into the same issue. It ended up being that I had included just the spark-streaming-kinesis-asl jar. This jar does not contain the kinesis sdk as far as I'm aware. I fixed it by removing the lone jar and then using the package manager for the dependency with the spark-submit argument --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:2.4.4. If you use the package manager but do not remove the offending jar, the program will not work. I hope this helps all who come across this error in the future.

Upvotes: 2

Joshua Marango
Joshua Marango

Reputation: 11

Please see the solution below:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream, StorageLevel

if __name__ == "__main__":

    kinesisConf = {...} # I put all my credentials in here

    batchInterval = 2000
    kinesisCheckpointInterval = batchInterval

    sc = SparkContext(appName="kinesis-stream")
    ssc = StreamingContext(sc, batchInterval)

    data = KinesisUtils.createStream(
        ssc=ssc,
        kinesisAppName=kinesisConf['appName'],
        streamName=kinesisConf['streamName'],
        endpointUrl=kinesisConf['endpointUrl'],
        regionName=kinesisConf['regionName'],
        initialPositionInStream=InitialPositionInStream.LATEST,
        checkpointInterval=kinesisCheckpointInterval,
        storageLevel=StorageLevel.MEMORY_AND_DISK_2,
        awsAccessKeyId=kinesisConf['awsAccessKeyId'],
        awsSecretKey=kinesisConf['awsSecretKey']
    )

    data.pprint()

    ssc.start()
    ssc.awaitTermination()

& when you run it, do it like so:

spark-submit --master local[8] --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.0.0-preview ./streaming.py

2.12 -> refers to the scala version 3.0.0 -> refers to the spark version

Go here and make sure you select the correct params for that package

Upvotes: 1

Related Questions