DUDANF
DUDANF

Reputation: 3000

Manipulate Nextflow variables outside of scripts

I have a process iterate_list. Process iterate_list takes a list and does something on each item in the list. When running the script, it takes two inputs. The list and the item it needs to process (which it gets as a consumer from a rabbitmq queue)

Currently, I give a python script the entire list, and it iterates over each one does the processing (as one big chunk) and returns after completion. This is fine, however, if the system restarts, it starts all over again.

I was wondering, how can I make it so that every time my python script processes a single item, it returns the item, I remove it from the list, and then pass in the new list to the process. So in case of a system restart/crash, nextflow knows where it left off and can continue from there.

import groovy.json.JsonSlurper

def jsonSlurper = new JsonSlurper()
def cfg_file = new File('/config.json')
def analysis_config = jsonSlurper.parse(cfg_file)
def cfg_json = cfg_file.getText()
def list_of_items_to_process = [] 

items = Channel.from(analysis_config.items.keySet())

for (String item : items) {
    list_of_items_to_process << item
    } 

process iterate_list{
    echo true

    input:
    list_of_items_to_process

    output:
    val 1 into typing_cur

    script:
    """
    python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
    """ 
}

process signal_completion{

    echo true

    input:
    val typing_cur

    script:
    """
    echo "all done!"
    """
}

Basically, the process "iterate_list" takes one "item" from a queue in the message broker. Process iterate_list should look something like:

process iterate_list{
    echo true

    input:
    list_of_items_to_process

    output:
    val 1 into typing_cur

    script:
    """
    python3.7 process_list_items.py ${my_queue} \'${list_of_items_to_process}\'
    list_of_items_to_process.remove(<output from python script>)
    """
}

And so for each one, it shd run, remove the item it jus processed, and restart with a new list.

initial_list = [1,2,3,4]
after_first_process_completes = [2,3,4]
and_eventually = [] <- This is when it should move on to the next process.

Upvotes: 0

Views: 1931

Answers (1)

Steve
Steve

Reputation: 54502

Looks like what you're really trying to do is manipulate a global ArrayList from within a Nextflow process. AFAIK, there's no way to do that exactly. This is what channels are for.

It's not clear if you actually need to remove any items from your list of items to process. Nextflow can already use cached results using the -resume option. So why not just pass in the full list and a single item for processing?

items = Channel.from(['foo', 'bar', 'baz'])

items.into {
    items_ch1
    items_ch2
}

process iterate_list{

    input:
    val item from items_ch1
    val list_of_items_to_process from items_ch2.collect()

    """
    python3.7 process_list_items.py "${item}" '${list_of_items_to_process}'
    """
}

I can only guess as to how your Python script uses its arguments, but if your list of items to process is just a placeholder then you may even be able to input a single element list of items to process:

items = Channel.from(['foo', 'bar', 'baz'])

process iterate_list{

    input:
    val item from items

    """
    python3.7 process_list_items.py "${item}" '[${item}]'
    """
}

Upvotes: 1

Related Questions