LoicM
LoicM

Reputation: 2213

Apache beam read schema from pubsub

I am very new to beam and reading streaming data so I hope my question is not too trivial.

I am using the beam Python SDK to read data from PubSub before writing it into some other files. As the data I receive is always in the same format, I tried to make use of the schema feature to parse the data I receive from PubSub.

The data received is always a dictionary name: "my_name", value: 42, so my pipeline looks like this:

import typing

import apache_beam as beam
from apache_beam.io import ReadFromPubSub


class MySchema(typing.NamedTuple):
    name: str
    value: int

with beam.Pipeline() as pipeline:
    pipeline | ReadFromPubSub(topic=<my_subscription>).with_output_types(MySchema)

However, I then get the error apache_beam.typehints.decorators.TypeCheckError: Output type hint violation at ReadFromPubSub: expected <class '__main__.MassificationImage'>, got <class 'bytes'>

It makes sense as PubSub naturally gets bytes: I can just parse the data into a dictionary and then it seems to work.

with beam.Pipeline() as pipeline:
    (pipeline 
        | ReadFromPubSub(topic=<my_subscription>)
        | beam.Map(lambda x: json.loads(x.decode("utf8"))).with_output_types(MySchema)

It seems to work fine, but does not having to parse the data into a dictionary kind of defeat the purpose of the schema ? Is there any more straightforward way of doing this ?

Upvotes: 0

Views: 856

Answers (1)

ningk
ningk

Reputation: 1383

https://beam.apache.org/documentation/programming-guide/#schemas describes purposes of Beam schemas. The assumption is that

Often, the types of the records being processed have an obvious structure. Common Beam sources produce JSON, Avro, Protocol Buffer, or database row objects; all of these types have well defined structures ...

And the schema makes further transforms easier.

The confusion here is that Pub/Sub as a source doesn't provide structured data directly. Or at least the Beam IO for Pub/Sub reads bytes.

Pub/Sub does support schemas: https://cloud.google.com/pubsub/docs/schemas. You may create a new IO to merge the bytes-read and schema-parse.

Upvotes: 1

Related Questions