Reputation: 16124
I suppose it should be faily easy to write PCollection of serialized protobuf messages into Text files and read them back. But I failed to do so after a few attempts. Would appreciate it if anyone has any comment.
// definition of proto.
syntax = "proto3";
package test;
message PhoneNumber {
string number = 1;
string country = 2;
}
I have the python code below that implements a simple Beam pipeline to write texts into serialized protobufs.
# Test python code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2
class ToProtoFn(beam.DoFn):
def process(self, element):
phone = phone_pb2.PhoneNumber()
phone.number, phone.country = element.strip().split(',')
yield phone.SerializeToString()
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.Create(["123-456-789,us", "345-567-789,ca"])
| beam.ParDo(ToProtoFn())
| beam.io.WriteToText('/Users/greeness/data/phone-pb'))
The pipeline can be run successfully and produced a file with content:
$ cat ~/data/phone-pb-00000-of-00001
123-456-789us
345-567-789ca
Then I code another pipeline to read the serialized protobufs and parse them with a ParDo
.
class ToCsvFn(beam.DoFn):
def process(self, element):
phone = phone_pb2.PhoneNumber()
phone.ParseFromString(element)
yield ",".join([phone.number, phone.country])
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.io.ReadFromText('/Users/greeness/data/phone*')
| beam.ParDo(ToCsvFn())
| beam.io.WriteToText('/Users/greeness/data/phone-csv'))
I got this error message when running it.
File "/Library/Python/2.7/site-packages/apache_beam/runners/common.py", line 458, in process_outputs
for result in results:
File "phone_example.py", line 37, in process
phone.ParseFromString(element)
File "/Library/Python/2.7/site-packages/google/protobuf/message.py", line 185, in ParseFromString
self.MergeFromString(serialized)
File "/Library/Python/2.7/site-packages/google/protobuf/internal/python_message.py", line 1069, in MergeFromString
raise message_mod.DecodeError('Truncated message.')
DecodeError: Truncated message. [while running 'ParDo(ToCsvFn)']
So it looks like the serialized protobuf string cannot be parsed. Am I missing something? Thanks for any help!
Upvotes: 5
Views: 3788
Reputation: 27
TFRecord
is a detail here, which means you can still get it working with TextIO.
The trick here is Coder, which is used for encoding and decoding a type during pipeline runs. Generally you should use them unless the type is builtin / trivial. In the protobuf class, using ProtoCoder
is simply the right thing to do.
from google.protobuf.timestamp_pb2 import Timestamp
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class ToProtoFn(beam.DoFn):
def process(self, element):
timestamp = Timestamp()
timestamp.seconds, timestamp.nanos = [int(x) for x in element.strip().split(',')]
print(timestamp)
yield timestamp
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.Create(["1586753000,222333000", "1586754000,222333000"])
| beam.ParDo(ToProtoFn())
| beam.io.WriteToText('time-pb',
coder=beam.coders.ProtoCoder(Timestamp().__class__)))
class ToCsvFn(beam.DoFn):
def process(self, element):
print(element)
yield ",".join([str(element.seconds), str(element.nanos)])
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.io.ReadFromText('time-pb-00000-of-00001',
coder=beam.coders.ProtoCoder(Timestamp().__class__))
| beam.ParDo(ToCsvFn())
| beam.io.WriteToText('time-csv'),
)
Upvotes: -1
Reputation: 16124
I find a temporary solution via the implemented tfrecordio.py
.
The code below is working. But I am still open to any comment that could solve the above problem.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2
def WriteTextToTFRecord():
class ToProtoFn(beam.DoFn):
def process(self, element):
phone = phone_pb2.PhoneNumber()
phone.number, phone.country = element.strip().split(',')
yield phone
with beam.Pipeline(options=PipelineOptions()) as p:
lines = p | beam.Create(["123-456-789,us", "345-567-789,ca"])
processed = (
lines
| beam.ParDo(ToProtoFn())
| beam.io.WriteToTFRecord('/Users/greeness/data/phone-pb',
coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__)))
def ReadTFRecordAndSaveAsCSV():
class ToCsvFn(beam.DoFn):
def process(self, element):
yield ','.join([element.number, element.country])
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.io.ReadFromTFRecord('/Users/greeness/data/phone-pb-*',
coder=beam.coders.ProtoCoder(phone_pb2.PhoneNumber().__class__))
| beam.ParDo(ToCsvFn())
| beam.io.WriteToText('/Users/greeness/data/phone-csv'))
if __name__ == '__main__':
WriteTextToTFRecord()
ReadTFRecordAndSaveAsCSV()
Upvotes: 5