James S
James S

Reputation: 3374

Finding a series of patterns within a data stream

(This is in in Python, and code would be great, but I'm primarily interested in the algorithm.)

I'm monitoring an audio stream (PyAudio) and looking for a series of 5 pops (see the bottom for a visualization). I'm read()ing the stream and getting the RMS value for the block that I've just read (similar to this question). My problem is that I'm not looking for a single event, but instead a series of events (pops) that have some characteristics but aren't nearly as boolean as I'd like. What's the most straightforward (and performant) way to detect these five pops?

The RMS function gives me a stream like this:

0.000580998485254, 0.00045098391298, 0.00751436443973, 0.002733730043, 0.00160775708652, 0.000847808804511

It looks a bit more useful if I round (a similar stream) for you:

0.001, 0.001, 0.018, 0.007, 0.003, 0.001, 0.001

You can see the pop in item 3, and presumably as it quiets down in item 4, and maybe the tail end was during a fraction of item 5.

I want to detect 5 of those in a row.

My naive approach is to: a) define what a pop is: Block's RMS is over .002. For at least 2 blocks but no more than 4 blocks. Started with silence and ends with silence.

Additionally, I'm tempted to define what silence is (to ignore the not quite loud but not quite silent blocks, but I'm not sure this makes more sense then considering 'pop' to be boolean).

b) Then have a state machine that keeps track of a bunch of variables and has a bunch of if statements. Like:

while True:
  is_pop = isRMSAmplitudeLoudEnoughToBeAPop(stream.read())

  if is_pop:
    if state == 'pop':
      #continuation of a pop (or maybe this continuation means
      #that it's too long to be a pop
      if num_pop_blocks <= MAX_POP_RECORDS:
        num_pop_blocks += 1
      else:
        # too long to be a pop
        state = 'waiting'
        num_sequential_pops = 0
    else if state == 'silence':
      #possible beginning of a pop
      state = 'pop'
      num_pop_blocks += 1
      num_silence_blocks = 0
  else:
    #silence
    if state = 'pop':
      #we just transitioned from pop to silence
      num_sequential_pops += 1

      if num_sequential_pops == 5:
        # we did it
        state = 'waiting'
        num_sequential_pops = 0
        num_silence_blocks = 0

        fivePopsCallback()
    else if state = 'silence':
      if num_silence_blocks >= MAX_SILENCE_BLOCKS:
        #now we're just waiting
        state = 'waiting'
        num_silence_blocks = 0
        num_sequential_pops = 0

That code is not at all complete (and might have a bug or two), but illustrates my line of thinking. It's certainly more complex than I'd like it to be, which is why I'm asking for suggestions.

Waveform

Upvotes: 1

Views: 305

Answers (2)

James S
James S

Reputation: 3374

I ended up with what, to me, feels like a naive approach with an ongoing loop and a few variables to maintain and transition to new states. It occurred to me after finishing, though, that I should have explored hotword detection because 5 consecutive clicks are basically a hotword. And they have a pattern that I have to look for.

Anyways, here's my code:

POP_MIN_MS = 50
POP_MAX_MS = 150

POP_GAP_MIN_MS = 50
POP_GAP_MAX_MS = 200

POP_BORDER_MIN_MS = 500

assert POP_BORDER_MIN_MS > POP_GAP_MAX_MS

POP_RMS_THRESHOLD_MIN = 100

FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100 # Sampling Rate -- frames per second
INPUT_BLOCK_TIME_MS = 50
INPUT_FRAMES_PER_BLOCK = int(RATE*INPUT_BLOCK_TIME_MS/1000)

POP_MIN_BLOCKS = POP_MIN_MS / INPUT_BLOCK_TIME_MS
POP_MAX_BLOCKS = POP_MAX_MS / INPUT_BLOCK_TIME_MS

POP_GAP_MIN_BLOCKS = POP_GAP_MIN_MS / INPUT_BLOCK_TIME_MS
POP_GAP_MAX_BLOCKS = POP_GAP_MAX_MS / INPUT_BLOCK_TIME_MS

POP_BORDER_MIN_BLOCKS = POP_BORDER_MIN_MS / INPUT_BLOCK_TIME_MS


