Reputation: 77
I want to bring a huge table from oracle DB to Delta Lake day by day.
each day approximately has a volume of about 3 GB.
I want to bring to delta lake with below format :
eachfolder for date : Tablename/2020-12-10
from delta.tables import *
ip = '1.1.1.1'
port = '1111'
database = 'Test'
user = 'test'
password = 'test'
drivertype = 'oracle.jdbc.driver.OracleDriver'
start_date = "20210101"
stop_date = "20210103"
start = datetime.strptime(start_date, "%Y%m%d")
stop = datetime.strptime(stop_date, "%Y%m%d")
partitionColumn = 'time_section'
lowerBound = 0
upperBound = 24
partitions = 4
while start < stop:
SQLCommand = """(select/*+parallel(a,4)*/ a.* from lbi_app.mytble a where
date =%s)"""%start.strftime('%Y%m%d')
TempDFName = 'mytble'
df = spark.read.format("jdbc")\
.option("url", f"jdbc:oracle:thin:@//{ip}:{port}/{database}")\
.option("dbtable",SQLCommand).option("fetchsize", 500000)\
.option("user", user)\
.option("numPartitions", partitions)\
.option("lowerBound",lowerBound)\
.option("upperBound", upperBound)\
.option("partitionColumn", "%s"%partitionColumn)\
.option("oracle.jdbc.timezoneAsRegion", "false")\
.option("oracle.jdbc.mapDateToTimestamp", "true")\
.option("encoding", "UTF-8")\
.option("characterEncoding", "UTF-8")\
.option("useUnicode", "true")\
.option("password", password)\
.option("driver", drivertype)\
.load()
df.write.format("delta").partitionBy("date")\
.option("overwriteSchema","true")\
.mode("overwrite")\
.save("/delta/layer1/"+str(TempDFName))
start = start + timedelta(days=1)
but when I run this code all records saves into folder : myTable/date=20210101
I want to each date has own folder like below:
myTable/date=20210101, myTable/date=20210102, myTable/date=20210103
What is the solution for this problem?
Upvotes: 2
Views: 432
Reputation: 87299
I think that you need to use
.option("mergeSchema", "true").mode("append")
instead of
.option("overwriteSchema","true").mode("overwrite")
otherwise you'll overwrite the whole table on each iteration.
Upvotes: 1