Reputation: 1769
Based on the WordCount Example, I am trying to read my own json data (instead of the shakespear txts).
I am running the pipeline with:
mvn compile exec:java -Dexec.mainClass=myPkg.myClass -Dexec.args=" \
--project=myProj \
--stagingLocation=gs://myBkt/stage \
--runner=BlockingDataflowPipelineRunner \
--output=gs://myBkt/output/out \
--defaultWorkerLogLevel=DEBUG"
the output from the console is as follows:
<date> com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 68 files. Enable logging at DEBUG level to see which files will be staged.
<date> myPkg$GroupPublished apply
<date> myPkg$GroupPublished apply
INFO: GroupPublished/JsonToDatePosPlatKeyFn.out [PCollection]
<date> myPkg main
main
static void main(String[] args) {
...
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadJson").from(options.getInputFile()))
.apply(new GroupPublished())
.apply(ParDo.of(new FormatAsStringFn()))
.apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
}
GroupPublished transformation
static class GroupPublished extends PTransform<PCollection<String>,
PCollection<KV<DatePosPlatKey, Long>>> {
@Override
public PCollection<KV<DatePosPlatKey, Long>> apply(PCollection<String> lines) {
PCollection<DatePosPlatKey> keyList
= lines.apply(ParDo.of(new JsonToDatePosPlatKeyFn()));
PCollection<KV<DatePosPlatKey, Long>> keysCounted =
keyList.apply(Count.<DatePosPlatKey>perElement());
return keysCounted;
}
}
json row processing
static class JsonToDatePosPlatKeyFn extends DoFn<String, DatePosPlatKey>{
@Override
public void processElement(ProcessContext c) throws Exception {
JsonNode root = mapper.readTree(c.element());
for (JsonNode jsonFact : root) {
DatePosPlatKey key = new DatePosPlatKey(...construct...);
...manipulate...
c.output(key);
}
}
}
data class
@DefaultCoder(AvroCoder.class)
public static class DatePosPlatKey { ... }
stuff I've checked so far:
defaultWorkerLogLevel
doesn't seem to make any difference to the console output{...}\n{...}\n...
how can I better debug a complete lack of data? can you see what I've done wrong?
Upvotes: 0
Views: 244
Reputation: 17913
Upon offline discussion it turned out the code was missing a call to p.run()
, so the pipeline was only constructed but not executed.
Upvotes: 0