morganics
morganics

Reputation: 1249

Dask - Reading partitions in order using itertuples

I'm using Dask to read in a table with around 14 million rows using read_sql_table. When I read the dataframe using itertuples the index (which is ordered in the table) is not read out sequentially for one or two partitions. How is it possible to enforce this? The row_id is generated by row_number (on the view) and is used as the index when generating the dataframe. I know Pandas has a sorted=True arg, anything similar?

This is what happens at the moment, while reading the data in (number of rows read should match the current index): INFO - Read 11870000 Rows (index: 11870000) INFO - Read 11880000 Rows (index: 11880000) INFO - Read 11890000 Rows (index: 11890000) INFO - Read 11900000 Rows (index: 11900000) --INFO - Read 11910000 Rows (index: 12159912)-- INFO - Read 11920000 Rows (index: 12169912) INFO - Read 11930000 Rows (index: 12179912) INFO - Read 11940000 Rows (index: 12189912)

All is good until the 11,900,000th row, and at that point it switches in the wrong partition.

Upvotes: 1

Views: 1592

Answers (1)

morganics
morganics

Reputation: 1249

This might be an answer to the issue (which is perhaps rare), but the software that reads the stream requires a monotonically increasing index. I can only assume that it's the multiple calls to the DB which are being resolved at different speeds, so possibly another option would be to use the single-threaded scheduler on the compute call to read_sql_table.

First of all, I get the first index in each partition;

def _order_partitions(self, ddf):
    ordering = {}
    for partition in range(ddf.npartitions):
        ordering.update({partition: int(df.get_partition(partition).head(1).index[0])})

    return sorted(ordering, key=ordering.get)

Storing the result in self._ordered_partitions, and then I recreate the itertuples function call in Dask (which is quite simple);

def _generator(self):
    for i in range(self._ddf.npartitions):
        ordered_partition = self._ordered_partitions[i]
        df = self._ddf.get_partition(ordered_partition).compute()
        for row in df.itertuples():
            yield row

The only change being the addition of the ordered_partition. I've not fully tested it yet, so will mark as an answer once I'm happy with it.

Upvotes: 1

Related Questions