Reputation: 2112
I have a zip file with a CSV and a json mapping file in it. I would like to read the csv into a spark data frame and the json mapping file into a dictionary. I have done the latter part doing this:
import boto3
obj = s3.get_object(Bucket='bucket', Key='key')
z = zipfile.ZipFile(io.BytesIO(obj["Body"].read()))
csvjson = json.loads(z.open(files[1]).read().decode('utf-8'))
In general, I would like to do the following to get the df from the csv file:
dfRaw = spark.read \
.format("text") \
.option("multiLine","true") \
.option("inferSchema","false") \
.option("header","true") \
.option("ignoreLeadingWhiteSpace","true") \
.option("ignoreTrailingWhiteSpace","true") \
.load(z.open(files[0]).read().decode('utf-8'))
However this obviously doesn't work because load()
expects a file path, and not the lines itself. How can I read this file from the zip file into a spark data frame?
Upvotes: 2
Views: 3544
Reputation: 6082
Since you're manually "unzip" CSV file and get the output as String, you can use parallelize
as follow
z = zipfile.ZipFile(io.BytesIO(obj["Body"].read()))
csv = [l.decode('utf-8').replace('\n', '') for l in z.open(files[0]).readlines()]
(spark
.sparkContext
.parallelize(csv)
.toDF(T.StringType())
.withColumn('value', F.from_csv('value', 'ID int, Trxn_Date string')) # your schema goes here
.select('value.*')
.show(10, False)
)
# Output
+----+----------+
|ID |Trxn_Date |
+----+----------+
|null|Trxn_Date |
|100 |2021-03-24|
|133 |2021-01-22|
+----+----------+
Upvotes: 3