user2998365
user2998365

Reputation: 11

How to: TBB node with multiple asynchronous inputs and multiple outputs

I am new to Threading Building Blocks (TBB); I would need to implement the following logic with TBB nodes:

A node of type N receives two inputs; for instance: 1. std::vector // data 2. bool // flag

These inputs come asynchronously.

If the input is of type 1, process the data owned by the node of type N to produce two outputs, for instance: a. std::vector b. int

If the input is of type 2, process the data owned by the node of type N to produce one output, say a std::vector.

I have been trying to formulate the input part using a tbb::flow::or_node, and the output part using tbb::flow::multifunction_node.

If there is only one input and multiple outputs, this logic can be written with tbb::flow::multifunction_node (I tested, it works). If there is one output, and multiple inputs, I found example of code illustrating solutions. However, it is not clear to me how the case of multiple asynchronous inputs and multiple outputs can be implemented with the TBB framework. Suggestions welcome.

Upvotes: 1

Views: 1444

Answers (1)

cahuson
cahuson

Reputation: 846

You should be able to do what you want with the current implementation of or_node. (We are re-designing the output of the or_node to make it more friendly, but we need input from users like you on issues with the or_node Community Preview Feature.)

One thing to remember is to turn on the CPF when you are compiling code with the or_node. The switch is -DTBB_PREVIEW_GRAPH_NODES=1 .

# define TBB_PREVIEW_GRAPH_NODES 1  // necessary to turn on the or_node community Preview Feature.
#include "tbb/flow_graph.h"
#include <vector>

using namespace tbb::flow;

// The output format of the or_node is a struct that contains
//   1. the index of the input that the message appeared on, and
//   2. a tuple, the (i-1)th element of which is the message received

typedef or_node<tuple<std::vector<double>, bool> > my_or_node_type;

// it wasn't clear from the description if you wanted to differentiate between the vectors output with
// an input of type 1. or type 2.  If you need to do that you can add an extra output port to the multifunction_node.
typedef multifunction_node<my_or_node_type::output_type, tuple<std::vector<double>, int> > my_mf_node_type;

struct mf_node_body {
    void operator()(const my_or_node_type::output_type &in, my_mf_node_type::output_ports_type &op) {
        switch(in.indx) {
        case 0: {
                // do the operation for the first input (the std::vector) The vector will be in
                // get<0>(in.result).  Remember you are copying vectors here, so if you have big
                // vectors you will probably want to do some buffer management on your own and
                // pass refs to the vector instead.
            }
            break;
        case 1: {
                // do the operation signaled by the second input (the bool.)  The value of the
                // input is in get<1>(in.result).
            }
            break;
        }
    }
};


main() {
    graph g;
    my_or_node_type multi_in(g);
    my_mf_node_type multi_out(g, unlimited, mf_node_body());

    // if the vector-producing node is called vpn, you attach it to the 0-th input of the or_node with
    //     make_edge(vpn, input_port<0>(multi_in));
    //
    // the bool-producing node bn can be attached similarly:
    //     make_edge(bn, input_port<1>(multi_in);
    //
    // attach the multi-in to the multi-out:
    //     make_edge(multi_in, multi_out);
    //
    // attach the vector consumer node vcn
    //     make_edge(output_port<0>(multi_out), vcn);
    //
    // attach the integer output to the int consuming node icn
    //     make_edge(output_port<1>(multi_out), icn);
    // 
    // start up the graph and make sure to do a wait_for_all() at the end.
}

Remember that the multifunction_node body is invoked in parallel, so the work it does should not have race conditions (unless you want race conditions for some reason.) You can make the node body execute serially by constructing it with serial instead of unlimited. And the only way to ensure you can safely destroy the graph is to make sure no tasks are executing any of the nodes. The best way to do this is to do a g.wait_for_all().

Regards, Chris

P.S. - one addendum. If the multifunction_node is defined serial, it will have an input buffer unless you explicitly exclude it. This may change the behavior of your graph if you are not expecting the buffer to be there.

Upvotes: 3

Related Questions