Reputation: 51
So I'm evaluating Kafka. In our use case would have to create new topics containing "time elapsed" from one event to the next, essentially since a sensor will report as "on" or "off" into Kafka. So having the timestamp, sensorname and state, create new topics with duration of the "on" and "off" state.
My data is something like this:
{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on}
to get result
{ 2019:02:15 00:30:00, sensor1, off, 30sec }.
Essentially have to combine states of multiple sensors to determine combined state of a machine. Hundreds if not eventually thousands of sensors in a factory
Upvotes: 5
Views: 2051
Reputation: 11192
Following up on the idea from https://github.com/confluentinc/ksql/issues/2562 to use a self join, I came up with the following solution:
#kafka-topics --bootstrap-server localhost:9092 --delete --topic temptest
echo '{"temp": 3.0, "counter": 1}' | kafkacat -b localhost -t temptest
echo '{"temp": 4.0, "counter": 2}' | kafkacat -b localhost -t temptest
echo '{"temp": 6.0, "counter": 3}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.0, "counter": 4}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 6}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 5}' | kafkacat -b localhost -t temptest
Here we assume that consecutive events have a counter property already. Such a counter could also be added with ksql by simply aggregating the events counts over time.
-- import the topic into ksql
CREATE STREAM temp_json (ingesttime BIGINT, row VARCHAR, temp DOUBLE, counter INTEGER) WITH (kafka_topic='temptest', value_format='JSON', KEY='counter');
--- change format to avro and repartion
CREATE STREAM temp WITH (VALUE_FORMAT='AVRO') AS SELECT temp, counter, CAST(counter AS VARCHAR) as counter_key FROM temp_json PARTITION BY counter_key;
--- create second stream with shifted counter
CREATE STREAM temp_shift AS SELECT temp, counter as counter_orig, counter+ 1 as counter from temp PARTITION BY counter;
-- join the streams by counter
CREATE STREAM temp_diff AS SELECT
prev.temp-cur.temp as temp_difference, cur.temp as temp, prev.temp as prev_temp, cur.counter as counter
FROM temp cur
LEFT JOIN temp_shift prev WITHIN 2 HOURS
ON cur.counter = prev.counter;
Test it
ksql> SELECT * FROM temp_diff LIMIT 4;
1574321370281 | 1 | null | 3.0 | null | 1
1574321372307 | 2 | -1.0 | 4.0 | 3.0 | 2
1574321372319 | 3 | -2.0 | 6.0 | 4.0 | 3
1574321372331 | 4 | 3.0 | 3.0 | 6.0 | 4
The sensor itself is left out to keep the solution simple. However, it could be easily added by using a compound key for the partitioning as described in https://www.confluent.io/stream-processing-cookbook/ksql-recipes/creating-composite-key
Upvotes: 0
Reputation: 4610
This is pretty easy in Kafka Streams, so I would opt for 2.
First you have to to model your input data properly. Your example uses local time, which makes it impossible to calculate durations between two timestamps. Use something like epoch time.
Start with a source data model like
interface SensorState {
String getId();
Instant getTime();
State getState();
enum State {
OFF,
ON
}
}
and a target of
interface SensorStateWithDurationX {
SensorState getEvent();
Duration getDuration();
}
Now that you have defined you input and output stream (but see “Data Types and Serialization”) you just need to transform the values (“Applying processors and transformers”) by simply defining a ValueTransformer
.
It has to do 2 things:
Check the state store for historical data for the sensor and update it with new data, if necessary
When historical data is available, calculate the difference between the timestamps and emit the data together with the calculated duration
class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
KeyValueStore<String, SensorState> store;
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
}
public SensorStateWithDuration transform(SensorState sensorState) {
// Nothing to do
if (sensorState == null) {
return null;
}
// Check for the previous state, update if necessary
var oldState = checkAndUpdateSensorState(sensorState);
// When we have historical data, return duration so far. Otherwise return null
return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
}
public void close() {}
/**
* Checks the state store for historical state based on sensor ID and updates it, if necessary.
*
* @param sensorState The new sensor state
* @return The old sensor state
*/
Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
// The Sensor ID is our index
var index = sensorState.getId();
// Get the historical state (might be null)
var oldState = store.get(index);
if (neetToUpdate(oldState, sensorState)) {
// Update the state store to the new state
store.put(index, sensorState);
}
return Optional.ofNullable(oldState);
}
/**
* Check if we need to update the state in the state store.
*
* <p>Either we have no historical data, or the state has changed.
*
* @param oldState The old sensor state
* @param sensorState The new sensor state
* @return Flag whether we need to update
*/
boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
return oldState == null || oldState.getState() != sensorState.getState();
}
/**
* Wrap the old state with a duration how log it lasted.
*
* @param oldState The state of the sensor so far
* @param sensorState The new state of the sensor
* @return Wrapped old state with duration
*/
SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
var duration = Duration.between(oldState.getTime(), sensorState.getTime());
return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
}
}
Putting everything together (“Connecting Processors and State Stores”) in a simple Topology:
var builder = new StreamsBuilder();
// Our state store
var storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("SensorStates"),
Serdes.String(),
storeSerde);
// Register the store builder
builder.addStateStore(storeBuilder);
builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
.transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
.to("result-topic", Produced.with(Serdes.String(), resultSerde));
var topology = builder.build();
A full application is at github.com/melsicon/kafka-sensors.
Upvotes: 2