Thinus Marloth
Thinus Marloth

Reputation: 51

Kafka time difference last two records, KSQL or other?

So I'm evaluating Kafka. In our use case would have to create new topics containing "time elapsed" from one event to the next, essentially since a sensor will report as "on" or "off" into Kafka. So having the timestamp, sensorname and state, create new topics with duration of the "on" and "off" state.

  1. Is that doable in KSQL, and how?
  2. Or should one really leave this to consumers or stream processors to figure out?

My data is something like this:

{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on} 

to get result

{ 2019:02:15 00:30:00, sensor1, off, 30sec }. 

Essentially have to combine states of multiple sensors to determine combined state of a machine. Hundreds if not eventually thousands of sensors in a factory

Upvotes: 5

Views: 2051

Answers (2)

Holger Brandl
Holger Brandl

Reputation: 11192

Following up on the idea from https://github.com/confluentinc/ksql/issues/2562 to use a self join, I came up with the following solution:

  1. Create the data
#kafka-topics --bootstrap-server localhost:9092  --delete --topic temptest
echo '{"temp": 3.0, "counter": 1}' | kafkacat -b localhost -t temptest
echo '{"temp": 4.0, "counter": 2}' | kafkacat -b localhost -t temptest
echo '{"temp": 6.0, "counter": 3}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.0, "counter": 4}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 6}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 5}' | kafkacat -b localhost -t temptest

Here we assume that consecutive events have a counter property already. Such a counter could also be added with ksql by simply aggregating the events counts over time.

  1. Differentiate the function
-- import the topic into ksql
CREATE STREAM temp_json (ingesttime BIGINT, row VARCHAR, temp DOUBLE, counter INTEGER) WITH (kafka_topic='temptest', value_format='JSON', KEY='counter');

--- change format to avro and repartion
CREATE STREAM temp WITH (VALUE_FORMAT='AVRO') AS SELECT temp, counter, CAST(counter AS VARCHAR) as counter_key FROM temp_json PARTITION BY counter_key;

--- create second stream with shifted counter
CREATE STREAM temp_shift AS SELECT temp, counter as counter_orig, counter+ 1 as counter from temp PARTITION BY counter;

-- join the streams by counter
CREATE STREAM temp_diff AS SELECT
  prev.temp-cur.temp as temp_difference, cur.temp as temp,  prev.temp as prev_temp, cur.counter as counter
  FROM temp cur
  LEFT JOIN temp_shift prev WITHIN 2 HOURS
  ON cur.counter = prev.counter;

Test it

ksql> SELECT * FROM temp_diff LIMIT 4;
1574321370281 | 1 | null | 3.0 | null | 1
1574321372307 | 2 | -1.0 | 4.0 | 3.0 | 2
1574321372319 | 3 | -2.0 | 6.0 | 4.0 | 3
1574321372331 | 4 | 3.0 | 3.0 | 6.0 | 4

The sensor itself is left out to keep the solution simple. However, it could be easily added by using a compound key for the partitioning as described in https://www.confluent.io/stream-processing-cookbook/ksql-recipes/creating-composite-key

Upvotes: 0

eik
eik

Reputation: 4610

This is pretty easy in Kafka Streams, so I would opt for 2.

First you have to to model your input data properly. Your example uses local time, which makes it impossible to calculate durations between two timestamps. Use something like epoch time.

Start with a source data model like

interface SensorState {
  String getId();
  Instant getTime();
  State getState();
  enum State {
    OFF,
    ON
  }
}

and a target of

interface SensorStateWithDurationX {
  SensorState getEvent();
  Duration getDuration();
}

Now that you have defined you input and output stream (but see “Data Types and Serialization”) you just need to transform the values (“Applying processors and transformers”) by simply defining a ValueTransformer.

It has to do 2 things:

  1. Check the state store for historical data for the sensor and update it with new data, if necessary

  2. When historical data is available, calculate the difference between the timestamps and emit the data together with the calculated duration

class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
  KeyValueStore<String, SensorState> store;

  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
    this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
  }

  public SensorStateWithDuration transform(SensorState sensorState) {
    // Nothing to do
    if (sensorState == null) {
      return null;
    }

    // Check for the previous state, update if necessary
    var oldState = checkAndUpdateSensorState(sensorState);

    // When we have historical data, return duration so far. Otherwise return null
    return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
  }

  public void close() {}

  /**
   * Checks the state store for historical state based on sensor ID and updates it, if necessary.
   *
   * @param sensorState The new sensor state
   * @return The old sensor state
   */
  Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
    // The Sensor ID is our index
    var index = sensorState.getId();

    // Get the historical state (might be null)
    var oldState = store.get(index);
    if (neetToUpdate(oldState, sensorState)) {
      // Update the state store to the new state
      store.put(index, sensorState);
    }
    return Optional.ofNullable(oldState);
  }

  /**
   * Check if we need to update the state in the state store.
   *
   * <p>Either we have no historical data, or the state has changed.
   *
   * @param oldState The old sensor state
   * @param sensorState The new sensor state
   * @return Flag whether we need to update
   */
  boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
    return oldState == null || oldState.getState() != sensorState.getState();
  }

  /**
   * Wrap the old state with a duration how log it lasted.
   *
   * @param oldState The state of the sensor so far
   * @param sensorState The new state of the sensor
   * @return Wrapped old state with duration
   */
  SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
    var duration = Duration.between(oldState.getTime(), sensorState.getTime());
    return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
  }
}

Putting everything together (“Connecting Processors and State Stores”) in a simple Topology:

var builder = new StreamsBuilder();

// Our state store
var storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("SensorStates"),
        Serdes.String(),
        storeSerde);

// Register the store builder
builder.addStateStore(storeBuilder);

builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
    .transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
    .to("result-topic", Produced.with(Serdes.String(), resultSerde));

var topology = builder.build();

A full application is at github.com/melsicon/kafka-sensors.

Upvotes: 2

Related Questions