Reputation: 83
I am trying to load on prem transaction log table and update a databricks table.In Source table,the column reqdetails table hold all the information and expect ProductID remaining columns are dynamic(not all columns exists in the request) all time the optional/nullable.Could you please let me the process using python in databricks ? Explored StructType but it needs mandatory specific columns.
Source Table
Req ID Type Req Details Status
1 Update ProductID=234;ProductName=LawnMover;Price=58 True
2 Update ProductID=874;Price=478 True
3 Update ProductID=678;ProductParentgroup=Watersuppuly;Price=1.6 True
Target table before Update
ProductID ProductParentgroup ProductName Price
234 Utility Mover 86
874 HOA Sink 450
678 Water Filters 1.2
Target table after Update
ProductID ProductParentgroup ProductName Price
234 Utility LawnMover 58
874 HOA Sink 478
678 Watersupply Filters 1.6
Upvotes: 3
Views: 74
Reputation: 70538
Create a proc with default values of null, then pass in the value you get or null to the procedure. The update code below shows you how to use COALESCE to replace the null values with the existing value or use the passed value.
CREATE PROCEDURE example
(
@productid integer
@parentgroup varchar(200) = NULL,
@productname varchar(200) = NULL,
@price integer = NULL
)
AS
BEGIN
SET NOCOUNT ON
UPDATE tablenameyoudidnottellusthename as A
SET A.parentgroup = COALESCE(@pargentgroup, A.parentgroup),
A.productname = COALESCE(@productname, A.productname),
A.price = COALESCE(@price,A.price)
WHERE A.productid = @productid
END
Upvotes: 1
Reputation: 14895
Split ReqDetails
from the source table using regexp_extract, join source and target table and then merge the respective columns using coalesce.
If regexp_extract
does not match a value in ReqDetails
, it returns an empty string.
from pyspark.sql import functions as F
srcDf = ...
targetDf = ...
src2Df = srcDf.withColumn('ProductID', F.regexp_extract('ReqDetails', 'ProductID=(.*?);', 1)) \
.withColumn('SrcProductParentgroup', F.regexp_extract('ReqDetails', 'ProductParentgroup=(.*?);', 1)) \
.withColumn('SrcProductName', F.regexp_extract('ReqDetails', 'ProductName=(.*?);', 1)) \
.withColumn('SrcPrice', F.regexp_extract('ReqDetails', 'Price=(.*)', 1)) \
.na.replace('', None)
targetDf.join(src2Df, on='ProductID', how='left') \
.withColumn('ProductParentgroup', F.coalesce('SrcProductParentgroup', 'ProductParentgroup')) \
.withColumn('ProductName', F.coalesce('SrcProductName', 'ProductName')) \
.withColumn('Price', F.coalesce('SrcPrice', 'Price')) \
.select('ProductID', 'ProductParentgroup', 'ProductName', 'Price') \
.show()
Output:
+---------+------------------+-----------+-----+
|ProductID|ProductParentgroup|ProductName|Price|
+---------+------------------+-----------+-----+
| 234| Utility| LawnMover| 58|
| 678| Watersuppuly| Filters| 1.6|
| 874| HOA| Sink| 478|
+---------+------------------+-----------+-----+
Upvotes: 1