user14634476
user14634476

Reputation: 11

PySpark read from Azure Blob Storage in Colab - Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found

I have been trying to read json data from Azure Blob Storage using pyspark in Jupyter notebooks/google colab and continually run into the same error - java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found

The following code will be how I setup in google colab, but the differences in Jupyter are minimal.

My setup:

I am using a previous version of spark as I faced this issue with the most current version - 3.4.0

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In the spark directory - "/content/spark-3.1.1-bin-hadoop3.2/jars" - I have placed the following jars:

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

conf = pyspark.SparkConf()
conf.set(
    "spark.jars.packages",
    "org.apache.hadoop:hadoop-azure-3.3.5,com.microsoft.azure:azure-storage-8.6.6"
    # I have also tried hadoop-azure:3.3.5 and azure-storage:8.6.6 (: instead of -)
)

conf.set(
    "fs.azure.account.key.<STORAGE_ACCOUNT>.blob.core.windows.net", "<TOKEN>")

spark = SparkSession.builder.master("local[*]")\
  .config(conf=conf)\
  .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

Then attempting reads in a few different formats as well as for both json and csv data:

df1 = spark.read.format('json').\
    load("wasbs://{CONTAINER}@{ACCOUNT}.blob.core.windows.net/{FILE_PATH}.json")
df2 = spark.read.json("wasbs://{CONTAINER}@{ACCOUNT}.blob.core.windows.net/{FILE_PATH}.json")```
df3 = spark.read.csv("wasbs://{CONTAINER}@{ACCOUNT}.blob.core.windows.net/{FILE_PATH}.csv")```

And each of the above throw the error:

Py4JJavaError: An error occurred while calling o233.load.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2595)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593)
    ... 25 more

I have made sure my Azure account information, keys and file paths are correct and that reads work for pyspark in general - verified with:

df = spark.read.csv('sample_data/california_housing_test.csv', header=True, sep=";")
df.show(5)

I have followed instructions from here and here and here to try to resolve my issue with no avail, please advise!

Upvotes: 1

Views: 1731

Answers (3)

Rahul Sahoo
Rahul Sahoo

Reputation: 159

This will work for spark version 3.5.1. I have tested it on local machine.

spark = (
    SparkSession.builder.appName("hive-metastore-example")
    .config("hive.metastore.uris", "thrift://localhost:9083")
    .config(
        "spark.jars.packages",
        "io.delta:delta-spark_2.12:3.1.0,org.apache.hadoop:hadoop-azure:3.3.4,org.apache.hadoop:hadoop-azure-datalake:3.3.4,com.microsoft.azure:azure-storage:7.0.1",
    )
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
    .config("fs.azure.account.key.<storage-account>.blob.core.windows.net", "<access_key>")
    .config("fs.azure.createRemoteFileSystemDuringInitialization", "false")
    .config("spark.sql.warehouse.dir", "/tmp/warehouse/")
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.AzureLogStore")
    .config("spark.sql.catalogImplementation", "hive")
    .enableHiveSupport()
    .getOrCreate()
)

Be careful with the jar version as using any other version will give you class not found error.

Upvotes: 0

user14634476
user14634476

Reputation: 11

After playing around with configuration settings and how I brought the jars into the environment I was able to get it working:

!curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.5/hadoop-azure-3.3.5.jar --output /content/spark-3.1.1-bin-hadoop3.2/jars/hadoop-azure-3.3.5.jar
!curl https://repo1.maven.org/maven2/com/microsoft/azure/azure-storage/8.6.6/azure-storage-8.6.6.jar --output /content/spark-3.1.1-bin-hadoop3.2/jars/azure-storage-8.6.6.jar

Note "spark.jars.packages" changed to "spark.jars", and then pointing to the filepath as defined above.

conf = pyspark.SparkConf()
conf.set(
    "spark.jars",
    "/content/spark-3.1.1-bin-hadoop3.2/jars/hadoop-azure-3.3.5.jar, /content/spark-3.1.1-bin-hadoop3.2/jars/azure-storage-8.6.6.jar"
)

** Note that this fix worked in Colab, I still haven't quite gotten Jupyter notebooks functioning.

Upvotes: 0

Bhavani
Bhavani

Reputation: 5317

I tried to read data from azure blob storage in google Collab using below code:

from pyspark.sql import SparkSession

spark = SparkSession.builder \  
.master("spark://spark-master-svc:7077") \  
.config("spark.hadoop.fs.azure.account.key.{account_name}.blob.core.windows.net", account_key) \
.getOrCreate()
account_name = '<storageaccountName>'

account_key = '<accesskey>'
container_name = '<containerName>'
df = spark.read.json(f"wasbs://{container_name}@{account_name}.blob.core.windows.net/input.json") 

I got below error:

enter image description here

I tried with below code I am able to read data from blob storage:

from datetime import datetime, timedelta
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions
import pandas as pd

account_name = '<storageaccount>'
account_key = '<AccessKey>'
container_name = '<containerName>'

connect_str = 'DefaultEndpointsProtocol=https;AccountName=' + account_name + ';AccountKey=' + account_key + ';EndpointSuffix=core.windows.net'
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client = blob_service_client.get_container_client(container_name)

blob_list = []
for blob_i in container_client.list_blobs():
     blob_list.append(blob_i.name)
     
for blob_i in blob_list:
  sas_i = generate_blob_sas(account_name = account_name,
    container_name = container_name,
    blob_name = blob_i,
    account_key=account_key,
    permission=BlobSasPermissions(read=True),
    expiry=datetime.utcnow() + timedelta(hours=1))

  sas_url = 'https://' + account_name+'.blob.core.windows.net/' + container_name + '/' + blob_i + '?' + sas_i
  df = pd.read_csv(sas_url)
  print(df)

enter image description here

Upvotes: 0

Related Questions