mpatek
mpatek

Reputation: 820

Dynamic "Fan In" in Argo Workflows

Argo permits the dynamic generation of parallel workflow steps based on outputs from previous steps.

An example of this dynamic workflow generation is provided here: https://github.com/argoproj/argo-workflows/blob/master/examples/loops-param-result.yaml

I'm trying to create a similar workflow with a final 'fan-in' step that will read outputs from the dynamically created parallel steps. Here's a stab at it:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    - - name: write
        template: output-number
        arguments:
          parameters:
          - name: number
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"
    - - name: fan-in
        template: fan-in
        arguments:
          parameters:
          - name: numbers
            value: "{{steps.write.outputs.parameters.number}}"

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: output-number
    inputs:
      parameters:
      - name: number
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
    outputs:
      parameters:
        - name: number
          valueFrom:
            path: /tmp/number.txt

  - name: fan-in
    inputs:
      parameters:
        - name: numbers
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo received {{inputs.parameters.numbers}}"]

I'm able to submit this workflow, and it runs successfully. Unfortunately, the output of the final fan-in step looks like this:

fan-in: received {{steps.write.outputs.parameters.number}}

The value for the input numbers parameter is not being interpolated. Any ideas about how to get this working?

Upvotes: 6

Views: 6010

Answers (1)

crenshaw-dev
crenshaw-dev

Reputation: 8392

Aggregated step output parameters are accessible via steps.STEP-NAME.outputs.parameters. It's not possible to access an aggregated set of outputs for one parameter by name.

This slight change to your workflow should get you what you need:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    - - name: write
        template: output-number
        arguments:
          parameters:
          - name: number
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"
    - - name: fan-in
        template: fan-in
        arguments:
          parameters:
          - name: numbers
            value: "{{steps.write.outputs.parameters}}"

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: output-number
    inputs:
      parameters:
      - name: number
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
    outputs:
      parameters:
        - name: number
          valueFrom:
            path: /tmp/number.txt

  - name: fan-in
    inputs:
      parameters:
        - name: numbers
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo received {{inputs.parameters.numbers}}"]

The only change was to remove .number from {{steps.write.outputs.parameters.number}}.

This is the new output:

received [{number:20},{number:21},{number:22},{number:23},{number:24},{number:25},{number:26},{number:27},{number:28},{number:29},{number:30}]

Here is the GitHub issue where output parameter aggregation was discussed/created.

I've put in an enhancement proposal for accessing aggregated output parameters by name.

Upvotes: 6

Related Questions