Reputation: 497
When i create a dataflow template, the characteristics of Runtime parameters are not persisted in the template file. At runtime, if i try to pass a value for this parameter, i take a 400 error
I'm using Scio 0.3.2, scala 2.11.11 with apache beam (0.6).
My parameters are the following :
trait XmlImportJobParameters extends PipelineOptions {
def getInput: ValueProvider[String]
def setInput(value: ValueProvider[String]): Unit
They are registred with this code
val options = PipelineOptionsFactory.fromArgs(cmdlineArgs: _*).withValidation().as[XmlImportJobParameters](classOf[XmlImportJobParameters])
implicit val (sc, args) = ContextAndArgs(cmdlineArgs)
To create the template i call sbt with this parameters :
run-main jobs.XmlImportJob --runner=DataflowRunner --project=MyProject --templateLocation=gs://myBucket/XmlImportTemplate --tempLocation=gs://myBucket/staging --instance=myInstance
If i pass explicitly --input, it becomes a StaticValue instead of RuntimeValue, and this time, i can see it in the template file.
The template is called from a google function watching a bucket storage (inspired from :
projectId: projectId,
resource: {
parameters: {
input: `gs://${file.bucket}/${}`
jobName: jobs[job].name,
gcsPath: 'gs://MyBucket/MyTemplate'
The 400 error :
problem running dataflow template, error was: { Error: (109c1c52dc52fec7): The workflow could not be created. Causes: (109c1c52dc52fb8e): Found unexpected parameters: ['input' (perhaps you meant 'runner')] at Request._callback (/user_code/node_modules/googleapis/node_modules/google-auth-library/lib/transporters.js:85:15) at Request.self.callback (/user_code/node_modules/googleapis/node_modules/request/request.js:188:22) at emitTwo (events.js:106:13) at Request.emit (events.js:191:7) at Request.<anonymous(/user_code/node_modules/googleapis/node_modules/request/request.js:1171:10) at emitOne (events.js:96:13) at Request.emit (events.js:188:7) at IncomingMessage.<anonymous> (/user_code/node_modules/googleapis/node_modules/request/request.js:1091:12) at IncomingMessage.g (events.js:291:16) at emitNone (events.js:91:20) code: 400, errors: [ { message: '(109c1c52dc52fec7): The workflow could not be created. Causes: (109c1c52dc52fb8e): Found unexpected parameters: [\'input\' (perhaps you meant \'runner\')]', domain: 'global', reason: 'badRequest' } ] }
Same error when i try this :
gcloud beta dataflow jobs run xmlJobImport --gcs-location gs://MyBucket/MyTemplate --parameters input=gs://MyBucket/file.csv
( INVALID_ARGUMENT: (260a4f3f738a8ad9): The workflow could not be created. Causes: (260a4f3f738a8f96): Found unexpected parameters: ['input' (perhaps you meant 'runner'), 'projectid' (perhaps you meant 'project'), 'table' (perhaps you meant 'zone')]
The current settings are :
Current Settings:
appName: XmlImportJob$
autoscalingAlgorithm: THROUGHPUT_BASED
input: RuntimeValueProvider{propertyName=input, default=null, value=null}
instance: StaticValueProvider{value=staging}
jobName: xml-import-job
maxNumWorkers: 1
network: staging
numWorkers: 1
optionsId: 0
project: myProjectId
projectid: StaticValueProvider{value=myProjectId}
provenance: StaticValueProvider{value=ac3}
record: StaticValueProvider{value=BIEN}
root: StaticValueProvider{value=LISTEPA}
runner: class org.apache.beam.runners.dataflow.DataflowRunner
stableUniqueNames: WARNING
streaming: false
subnetwork: regions/europe-west1/subnetworks/net-staging
table: StaticValueProvider{value=annonce}
tempLocation: gs://-flux/staging/xmlImportJob/
templateLocation: gs://-flux-templates/XmlImportTemplate
workerMachineType: n1-standard-1
zone: europe-west1-c
Upvotes: 4
Views: 1630
Reputation: 3688
Coping the answer from the issue:
Scio does not currently expose ValueProvider
based APIs - we now have an issue open for this #696
A working example would be something like:
object WordCount {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
For the job above, to create template:
run-main com.example.WordCount --runner=DataflowRunner --project=<project> --templateLocation=gs://<template-bucket> --tempLocation=gs://<temp-location> --output=gs://<example-of-static-arg-output>
To submit job:
gcloud beta dataflow jobs run rav-test --gcs-location=gs://<template-bucket> --parameters=input=gs://<runtime-value>
Upvotes: 4