Ruofan Kong
Ruofan Kong

Reputation: 1070

Loading data from AWS S3 through Apache-Spark

I have written a python code to load files from Amazon Web Service (AWS) S3 through Apache-Spark. Specifically, the code creates RDD and load all csv files from the directory data in my bucket ruofan-bucket on AWS S3 using SparkContext().wholeTextFiles("s3n://ruofan-bucket/data"). The code shows below:

import os, sys, inspect

### Current directory path.
curr_dir = os.path.split(inspect.getfile(inspect.currentframe()))[0]

### Setup the environment variables
spark_home_dir = os.path.realpath(os.path.abspath(os.path.join(curr_dir, "../spark-1.4.0")))
python_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
os.environ["SPARK_HOME"] = spark_home_dir
os.environ["PYTHONPATH"] = python_dir

### Setup pyspark directory path
pyspark_dir = os.path.realpath(os.path.abspath(os.path.join(spark_home_dir, "./python")))
sys.path.append(pyspark_dir)

### Import the pyspark
from pyspark import SparkConf, SparkContext

def main():
    ### Initialize the SparkConf and SparkContext
    conf = SparkConf().setAppName("ruofan").setMaster("local")
    sc = SparkContext(conf = conf)

    ### Create a RDD containing metadata about files in directory "data"
    datafile = sc.wholeTextFiles("s3n://ruofan-bucket/data")    ### Read data directory from S3 storage.

    ### Collect files from the RDD
    datafile.collect()


if __name__ == "__main__":
    main()

Before I run my code, I've already exported the environment variables: AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID. But when I run my code, it shows up the error:

IOError: [Errno 2] No such file or directory: 's3n://ruofan-bucket/data/test1.csv'

I'm sure I have the directory as well as the files on AWS S3, and I have no idea about the error. I really appreciate if anyone helps me solve the problem.

Upvotes: 3

Views: 4736

Answers (2)

Mez
Mez

Reputation: 4726

You can try the following to load data from S3, into an RDD, and then loop the results and print them out. You can do any transformation after using Spark SQL.

      val spark = SparkSession
    .builder()
    .appName("Spark SQL POC")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  val sparkContext = spark.sparkContext

  sparkContext.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
  sparkContext.hadoopConfiguration.set("fs.s3.awsAccessKeyId", accessKey)
  sparkContext.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", secret)
  sparkContext.hadoopConfiguration.set("fs.s3.endpoint",region)

  // The schema is encoded in a string
  val schemaString = "country displayName alias status userName userID exportTime city title email registrationDate firstName lastName dateOfBirth address1 address2 address3 postCode telephoneNumber nickName exclusionExpiryDate exclusionPeriod blocked blockReason lastLoginDate gender mobileNumber marketingSms marketingEmail affiliateMarker depositLimitDate depositLimitPeriod depositLimitAmount depositLimitCurrency depositLimitWaitForUpdate depositLimitUpdatePeriod depositLimitUpdateAmount depositLimitUpdateCurrency userModified selfExclusionModified userBlockModified registeredBy userNote"

  // Generate the schema based on the string of schema
  val fields = schemaString.split(" ")
    .map(fieldName => StructField(fieldName, StringType, nullable = true))

  val schema = StructType(fields)

  var s3Users = spark.sqlContext.read.schema(schema).json("s3://asgaard-data/users/*/*/*/*/").rdd

  // Apply the schema to the RDD
  val usersDataFrame = spark.createDataFrame(s3Users, schema)

  // Creates a temporary view using the DataFrame
  usersDataFrame.createOrReplaceTempView("users")

  // SQL can be run over a temporary view created using DataFrames
  val results = spark.sql("SELECT userName FROM users limit 10")

  results.map(attributes => "UserName: " + attributes(0)).show()

Versions are as follows

        <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.1.0</version>
    </dependency> 

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>

Upvotes: 0

John Rotenstein
John Rotenstein

Reputation: 270089

It would appear that wholeTextFiles does not work with Amazon S3.

See:

However, there may be differences between Hadoop versions, so don't take it as definite.

Upvotes: 1

Related Questions