IoT user
IoT user

Reputation: 1300

getAttribute from PubsubMessage in Dataflow

I have a problem trying to access pubsub message's attributes.

The error message is the following:

Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. 

stackTrace: [org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage.getAttribute(PubsubMessage.java:56),
transform1$3.processElement(transform1.java:37),
transform1$3$DoFnInvoker.invokeProcessElement(Unknown Source),
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218),
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183),
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78),
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:216),
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54),
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160), org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
java.util.concurrent.FutureTask.run(FutureTask.java:266),
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
java.lang.Thread.run(Thread.java:748)]

I'm using the Dataflow Eclipse SDK to run the pipeline locally:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-direct-java</artifactId>
  <version>${beam.version}</version>
  <scope>runtime</scope>
</dependency>

The line of code which produces the error is this:

String fieldId = c.element().getAttribute("evId");

The full code of the ptransform is the following:

public class transform1 extends DoFn<PubsubMessage, Event> {

    public static TupleTag<ErrorHandler> failuresTag=new TupleTag<ErrorHandler>(){};
    public static TupleTag<Event> validTag = new TupleTag<Event>(){};

    public static PCollectionTuple process(PCollection<PubsubMessage> logStrings)
    {
        return logStrings.apply("Create PubSub objects", ParDo.of(new DoFn<PubsubMessage, Event>()
        {
            @ProcessElement
            public void processElement(ProcessContext c)
            {
                try 
                {
                    Event event = new Event();
                    String fieldId = c.element().getAttribute("evId");
                    event.evId = "asa"; //this line is just to test to set a value
                    c.output(event);
                    <...>

I have seen a similar question but I'm not sure how I could fix it


The main pipeline code (if needed)

    public static PipelineResult run(Options options) {

        Pipeline pipeline = Pipeline.create(options);

        /*
         * Step 1: Read from PubSub
         */
        PCollection<PubsubMessage> messages = null;
        if (options.getUseSubscription()) {
            messages = pipeline.apply("ReadPubSubSubscription", PubsubIO.readMessagesWithAttributes()
                    .fromSubscription(options.getInputSubscription()).withIdAttribute("messageId"));
        } else {
            messages = pipeline.apply("ReadPubSubTopic", PubsubIO.readMessagesWithAttributes()
                    .fromTopic(options.getInputTopic()).withIdAttribute("messageId"));
        }


        /*
         * Step 2: Transform PubSubMessage to Event
         */
        PCollectionTuple eventCollections = transform1.process(messages);

PubSub message:

{ "evId":"id", "payload":"payload" }

I also tried as:

"{ "evId":"id", "payload":"payload" }"

This is how I publish the message in pubsub to test the pipeline:

publish


After making more test, the way I was publishing to pubsub it seems to be the source of the error, because If I added as attribute instead of message body the problem disappear.

Upvotes: 0

Views: 1525

Answers (1)

IoT user
IoT user

Reputation: 1300

The reason was I was trying to access to an attribute here:

String fieldId = c.element().getAttribute("evId");

But when I was sending the message through the pubsub dashboard I didn't add any attribute and it cause all the pipeline crash.

Upvotes: 1

Related Questions