Reputation: 2643
I'm playing around with flink (1.6, 1.7) for the first time and using the data from the github archive at https://www.gharchive.org/ but using that data as a streaming datasource.
My simple example just counts up all the events per user for a daily window and I'm trying to replicate the same example but using TableEnvironment and SQL Support instead.
However, I am encountering the following error:
class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime as per below:
8-12-04 14:17:02:115 INFO main exploration.StreamingTableApp:32 - Starting Streaming Table Flink App Example...
18-12-04 14:17:02:174 INFO main typeutils.TypeExtractor:1818 - class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit does not contain a setter for field modificationTime
18-12-04 14:17:02:176 INFO main typeutils.TypeExtractor:1857 - Class class org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
18-12-04 14:17:02:937 INFO main exploration.StreamingTableApp:74 - Finished...
I'm reading the CSV source as a datastream and using Gson to parse out bits of the json line and mapping these attributes to a Tuple.
Does anyone have any ideas / experience with this ?
main method:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Mapped in docker compose file too.
DataStreamSource<String> input = env.readTextFile("/some/path/github/");
// Setup the stream
DataStream<Tuple4<String, Integer, String, Long>> stream = input.map(new GithubTupleConverter())
.assignTimestampsAndWatermarks(new TupleTimestampExtractor());
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
Table tableFromStream = tEnv.fromDataStream(stream, "user_id, kount, basic_date,event_date");
TupleTimestampExtractor
public class TupleTimestampExtractor
extends BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, Integer, String, Long>> {
private static final long serialVersionUID = 3737675541965835242L;
public TupleTimestampExtractor() {
super(Time.seconds(30L));
}
@Override
public long extractTimestamp(Tuple4<String, Integer, String, Long> element) {
return element.getField(3);
}
}
GithubTupleConverter.java
public class GithubTupleConverter implements MapFunction<String, Tuple4<String, Integer, String, Long>> {
private static final Gson g = new Gson();
@Override
public Tuple4<String, Integer, String, Long> map(String value) throws Exception {
// Take each line as Json.
JsonObject o = g.fromJson(value, JsonObject.class);
// Extract the user id
String userId = o.get("actor").getAsJsonObject().get("login").getAsString();
// Extract the event type (commit, pull request, fork event)
String type = o.get("type").getAsString();
// Get the event date time
String dateTime = o.get("created_at").getAsString();
// Parse date string to Typed type.
LocalDateTime eventTime = LocalDateTime.parse(dateTime, DateTimeFormatter.ISO_DATE_TIME);
// Format the date so it can be used in the output.
DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE;
return Tuple4.of(userId, 1, formatter.format(eventTime), eventTime.toInstant(ZoneOffset.UTC).toEpochMilli());
}
}
Upvotes: 5
Views: 5902
Reputation: 18987
The logs that you share do not show an error. The logs are on INFO
level and no exception is thrown (at least not in the provided logs).
The log entry just says that the class TimestampedFileInputSplit
cannot be treated as a POJO. In general this message indicates that the performance is not optimal but in this particular case it is not a problem.
Do you get any other error message?
Upvotes: 2