Reputation: 79
I'm trying export text files to Postgres database using Spark. I am using below piece of code to export individual text files. I have close to 200 text files in the same folder and every text file has same structure. Unfortunately year value is not part of my input file hence I'm hard coding it.
I wish to upload all these files at a time but don't know how to do it , anyone has any suggestions please?
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
lines = sc.textFile("/aaaa/bbbb/DataFile/t-1870.txt")
splits = lines.map(lambda l: l.split(","))
raw_data = splits.map(lambda b: Row(name=b[0], gender=b[1],count=int(b[2]),year=int(1870)))
schemaBabies = sqlContext.createDataFrame(raw_data)
schemaBabies.registerTempTable("raw_data")
df = sqlContext.sql("select * from raw_data")
pgurl="jdbc:postgresql://localhost:5432/sparkling?user=XXXX&password=XXXX"
properties={"user":"XXXX","password":"XXXX","driver":"org.postgresql.Driver","mode":"append"}
df.write.jdbc(url = pgurl ,table = "EDW.raw_data",properties=properties)
Upvotes: 3
Views: 1191
Reputation: 330353
Lets assume your data looks like this:
import csv
import tempfile
import os
out = tempfile.mkdtemp()
data = [
("1870", [("Jane Doe", "F", 3)]),
("1890", [("John Doe", "M", 1)]),
]
for year, rows in data:
with open(os.path.join(out, "t-{0}.txt".format(year)), "w") as fw:
csv.writer(fw).writerows(rows)
Start PySpark session or submit script passing correct spark-csv
to --packages
argument and load data with specified schema:
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("gender", StringType(), True),
StructField("count", LongType(), True)
])
df = (sqlContext.read.format("com.databricks.spark.csv")
.schema(schema)
.load(out))
Extract year from the file name and write:
from pyspark.sql.functions import input_file_name, regexp_extract
df_with_year = (df.withColumn(
"year",
regexp_extract(input_file_name(), "[1-2][0-9]{3}", 0).cast("int")))
df_with_year.show()
## +--------+------+-----+----+
## | name|gender|count|year|
## +--------+------+-----+----+
## |John Doe| M| 1|1890|
## |Jane Doe| F| 3|1870|
## +--------+------+-----+----+
df_with_year.write.jdbc(...)
Important: In Spark < 2.0 this approach depends on not passing data around between Python and JVM. It won't work with Python UDFs or DataFrame.rdd.map
.
Upvotes: 2