Reputation: 792
I have a text file containing data that I read to memory with numpy.genfromtxt
enforcing a custom numpy.dtype
. Although the text file is smaller than the available RAM, I often get a MemoryError
(which I don't understand, but it is not the point of this question). When looking for ways to resolve it, I came across dask. In the API I found methods for data loading but none of them reads from text files, not to mention my need to support converters in genfromtxt()
.
I see there is a dask.dataframe.read_csv()
method, but in my case I don't use pandas, but rather plain numpy.array
with custom dtypes and colum names, as mentioned above. The text file I have is not CSV anyway (thus the abovementioned use of converters in genfromtxt()
).
Any ideas on how could I handle it will be appreciated.
Upvotes: 0
Views: 1104
Reputation: 792
SO editors rejected my edit to @mdurant's answer, thus, I post the working code (based on that answer) here:
import numpy
import dask
import dask.array as da
import io
fname = 'data.txt'
# data.txt is:
# 1 2
# 3 4
# 5 6
files = [fname]
_, blocks = dask.bytes.read_bytes(files, delimiter="\n")
my_type = numpy.dtype([
('f1', numpy.float64),
('f2', numpy.float64)
])
native_type = numpy.float
used_type = numpy.float64
# If the below line is uncommented, then creating the dask array will work, but it won't
# be possible to perform any operations on it
# used_type = my_type
# Debug
# print('blocks', blocks)
# print('type(blocks)', type(blocks))
# print('blocks[0]', blocks[0])
# print('type(blocks[0])', type(blocks[0]))
@dask.delayed
def parse(block):
r = numpy.genfromtxt(io.BytesIO(block[0]))
print('parse() about to return:\n', r, '\n')
return r
# Below I added shape, which seems compulsatory, the reason for which I don't understand
arrays = [da.from_delayed(value=parse(block), shape=(3, ), dtype=used_type) for block in blocks]
# da.concat did not not work for me
arr = da.stack(arrays)
# The below will not work if used_type is set to my_type
arr += 1
# Neither the below woudl work, it raises NotImplementedError
# arr['f1'] += 1
arr_np = arr.compute()
print('numpy array incremented by one: \n', arr_np)
Upvotes: 1
Reputation: 28673
You should use the function dask.bytes.read_bytes
with delimiter="\n"
to read your file(s) and split them into blocks at line-endings. You get back a set of dask.delayed
objects, which you can pass to numpy. Unfortunately, numpy wants a file-like, so you must pack the bytes again:
import dask
import dask.array as da
_, blocks = dask.bytes.read_bytes(files, delimiter="\n")
@dask.delayed
def parse(block):
return numpy.genfromtext(io.BytesIO(block), ...)
arrays = [da.from_delayed(parse(block), ...) for block in blocks]
arr = da.stack/concat(arrays)
Upvotes: 1