Ahmed Awad
Ahmed Awad

Reputation: 95

Flink SQL Client environment configuration to read CSV file as source streaming table

I want to try out the Match_Recognize operator in Flink SQL from the SQL client. For this, I have done the following setup for the source table

# A typical table source definition looks like:
 - name: TaxiRides
   type: source
   update-mode: append
   connector: 
       type: filesystem
       path: "/home/bitnami/Match_Recognize/TaxiRide.csv"
   format: 
       type: csv
       fields:
           - name: rideId
             type: LONG
           - name: taxiId
             type: LONG
           - name: isStart
             type: BOOLEAN
           - name: lon
             type: FLOAT
           - name: lat
             type: FLOAT
           - name: rideTime
             type: TIMESTAMP
           - name: psgCnt
             type: INT
       line-delimiter: "\n"
       field-delimiter: ","

   schema:
    - name: rideId
      type: LONG
    - name: taxiId
      type: LONG
    - name: isStart
      type: BOOLEAN
    - name: lon
      type: FLOAT
    - name: lat
      type: FLOAT
    - name: rideTime
      type: TIMESTAMP
      rowtime:
        timestamps:
          type: "from-field"
          from: "eventTime"
        watermarks:
          type: "periodic-bounded"
          delay: "60000"
    - name: psgCnt
      type: INT

When I start the session, I get the following error

Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
        at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
        at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
        at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
        ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

So, my question is: Is it possible to read the source stream as a table from a file or does it have to be from Kafka?

UPDATE: I am using Flink version 1.9.1

Upvotes: 0

Views: 803

Answers (1)

Dawid Wysakowicz
Dawid Wysakowicz

Reputation: 3422

Unfortunately you are hitting a limitation of the csv filesystem connector. This connector does not support rowtime attributes.

In 1.10 we started work on expressing watermarks and time attributes in a slightly different way. See for a reference: https://issues.apache.org/jira/browse/FLINK-14320.

You can try creating a Table from DDL with a WATERMARK declaration as described here: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table It works for the blink planner only though (blink planner is the default implementation in sql-client starting from the 1.10 release).

Another option you have is reading from Kafka with a CSV format.

BTW this particular exception message was improved in FLINK 1.10. Since now Flink will tell the problematic properties.

Upvotes: 1

Related Questions