krishnab
krishnab

Reputation: 10060

Deserializing messages without loading entire file into memory?

I am using Google Protocol Buffers and Python to decode some large data files--200MB each. I have some code below that shows how to decode a delimited stream and it works just fine. However it uses the read() command which loads the whole file into memory and then iterates over it.

import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32

with open('/home/working/data/feed.pb', 'rb') as f:
    buf = f.read() ## PROBLEM-LOADS ENTIRE FILE TO MEMORY.
    n = 0
    while n < len(buf):
        msg_len, new_pos = _DecodeVarint32(buf, n)
        n = new_pos
        msg_buf = buf[n:n+msg_len]
        n += msg_len
        read_row = sfeed.standard_feed()
        read_row.ParseFromString(msg_buf)
        # do something with read_metric
        print(read_row)

Note that this code comes from another SO post, but I don't remember the exact url. I was wondering if there was a readlines() equivalent with protocol buffers that allows me to read in one delimited message at a time and decode it? I basically want a pipeline that is not limited by the RAM I have to load the file.

Seems like there was a pystream-protobuf package that supported some of this functionality, but it has not been updated in a year or two. There is also a post from 7 years ago that asked a similar question. But I was wondering if there was any new information since then.

python example for reading multiple protobuf messages from a stream

Upvotes: 6

Views: 1833

Answers (2)

jpa
jpa

Reputation: 12176

If it is ok to load one full message at a time, this is quite simple to implement by modifying the code you posted:

import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32

with open('/home/working/data/feed.pb', 'rb') as f:
    buf = f.read(10) # Maximum length of length prefix
    while buf:
        msg_len, new_pos = _DecodeVarint32(buf, 0)
        buf = buf[new_pos:]
        # read rest of the message
        buf += f.read(msg_len - len(buf))
        read_row = sfeed.standard_feed()
        read_row.ParseFromString(buf)
        buf = buf[msg_len:]
        # do something with read_metric
        print(read_row)
        # read length prefix for next message
        buf += f.read(10 - len(buf))

This reads 10 bytes, which is enough to parse the length prefix, and then reads the rest of the message once its length is known.

String mutations are not very efficient in Python (they make a lot of copies of the data), so using bytearray can improve performance if your individual messages are also large.

Upvotes: 5

jrouquie
jrouquie

Reputation: 4405

https://github.com/cartoonist/pystream-protobuf/ was updated 6 months ago. I haven't tested it much so far, but it seems to work fine without any need for an update. It provides optional gzip and async.

Upvotes: 2

Related Questions