Hemanth Annavarapu
Hemanth Annavarapu

Reputation: 917

Recursively adding rows to a dataframe

I am new to spark. I have some json data that comes as an HttpResponse. I'll need to store this data in hive tables. Every HttpGet request returns a json which will be a single row in the table. Due to this, I am having to write single rows as files in the hive table directory.

But I feel having too many small files will reduce the speed and efficiency. So is there a way I can recursively add new rows to the Dataframe and write it to the hive table directory all at once. I feel this will also reduce the runtime of my spark code.

Example:

for(i <- 1 to 10){
 newDF = hiveContext.read.json("path")
 df = df.union(newDF)
}
df.write()

I understand that the dataframes are immutable. Is there a way to achieve this?

Any help would be appreciated. Thank you.

Upvotes: 1

Views: 1188

Answers (3)

Jack Leow
Jack Leow

Reputation: 22497

You are mostly on the right track, what you want to do is to obtain multiple single records as a Seq[DataFrame], and then reduce the Seq[DataFrame] to a single DataFrame by unioning them.

Going from the code you provided:

val BatchSize = 100
val HiveTableName = "table"

(0 until BatchSize).
map(_ => hiveContext.read.json("path")).
reduce(_ union _).
write.insertInto(HiveTableName)

Alternatively, if you want to perform the HTTP requests as you go, we can do that too. Let's assume you have a function that does the HTTP request and converts it into a DataFrame:

def obtainRecord(...): DataFrame = ???

You can do something along the lines of:

val HiveTableName = "table"
val OtherHiveTableName = "other_table"
val jsonArray = ???

val batched: DataFrame =
    jsonArray.
    map { parameter =>
      obtainRecord(parameter)
    }.
    reduce(_ union _)
batched.write.insertInto(HiveTableName)
batched.select($"...").write.insertInto(OtherHiveTableName)

Upvotes: 1

puhlen
puhlen

Reputation: 8529

If you can batch-download all of the data (for example with a script using curl or some other program) and store it in a file first (or many files, spark can load an entire directory at once) you can then load that file(or files) all at once into spark to do your processing. I would also check to see it the webapi as any endpoints to fetch all the data you need instead of just one record at a time.

Upvotes: 0

user8320812
user8320812

Reputation: 1

You are clearly misusing Spark. Apache Spark is analytical system, not a database API. There is no benefit of using Spark to modify Hive database like this. It will only bring a severe performance penalty without benefiting from any of the Spark features, including distributed processing.

Instead you should use Hive client directly to perform transactional operations.

Upvotes: 0

Related Questions