basilikum
basilikum

Reputation: 10528

processing an audio stream with python (resampling)

I have a python script that receives chunks of binary raw audio data and I would like to change the sample rate of those chunks to 16000 and then pipe them to another component.

I tried my luck with audiotools but without success:

# f is a filelike FIFO buffer
reader = PCMFileReader(f, 44100, 1, 1, 16)
conv = PCMConverter(reader, 16000, 1, 1, 16)

Then I just write to the buffer anytime, I get a new chunk:

f.write(msg)

And read from the buffer in another thread:

while not reader.file.closed:
    fl = conf.read(10)
    chunk = fl.to_bytes(False, True)

The problem is that I get this value error, which seems to come from a "samplerate.c" library:

ValueError: SRC_DATA->data_out is NULL

This error only occurs with resampling. If I turn off that step, then everything works fine and I get playable audio.

Therefore my question: What would be a good tool for this task? And if audiotools turns out to be the right answer, how do I do that correctly.

Upvotes: 1

Views: 2750

Answers (1)

Oleg Kokorin
Oleg Kokorin

Reputation: 2672

here is a simplified resampler code

dataFormat is a number of bytes per sample in the stream, ex: stereo 16 bit would be = 4, original_samples is a source bin string size, desired_samples is a desired bit string size, 16KHz->44K1Hz ex: original = 160 but desired = 441, pcm is a source bin string, return is resampled bin string) :

def resampleSimplified(pcm, desired_samples, original_samples, dataFormat):

    samples_to_pad = desired_samples - original_samples

    q, r = divmod(desired_samples, original_samples)
    times_to_pad_up = q + int(bool(r))
    times_to_pad_down = q

    pcmList = [pcm[i:i+dataFormat] for i in range(0, len(pcm), dataFormat)]

    if samples_to_pad > 0:
        # extending pcm times_to_pad times
        pcmListPadded = list(itertools.chain.from_iterable(
            itertools.repeat(x, times_to_pad_up) for x in pcmList)
            )
    else:
        # shrinking pcm times_to_pad times
        if times_to_pad_down > 0:
            pcmListPadded = pcmList[::(times_to_pad_down)]
        else:
            pcmListPadded = pcmList

    padded_pcm = ''.join(pcmListPadded[:desired_samples])

    return padded_pcm

Upvotes: 2

Related Questions