Reputation: 497
I am new to Dataflow, and I'm going to migrate the following snippet from Java SDK 1.9.0 to 2.3.0:
//SDK 1.9.0
PCollection<MyType> pubsub = p.apply(
PubsubIO.Read.named("Read from Pubsub")
.topic(myTopic)
.withCoder(SerializableCoder.of(MyType.class))
.timestampLabel("myDate"));
I'd convert it as
//SDK 2.3.0
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
PubsubIO.<MyType>read () // <-- COMPILE ERROR here, private method
.fromTopic(myTopic)
.withTimestampAttribute ("myDate"))
.setCoder(SerializableCoder.of(MyType.class));
but the PubsubIO.read()
method is private as of java SDK 2.3.0.
So I need to consume messages bringing serialized instances of MyType
, but the methods exposed by PubsubIO
seem meant only for text messages, avro, protobuf and so on.
How can I read from PubsubIO topics where messages contain serialized java objects?
UPDATE:
I could tweak it this way (not tried yet)...
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
PubsubIO.readMessagesWithAttributes ()
.fromTopic(myTopic)
.withTimestampAttribute ("myDate"))
.apply (MapElements.via(new SimpleFunction<PubsubMessage, MyType> () {
@Override
public MyType apply (final PubsubMessage message) {
final byte[] payload = message.getPayload ();
try {
try (final ObjectInputStream stream = new ObjectInputStream (new ByteArrayInputStream (payload))) {
return (MyType) stream.readObject ();
}
} catch (IOException e) {
throw new RuntimeException (e);
} catch (ClassNotFoundException e) {
throw new RuntimeException (e);
}
}
}))
Upvotes: 0
Views: 601
Reputation: 7493
Your updated code looks like it should work. Note that there is also PubsubIO.readPubsubMessagesWithoutAttributes()
if you are not using the attribute map.
The previous functionality was removed in PR#2634, which replaced it with specialized methods for the most common encoding types (proto, avro, Strings).
I suspect arbitrary object decoding via SerializableCoder
wasn't retained due to the inherent dangers of relying on Java Serialization. See the SerializableCoder
javadoc or related question Java serialization - advantages and disadvantages, use or avoid?. However, if you feel like the API is lacking, the Beam SDK is open source and the community welcomes contributions.
Upvotes: 2