Reputation: 1300
I have a problem trying to access pubsub message's attributes.
The error message is the following:
Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal.
stackTrace: [org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage.getAttribute(PubsubMessage.java:56),
transform1$3.processElement(transform1.java:37),
transform1$3$DoFnInvoker.invokeProcessElement(Unknown Source),
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:218),
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183),
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78),
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:216),
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54),
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160), org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
java.util.concurrent.FutureTask.run(FutureTask.java:266),
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
java.lang.Thread.run(Thread.java:748)]
I'm using the Dataflow Eclipse SDK to run the pipeline locally:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
The line of code which produces the error is this:
String fieldId = c.element().getAttribute("evId");
The full code of the ptransform is the following:
public class transform1 extends DoFn<PubsubMessage, Event> {
public static TupleTag<ErrorHandler> failuresTag=new TupleTag<ErrorHandler>(){};
public static TupleTag<Event> validTag = new TupleTag<Event>(){};
public static PCollectionTuple process(PCollection<PubsubMessage> logStrings)
{
return logStrings.apply("Create PubSub objects", ParDo.of(new DoFn<PubsubMessage, Event>()
{
@ProcessElement
public void processElement(ProcessContext c)
{
try
{
Event event = new Event();
String fieldId = c.element().getAttribute("evId");
event.evId = "asa"; //this line is just to test to set a value
c.output(event);
<...>
I have seen a similar question but I'm not sure how I could fix it
The main pipeline code (if needed)
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
/*
* Step 1: Read from PubSub
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages = pipeline.apply("ReadPubSubSubscription", PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()).withIdAttribute("messageId"));
} else {
messages = pipeline.apply("ReadPubSubTopic", PubsubIO.readMessagesWithAttributes()
.fromTopic(options.getInputTopic()).withIdAttribute("messageId"));
}
/*
* Step 2: Transform PubSubMessage to Event
*/
PCollectionTuple eventCollections = transform1.process(messages);
PubSub message:
{ "evId":"id", "payload":"payload" }
I also tried as:
"{ "evId":"id", "payload":"payload" }"
This is how I publish the message in pubsub to test the pipeline:
After making more test, the way I was publishing to pubsub it seems to be the source of the error, because If I added as attribute instead of message body the problem disappear.
Upvotes: 0
Views: 1525
Reputation: 1300
The reason was I was trying to access to an attribute here:
String fieldId = c.element().getAttribute("evId");
But when I was sending the message through the pubsub dashboard I didn't add any attribute and it cause all the pipeline crash.
Upvotes: 1