Reputation: 95
I want to try out the Match_Recognize
operator in Flink SQL from the SQL client. For this, I have done the following setup for the source table
# A typical table source definition looks like:
- name: TaxiRides
type: source
update-mode: append
connector:
type: filesystem
path: "/home/bitnami/Match_Recognize/TaxiRide.csv"
format:
type: csv
fields:
- name: rideId
type: LONG
- name: taxiId
type: LONG
- name: isStart
type: BOOLEAN
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: rideTime
type: TIMESTAMP
- name: psgCnt
type: INT
line-delimiter: "\n"
field-delimiter: ","
schema:
- name: rideId
type: LONG
- name: taxiId
type: LONG
- name: isStart
type: BOOLEAN
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: rideTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "eventTime"
watermarks:
type: "periodic-bounded"
delay: "60000"
- name: psgCnt
type: INT
When I start the session, I get the following error
Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No factory supports all properties.
So, my question is: Is it possible to read the source stream as a table from a file or does it have to be from Kafka?
UPDATE: I am using Flink version 1.9.1
Upvotes: 0
Views: 803
Reputation: 3422
Unfortunately you are hitting a limitation of the csv filesystem connector. This connector does not support rowtime attributes.
In 1.10 we started work on expressing watermarks and time attributes in a slightly different way. See for a reference: https://issues.apache.org/jira/browse/FLINK-14320.
You can try creating a Table from DDL with a WATERMARK declaration as described here: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table It works for the blink planner only though (blink planner is the default implementation in sql-client starting from the 1.10 release).
Another option you have is reading from Kafka with a CSV format.
BTW this particular exception message was improved in FLINK 1.10. Since now Flink will tell the problematic properties.
Upvotes: 1