Reputation: 501
I want to combine Measurements by type and device name.
public class Measurement implements Serializable {
private String measurementType;
private String device;
private Long ts;
private Double reading;
}
I am doing an average with Combine.perKey(...)
successfully by the type. But I want to have basically a compound key of device and measurementType.
Right now my KvDoFn looks like this:
public class KvByMeasurementType extends DoFn<Measurement, KV<String, Measurement>> implements Serializable {
@DoFn.ProcessElement
public void processElement(DoFn<Measurement, KV<String, Measurement>>.ProcessContext context) {
Measurement measurement = context.element();
context.output(KV.of(measurement.getMeasurementType(), measurement));
}
}
How to extend it to create a compound key of two values?
Upvotes: 2
Views: 620
Reputation: 2024
You can simply create a new object and make that the key. For example,
public class MyKey implements Serializable {
private String measurementType;
private String device;
}
And then generate and output KV
s of type MyKey
from your KvByMeasurementType
.
Also, define a Beam CombineFn that performs the combining based on this key. See here for more information on the Beam's Combine
transform.
Upvotes: 2