Reputation: 4289
I am receiving messages via pubsub. Each message should be stored in its own file in GCS as rough data, execute some processing on the data, and then save it to big query- having the file name in the data.
Data should be seen immediately in BQ after received.
Example:
data published to pubsub : {a:1, b:2}
data saved to GCS file UUID: A1F432
data processing : {a:1, b:2} ->
{a:11, b: 22} ->
{fileName: A1F432, data: {a:11, b: 22}}
data in BQ : {fileName: A1F432, data: {a:11, b: 22}}
the idea is that the processed data is stored in BQ having a link to the Rough data stored in GCS.
Here is my code.
public class BotPipline {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(MY_PROJECT);
options.setStagingLocation(MY_STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline.apply(PubsubIO.Read.subscription(MY_SUBSCRIBTION));
String uuid = ...;
input.apply(TextIO.Write.to(MY_STORAGE_LOCATION + uuid));
input
.apply(ParDo.of(new DoFn<String,String>(){..}).named("updateJsonAndInsertUUID"))
.apply(convertToTableRow(...)).named("convertJsonStringToTableRow"))
.apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
);
pipeline.run();
}
My code doesn't run since writing unbounded collections in TextIO.Write is not supported. After some research I found I have a few options to workaround this issue:
I have no idea how to start. Can anyone provide me code for one of the following solutions, or give me a different solution which matches my case. (providing the code)
Upvotes: 1
Views: 769
Reputation: 17913
The best option is #2 - a simple DoFn
that creates the files according to your data. Something like this:
class CreateFileFn extends DoFn<String, Void> {
@ProcessElement
public void process(ProcessContext c) throws IOException {
String filename = ...generate filename from element...;
try (WritableByteChannel channel = FileSystems.create(
FileSystems.matchNewResource(filename, false),
"application/text-plain")) {
OutputStream out = Channels.newOutputStream(channel);
...write the element to out...
}
}
}
Upvotes: 3