Reputation: 276
I have a lot of files with text data pushed by azure IOT on a blob storage in a lot of folders, and I want to read them and have a delta lake table with one row for each line of a file. I used to read them file by file, but it takes too much time so I want to use spark to speed up this treatment. It needs to integrate a databricks workflow made in R.
I've found spark_read_text function to read text file, but it cannot recursively read directory, it only understand if all the files are in one directory.
Here is an example of a file path (appid/partition/year/month/day/hour/minute/file): app_id/10/2023/02/06/08/42/gdedir22hccjq
Partition is a random folder (there is around 30 of them right now) that azure IoT seems to create to treat data in parallel, so data for the same date can be split in several folders, which does not simplify the reading efficiency.
So the only function I found to do that is spark.textFile
, which works with jokers and recursively handle directories. The only problem is that it return a RDD, and I can't find a way to transform it to a spark dataframe, which could ultimatly be accessed using a tbl_spark
R object.
Here is what I did so far:
You need to set the config to recursively read the folder (here I do this on databricks in a dedicate python cell):
%py
sc._jsc.hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true")
Then I can create a RDD:
j_rdd <- spark_context(sc) %>%
invoke("textFile", "/mnt/my_cont/app_id/*/2022/11/17/*", 10L)
This work to create the RDD, and as you can see I can map all the partitions (before the year) with a "*", as well as the folders four hours and minutes recursively with the "*" at the end.
I can collect it and create a R dataframe:
lst <- invoke(j_rdd, "collect")
data.frame(row = unlist(lst))
This correctly get my data, one column of text and one row for each line of each file (I can't display an example for privacy reason but it's not important).
The problem is I don't want to collect, but want to update a delta table with this data, and can't find a way to get a sparklyr object that I can use. The j_rdd object I got is like this:
>j_obj
<jobj[2666]>
org.apache.spark.rdd.MapPartitionsRDD
/mnt/my_cont/app_id/*/2022/11/17/* MapPartitionsRDD[80] at textFile at NativeMethodAccessorImpl.java:0
The closer I got so far: I tried to copy code here to convert data to a dataframe using invoke, but I don't seems to do it correctly:
contents_field <- invoke_static(sc, "sparklyr.SQLUtils", "createStructField", "contents", "character", TRUE)
schema <- invoke_static(sc, "sparklyr.SQLUtils", "createStructType", list(contents_field))
j_df <- invoke(hive_context(sc), "createDataFrame", j_rdd, schema)
invoke(j_df, "createOrReplaceTempView", "tmp_test")
dfs <- tbl(sc, "tmp_test")
dfs %>% sdf_nrow()
I only have one column with character in it so I thought it would work, but I get this error:
Error : org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 25.0 failed 4 times, most recent failure: Lost task 14.3 in stage 25.0 (TID 15158) (10.221.193.133 executor 2): java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, contents), StringType, false), true, false, true) AS contents#366
at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1192)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:236)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:208)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:95)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:233)
... 28 more
Does anyone have an idea how to convert this RDD object (using R/sparklyr) that I got in return of the invoke function in something usable without collecting data ?
Upvotes: 0
Views: 185
Reputation: 276
Finally, I found that spark_read_text
can also read multiple files with jokers, but you have to put a joker for each directories and files, it cannot discover folders recursively.
For example:
dfs <- spark_read_text(sc, "/mnt/container/app_id/10/2023/02/06/*")
...doesn't work. But:
dfs <- spark_read_text(sc, "/mnt/container/app_id/10/2023/02/06/*/*/*")
...works. Also:
dfs <- spark_read_text(sc, "/mnt/container/app_id/*/2023/02/06/*/*/*")
...with a joker above the date also works.
As the directory depth doesn't change in my case, that's enough for me.
Upvotes: 0