GrozaFry
GrozaFry

Reputation: 51

How to monitor a combination of io sensors in a stateful manner

My data source emits IOT data with the following structure -

io_id,value,timestamp
232,1223,1718191205
321,671,1718191254
54,2313,1718191275
232,432,1718191315
321,983,1718191394
........

There are 2 things I want to achieve on this data using Flink.

  1. I want to monitor change in individual io_id values. Below code works fine for this purpose.

    from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext from pyflink.common.typeinfo import Types from pyflink.datastream.state import ValueStateDescriptor

    class ValueChangeMonitor(KeyedProcessFunction): def init(self): self.previous_value_state = None

     def open(self, runtime_context: RuntimeContext):
         self.previous_value_state = runtime_context.get_state(
             ValueStateDescriptor("previous_value", Types.INT())
         )
    
     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
         io_id, io_value = value
         previous_value = self.previous_value_state.value()
    
         if previous_value is not None:
             change = abs(io_value - previous_value)
             if change > 100:
                 print(f"Significant change detected for IO {io_id}: {change}")
    
         self.previous_value_state.update(io_value)
    

    def main(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1)

     data_stream = env.from_collection([
         (232,1223,1718191205)
         (321,671,1718191254),
         (54,2313,1718191275),
         (232,432,1718191315),
         (321,983,1718191394)
     ], type_info=Types.TUPLE([Types.INT(), Types.INT()]))
    
     keyed_stream = data_stream.key_by(lambda x: x[0])
     keyed_stream.process(ValueChangeMonitor()).print()
    
     env.execute("IO Value Change Monitor")
    

    if name == 'main': main()

  2. Instead of monitoring individual io_id values, I want to create a dummy io which is a combination of io sensors. eg - dummy_io (io_1==234 && io_2==423) . Now I want to monitor change in value of this dummy io built up using io's from the data. In this case when value of io_1 is 234 and value of io_2 is 423, value of dummy_io will be True or 1. In other cases it will be False or 0. I want to generate events when the value of dummy_io changes.

How can I achieve the above? Is Flink the right tool for the above use case?

Upvotes: 0

Views: 61

Answers (1)

zander_
zander_

Reputation: 31

Flink is a good tool for stream processing. Whether or not it is the right tool depends on where it is running and your/your team's proficiency with running/managing it.

To answer the question, you need to create a Class that inherits from KeyedProcessFunction (link) that will initialize the dummy IO state and then put your update logic in a class method overriding process_element.

Since you are using Python, you should check out some of the Python stream processors (Bytewax and Quix). They will fit your needs and provide an easier solution to use, run, and manage.

Upvotes: 0

Related Questions