Malcolm Rest
Malcolm Rest

Reputation: 103

Manipulating audio buffers in real time - Python 3.7

Using "Sound device" library I built a python 3.7 program that receives a buffer (512 samples) from an audio device, processes it and sends out to the same device. The "audio device" is an audio card, so one can connect a microphone, process the input and send it out to speakers after processing - in realtime.

The process is stereophonic (two channels) and I'm trying to make it more efficient. It is a simple low_pass filter.

Receive buffer: 512 samples in the shape:

[
[XLeft1 , XRight1]
[XLeft2 , XRight2]
...
[XLeft512 , XRight512]
]

Process: Has to be sample by sample, like so: [ Xleft1, XLeft2 , ... , XLeft512 ] and then the same for Right channel.

Out Buffer Has to be the same as in buffer. So here I need to do some data transforms which I'm trying to minimize.

What is the most CPU efficient way to accomplish this ?

The code I'm using now:

def do_process(self,indata):
  Y=[]
  for i in range(0,len(indata[0])):                    #Two channels for stereo
      v=indata[:,i]# i represent number of channel     #v is the current channel 
      if i ==0:
       current_chunk_filter=self.low_pass1.do_calculation(v)   
      if i==1:
       current_chunk_filter=self.low_pass2.do_calculation(v)
      Y.insert(i,current_chunk_filter)
      
  outdata=list(map(list, zip(*Y)))# transpose the list
  dimensioned_numpy_array = np.reshape(outdata, (int(len(outdata)), 2)) #Makes the list vertical
  return   dimensioned_numpy_array 

How can I avoid (or improve efficiency) of going from list, transpose, reshape, etc. ?

Upvotes: 2

Views: 1722

Answers (1)

Jon Nordby
Jon Nordby

Reputation: 6299

Your output chunks do not change size, and only one exists at a time. So pre-allocate it and then put the output data directly into the correct location.

Something like:

channels = 2
samples = 512
out = numpy.zeros(shape=(samples, channels))
filters = [self.low_pass1, self.low_pass2]

def do_process(in, out):
  
  for channel in range(in.shape[1]):
     out[:, channel] = filter[channel].do_calculation(in[:, channel])


Upvotes: 1

Related Questions