Reputation: 439
I want to write the values from the key, value pairs to text files in GCS by key using FileIO
with writeDynamic()
in Apache Beam (using Java).
So far, I'm reading the data from Big Query, transforming it into a key, value pairs and then try to use FileIO with writeDynamic()
to write the values into one file per key.
PCollection<TableRow> inputRows = p.apply(BigQueryIO.readTableRows()
.from(tableSpec)
.withMethod(Method.DIRECT_READ)
.withSelectedFields(Lists.newArrayList("id", "string1", "string2", "string3", "int1")));
inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to("gs://bucket/output")
.withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));
I get the error:
The method apply
(PTransform<? super PCollection<KV<Integer,String>>,OutputT>)
in the type PCollection<KV<Integer,String>>
is not applicable for the arguments
(FileIO.Write<String,KV<String,String>>)
Upvotes: 0
Views: 3719
Reputation: 7058
There is a type mismatch. Notice that the TableRow
element is parsed into a KV<Integer, String>
in MapElements
(i.e. the key is an Integer
). Then, the write step is expecting a String
key as in .apply(FileIO.<String, KV<String, String>>writeDynamic()
:
inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
...
To avoid having to cast the key again when using .by(KV::getKey)
I would recommend to cast it as a String
before:
inputRows
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
As an example I tested this with a public table bigquery-public-data:london_bicycles.cycle_stations
where I write each bike station to a different file:
$ cat output/file-746-00000-of-00004.txt
Lots Road, West Chelsea
$ bq query --use_legacy_sql=false "SELECT name FROM \`bigquery-public-data.london_bicycles.cycle_stations\` WHERE id = 746"
Waiting on bqjob_<ID> ... (0s) Current status: DONE
+-------------------------+
| name |
+-------------------------+
| Lots Road, West Chelsea |
+-------------------------+
Full code:
package com.dataflow.samples;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
public abstract class DynamicGCSWrites {
public interface Options extends PipelineOptions {
@Validation.Required
@Description("Output Path i.e. gs://BUCKET/path/to/output/folder")
String getOutput();
void setOutput(String s);
}
public static void main(String[] args) {
DynamicGCSWrites.Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicGCSWrites.Options.class);
Pipeline p = Pipeline.create(options);
String output = options.getOutput();
PCollection<TableRow> inputRows = p
.apply(BigQueryIO.readTableRows()
.from("bigquery-public-data:london_bicycles.cycle_stations")
.withMethod(Method.DIRECT_READ)
.withSelectedFields(Lists.newArrayList("id", "name")));
inputRows
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(output)
.withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));
p.run().waitUntilFinish();
}
}
Upvotes: 1