FrancoisDuCoq
FrancoisDuCoq

Reputation: 65

Read Pubsub message to read from Bigquery

I would like to read a BigQuery table based on the content of a pubsub message

subscription = 'my_subscription'

tablename = (p
              | 'receive msg from PubSub' >> beam.io.ReadFromPubSub(subscription=subscription)
              | "convert msg to dict" >> beam.Map(lambda x: json.loads(x))
              | "extract tablename" >> beam.Map(lambda x: x['tablename'])
             )
(p
  | 'Read from BQ Table' >> beam.io.ReadFromBigQuery(table=tablename)
  | #more transformations...     )

Deploying this to a template_metadata file yields the following error message:

"TypeError: expected string or buffer"

Upvotes: 0

Views: 130

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6582

The readFromPubSub IO returns a PubSubMessage object.

If you want to retrieve the data inside the PubSubMessage object, you have to change your code as follow :

subscription = 'my_subscription'

tablename = (p
              | 'receive msg from PubSub' >> beam.io.ReadFromPubSub(subscription=subscription)
              | "convert msg to dict" >> beam.Map(lambda msg: json.loads(msg.data.decode()))
              | "extract tablename" >> beam.Map(lambda x: x['tablename'])
             )
(p
  | 'Read from BQ Table' >> beam.io.ReadFromBigQuery(table=tablename)
  | #more transformations...     )

I changed the transformation from PubSubMessage to Dict : json.loads(msg.data.decode())

The msg.data.decode allows to retrieve the data as str, then the json.loads will convert it correctly to Dict.

Your last transformation should normally work well and you could retrieve the tablename from the Dict.

Upvotes: 2

Related Questions