Reputation: 4664
I'm not sure where to begin, so looking for some guidance. I'm looking for a way to create some arrays/tables in one process, and have it accessible (read-only) from another.
So I create a pyarrow.Table
like this:
a1 = pa.array(list(range(3)))
a2 = pa.array(["foo", "bar", "baz"])
a1
# <pyarrow.lib.Int64Array object at 0x7fd7c4510100>
# [
# 0,
# 1,
# 2
# ]
a2
# <pyarrow.lib.StringArray object at 0x7fd7c5d6fa00>
# [
# "foo",
# "bar",
# "baz"
# ]
tbl = pa.Table.from_arrays([a1, a2], names=["num", "name"])
tbl
# pyarrow.Table
# num: int64
# name: string
# ----
# num: [[0,1,2]]
# name: [["foo","bar","baz"]]
Now how do I read this from a different process? I thought I would use multiprocessing.shared_memory.SharedMemory
, but that didn't quite work:
shm = shared_memory.SharedMemory(name='pa_test', create=True, size=tbl.nbytes)
with pa.ipc.new_stream(shm.buf, tbl.schema) as out:
for batch in tbl.to_batches():
out.write(batch)
# TypeError: Unable to read from object of type: <class 'memoryview'>
Do I need to wrap the shm.buf
with something?
Even if I get this to work, it seems very fiddly. How would I do this in a robust manner? Do I need something like zmq?
I'm not clear how this is zero copy though. When I write the record batches, isn't that serialisation? What am I missing?
In my real use case, I also want to talk to Julia, but maybe that should be a separate question when I come to it.
PS: I have gone through the docs, it didn't clarify this part for me.
Upvotes: 9
Views: 2182
Reputation: 223
Do I need to wrap the shm.buf with something?
Yes, you can use pa.py_buffer()
to wrap it:
size = calculate_ipc_size(table)
shm = shared_memory.SharedMemory(create=True, name=name, size=size)
stream = pa.FixedSizeBufferWriter(pa.py_buffer(shm.buf))
with pa.RecordBatchStreamWriter(stream, table.schema) as writer:
writer.write_table(table)
Also, for size
you need to calculate the size of the IPC output, which may be a bit larger than Table.nbytes
. The function you can use for that is:
def calculate_ipc_size(table: pa.Table) -> int:
sink = pa.MockOutputStream()
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table)
return sink.size()
How would I do this in a robust manner?
Not sure of this part yet. In my experience the original process needs to stay alive while the others are reusing the buffers, but there might be a way to get around that. This is likely connected to this bug in CPython: https://bugs.python.org/issue38119
I'm not clear how this is zero copy though. When I write the record batches, isn't that serialisation? What am I missing?
You are correct that writing the Arrow data into an IPC buffer does involve copies. The zero-copy part is when other processes read the data from shared memory. The columns of the Arrow table will reference the relevant segments of the IPC buffer, rather than a copy.
Upvotes: 10