Reputation: 225
I'm trying to find the source of a bug on Spark 2.0.0, I have a map that holds table names as keys and the dataframe as the value, I loop through it and at the end use spark-avro (3.0.0-preview2) to write everything to S3 directories. It runs perfect locally (with a local path instead of an s3 path of course), but when I run it on Amazon's EMR it runs for a while and then it says the folder already exists and terminates (which would mean that the same key value is being used in that for loop more than once, right?). Is this possibly an issue with threading?
for ((k, v) <- tableMap) {
val currTable: DataFrame = tableMap(k)
val decryptedCurrTable = currTable.withColumn("data", decryptUDF(currTable("data")))
val decryptedCurrTableData = sparkSession.sqlContext.read.json(decryptedCurrTable.select("data").rdd.map(row => row.toString()))
decryptedCurrTable.write.avro(s"s3://..../$k/table")
decryptedCurrTableData.write.avro(s"s3://..../$k/tableData")
Upvotes: 2
Views: 1996
Reputation: 225
I think it was a concurrency issue, I changed my code to:
decryptedCurrTable.write.mode("append").avro(s"s3://..../$k/table")
decryptedCurrTableData.write.mode("append").avro(s"s3://..../$k/tableData")
And everything worked fine.
Upvotes: 3