AkhilaV
AkhilaV

Reputation: 483

How to read PDF files and xml files in Apache Spark scala?

My sample code for reading text file is

val text = sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
    var rddwithPath = text.asInstanceOf[HadoopRDD[LongWritable, Text]].mapPartitionsWithInputSplit { (inputSplit, iterator) ⇒
      val file = inputSplit.asInstanceOf[FileSplit]
      iterator.map { tpl ⇒ (file.getPath.toString, tpl._2.toString) }
    }.reduceByKey((a,b) => a)

In this way how can I use PDF and Xml files

Upvotes: 6

Views: 17177

Answers (4)

Mykola Melnyk
Mykola Melnyk

Reputation: 71

You can use PDF DataSource for Apache Spark for reading PDF files to the DataFrame. It supports digital and scanned PDF files.

Here is example of code for reading PDF files in Scala:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Spark PDF Example")
  .master("local[*]")
  .config("spark.jars.packages", "com.stabrise:spark-pdf_2.12:0.1.7")
  .getOrCreate()
   
val df = spark.read.format("pdf")
  .option("imageType", "BINARY")
  .option("resolution", "200")
  .option("pagePerPartition", "2")
  .option("reader", "pdfBox")
  .load("path to the pdf file(s)")

df.show()

The output DataFrame contains the following columns:

  • path: path to the file
  • page_number: page number of the document
  • text: extracted text from the text layer of the PDF page
  • image: image representation of the page
  • document: the OCR-extracted text from the rendered image (calls Tesseract OCR)
  • partition_number: partition number

Upvotes: 0

cognitive
cognitive

Reputation: 41

You can simply use spark-shell with tika and run the below code in a sequential manner or in a distributed manner depending upon your use case

spark-shell --jars tika-app-1.8.jar
val binRDD = sc.binaryFiles("/data/")
val textRDD = binRDD.map(file => {new org.apache.tika.Tika().parseToString(file._2.open( ))})
textRDD.saveAsTextFile("/output/")
System.exit(0)

Upvotes: 1

jitendra prajapati
jitendra prajapati

Reputation: 41

PDF can be parse in pyspark as follow:

If PDF is store in HDFS then using sc.binaryFiles() as PDF is store in binary format. Then the binary content can be send to pdfminer for parsing.

import pdfminer
from pdfminer.pdfparser import PDFParser
from pdfminer.pdfdocument import PDFDocument

def return_device_content(cont):
    fp = io.BytesIO(cont)
    parser = PDFParser(fp)
    document = PDFDocument(parser)

filesPath="/user/root/*.pdf"
fileData = sc.binaryFiles(filesPath)
file_content = fileData.map(lambda content : content[1])
file_content1 = file_content.map(return_device_content)

Further parsing is can be done using functionality provided by pdfminer.

Upvotes: 4

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29185

PDF & XML can be parsed using Tika:

look at Apache Tika - a content analysis toolkit enter image description here look at - https://tika.apache.org/1.9/api/org/apache/tika/parser/xml/
- http://tika.apache.org/0.7/api/org/apache/tika/parser/pdf/PDFParser.html
- https://tika.apache.org/1.9/api/org/apache/tika/parser/AutoDetectParser.html
Below is example integration of Spark with Tika :

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.input.PortableDataStream
import org.apache.tika.metadata._
import org.apache.tika.parser._
import org.apache.tika.sax.WriteOutContentHandler
import java.io._

object TikaFileParser {

  def tikaFunc (a: (String, PortableDataStream)) = {

    val file : File = new File(a._1.drop(5))
    val myparser : AutoDetectParser = new AutoDetectParser()
    val stream : InputStream = new FileInputStream(file)
    val handler : WriteOutContentHandler = new WriteOutContentHandler(-1)
    val metadata : Metadata = new Metadata()
    val context : ParseContext = new ParseContext()

    myparser.parse(stream, handler, metadata, context)

    stream.close

    println(handler.toString())
    println("------------------------------------------------")
  }


  def main(args: Array[String]) {

    val filesPath = "/home/user/documents/*"
    val conf = new SparkConf().setAppName("TikaFileParser")
    val sc = new SparkContext(conf)
    val fileData = sc.binaryFiles(filesPath)
    fileData.foreach( x => tikaFunc(x))
  }
}

Upvotes: 9

Related Questions