Reputation: 61
I've been working through Gnuradio tutorials in order to understand how blocks are made.
I am trying to make an interpolation block (1 input : 2 outputs) that returns the input values multiplied by a constant in both output streams. I've implemented this as a class, multiply_out_fff() that inherits from an interp_block
import numpy
from gnuradio import gr
class multiply_out_fff(gr.interp_block):
"""
docstring for block multiply_out_fff
"""
def __init__(self, multiple):
gr.interp_block.__init__(self,
name="multiply_out_fff",
in_sig=[numpy.float32],
out_sig=[numpy.float32], interp = 2)
self.multiple = multiple
def work(self, input_items, output_items):
in0 = input_items[0]
out = output_items[0]
print("the data coming in: ", in0)
print("in shape ",in0.shape)
for i in range(0,len(out)/2):
out[i] = in0[i] * self.multiple
for i in range(len(out)/2,len(out)):
out[i] = in0[i-len(out)] * self.multiple
print("the data going out: ", out)
print("out shape ", out.shape)
return len(output_items[0])
I've written a test for this block, and have managed to get it to pass, but not in the way I originally thought would work
from gnuradio import gr, gr_unittest
from gnuradio import blocks
from multiply_out_fff import multiply_out_fff
class qa_multiply_out_fff (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
def tearDown (self):
self.tb = None
def test_001_t (self):
# set up fg
self.data = (0,1,-2,5.5)
self.expected_result = (0,2,-4,11,0,2,-4,11)
print("---------testing----------")
print("test data: ", self.data)
#make blocks
self.src = blocks.vector_source_f(self.data)
self.mult = multiply_out_fff(2)
self.snk1 = blocks.vector_sink_f()
self.snk2 = blocks.vector_sink_f()
#make connections
self.tb.connect((self.src,0),(self.mult,0))
self.tb.connect((self.mult,0),(self.snk1,0))
#self.tb.connect((self.mult,1),(self.snk2,0))
self.tb.run ()
self.result_data1 = self.snk1.data()
#self.result_data2 = self.snk2.data()
print("The output data: ", self.result_data1)
print("--------test complete-------------\n")
# check data
self.assertFloatTuplesAlmostEqual(self.expected_result, self.result_data1)
#self.assertFloatTuplesAlmostEqual(self.expected_result, self.result_data2)
if __name__ == '__main__':
gr_unittest.run(qa_multiply_out_fff, "qa_multiply_out_fff.xml")
running the test script gives:
~/gnuradio/gr-tutorial/python$ python qa_multiply_out_fff.py
---------testing----------
('test data: ', (0, 1, -2, 5.5))
('the data coming in: ', array([ 0. , 1. , -2. , 5.5], dtype=float32))
('in shape ', (4,))
('the data going out: ', array([ 0., 2., -4., 11., 0., 2., -4., 11.], dtype=float32))
('out shape ', (8,))
('The output data: ', (0.0, 2.0, -4.0, 11.0, 0.0, 2.0, -4.0, 11.0))
--------test complete-------------
.
----------------------------------------------------------------------
Ran 1 test in 0.002s
OK
The concept I'm struggling to understand is how my block returns data. Right now, it returns both interpolation output streams in a single array with twice the length of the input. I thought it should have two independent output arrays, and allow me to have a separate sink connected to each output, like this (commented out in the test):
self.tb.connect((self.mult,0),(self.snk1,0))
self.tb.connect((self.mult,1),(self.snk2,0))
Instead, all my data flows into snk1. If I uncomment the second connection, I receive an error informing me that the self.mult block can't have more output connections.
ValueError: port number 1 exceeds max of 0
How can I make an interpolation block I can make multiple output connections with?
Upvotes: 3
Views: 2549
Reputation: 61
Looks like I misunderstood the 'interp' parameter, which specifies the ratio between input vector length and output vector length. The proper way to make multiple outputs for this block is to modify the 'out_sig' parameter to be a list of output types, as shown below in the modified multiply_out_fff class.
import numpy
from gnuradio import gr
class multiply_out_fff(gr.interp_block):
"""
docstring for block multiply_out_fff
"""
def __init__(self, multiple):
gr.interp_block.__init__(self,
name="multiply_out_fff",
in_sig=[numpy.float32],
out_sig=[numpy.float32,numpy.float32], interp = 1)
self.multiple = multiple
def work(self, input_items, output_items):
in0 = input_items[0]
out1 = output_items[0]
out2 = output_items[1]
# print("the data coming in: ", in0)
# print("in shape ",in0.shape)
for i in range(len(in0)):
out1[i] = in0[i] * self.multiple
for i in range(len(in0)):
out2[i] = in0[i] * self.multiple * 2
# print("the data going out: ", out)
# print("out shape ", out.shape)
return len(output_items[0])
Gnuradio - "If we wanted vectors, we could define those as in_sig=[(numpy.float32,4),numpy.float32]. This means there are two input ports, one for vectors of 4 floats and the other for scalars."
Upvotes: 3