Maciek
Maciek

Reputation: 792

How to load a big numpy array from a text file with Dask?

I have a text file containing data that I read to memory with numpy.genfromtxt enforcing a custom numpy.dtype. Although the text file is smaller than the available RAM, I often get a MemoryError (which I don't understand, but it is not the point of this question). When looking for ways to resolve it, I came across dask. In the API I found methods for data loading but none of them reads from text files, not to mention my need to support converters in genfromtxt().

I see there is a dask.dataframe.read_csv() method, but in my case I don't use pandas, but rather plain numpy.array with custom dtypes and colum names, as mentioned above. The text file I have is not CSV anyway (thus the abovementioned use of converters in genfromtxt()).

Any ideas on how could I handle it will be appreciated.

Upvotes: 0

Views: 1104

Answers (2)

Maciek
Maciek

Reputation: 792

SO editors rejected my edit to @mdurant's answer, thus, I post the working code (based on that answer) here:

import numpy 
import dask
import dask.array as da
import io

fname = 'data.txt'
# data.txt is:
# 1 2
# 3 4
# 5 6

files = [fname]
_, blocks = dask.bytes.read_bytes(files, delimiter="\n")

my_type = numpy.dtype([
                    ('f1', numpy.float64),
                    ('f2', numpy.float64)
                    ])

native_type = numpy.float
used_type = numpy.float64
# If the below line is uncommented, then creating the dask array will work, but it won't
# be possible to perform any operations on it
# used_type = my_type

# Debug
# print('blocks', blocks)
# print('type(blocks)', type(blocks))
# print('blocks[0]', blocks[0])
# print('type(blocks[0])', type(blocks[0]))

@dask.delayed
def parse(block):
    r = numpy.genfromtxt(io.BytesIO(block[0]))
    print('parse() about to return:\n', r, '\n')
    return r

# Below I added shape, which seems compulsatory, the reason for which I don't understand
arrays = [da.from_delayed(value=parse(block), shape=(3, ), dtype=used_type) for block in blocks]
# da.concat did not not work for me
arr = da.stack(arrays)
# The below will not work if used_type is set to my_type
arr += 1
# Neither the below woudl work, it raises NotImplementedError
# arr['f1'] += 1
arr_np = arr.compute()
print('numpy array incremented by one: \n', arr_np)

Upvotes: 1

mdurant
mdurant

Reputation: 28673

You should use the function dask.bytes.read_bytes with delimiter="\n" to read your file(s) and split them into blocks at line-endings. You get back a set of dask.delayed objects, which you can pass to numpy. Unfortunately, numpy wants a file-like, so you must pack the bytes again:

import dask
import dask.array as da
_, blocks = dask.bytes.read_bytes(files, delimiter="\n")

@dask.delayed
def parse(block):
    return numpy.genfromtext(io.BytesIO(block), ...)

arrays = [da.from_delayed(parse(block), ...) for block in blocks]
arr = da.stack/concat(arrays)

Upvotes: 1

Related Questions