stefanmi
stefanmi

Reputation: 23

Nextflow: filtering channel with values from other channel

I have a multi-output channel producing as one output list of "1" and empty strings. I'd like to use these values to filter another output with the corresponding datasets (1=invalid, empty=valid). For better context:

channel.datasets = [data1, data2, data3, data4]
channel.status = ['1','1','','']

What I'd like to have is the following:

valid_datasets = [data3, data4]
invalid_datasets = [data1, data2]

I expected this to be relatively simple, but I think I'm not fully into the Nextflow logic yet.

What I tried to do was to first associate the ones and empty strings to boolean values:

invalid = channel.status.map { it == "1" }
valid = channel.status.map { it != "1" }

And then use these boolean values for filtering:

invalid_data = channel.datasets.filter(invalid)
valid_data = channel.datasets.filter(valid)

Unfortunately this didn't work. I also considered using branch but I can't find a way to use the status channel as a way to branch the datasets channel.
Note that I'd like to keep the datasets channel separate because I'll need to process datasets on their own downstream. Beforehand I tried to use tuple to combine status and datasets, but I couldn't find a way to access only datasets after filtering.

I would appreciate any suggestion or recommendations.

Upvotes: 2

Views: 923

Answers (1)

Steve
Steve

Reputation: 54502

The problem here is that these two channels do not share a matching key. They could be 'combined' using the merge operator and then filtered, but this approach is likely not what you want (emphasis mine):

In general, the use of the merge operator is discouraged. Processes and channel operators are not guaranteed to emit items in the order that they were received, due to their parallel and asynchronous nature. Therefore, if you try to merge output channels from different processes, the resulting channel may be different on each run, which will cause resumed runs to not work properly.

You should always use a matching key (e.g. sample ID) to merge multiple channels, so that they are combined in a deterministic way. For this purpose, you can use the join operator.

If the upstream processes instead produced channels with a shared key (or the keys could be associated in some way), they could be joined and branched to get what you want, for example:

workflow {

    datasets_ch = Channel.of( ['foo', 'data1'], ['bar', 'data2'], ['baz', 'data3'] )
    status_ch = Channel.of( ['bar', '1'], ['baz', ''], ['foo', '1'] )

    datasets_ch
        .join( status_ch )
        .branch { key, dataset, status ->
            valid: status
                return tuple( key, dataset )
            invalid: !status
                return tuple( key, dataset )
        }
        .set { result }

    result.valid.view { key, dataset ->
        "Valid dataset: ${dataset} (${key})"
    }
    result.invalid.view { key, dataset ->
        "Invalid dataset: ${dataset} (${key})"
    }
}

Results:

$ nextflow run main.nf 
N E X T F L O W  ~  version 23.04.1
Launching `main.nf` [dreamy_roentgen] DSL2 - revision: 959ee8cd84
Valid dataset: data2 (bar)
Invalid dataset: data3 (baz)
Valid dataset: data1 (foo)

Upvotes: 1

Related Questions