psykidellic
psykidellic

Reputation: 71

Accessing pubsublite message attributes in beam pipeline - Java

We have been using PubSubLite in our Go program without any issues and I just started using the Java library with Beam.

Using the PubSubLite IO, we get PCollection of SequencedMessage specifically: https://cloud.google.com/java/docs/reference/google-cloud-pubsublite/latest/com.google.cloud.pubsublite.proto.SequencedMessage

Now, from it I can get the data by doing something like:

message.getMessage().getData().toByteArray()

and then doing the normal conversion.

But for attributes, I cannot seem to get it correctly, just the value. In Go, I could do:

msg.Attributes["attrKey"]

but when I do:

message.getMessage().getAttributesMap().get("attrKey")

I am getting an Object which I cannot seem to convert to just string value of it. As far as I understand, it returns a Map<String, AttributeValues> and they all seem to be just wrapper over the internal protobuf. Also, Map is an interface so how do I get to the actual implementation to get the underlying value of each of the attribute.

Upvotes: 1

Views: 164

Answers (1)

Daniel Collins
Daniel Collins

Reputation: 812

The SequencedMessage attributes represent a multimap of string to bytes, not a map of string to string like in standard Pub/Sub. In the go client, by default the client will error if there are multiple values for a given key or if any of the values is not valid UTF-8, and thus presents a map[string]string interface.

When you call message.getMessage().getAttributesMap().get("attrKey"), you have a value of type AttributeValues which is a holder for a list of ByteStrings. To convert this to a single String, you would need to throw if the list is not of length 1, then call toStringUtf8 on the byte string element with index 0.

If you wish to interact with the standard Pub/Sub message format like you would in go, you can convert to this format by doing:

import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsublite.CloudPubsubTransforms;

PCollection<SequencedMessage> messages = ...
PCollection<PubsubMessage> transformed = messages.apply(CloudPubsubTransforms.toCloudPubsubMessages());

Upvotes: 1

Related Questions