Reputation: 65
I am trying to read a file with 34 fields in it to be printed on console using Netbeans. However, all I am able to print is the schema. Because the option to print is missing in this particular version of Flink used with csvreader.
Please see the code and help me understand where I should correct. I would have used CSVReader
, inbuilt API but turns out it does not support more than 22 fields and hence resorted to using Table API. Also tried using CsvTableSource
version 1.5.1 Flink, but no luck with syntax. As the .field("%CPU", Types.FLOAT())
kept giving error for type float not recognised symbol.
My main aim is to just to be able to read the CSV file and then send to a Kafka topic, but before that i want to check if the file is read, no luck yet.
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.Types;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.api.java.Slide;
public class CsvReader {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = new CsvTableSource("/home/merlin/Experiments/input_container/container_data1.csv",
new String[] { "%CPU", "MEM", "VSZ", "RSS", "timestamp",
"OOM_Score", "io_read_count", "io_write_count", "io_read_bytes", "io_write_bytes",
"io_read_chars", "io_write_chars", "num_fds", "num_ctx_switches_voluntary", "num_ctx_switches_involuntary",
"mem_rss", "mem_vms", "mem_shared", "mem_text", "mem_lib", "mem_data", "mem_dirty", "mem_uss", "mem_pss",
"mem_swap", "num_threads", "cpu_time_user", "cpu_time_system", "cpu_time_children_user",
"cpu_time_children_system", "container_nr_sleeping", "container_nr_running",
"container_nr_stopped", "container_nr_uninterruptible","container_nr_iowait" },
new TypeInformation<?>[] {
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT()
});// lenient
tEnv.registerTableSource("container", csvTableSource);
Table result = tEnv
.scan("container");
System.out.println(result);
result.printSchema();
}
}
/*tEnv.toAppendStream(result, Row.class).print();
result.writeToSink(null);print();
env.execute();*/
This is the output
root
|-- %CPU: Float
|-- MEM: Float
|-- VSZ: Float
|-- RSS: Float
|-- timestamp: Float
|-- OOM_Score: Float
|-- io_read_count: Float
|-- io_write_count: Float
|-- io_read_bytes: Float
|-- io_write_bytes: Float
|-- io_read_chars: Float
|-- io_write_chars: Float
|-- num_fds: Float
|-- num_ctx_switches_voluntary: Float
|-- num_ctx_switches_involuntary: Float
|-- mem_rss: Float
|-- mem_vms: Float
|-- mem_shared: Float
|-- mem_text: Float
|-- mem_lib: Float
|-- mem_data: Float
|-- mem_dirty: Float
|-- mem_uss: Float
|-- mem_pss: Float
|-- mem_swap: Float
|-- num_threads: Float
|-- cpu_time_user: Float
|-- cpu_time_system: Float
|-- cpu_time_children_user: Float
|-- cpu_time_children_system: Float
|-- container_nr_sleeping: Float
|-- container_nr_running: Float
|-- container_nr_stopped: Float
|-- container_nr_uninterruptible: Float
|-- container_nr_iowait: Float
And this is another version of code, which too did not work
package wikiedits;
import static com.sun.xml.internal.fastinfoset.alphabet.BuiltInRestrictedAlphabets.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableEnvironment;
public class Csv {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
//TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = CsvTableSource
.builder()
.path("home/merlin/Experiments/input_container/container_data1.csv")
.field("%CPU", Types.FLOAT)
.field("MEM", Types.FLOAT)
.field("VSZ", Types.FLOAT)
.field("RSS", Types.FLOAT)
.field("timestamp", Types.FLOAT)
.field("OOM_Score", Types.FLOAT)
.field("io_read_count", Types.FLOAT)
.field("io_write_count", Types.FLOAT)
.field("io_read_bytes", Types.FLOAT)
.field("io_write_bytes", Types.FLOAT)
.field("io_read_chars", Types.FLOAT)
.field("io_write_chars", Types.FLOAT)
.field("num_fds", Types.FLOAT)
.field("num_ctx_switches_voluntary", Types.FLOAT)
.field("num_ctx_switches_involuntary", Types.FLOAT)
.field("mem_rss", Types.FLOAT)
.field("mem_vms", Types.FLOAT)
.field("mem_shared", Types.FLOAT)
.field("mem_text", Types.FLOAT)
.field("mem_lib", Types.FLOAT)
.field("mem_data", Types.FLOAT)
.field("mem_dirty", Types.FLOAT)
.field("mem_uss", Types.FLOAT)
.field("mem_pss", Types.FLOAT)
.field("mem_swap", Types.FLOAT)
.field("num_threads", Types.FLOAT)
.field("cpu_time_user", Types.FLOAT)
.field("cpu_time_system", Types.FLOAT)
.field("cpu_time_children_user", Types.FLOAT)
.field("cpu_time_children_system", Types.FLOAT)
.field("container_nr_sleeping", Types.FLOAT)
.field("container_nr_running", Types.FLOAT)
.field("container_nr_stopped", Types.FLOAT)
.field("container_nr_uninterruptible", Types.FLOAT)
.field("container_nr_iowait", Types.FLOAT)
.fieldDelimiter(",")
.lineDelimiter("\n")
.ignoreFirstLine()
.ignoreParseErrors()
.commentPrefix("%")
.build();
// name your table source
tEnv.registerTableSource("container", csvTableSource);
Table table = tEnv.scan("container");
DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);
// define the sink as common print on console here
stream.print();
// env.execute();
}
}
New edit If I had to pass it onto either a Kafka topic and then into a function call? This is what I tried:
DataStreamSink<Row> stream = addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties));
//DataStreamSink<Row> stream = tEnv.toAppendStream(table, Row.class).print();
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
public String map(String value) throws Exception {
SendToRestAPI sendrest= new SendToRestAPI(value);
String String1= sendrest.converttoJson();
return "Stream Value: " + String1;
}
})
.addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties)); /*.print();*/
env.execute();
}
}
The stream.map line throws error:
Cannot find symbol:method Map
Upvotes: 2
Views: 3288
Reputation: 680
This code:
package example.flink;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
public class TestFlink {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = CsvTableSource
.builder()
.path("container_data.csv")
.field("%CPU", Types.FLOAT)
.field("MEM", Types.FLOAT)
.field("VSZ", Types.FLOAT)
.field("RSS", Types.FLOAT)
.field("timestamp", Types.FLOAT)
.field("OOM_Score", Types.FLOAT)
.field("io_read_count", Types.FLOAT)
.field("io_write_count", Types.FLOAT)
.field("io_read_bytes", Types.FLOAT)
.field("io_write_bytes", Types.FLOAT)
.field("io_read_chars", Types.FLOAT)
.field("io_write_chars", Types.FLOAT)
.field("num_fds", Types.FLOAT)
.field("num_ctx_switches_voluntary", Types.FLOAT)
.field("num_ctx_switches_involuntary", Types.FLOAT)
.field("mem_rss", Types.FLOAT)
.field("mem_vms", Types.FLOAT)
.field("mem_shared", Types.FLOAT)
.field("mem_text", Types.FLOAT)
.field("mem_lib", Types.FLOAT)
.field("mem_data", Types.FLOAT)
.field("mem_dirty", Types.FLOAT)
.field("mem_uss", Types.FLOAT)
.field("mem_pss", Types.FLOAT)
.field("mem_swap", Types.FLOAT)
.field("num_threads", Types.FLOAT)
.field("cpu_time_user", Types.FLOAT)
.field("cpu_time_system", Types.FLOAT)
.field("cpu_time_children_user", Types.FLOAT)
.field("cpu_time_children_system", Types.FLOAT)
.field("container_nr_sleeping", Types.FLOAT)
.field("container_nr_running", Types.FLOAT)
.field("container_nr_stopped", Types.FLOAT)
.field("container_nr_uninterruptible", Types.FLOAT)
.field("container_nr_iowait", Types.FLOAT)
.fieldDelimiter(",")
.lineDelimiter("\n")
.ignoreFirstLine()
.ignoreParseErrors()
.commentPrefix("%")
.build();
// name your table source
tEnv.registerTableSource("container", csvTableSource);
Table table = tEnv.scan("container");
DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);
// define the sink as common print on console here
stream.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
With these libraries (some likely redundant):
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-java', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-runtime_2.11', version: '1.5.1'
Is running at least. I'm not sure if it gives the output you need, but it is almost exactly what you put in your latest edit, but is working in an IDE. Does that help?
If your delimiter is still a space remember to change .fieldDelimiter(",")
Upvotes: 1
Reputation: 18987
You have to convert the Table
into a DataStream
to print it. The easiest way to do this is to convert it into a DataStream<Row>
as follows:
DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);
// print the stream & execute the program
stream.print();
env.execute();
See the documentation for more details.
Upvotes: 2