Siddharth B
Siddharth B

Reputation: 366

Flink SQL table backed by CSV with header

I have been searching on google for the last 20 minutes. I have gone through the Apache Flink documentation too specifically the CSV format but haven't found a way to skip the first header row.

The CSV looks like this:

AccountKey,InstrumentCode,Quantity,BuySell,Price
SB11,MSFT,100,Buy,250.57
SB11,AMZN,125,Sell,309

I really am looking for a property to set which will make the CSV reader skip the header row. The following table definition does not work:

CREATE TABLE trades (
    AccountNo varchar(10),
    Symbol varchar(10),
    Quantity integer,
    BuySell varchar(4),
    Price decimal    
) WITH ( 
    'connector' = 'filesystem',
    'path' = '/mnt/d/Work/Github/Flink-Samples/data/trade-data.csv',
    'format' = 'csv'  
);

The error I see on sql-client.sh is:

[ERROR] Could not execute SQL statement. Reason:
java.lang.NumberFormatException: For input string: "Quantity"

I expect the CSV reader reads the rows when I execute a

select * from trades

So far, the only way I have found is to put a # character to make the header appear as a comment and use the following table definition:

CREATE TABLE trades (
    AccountNo varchar(10),
    Symbol varchar(10),
    Quantity integer,
    BuySell varchar(4),
    Price decimal
) WITH ( 
    'connector' = 'filesystem',
    'path' = '/mnt/d/Work/Github/Flink-Samples/data/trade-data.csv',
    'format' = 'csv',    
    'csv.allow-comments' = 'true'
);

Upvotes: 2

Views: 1915

Answers (4)

Hunter Medney
Hunter Medney

Reputation: 371

This may be treading into hack territory, but one option to stay in the SQL context is to create a view that excludes the header row.

For example, the view below excludes the header row since column AccountNo would (likely) only equal AccountKey on the header row in the CSV.

CREATE VIEW trades_no_header AS SELECT * FROM trades WHERE AccountNo <> 'AccountKey';

Upvotes: 1

twalthr
twalthr

Reputation: 2664

Flink's connectors are designed for big amounts of data. Usually, CSV files are split into multiple files for efficient parallel processing.

In this case, a header would not make much sense because it would not be clear if the header is only located in the first file (which one is the first file?) and or in every file.

If there are headers in your file, you have the following options:

  • Remove the header beforehand with a different tool.
  • Pre-process the CSV file using a CSV format configured with a single STRING column and use OFFSET / LIMIT to skip first row before writing into a CSV file again. This is mostly useful for batch processing.
  • Enable ignoring parse errors and use OFFSET / LIMIT to definitely skip the first row.

Upvotes: 3

BenoitParis
BenoitParis

Reputation: 3184

What eshirvana said, plus this workaround:

Use the csv.ignore-parse-errors option so that NumberFormatException fails silently.

Upvotes: 0

eshirvana
eshirvana

Reputation: 24633

when you make csv-backed tables , you are just saying to read data from csv file , so you can't have headers in the file .

if you want to import csv file to an already existed table , you need ETL ,It's almost the case in any sql engine.

this blog is helpful how to transfer csv file into flink : From Streams to Tables and Back Again

Upvotes: 0

Related Questions