Damien GOUYETTE
Damien GOUYETTE

Reputation: 497

Inconsistent behavior on the functioning of the dataflow templates?

When i create a dataflow template, the characteristics of Runtime parameters are not persisted in the template file. At runtime, if i try to pass a value for this parameter, i take a 400 error

I'm using Scio 0.3.2, scala 2.11.11 with apache beam (0.6).

My parameters are the following :

trait XmlImportJobParameters extends PipelineOptions {
  def getInput: ValueProvider[String]
  def setInput(value: ValueProvider[String]): Unit
}

They are registred with this code

val options = PipelineOptionsFactory.fromArgs(cmdlineArgs: _*).withValidation().as[XmlImportJobParameters](classOf[XmlImportJobParameters])
PipelineOptionsFactory.register(classOf[XmlImportJobParameters])
implicit val (sc, args) = ContextAndArgs(cmdlineArgs)

To create the template i call sbt with this parameters :

run-main jobs.XmlImportJob    --runner=DataflowRunner --project=MyProject  --templateLocation=gs://myBucket/XmlImportTemplate  --tempLocation=gs://myBucket/staging --instance=myInstance

If i pass explicitly --input, it becomes a StaticValue instead of RuntimeValue, and this time, i can see it in the template file.

The template is called from a google function watching a bucket storage (inspired from https://shinesolutions.com/2017/03/23/triggering-dataflow-pipelines-with-cloud-functions/) :

...
dataflow.projects.templates.create({
                projectId: projectId,
                resource: {
                    parameters: {
                        input: `gs://${file.bucket}/${file.name}`
                    },
                    jobName: jobs[job].name,
                    gcsPath: 'gs://MyBucket/MyTemplate'
                }
            }
...

The 400 error :

problem running dataflow template, error was: { Error: (109c1c52dc52fec7): The workflow could not be created. Causes: (109c1c52dc52fb8e): Found unexpected parameters: ['input' (perhaps you meant 'runner')] at Request._callback (/user_code/node_modules/googleapis/node_modules/google-auth-library/lib/transporters.js:85:15) at Request.self.callback (/user_code/node_modules/googleapis/node_modules/request/request.js:188:22) at emitTwo (events.js:106:13) at Request.emit (events.js:191:7) at Request.<anonymous(/user_code/node_modules/googleapis/node_modules/request/request.js:1171:10) at emitOne (events.js:96:13) at Request.emit (events.js:188:7) at IncomingMessage.<anonymous> (/user_code/node_modules/googleapis/node_modules/request/request.js:1091:12) at IncomingMessage.g (events.js:291:16) at emitNone (events.js:91:20) code: 400, errors: [ { message: '(109c1c52dc52fec7): The workflow could not be created. Causes: (109c1c52dc52fb8e): Found unexpected parameters: [\'input\' (perhaps you meant \'runner\')]', domain: 'global', reason: 'badRequest' } ] }

Same error when i try this :

gcloud beta dataflow jobs run xmlJobImport --gcs-location gs://MyBucket/MyTemplate --parameters input=gs://MyBucket/file.csv

=>

(gcloud.beta.dataflow.jobs.run) INVALID_ARGUMENT: (260a4f3f738a8ad9): The workflow could not be created. Causes: (260a4f3f738a8f96): Found unexpected parameters: ['input' (perhaps you meant 'runner'), 'projectid' (perhaps you meant 'project'), 'table' (perhaps you meant 'zone')]

The current settings are :

Current Settings:
  appName: XmlImportJob$
  autoscalingAlgorithm: THROUGHPUT_BASED
  input: RuntimeValueProvider{propertyName=input, default=null, value=null}
  instance: StaticValueProvider{value=staging}
  jobName: xml-import-job
  maxNumWorkers: 1
  network: staging
  numWorkers: 1
  optionsId: 0
  project: myProjectId
  projectid: StaticValueProvider{value=myProjectId}
  provenance: StaticValueProvider{value=ac3}
  record: StaticValueProvider{value=BIEN}
  root: StaticValueProvider{value=LISTEPA}
  runner: class org.apache.beam.runners.dataflow.DataflowRunner
  stableUniqueNames: WARNING
  streaming: false
  subnetwork: regions/europe-west1/subnetworks/net-staging
  table: StaticValueProvider{value=annonce}
  tempLocation: gs://-flux/staging/xmlImportJob/
  templateLocation: gs://-flux-templates/XmlImportTemplate
  workerMachineType: n1-standard-1
  zone: europe-west1-c

Environement

Environement

Upvotes: 4

Views: 1630

Answers (1)

rav
rav

Reputation: 3688

Coping the answer from the issue:

Scio does not currently expose ValueProvider based APIs - we now have an issue open for this #696

A working example would be something like:

object WordCount {
   def main(cmdlineArgs: Array[String]): Unit = {
     val (sc, args) = ContextAndArgs(cmdlineArgs)
     sc.customInput("input", TextIO.read().from(sc.optionsAs[XmlImportJobParameters].getInput))
       .map(_.toUpperCase)
       .saveAsTextFile(args("output"))
     sc.close()
   }
}

For the job above, to create template:

run-main com.example.WordCount --runner=DataflowRunner --project=<project> --templateLocation=gs://<template-bucket>  --tempLocation=gs://<temp-location> --output=gs://<example-of-static-arg-output>

To submit job:

gcloud beta dataflow jobs run rav-test --gcs-location=gs://<template-bucket> --parameters=input=gs://<runtime-value>

Upvotes: 4

Related Questions