BenFradet
BenFradet

Reputation: 453

Dataflow job doesn't consume from PubSub when launched from template

I currently have a job which outputs the contents of a pubsub topic to a cloud storage folder which works fine if I launch the jar directly.

However, whenever I try to launch the job using the template I uploaded, no messages go through the pipeline.

It's very similar to the Google provided template except it accepts a subscription instead of a topic.

Here's my configuration:

trait Options extends PipelineOptions with StreamingOptions {
  @Description("The Cloud Pub/Sub subscription to read from")
  @Default.String("projects/project/subscriptions/subscription")
  def getInputSubscription: String
  def setInputSubscription(value: String): Unit

  @Description("The Cloud Storage directory to output files to, ends with /")
  @Default.String("gs://tmp/")
  def getOutputDirectory: String
  def setOutputDirectory(value: String): Unit

  @Description("The Cloud Storage prefix to output files to")
  @Default.String("subscription-")
  def getOutputFilenamePrefix: String
  def setOutputFilenamePrefix(value: String): Unit

  @Description("The shard template which will be part of the filenames")
  @Default.String("-W-P-SSSSS-of-NNNNN")
  def getShardTemplate: String
  def setShardTemplate(value: String): Unit

  @Description("The suffix of the filenames written out")
  @Default.String(".txt")
  def getOutputFilenameSuffix: String
  def setOutputFilenameSuffix(value: String): Unit

  @Description("The window duration in minutes, defaults to 5")
  @Default.Integer(5)
  def getWindowDuration: Int
  def setWindowDuration(value: Int): Unit

  @Description("The compression used (gzip, bz2 or none), bz2 can't be loaded into BigQuery")
  @Default.String("none")
  def getCompression: String
  def setCompression(value: String): Unit

  @Description("The maximum number of output shards produced when writing")
  @Default.Integer(1)
  def getNumShards: Int
  def setNumShards(value: Int): Unit
}

and here's how I launch the template:

   gcloud dataflow jobs run storage \
     --gcs-location gs://bucket/templates/Storage \
     --parameters runner=DataflowRunner,project=project,streaming=true,inputSubscription=projects/project/subscriptions/sub,outputDirectory=gs://bucket/

Here's how I launch the job without the template:

./storage \
  --runner=DataFlowRunner \
  --project=project \
  --streaming=true \
  --gcpTempLocation=gs://tmp-bucket/ \
  --inputSubscription=projects/project/subscriptions/sub  \
  --outputDirectory=gs://bucket/

Upvotes: 1

Views: 801

Answers (1)

Guillermo Cacheda
Guillermo Cacheda

Reputation: 2232

As @GuillemXercavins comment stated, the parameters have to use the ValueProvider interface as their type. This will allow the pipeline options to be set or used at runtime, which is what caused the issue.

It's worth pointing out, as you already did in a comment, that ValueProvider seems to be unsupported in Scio.


Edit:

Scio example provided by @BenFradet in comment below.

Upvotes: 1

Related Questions