Reputation: 2644
I am trying to get information about what are the distinct combinations of values in two of the columns in my pyarrow table.
What I am currently doing is:
import pandas as pd
import pyarrow as pa
my_table = pa.Table.from_pandas(
pd.DataFrame(
{
'col1':['a', 'a', 'a', 'a', 'b', 'b', 'b', 'b'],
'col2':[1,1,2,2,1,1,2,3],
'col3':[1,2,3,4,5,6,7,8]
}
)
)
a = [i.to_numpy().astype('str') for i in my_table.select(['col1', 'col2']).columns]
unique = np.unique(np.array(a), axis = 1)
Which returns the expected result of:
unique
>array([['a', 'a', 'b', 'b', 'b'],
['1', '2', '1', '2', '3']], dtype='<U21')
but this is quite slow for larger tables and I would hope that there is a faster way?
Alternatively, what I'd really like to know, is when I am trying to write a partitioned dataset, how to know in advance to which directories will it write (i.e. which partitions have some data in my table)
EDIT:
it can be faster converting to pandas instead of multiple numpy arrays and then using drop_duplicates()
:
my_table.select(['col1', 'col2']).to_pandas().drop_duplicates()
Upvotes: 1
Views: 4715
Reputation: 11
You could use the aggregation function as well
table.group_by([<column_names>]).aggregate([])
Upvotes: 1
Reputation: 1406
Support for encoding structs directly is tracked by https://issues.apache.org/jira/browse/ARROW-3978
In the meantime, here's a workaround which is computationally similar to pandas' unique-ing functionality, but avoids conversion-to-pandas costs by using pyarrow
's own compute kernels.
import pyarrow as pa
import pyarrow.compute as pc
def _dictionary_and_indices(column):
assert isinstance(column, pa.ChunkedArray)
if not isinstance(column.type, pa.DictionaryType):
column = pc.dictionary_encode(column, null_encoding_behavior='encode')
dictionary = column.chunk(0).dictionary
indices = pa.chunked_array([c.indices for c in column.chunks])
if indices.null_count != 0:
# We need nulls to be in the dictionary so that indices can be
# meaningfully multiplied, so we must round trip through decoded
column = pc.take(dictionary, indices)
return _dictionary_and_indices(column)
return dictionary, indices
def unique(table):
"produce a table containing only the unique rows from the input"
if table.num_columns == 0:
return None
table = table.unify_dictionaries()
dictionaries = []
fused_indices = None
for c in table.columns:
dictionary, indices = _dictionary_and_indices(c)
if fused_indices is None:
fused_indices = indices
else:
# pack column's indices into fused_indices
fused_indices = pc.add(
pc.multiply(fused_indices, len(dictionary)),
indices)
dictionaries.append(dictionary)
uniques = []
# pc.unique can now be invoked on the single array of fused_indices
fused_indices = pc.unique(fused_indices)
for dictionary in reversed(dictionaries):
# unpack the column's indices from fused_indices
quotient = pc.divide(fused_indices, len(dictionary))
remainder = pc.subtract(fused_indices,
pc.multiply(quotient, len(dictionary)))
# decode this column's uniques
uniques.insert(0, pc.take(dictionary, remainder))
fused_indices = quotient
return pa.Table.from_arrays(uniques, names=table.column_names)
if __name__ == '__main__':
my_table = pa.table({
'col1': ['a', 'a', 'a', 'a', 'b', 'b', 'b', 'b'],
'col2': [1, 1, 2, 2, 1, 1, 2, 3],
'col3': [1, 2, 3, 4, 5, 6, 7, 8],
})
assert unique(my_table.select(['col1', 'col2'])).equals(pa.table({
'col1': ['a', 'a', 'b', 'b', 'b'],
'col2': [1, 2, 1, 2, 3],
}))
Upvotes: 3