igx
igx

Reputation: 4231

Trying to read file from s3 with FLINK using the IDE getting Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

I am trying to read a file from s3 using Flink from IntelliJ and getting the following exception:

Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found

This how my code looks like :

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory

class ParquetSourceFunction extends SourceFunction[String]{
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    val inputPath = "s3a://path-to-bucket/"
    val outputPath = "s3a://path-to-output-bucket/"
    val conf = new Configuration()
    conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    val readFooter = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(inputPath), conf))
    val metadata = readFooter.getFileMetaData
    val schema = metadata.getSchema
    val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns)
    //    val parquetFileReader2 = new ParquetFileReader(new Path(inputPath), ParquetReadOptions)
    var pages: PageReadStore = null

try {
  while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
    val rows = pages.getRowCount
    val columnIO = new ColumnIOFactory().getColumnIO(schema)
    val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
    (0L until rows).foreach { _ =>
      val group = recordReader.read()
      val myString = group.getString("field_name", 0)
      ctx.collect(myString)
    }
  }
}
  }

  override def cancel(): Unit = ???
}

object Job {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    lazy val env = StreamExecutionEnvironment.getExecutionEnvironment

    lazy val stream = env.addSource(new ParquetSourceFunction)
    stream.print()
    env.execute()
  }
}

Sbt dependencies : val flinkVersion = "1.12.1"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")
  "org.apache.flink" %% "flink-parquet" % flinkVersion)


lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies,
    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,
    libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",
    libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided"
  )

Upvotes: 2

Views: 1741

Answers (1)

Arvid Heise
Arvid Heise

Reputation: 3634

S3 is only supported by adding the respective flink-s3-fs-hadoop to your plugin folder as described on the plugin docs. For an IDE local setup, the root that should contain the plugins dir is the working directory by default. You can override it by using the env var FLINK_PLUGINS_DIR.

To get the flink-s3-fs-hadoop into plugin, I'm guessing some sbt glue is necessary (or you do it once manually). In gradle, I'd define a plugin scope and copy the jars in a custom task to the plugin dir.

Upvotes: 1

Related Questions