Reputation: 187
I am using as_pandas
utility from impala.util
to read the data in dataframe
form fetched from hive. However, using pandas, I think I will not be able to handle large amount of data and it will also be slower. I have been reading about dask which provides excellent functionality for reading large data files. How can I use it to efficiently fetch data from hive.
def as_dask(cursor):
"""Return a DataFrame out of an impyla cursor.
This will pull the entire result set into memory. For richer pandas-
like functionality on distributed data sets, see the Ibis project.
Parameters
----------
cursor : `HiveServer2Cursor`
The cursor object that has a result set waiting to be fetched.
Returns
-------
DataFrame
"""
import pandas as pd
import dask
import dask.dataframe as dd
names = [metadata[0] for metadata in cursor.description]
dfs = dask.delayed(pd.DataFrame.from_records)(cursor.fetchall(),
columns=names)
return dd.from_delayed(dfs).compute()
Upvotes: 2
Views: 2945
Reputation: 28694
There is no current straight-forward way to do this. You would do well to see the implementation of dask.dataframe.read_sql_table and similar code in intake-sql - you will probably want a way to partition your data, and have each of your workers fetch one partition via a call to delayed()
. dd.from_delayed
and dd.concat
could then be used to stitch the pieces together.
-edit-
Your function has the delayed idea back to front. You are delaying and the immediately materialising the data within a function that operates on a single cursor - it can't be parallelised and will break your memory if the data is big (which is the reason you are trying this).
Lets suppose you can form a set of 10 queries, where each query gets a different part of the data; do not use OFFSET, use a condition on some column that is indexed by Hive. You want to do something like:
queries = [SQL_STATEMENT.format(i) for i in range(10)]
def query_to_df(query):
cursor = impyla.execute(query)
return pd.DataFrame.from_records(cursor.fetchall())
Now you have a function that returns a partition and has no dependence on global objects - it only takes as input a string.
parts = [dask.delayed(query_to_df)(q) for q in queries]
df = dd.from_delayed(parts)
Upvotes: 3