Ali. K
Ali. K

Reputation: 187

Using dask to read data from Hive

I am using as_pandas utility from impala.util to read the data in dataframe form fetched from hive. However, using pandas, I think I will not be able to handle large amount of data and it will also be slower. I have been reading about dask which provides excellent functionality for reading large data files. How can I use it to efficiently fetch data from hive.

def as_dask(cursor):
"""Return a DataFrame out of an impyla cursor.
This will pull the entire result set into memory.  For richer pandas- 
like functionality on distributed data sets, see the Ibis project.

Parameters
----------
cursor : `HiveServer2Cursor`
    The cursor object that has a result set waiting to be fetched.
Returns
-------
DataFrame
"""
    import pandas as pd
    import dask
    import dask.dataframe as dd

    names = [metadata[0] for metadata in cursor.description]
    dfs = dask.delayed(pd.DataFrame.from_records)(cursor.fetchall(), 
    columns=names)
    return dd.from_delayed(dfs).compute()

Upvotes: 2

Views: 2945

Answers (1)

mdurant
mdurant

Reputation: 28694

There is no current straight-forward way to do this. You would do well to see the implementation of dask.dataframe.read_sql_table and similar code in intake-sql - you will probably want a way to partition your data, and have each of your workers fetch one partition via a call to delayed(). dd.from_delayed and dd.concat could then be used to stitch the pieces together.

-edit-

Your function has the delayed idea back to front. You are delaying and the immediately materialising the data within a function that operates on a single cursor - it can't be parallelised and will break your memory if the data is big (which is the reason you are trying this).

Lets suppose you can form a set of 10 queries, where each query gets a different part of the data; do not use OFFSET, use a condition on some column that is indexed by Hive. You want to do something like:

queries = [SQL_STATEMENT.format(i) for i in range(10)]
def query_to_df(query):
    cursor = impyla.execute(query)
    return pd.DataFrame.from_records(cursor.fetchall())

Now you have a function that returns a partition and has no dependence on global objects - it only takes as input a string.

parts = [dask.delayed(query_to_df)(q) for q in queries]
df = dd.from_delayed(parts)

Upvotes: 3

Related Questions