Reputation: 31
I have created a package that transfers data from an old database to a new one. This looks something like this;
tblSupplier -> Suppliers
tblNaturalKey -> InvoiceNaturalKey
tblInvoice -> Invoices ...
Table names are not an issue as they can be hard coded in. I want to automate this for many clients and each has their own natural key for invoice table.
For example [InvoiceNumber], [AccountNumber]....
So for each client i have created a package that has an execute SQL task that retrieves column information from tblNaturalKey, stores as object and then implements an ADO loop to read object and create each custom column in the new Natural Key table. I hoped then the transfer package could just pick up the mappings based on column name equality. This is dynamic as i can just enter old and new server and database as environment variables and run the job from a server catalog. But the columns won't map and i get metadata errors. I will attach my source biml code. Please help! KR Marcus
<Biml xmlns="http://schemas.varigence.com/biml.xsd">
<Packages>
<Package Name="0-0-Nat_Key_Columns" ConstraintMode="Linear">
<Connections>
<Connection ConnectionName="OldDatabase"></Connection>
</Connections>
<Variables>
<Variable Name="Columns" DataType="Object" Namespace="User"></Variable>
<Variable Name="ColName" DataType="String" Namespace="User">op</Variable>
<Variable Name="DataType" DataType="String" Namespace="User">int</Variable>
<Variable Name="DTdesc" DataType="String" Namespace="User"></Variable>
</Variables>
<Tasks>
<ExecuteSQL Name="ES-GetMissingCols" ConnectionName="OldDatabase" ResultSet="Full">
<DirectInput>
select i.COLUMN_NAME, i.DATA_TYPE,
case
when i.data_type = 'varchar' then CAST(concat('(',i.CHARACTER_MAXIMUM_LENGTH,')') AS varchar(10))
when i.DATA_TYPE = 'int' then ''
when i.DATA_TYPE = 'datetime' then ''
when i.DATA_TYPE = 'decimal' then cast(concat('(',i.NUMERIC_PRECISION,',',i.NUMERIC_SCALE,')') as varchar(10) )
else ''
end AS DTdesc
from INFORMATION_SCHEMA.COLUMNS i
where i.TABLE_NAME = 'tblsys_naturalkey_lu'
</DirectInput>
<Results>
<Result VariableName="User.Columns" Name="0"></Result>
</Results>
</ExecuteSQL>
<ForEachAdoLoop Name="ForEachADO-ColumnLoop" SourceVariableName="User.Columns" EnumerationMode="EnumerateRowsInFirstTable">
<VariableMappings>
<VariableMapping Name="0" VariableName="User.ColName"></VariableMapping>
<VariableMapping Name="1" VariableName="User.DataType"></VariableMapping>
<VariableMapping Name="2" VariableName="User.DTdesc"></VariableMapping>
</VariableMappings>
<Tasks>
<ExecuteSQL Name="ES-AddMissingCols" ConnectionName="NewDatabase">
<DirectInput></DirectInput>
<Expressions>
<Expression ExternalProperty="SqlStatementSource">
"IF NOT EXISTS (
SELECT
*
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = 'InvoiceNaturalKey' AND COLUMN_NAME = '"+ @[User::ColName]+ "')
BEGIN
ALTER TABLE InvoiceNaturalKey
ADD ["+ @[User::ColName] + "] " + @[User::DataType] + (DT_WSTR,5)@[User::DTdesc] +"
END;"
</Expression>
</Expressions>
</ExecuteSQL>
</Tasks>
</ForEachAdoLoop>
</Tasks>
</Package>
<Package Name="1-3-Nat_Key" ConstraintMode="Linear" ProtectionLevel="DontSaveSensitive">
<Tasks>
<Dataflow Name="DFT-NatKey Transfer">
<Transformations>
<OleDbSource Name="ODS-NK" ConnectionName="OldDatabase" ValidateExternalMetadata="false">
<DirectInput>
SELECT * FROM [dbo].[tblsys_naturalkey_lu]
</DirectInput>
</OleDbSource>
<OleDbDestination Name="ODD-NK" ConnectionName="NewDatabase" KeepIdentity="true">
<SqlCommandOutput>select * from InvoiceNaturalKey</SqlCommandOutput>
</OleDbDestination>
</Transformations>
</Dataflow>
</Tasks>
</Package>
</Packages>
</Biml>
<#@ template language="C#" tier="2"#>
Upvotes: 2
Views: 134
Reputation: 31
I recently solved this problem by using an execute process/Python script task in replacement of the data flow task. This way i was able to pass through a project parameter to the python script and solve the mapping problem. I used executemany() method from pyodbc library. Please see code below.
# -*- coding: utf-8 -*-
"""
Created on Mon Feb 3 13:00:48 2020
@author: MTrinder
"""
import sys
import pyodbc
#dummy arguments, sys arguments will replace
args = ['','[natkey1],[natkey2],[natkey3],[natKey4],[natKey5],[natKey6]','old_server','old_db','new_server','new_db']
for i,arg in enumerate(sys.argv):
args[i] = arg
#map arguments
cols_str = args[1]
old_server = args[2]
old_db = args[3]
new_server = args[4]
new_db = args[5]
#extract data to be inserted
sql_conn = pyodbc.connect("DRIVER={SQL Server Native Client 11.0};SERVER="+old_server+";DATABASE="+old_db+";Trusted_Connection=yes")
query = """ SELECT * from tblsys_naturalkey_lu"""
import pandas as pd
nk = pd.read_sql_query(query, sql_conn)
#data to strings
nk = nk.astype(str)
#col string
cols_str_b = cols_str.replace('[','')
cols_str_b = cols_str_b.replace(']','')
cols = cols_str.split(',')
cols_b = cols_str_b.split(',')
#number of nk cols
n_cols = len(cols)
#sql insert values
vals = ''
for i in range(n_cols):
if i<n_cols-1:
vals = vals + '?,'
else:
vals = vals +'?'
#new databse connection
connStr = pyodbc.connect('DRIVER={SQL Server Native Client 11.0};SERVER='+new_server+';DATABASE='+new_db+';Trusted_Connection=yes')
cursor = connStr.cursor()
#data to list
import numpy as np
data = np.array(nk).tolist()
import math
#create batches
n_batches = 30 #optimal batch size for 10M records
batch_size = math.floor(len(nk) / n_batches)
#insert rows
for i in range(n_batches+1):
if i != n_batches:
sql = "set identity_insert [InvoiceNaturalKey] on INSERT INTO dbo.InvoiceNaturalKey("+ cols_str +") values ("+vals+") set identity_insert [InvoiceNaturalKey] off"
cursor.fast_executemany = True
cursor.executemany(sql, data[i*batch_size:((i+1)*batch_size)])
else:
cursor.executemany(sql, data[n_batches*batch_size:])
connStr.commit()
cursor.close()
connStr.close()
Upvotes: 1