Hamed Tamadon
Hamed Tamadon

Reputation: 77

Using delta lake partitions when writing a delta lake from RDBMS Table

I want to bring a huge table from oracle DB to Delta Lake day by day.

each day approximately has a volume of about 3 GB.

I want to bring to delta lake with below format :

eachfolder for date : Tablename/2020-12-10

from delta.tables import *
ip = '1.1.1.1'
port = '1111'
database = 'Test'
user = 'test'
password = 'test'
drivertype = 'oracle.jdbc.driver.OracleDriver'
start_date = "20210101"
stop_date  = "20210103"
start = datetime.strptime(start_date, "%Y%m%d")
stop = datetime.strptime(stop_date, "%Y%m%d")
partitionColumn = 'time_section'
lowerBound = 0
upperBound = 24
partitions = 4
while start < stop:
    SQLCommand = """(select/*+parallel(a,4)*/ a.* from lbi_app.mytble a where 
    date =%s)"""%start.strftime('%Y%m%d') 
    TempDFName = 'mytble' 
    df = spark.read.format("jdbc")\
       .option("url", f"jdbc:oracle:thin:@//{ip}:{port}/{database}")\
       .option("dbtable",SQLCommand).option("fetchsize", 500000)\
       .option("user", user)\
       .option("numPartitions", partitions)\
       .option("lowerBound",lowerBound)\
       .option("upperBound", upperBound)\
       .option("partitionColumn", "%s"%partitionColumn)\
       .option("oracle.jdbc.timezoneAsRegion", "false")\
       .option("oracle.jdbc.mapDateToTimestamp", "true")\
       .option("encoding", "UTF-8")\
       .option("characterEncoding", "UTF-8")\
       .option("useUnicode", "true")\
       .option("password", password)\
       .option("driver", drivertype)\
       .load()
    df.write.format("delta").partitionBy("date")\
       .option("overwriteSchema","true")\
       .mode("overwrite")\
       .save("/delta/layer1/"+str(TempDFName))
    start = start + timedelta(days=1)  

but when I run this code all records saves into folder : myTable/date=20210101

I want to each date has own folder like below:

myTable/date=20210101, myTable/date=20210102, myTable/date=20210103

What is the solution for this problem?

Upvotes: 2

Views: 432

Answers (1)

Alex Ott
Alex Ott

Reputation: 87299

I think that you need to use

.option("mergeSchema", "true").mode("append")

instead of

.option("overwriteSchema","true").mode("overwrite")

otherwise you'll overwrite the whole table on each iteration.

Upvotes: 1

Related Questions