Davide Cavestro
Davide Cavestro

Reputation: 497

DataFlow SDK 2.x: how to consume from PubSubIO using java serialization

I am new to Dataflow, and I'm going to migrate the following snippet from Java SDK 1.9.0 to 2.3.0:

//SDK 1.9.0
PCollection<MyType> pubsub = p.apply(
  PubsubIO.Read.named("Read from Pubsub")
  .topic(myTopic)
  .withCoder(SerializableCoder.of(MyType.class))
  .timestampLabel("myDate"));

I'd convert it as

//SDK 2.3.0
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
  PubsubIO.<MyType>read () // <-- COMPILE ERROR here, private method
  .fromTopic(myTopic)
  .withTimestampAttribute ("myDate"))
.setCoder(SerializableCoder.of(MyType.class));

but the PubsubIO.read() method is private as of java SDK 2.3.0.

So I need to consume messages bringing serialized instances of MyType, but the methods exposed by PubsubIO seem meant only for text messages, avro, protobuf and so on.

How can I read from PubsubIO topics where messages contain serialized java objects?

UPDATE:

I could tweak it this way (not tried yet)...

PCollection<MyType> pubsub = p.apply("Read from Pubsub",
  PubsubIO.readMessagesWithAttributes ()
  .fromTopic(myTopic)
  .withTimestampAttribute ("myDate"))
.apply (MapElements.via(new SimpleFunction<PubsubMessage, MyType> () {
        @Override
        public MyType apply (final PubsubMessage message) {
            final byte[] payload = message.getPayload ();
            try {
                try (final ObjectInputStream stream = new ObjectInputStream (new ByteArrayInputStream (payload))) {
                    return (MyType) stream.readObject ();
                }
            } catch (IOException e) {
                throw new RuntimeException (e);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException (e);
            }
        }
    }))

Upvotes: 0

Views: 601

Answers (1)

Scott Wegner
Scott Wegner

Reputation: 7493

Your updated code looks like it should work. Note that there is also PubsubIO.readPubsubMessagesWithoutAttributes() if you are not using the attribute map.

The previous functionality was removed in PR#2634, which replaced it with specialized methods for the most common encoding types (proto, avro, Strings).

I suspect arbitrary object decoding via SerializableCoder wasn't retained due to the inherent dangers of relying on Java Serialization. See the SerializableCoder javadoc or related question Java serialization - advantages and disadvantages, use or avoid?. However, if you feel like the API is lacking, the Beam SDK is open source and the community welcomes contributions.

Upvotes: 2

Related Questions