def listen(self):
    pops = 0
    sequential_loud_blocks = 0
    sequential_notloud_blocks = 0

    stream = self.pa.open(
      format=FORMAT,
      channels=CHANNELS,
      rate=RATE,
      input=True,
      frames_per_buffer=INPUT_FRAMES_PER_BLOCK
    )

    states = {
      'PENDING': 1,
      'POPPING': 2,
      'ENDING': 3,
    }

    state = states['PENDING']

    while True:
      amp = audioop.rms(stream.read(INPUT_FRAMES_PER_BLOCK), 2)

      is_loud = (amp >= POP_RMS_THRESHOLD_MIN)

      if state == states['PENDING']:
        if is_loud:
          # Only switch to POPPING if it's been quiet for at least the border
          #   period. Otherwise stay in PENDING.
          if sequential_notloud_blocks >= POP_BORDER_MIN_BLOCKS:
            state = states['POPPING']
            sequential_loud_blocks = 1

          # If it's now loud then reset the # of notloud blocks
          sequential_notloud_blocks = 0
        else:
          sequential_notloud_blocks += 1

      elif state == states['POPPING']:

        if is_loud:
          sequential_loud_blocks += 1
          # TODO: Is this necessary?
          sequential_notloud_blocks = 0

          if sequential_loud_blocks > POP_MAX_BLOCKS:
            # it's been loud for too long; this isn't a pop
            state = states['PENDING']
            pops = 0
            #print "loud too long"
            # since it has been loud and remains loud then no reason to reset
            #   the notloud_blocks count

        else:
          # not loud
          if sequential_loud_blocks:
            # just transitioned from loud. was that a pop?
            # we know it wasn't too long, or we would have transitioned to
            #   PENDING during the pop
            if sequential_loud_blocks < POP_MIN_BLOCKS:
              # wasn't long enough
              # go to PENDING
              state = states['PENDING']
              pops = 0
              #print "not loud long enough"
            else:
              # just right
              pops += 1
              logging.debug("POP #%s", pops)

            sequential_loud_blocks = 0
            sequential_notloud_blocks += 1

          else:
            # it has been quiet. and it's still quiet
            sequential_notloud_blocks += 1

            if sequential_notloud_blocks > POP_GAP_MAX_BLOCKS:
              # it was quiet for too long
              # we're no longer popping, but we don't know if this is the
              #   border at the end
              state = states['ENDING']

      elif state == states['ENDING']:
        if is_loud:
          # a loud block before the required border gap. reset
          # since there wasn't a gap, this couldn't be a valid pop anyways
          #   so just go back to PENDING and let it monitor for the border
          sequential_loud_blocks = 1
          sequential_notloud_blocks = 0
          pops = 0

          state = states['PENDING']
        else:
          sequential_notloud_blocks += 1

          # Is the border time (500 ms right now) enough of a delay?
          if sequential_notloud_blocks >= POP_BORDER_MIN_BLOCKS:
            # that's a bingo!
            if pops == 5:

              stream.stop_stream()

              # assume that starting now the channel is not silent
              start_time = time.time()


              print ">>>>> 5 POPS"

              elapsed = time.time() - start_time

              #time.time() may return fractions of a second, which is ideal    
              stream.start_stream()

              # do whateve we need to do

            state = states['PENDING']
            pops = 0

It needs some formal testing. I found an issue just last night in which it wasn't resetting itself after a pop and then too-long quiet. My plan is to refactor and then feed it a stream of simulated RMS' (e.g., (0, 0, 0, 500, 200, 0, 200, 0, ...)) and ensure it detects (or doesn't detect) appropriately.

Upvotes: 1

Paddy3118
Paddy3118

Reputation: 4772

You might want to compute the simple moving average of the last P points, where P ~= 4 and plot the result together with your raw input data.

You could then use the maxima of the smoothed average as a pop. Define a maximum interval in which to see five pops and that could be what your after.

Adjust P for best fit.

I wouldn't be surprised if there wasn't already a Python module for this, but I haven't looked.

Upvotes: 1

Related Questions