Reputation: 1070
I have written a python code to load files from Amazon Web Service (AWS) S3 through Apache-Spark. Specifically, the code creates RDD and load all csv files from the directory data
in my bucket ruofan-bucket
on AWS S3 using SparkContext().wholeTextFiles("s3n://ruofan-bucket/data")
. The code shows below:
import os, sys, inspect
### Current directory path.
curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]
### Setup the environment variables
spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark-1.4.0")))
python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
os.environ["SPARK_HOME"] = spark_home_dir
os.environ["PYTHONPATH"] = python_dir
### Setup pyspark directory path
pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
sys.path.append(pyspark_dir)
### Import the pyspark
from pyspark import SparkConf, SparkContext
def main():
### Initialize the SparkConf and SparkContext
conf = SparkConf().setAppName("ruofan").setMaster("local")
sc = SparkContext(conf = conf)
### Create a RDD containing metadata about files in directory "data"
datafile = sc.wholeTextFiles("s3n://ruofan-bucket/data") ### Read data directory from S3 storage.
### Collect files from the RDD
datafile.collect()
if __name__ == "__main__":
main()
Before I run my code, I've already exported the environment variables: AWS_SECRET_ACCESS_KEY
and AWS_ACCESS_KEY_ID
. But when I run my code, it shows up the error:
IOError: [Errno 2] No such file or directory: 's3n://ruofan-bucket/data/test1.csv'
I'm sure I have the directory as well as the files on AWS S3, and I have no idea about the error. I really appreciate if anyone helps me solve the problem.
Upvotes: 3
Views: 4736
Reputation: 4726
You can try the following to load data from S3, into an RDD, and then loop the results and print them out. You can do any transformation after using Spark SQL.
val spark = SparkSession
.builder()
.appName("Spark SQL POC")
.master("local")
.getOrCreate()
import spark.implicits._
val sparkContext = spark.sparkContext
sparkContext.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", accessKey)
sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", secret)
sparkContext.hadoopConfiguration.set("fs.s3.endpoint",region)
// The schema is encoded in a string
val schemaString = "country displayName alias status userName userID exportTime city title email registrationDate firstName lastName dateOfBirth address1 address2 address3 postCode telephoneNumber nickName exclusionExpiryDate exclusionPeriod blocked blockReason lastLoginDate gender mobileNumber marketingSms marketingEmail affiliateMarker depositLimitDate depositLimitPeriod depositLimitAmount depositLimitCurrency depositLimitWaitForUpdate depositLimitUpdatePeriod depositLimitUpdateAmount depositLimitUpdateCurrency userModified selfExclusionModified userBlockModified registeredBy userNote"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
var s3Users = spark.sqlContext.read.schema(schema).json("s3://asgaard-data/users/*/*/*/*/").rdd
// Apply the schema to the RDD
val usersDataFrame = spark.createDataFrame(s3Users, schema)
// Creates a temporary view using the DataFrame
usersDataFrame.createOrReplaceTempView("users")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT userName FROM users limit 10")
results.map(attributes => "UserName: " + attributes(0)).show()
Versions are as follows
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.1.0</version>
</dependency>
Upvotes: 0
Reputation: 270089
It would appear that wholeTextFiles
does not work with Amazon S3.
See:
However, there may be differences between Hadoop versions, so don't take it as definite.
Upvotes: 1