nembleton
nembleton

Reputation: 2502

How to implement a custom file parser in Google DataFlow for a Google Cloud Storage file

I have a custom file format in Google Cloud Storage and I want to read it from Google DataFlow.

I've implemented a Source and a Reader by subclassing FileBasedReader, but then I realized it didn't support reading from Google Cloud Storage (while FileBasedSink actually does...) so I'm not sure what's the best idea to solve that here...

I tried to subclass TextIO but I couldn't reach an end with that as it doesn't seem to be designed to be subclassed.

Any good idea on how to deal with that?

Thanks.

Update to reflect on the comments

File pattern used: gs://mybucket/my.json

Implemented the Source class from FileBasedSource:

MessageSource<T> extends FileBasedSource<T>

Implemented the Reader class (what I really care about here) from FileBasedReader:

MessageReader<T> extends FileBasedReader<T>

Process for reading is:

MySource source = // instantiate source
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from(options.getSource()).named("ReadFileData"))
     .apply(ParDo.of(new DoFn<String, String>() {

And the getSource() comes from this command line parameter (verified correct):

    --source=gs://${BUCKET_NAME}/my.json \

Am I missing anything?

2nd UPDATE

While running source.getEstimatedSizeBytes(options) it tells me no handler found?

java.io.IOException: Unable to find handler for gs://mybucket/my.json
at com.google.cloud.dataflow.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:186)
at com.google.cloud.dataflow.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:182)
at com.etc.TrackingDataPipeline.main(TrackingDataPipeline.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)

I thought the FileBasedSource was supposed to handle GCS?

Upvotes: 2

Views: 510

Answers (1)

Kenn Knowles
Kenn Knowles

Reputation: 6023

From the stack trace you show in "2nd Update", it looks like you have called getEstimatedSizeBytes directly from your main() method. This is expected to lead to the error you see.

The standard URL scheme handlers are registered when a pipeline runner is constructed. In your example code, that would happen when you call Pipeline.create(options) (this calls PipelineRunner.fromOptions(options), where the standard handlers are registered).

If you want to have the standard URL schemes registered in a context other than running a pipeline, you can explicitly call IOChannelUtils.registerStandardIOFactories(). I should note that this is not a supported API, but reaching a bit "under the hood". As such, it may change at any time.

Upvotes: 3

Related Questions