Reputation: 65
I would like to read a BigQuery table based on the content of a pubsub message
subscription = 'my_subscription'
tablename = (p
| 'receive msg from PubSub' >> beam.io.ReadFromPubSub(subscription=subscription)
| "convert msg to dict" >> beam.Map(lambda x: json.loads(x))
| "extract tablename" >> beam.Map(lambda x: x['tablename'])
)
(p
| 'Read from BQ Table' >> beam.io.ReadFromBigQuery(table=tablename)
| #more transformations... )
Deploying this to a template_metadata file yields the following error message:
"TypeError: expected string or buffer"
Upvotes: 0
Views: 130
Reputation: 6582
The readFromPubSub
IO
returns a PubSubMessage
object.
If you want to retrieve the data inside the PubSubMessage
object, you have to change your code as follow :
subscription = 'my_subscription'
tablename = (p
| 'receive msg from PubSub' >> beam.io.ReadFromPubSub(subscription=subscription)
| "convert msg to dict" >> beam.Map(lambda msg: json.loads(msg.data.decode()))
| "extract tablename" >> beam.Map(lambda x: x['tablename'])
)
(p
| 'Read from BQ Table' >> beam.io.ReadFromBigQuery(table=tablename)
| #more transformations... )
I changed the transformation from PubSubMessage
to Dict
:
json.loads(msg.data.decode())
The msg.data.decode
allows to retrieve the data as str
, then the json.loads
will convert it correctly to Dict
.
Your last transformation should normally work well and you could retrieve the tablename
from the Dict
.
Upvotes: 2