ZeMi
ZeMi

Reputation: 45

Flink read data from hdfs

I'm a freshman in Flink and I am wondering that how to read data from hdfs . Can anybody give some advices or some easy examples for me ? Thank you all.

Upvotes: 2

Views: 6377

Answers (3)

Erkan Şirin
Erkan Şirin

Reputation: 2095

With Flink 1.13, Hadoop 3.1.2, Java 1.8.0 on the Centos7 machine I was able to read from HDFS. HADOOP_HOME and HADOOP_CLASSPATH were exported already. I think from version 1.11 something changed. I couldn't find even a simple example. Therefore I share my example.

I added to pom.xml following dependency

 <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.1.2</version>
 </dependency>

My Scala code:

package com.vbo.datastreamapi

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object ReadWriteHDFS extends App {
  
  val env = StreamExecutionEnvironment.getExecutionEnvironment


  val stream = env.readTextFile("hdfs://localhost:9000/user/train/datasets/Advertising.csv")

  stream.print()

  env.execute("Read Write HDFS")

}

Upvotes: 0

HusnainKhan
HusnainKhan

Reputation: 11

Flink can read HDFS data which can be in any of the formats like text,Json,avro such as. Support for Hadoop input/output formats is part of the flink-java maven modules which are required when writing flink jobs.

Sample 1 : To read text file named JsonSeries and print on console

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> lines = env.readTextFile("hdfs://localhost:9000/user/hadoop/input/JsonSeries.txt")
        .name("HDFS File read");
lines.print();

Sample 2 : using input format

DataSet<Tuple2<LongWritable, Text>> inputHadoop =
        env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
                LongWritable.class, Text.class, "hdfs://localhost:9000/user/hadoop/input/JsonSeries.txt"));
inputHadoop.print();

Upvotes: 0

Chiwan Park
Chiwan Park

Reputation: 86

If your files are formatted in text file format, you can use 'readTextFile' method from 'ExecutionEnvironment' object.

Here is an example of various data sources. (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#data-sources)

Upvotes: 1

Related Questions