Reputation: 23
I have a multi-output channel producing as one output list of "1" and empty strings. I'd like to use these values to filter another output with the corresponding datasets (1=invalid, empty=valid). For better context:
channel.datasets = [data1, data2, data3, data4]
channel.status = ['1','1','','']
What I'd like to have is the following:
valid_datasets = [data3, data4]
invalid_datasets = [data1, data2]
I expected this to be relatively simple, but I think I'm not fully into the Nextflow logic yet.
What I tried to do was to first associate the ones and empty strings to boolean values:
invalid = channel.status.map { it == "1" }
valid = channel.status.map { it != "1" }
And then use these boolean values for filtering:
invalid_data = channel.datasets.filter(invalid)
valid_data = channel.datasets.filter(valid)
Unfortunately this didn't work.
I also considered using branch
but I can't find a way to use the status
channel as a way to branch the datasets
channel.
Note that I'd like to keep the datasets
channel separate because I'll need to process datasets on their own downstream. Beforehand I tried to use tuple
to combine status
and datasets
, but I couldn't find a way to access only datasets
after filtering.
I would appreciate any suggestion or recommendations.
Upvotes: 2
Views: 923
Reputation: 54502
The problem here is that these two channels do not share a matching key. They could be 'combined' using the merge
operator and then filtered, but this approach is likely not what you want (emphasis mine):
In general, the use of the
merge
operator is discouraged. Processes and channel operators are not guaranteed to emit items in the order that they were received, due to their parallel and asynchronous nature. Therefore, if you try to merge output channels from different processes, the resulting channel may be different on each run, which will cause resumed runs to not work properly.You should always use a matching key (e.g. sample ID) to merge multiple channels, so that they are combined in a deterministic way. For this purpose, you can use the join operator.
If the upstream processes instead produced channels with a shared key (or the keys could be associated in some way), they could be joined and branched to get what you want, for example:
workflow {
datasets_ch = Channel.of( ['foo', 'data1'], ['bar', 'data2'], ['baz', 'data3'] )
status_ch = Channel.of( ['bar', '1'], ['baz', ''], ['foo', '1'] )
datasets_ch
.join( status_ch )
.branch { key, dataset, status ->
valid: status
return tuple( key, dataset )
invalid: !status
return tuple( key, dataset )
}
.set { result }
result.valid.view { key, dataset ->
"Valid dataset: ${dataset} (${key})"
}
result.invalid.view { key, dataset ->
"Invalid dataset: ${dataset} (${key})"
}
}
Results:
$ nextflow run main.nf
N E X T F L O W ~ version 23.04.1
Launching `main.nf` [dreamy_roentgen] DSL2 - revision: 959ee8cd84
Valid dataset: data2 (bar)
Invalid dataset: data3 (baz)
Valid dataset: data1 (foo)
Upvotes: 1