Reputation: 446
I have a nextflow process that looks like this
process TileImages {
input:
val run_id
val group_id
val sample_id
path image
path masks
output:
val run_id, emit: run_id
val group_id, emit: group_id
val sample_id, emit: sample_id
script:
"""
python ${projectDir}/bin/run-tile-images.py \
--image ${image} \
--masks ${masks}
"""
}
This calls a Python script that ends like this
i = 0
for wsi_patch, mask_patch in zip(wsi_patch_extractor, mask_patch_extractor):
np.save(f"{args.prefix}_wsi_{i}.npy", wsi_patch)
np.save(f"{args.prefix}_mask_{i}.npy", mask_patch)
i += 1
Rather than saving and passing on one file, this saves roughly 40 file pairs (not the same number each time). How can I write the Nextflow process such that each of these pairs is a new entry in the output queue?
I.e. the output of a single process would in fact be lots of outputs each containing the run_id, group_id and sample_id as well as one of the wsi/mask pairs that are saved out?
Upvotes: 3
Views: 1421
Reputation: 54502
One approach would be to group the files by their suffix and then use the key to collect the file pairs. Note that we can use the flatMap
operator to emit the items in the collection separately. For example:
process test {
input:
val sample_id
output:
tuple val(sample_id), path("${sample_id}_{wsi,mask}_*.npy")
"""
for i in {1..3} ; do
touch "${sample_id}_wsi_\${i}.npy"
touch "${sample_id}_mask_\${i}.npy"
done
"""
}
workflow {
samples = Channel.of( 'foo', 'bar', 'baz' )
test( samples )
test.out.flatMap { sample, files ->
files
.groupBy { it.baseName.split('_',)[-1] }
.collect { key, values ->
def mask = values.find { it.baseName.endsWith("_mask_${key}") }
def wsi = values.find { it.baseName.endsWith("_wsi_${key}") }
tuple( sample, mask, wsi )
}
}.view()
}
Results:
$ nextflow run main.nf
N E X T F L O W ~ version 23.04.1
Launching `main.nf` [scruffy_feynman] DSL2 - revision: 8d78366a9a
executor > local (3)
[d4/0acae5] process > test (3) [100%] 3 of 3 ✔
[bar, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_mask_1.npy, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_wsi_1.npy]
[bar, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_mask_2.npy, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_wsi_2.npy]
[bar, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_mask_3.npy, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_wsi_3.npy]
[foo, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_mask_1.npy, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_wsi_1.npy]
[foo, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_mask_2.npy, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_wsi_2.npy]
[foo, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_mask_3.npy, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_wsi_3.npy]
[baz, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_mask_1.npy, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_wsi_1.npy]
[baz, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_mask_2.npy, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_wsi_2.npy]
[baz, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_mask_3.npy, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_wsi_3.npy]
Upvotes: 2