Reputation: 527
I'm trying to make an event processing stream using apache beam.
Steps which happen in my stream:
Now let's deep into code.
KafkaIO.<String, AvroGenericRecord>read()
.withBootstrapServers(bootstrapServers)
.withConsumerConfigUpdates(configUpdates)
.withTopics(inputTopics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(BeamKafkaAvroGenericDeserializer.class, AvroGenericCoder.of(serDeConfig()))
.commitOffsetsInFinalize()
.withoutMetadata();
records.apply(Window.<AvroGenericRecord>into(FixedWindows.of(Duration.standardHours(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))
.withLateFirings(AfterPane.elementCountAtLeast(1))
)
.withAllowedLateness(Duration.standardMinutes(5))
.discardingFiredPanes()
)
What I want to achieve by this window is to group data by event time every 1 hour and trigger every 10 min.
public class WriteAvroFilesTr extends PTransform<PCollection<AvroGenericRecord>, WriteFilesResult<AvroDestination>> {
private String baseDir;
private int numberOfShards;
public WriteAvroFilesTr(String baseDir, int numberOfShards) {
this.baseDir = baseDir;
this.numberOfShards = numberOfShards;
}
@Override
public WriteFilesResult<AvroDestination> expand(PCollection<AvroGenericRecord> input) {
ResourceId tempDir = getTempDir(baseDir);
return input.apply(AvroIO.<AvroGenericRecord>writeCustomTypeToGenericRecords()
.withTempDirectory(tempDir)
.withWindowedWrites()
.withNumShards(numberOfShards)
.to(new DynamicAvroGenericRecordDestinations(baseDir, Constants.FILE_EXTENSION))
);
}
private ResourceId getTempDir(String baseDir) {
return FileSystems.matchNewResource(baseDir + "/temp", true);
}
}
And
public class DynamicAvroGenericRecordDestinations extends DynamicAvroDestinations<AvroGenericRecord, AvroDestination, GenericRecord> {
private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
private final String baseDir;
private final String fileExtension;
public DynamicAvroGenericRecordDestinations(String baseDir, String fileExtension) {
this.baseDir = baseDir;
this.fileExtension = fileExtension;
}
@Override
public Schema getSchema(AvroDestination destination) {
return new Schema.Parser().parse(destination.jsonSchema);
}
@Override
public GenericRecord formatRecord(AvroGenericRecord record) {
return record.getRecord();
}
@Override
public AvroDestination getDestination(AvroGenericRecord record) {
Schema schema = record.getRecord().getSchema();
return AvroDestination.of(record.getName(), record.getDate(), record.getVersionId(), schema.toString());
}
@Override
public AvroDestination getDefaultDestination() {
return new AvroDestination();
}
@Override
public FileBasedSink.FilenamePolicy getFilenamePolicy(AvroDestination destination) {
String pathStr = baseDir + "/" + destination.name + "/" + destination.date + "/" + destination.name;
return new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(pathStr), destination.version, fileExtension);
}
private static class WindowedFilenamePolicy extends FileBasedSink.FilenamePolicy {
final ResourceId outputFilePrefix;
final String fileExtension;
final Integer version;
WindowedFilenamePolicy(ResourceId outputFilePrefix, Integer version, String fileExtension) {
this.outputFilePrefix = outputFilePrefix;
this.version = version;
this.fileExtension = fileExtension;
}
@Override
public ResourceId windowedFilename(
int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
FileBasedSink.OutputFileHints outputFileHints) {
IntervalWindow intervalWindow = (IntervalWindow) window;
String filenamePrefix =
outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");
String filename =
String.format("%s-%s(%s-%s)-(%s-of-%s)%s", filenamePrefix,
version,
formatter.print(intervalWindow.start()),
formatter.print(intervalWindow.end()),
shardNumber,
numShards - 1,
fileExtension);
ResourceId result = outputFilePrefix.getCurrentDirectory();
return result.resolve(filename, RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Expecting windowed outputs only");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(
DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
.withLabel("File Name Prefix"));
}
}
}
I've written down the whole of my pipeline. It kind of works well but I have misunderstood (not sure) that I handle events by event time.
Could someone review my code (especially 1 & 2 steps where I read and group by windows) either it windows by event time or not?
P.S. For every record in Kafka I have timestamp field inside.
UPD
Thanks jjayadeep
I include in KafkaIO custom TimestampPolicy
static class CustomTimestampPolicy extends TimestampPolicy<String, AvroGenericRecord> {
protected Instant currentWatermark;
CustomTimestampPolicy(Optional<Instant> previousWatermark) {
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<String, AvroGenericRecord> record) {
currentWatermark = Instant.ofEpochMilli(record.getKV().getValue().getTimestamp());
return currentWatermark;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
return currentWatermark;
}
}
Upvotes: 1
Views: 1209
Reputation: 2825
From the documentation present here [1] event time is used as the processing time by default in KafkaIO
By default, record timestamp (event time) is set to processing time in KafkaIO reader and source watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled ('LogAppendTime'), it can enabled with KafkaIO.Read.withLogAppendTime(). A custom timestamp policy can be provided by implementing TimestampPolicyFactory. See KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory) for more information.
Also processing time is the default timestamp method used as documented below
// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
1 - https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
Upvotes: 1