Sam
Sam

Reputation: 61

GnuRadio python interpolation blocks

I've been working through Gnuradio tutorials in order to understand how blocks are made.

I am trying to make an interpolation block (1 input : 2 outputs) that returns the input values multiplied by a constant in both output streams. I've implemented this as a class, multiply_out_fff() that inherits from an interp_block

import numpy
from gnuradio import gr

class multiply_out_fff(gr.interp_block):
    """ 
    docstring for block multiply_out_fff
    """
    def __init__(self, multiple):
        gr.interp_block.__init__(self,
            name="multiply_out_fff",
            in_sig=[numpy.float32],
            out_sig=[numpy.float32], interp = 2)
        self.multiple = multiple


    def work(self, input_items, output_items):
        in0 = input_items[0]
        out = output_items[0]

        print("the data coming in: ", in0)
        print("in shape ",in0.shape)

        for i in range(0,len(out)/2):
          out[i] = in0[i] * self.multiple
        for i in range(len(out)/2,len(out)):
          out[i] = in0[i-len(out)] * self.multiple

        print("the data going out: ", out)
        print("out shape ", out.shape)

        return len(output_items[0])

I've written a test for this block, and have managed to get it to pass, but not in the way I originally thought would work

from gnuradio import gr, gr_unittest
from gnuradio import blocks
from multiply_out_fff import multiply_out_fff

class qa_multiply_out_fff (gr_unittest.TestCase):

    def setUp (self):
        self.tb = gr.top_block ()

    def tearDown (self):
        self.tb = None

    def test_001_t (self):
        # set up fg
        self.data = (0,1,-2,5.5)
        self.expected_result = (0,2,-4,11,0,2,-4,11)

        print("---------testing----------")
        print("test data: ", self.data)

        #make blocks
        self.src = blocks.vector_source_f(self.data)
        self.mult = multiply_out_fff(2)
        self.snk1 = blocks.vector_sink_f()
        self.snk2 = blocks.vector_sink_f()

        #make connections
        self.tb.connect((self.src,0),(self.mult,0))
        self.tb.connect((self.mult,0),(self.snk1,0))
        #self.tb.connect((self.mult,1),(self.snk2,0))
        self.tb.run ()

        self.result_data1 = self.snk1.data()
        #self.result_data2 = self.snk2.data()

        print("The output data: ", self.result_data1)
        print("--------test complete-------------\n")

        # check data
        self.assertFloatTuplesAlmostEqual(self.expected_result, self.result_data1)
        #self.assertFloatTuplesAlmostEqual(self.expected_result, self.result_data2)


if __name__ == '__main__':
    gr_unittest.run(qa_multiply_out_fff, "qa_multiply_out_fff.xml")

running the test script gives:

 ~/gnuradio/gr-tutorial/python$ python qa_multiply_out_fff.py 
---------testing----------
('test data: ', (0, 1, -2, 5.5))
('the data coming in: ', array([ 0. ,  1. , -2. ,  5.5], dtype=float32))
('in shape ', (4,))
('the data going out: ', array([  0.,   2.,  -4.,  11.,   0.,   2.,  -4.,  11.], dtype=float32))
('out shape ', (8,))
('The output data: ', (0.0, 2.0, -4.0, 11.0, 0.0, 2.0, -4.0, 11.0))
--------test complete-------------

.
----------------------------------------------------------------------
Ran 1 test in 0.002s

OK

The concept I'm struggling to understand is how my block returns data. Right now, it returns both interpolation output streams in a single array with twice the length of the input. I thought it should have two independent output arrays, and allow me to have a separate sink connected to each output, like this (commented out in the test):

    self.tb.connect((self.mult,0),(self.snk1,0))
    self.tb.connect((self.mult,1),(self.snk2,0))

Instead, all my data flows into snk1. If I uncomment the second connection, I receive an error informing me that the self.mult block can't have more output connections.

ValueError: port number 1 exceeds max of 0

How can I make an interpolation block I can make multiple output connections with?

Upvotes: 3

Views: 2549

Answers (1)

Sam
Sam

Reputation: 61

Looks like I misunderstood the 'interp' parameter, which specifies the ratio between input vector length and output vector length. The proper way to make multiple outputs for this block is to modify the 'out_sig' parameter to be a list of output types, as shown below in the modified multiply_out_fff class.

import numpy
from gnuradio import gr

class multiply_out_fff(gr.interp_block):
    """ 
    docstring for block multiply_out_fff
    """
    def __init__(self, multiple):
        gr.interp_block.__init__(self,
            name="multiply_out_fff",
            in_sig=[numpy.float32],
            out_sig=[numpy.float32,numpy.float32], interp = 1)
        self.multiple = multiple


    def work(self, input_items, output_items):
        in0 = input_items[0]
        out1 = output_items[0]
        out2 = output_items[1]    
#        print("the data coming in: ", in0)
#        print("in shape ",in0.shape)

        for i in range(len(in0)):
          out1[i] = in0[i] * self.multiple

        for i in range(len(in0)):
          out2[i] = in0[i] * self.multiple * 2 

#        print("the data going out: ", out)
#        print("out shape ", out.shape)

        return len(output_items[0])

Gnuradio Reference

Gnuradio - "If we wanted vectors, we could define those as in_sig=[(numpy.float32,4),numpy.float32]. This means there are two input ports, one for vectors of 4 floats and the other for scalars."

Upvotes: 3

Related Questions