Jami
Jami

Reputation: 589

programmatic creation of a list of data flow tasks in SSIS

I am writing some code to generate a customised SSIS package. I've used the following code for generating PipelineTask components:

    private static List<string> getAccessTables()
    {
        List<string> tables = new List<string>();
        OleDbConnection con = new OleDbConnection(AccessConnestionString);
        con.Open();

        OleDbCommand com = con.CreateCommand();

        IEnumerable<DataRow> dtTables = con.GetSchema("tables").AsEnumerable();
        tables = dtTables.Where(row => (!row[2].ToString().StartsWith("MSys")
            && !row[2].ToString().Contains('~'))).OrderBy(row => row[2].ToString()).Select(row => row[2].ToString()).ToList();
        File.WriteAllLines(@"C:\TableList", tables);
        return tables;

    }




    public static void BuildPackage(Microsoft.SqlServer.Dts.Tasks.ScriptTask.ScriptObjectModel Dts)
    {
        var tables = getAccessTables();

        var app = new Application();

        Package package = new Package();
        package.Name = "AccessToFlatFile";

        // Add the SQL OLE-DB connection
        ConnectionManager connectionManagerOleDb = package.Connections.Add("OLEDB");
        connectionManagerOleDb.Name = "OLEDB";
        connectionManagerOleDb.ConnectionString = AccessConnestionString;

        // Add the Data Flow Task 
        int cnt = 64;
        Executable tsk = null, prevTsk = null;

        for (int i = 0; i < Math.Ceiling(tables.Count * 1.0 / cnt * 1.0); i++)
        {
            if (i > 0)
            {
                prevTsk = tsk;
            }
            tsk = package.Executables.Add("STOCK:PipelineTask");
            //tsk = package.Executables[i];

            // Get the task host wrapper, and the Data Flow task
            TaskHost taskHost = tsk as TaskHost;
            MainPipe dataFlowTask = (MainPipe)taskHost.InnerObject;

            if (i > 0)
            {
                //prevTsk = package.Executables[i-1];
                PrecedenceConstraint pcPipelineTask =
                       package.PrecedenceConstraints.Add((Executable)prevTsk, (Executable)tsk);
            }

            foreach (var table in tables.Skip(cnt * i).Take(cnt))
            {

                // Add the Flat File DB connection, basic info only, will define add columns later
                ConnectionManager connectionManagerFlatFile = package.Connections.Add("FLATFILE");
                connectionManagerFlatFile.ConnectionString = @"G:\Dest\" + table + ".txt";
                connectionManagerFlatFile.Name = "FlatFile_" + table;
                connectionManagerFlatFile.Properties["Format"].SetValue(connectionManagerFlatFile, "Delimited");
                connectionManagerFlatFile.Properties["ColumnNamesInFirstDataRow"].SetValue(connectionManagerFlatFile, true);
                connectionManagerFlatFile.Properties["Unicode"].SetValue(connectionManagerFlatFile, true);



                // Add OLE-DB source component
                IDTSComponentMetaData100 componentSource = dataFlowTask.ComponentMetaDataCollection.New();
                componentSource.Name = "OLEDBSource_" + table;
                componentSource.ComponentClassID = app.PipelineComponentInfos["OLE DB Source"].CreationName;
                //            componentSource.ComponentClassID = "DTSAdapter.OleDbSource.3";

                // Get OLE-DB source design-time instance, and initialise component
                CManagedComponentWrapper instanceSource = componentSource.Instantiate();
                instanceSource.ProvideComponentProperties();

                // Set source connection
                componentSource.RuntimeConnectionCollection[0].ConnectionManagerID = connectionManagerOleDb.ID;
                componentSource.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connectionManagerOleDb);

                // Set the source properties
                instanceSource.SetComponentProperty("AccessMode", 2);
                instanceSource.SetComponentProperty("SqlCommand", "SELECT * FROM [" + table + "]");

                // Reinitialize the metadata, refresh columns
                instanceSource.AcquireConnections(null);
                instanceSource.ReinitializeMetaData();
                instanceSource.ReleaseConnections();
                componentSource.Name = "OLEDBSource_" + table;


                // Add Flat File destination
                IDTSComponentMetaData100 componentDestination = dataFlowTask.ComponentMetaDataCollection.New();
                componentDestination.Name = "FlatFileDestination_" + table;
                componentDestination.ComponentClassID = app.PipelineComponentInfos["Flat File Destination"].CreationName;

                // Get Flat File destination design-time instance, and initialise component
                CManagedComponentWrapper instanceDestination = componentDestination.Instantiate();
                instanceDestination.ProvideComponentProperties();

                // Set destination connection
                componentDestination.RuntimeConnectionCollection[0].ConnectionManagerID = connectionManagerFlatFile.ID;
                componentDestination.RuntimeConnectionCollection[0].ConnectionManager =
                    DtsConvert.GetExtendedInterface(connectionManagerFlatFile);

                IDTSPath100 path = dataFlowTask.PathCollection.New();
                path.AttachPathAndPropagateNotifications(componentSource.OutputCollection[0],
                    componentDestination.InputCollection[0]);


                // Get input and virtual input for destination to select and map columns
                IDTSInput100 destinationInput = componentDestination.InputCollection[0];
                IDTSVirtualInput100 destinationVirtualInput = destinationInput.GetVirtualInput();
                IDTSVirtualInputColumnCollection100 destinationVirtualInputColumns =
                    destinationVirtualInput.VirtualInputColumnCollection;

                // Get native flat file connection 
                RuntimeWrapper.IDTSConnectionManagerFlatFile100 connectionFlatFile =
                    connectionManagerFlatFile.InnerObject as RuntimeWrapper.IDTSConnectionManagerFlatFile100;

                // Create flat file connection columns to match pipeline
                int indexMax = destinationVirtualInputColumns.Count - 1;
                for (int index = 0; index <= indexMax; index++)
                {
                    // Get input column to replicate in flat file
                    IDTSVirtualInputColumn100 virtualInputColumn = destinationVirtualInputColumns[index];

                    // Add column to Flat File connection manager
                    RuntimeWrapper.IDTSConnectionManagerFlatFileColumn100 flatFileColumn =
                        connectionFlatFile.Columns.Add() as RuntimeWrapper.IDTSConnectionManagerFlatFileColumn100;
                    flatFileColumn.ColumnType = "Delimited";
                    flatFileColumn.ColumnWidth = virtualInputColumn.Length;
                    flatFileColumn.DataPrecision = virtualInputColumn.Precision;
                    flatFileColumn.DataScale = virtualInputColumn.Scale;
                    flatFileColumn.DataType = virtualInputColumn.DataType;
                    RuntimeWrapper.IDTSName100 columnName = flatFileColumn as RuntimeWrapper.IDTSName100;
                    columnName.Name = virtualInputColumn.Name;

                    if (index < indexMax)
                        flatFileColumn.ColumnDelimiter = "||";
                    else
                        flatFileColumn.ColumnDelimiter = "~~";
                }

                // Reinitialize the metadata, generating external columns from flat file columns
                instanceDestination.AcquireConnections(null);
                instanceDestination.ReinitializeMetaData();
                instanceDestination.ReleaseConnections();

                // Select and map destination columns
                foreach (IDTSVirtualInputColumn100 virtualInputColumn in destinationVirtualInputColumns)
                {
                    // Select column, and retain new input column
                    IDTSInputColumn100 inputColumn = instanceDestination.SetUsageType(destinationInput.ID,
                        destinationVirtualInput, virtualInputColumn.LineageID, DTSUsageType.UT_READONLY);
                    // Find external column by name
                    IDTSExternalMetadataColumn100 externalColumn =
                        destinationInput.ExternalMetadataColumnCollection[inputColumn.Name];
                    // Map input column to external column
                    instanceDestination.MapInputColumn(destinationInput.ID, inputColumn.ID, externalColumn.ID);
                }
            }


            app.SaveToXml(String.Format(@"G:\Dest\{0}.dtsx", package.Name + tables.Count.ToString()), package, null);

            package.Dispose();
        }
    }

}

The above code works fine in creating a number of DataFlow tasks. But if we uncomment the two commented lines the app behaves very strange. But there would not be any difference between them I suppose. Does anybody know why using package.Executables[i] and package.Executables[i-1] does not generate the expected result? does it have anything to do with COM objects and initialisation?

Thanks in advance.

Upvotes: 1

Views: 2202

Answers (1)

billinkc
billinkc

Reputation: 61211

Logically, the code is attempting to add a Precedence Constraint from the current object to the previously created one.

The code that works does this by keeping a reference to the current and previous objects while the failing code attempts to do this via ordinal positions. Referencing the previous ordinal works fine, the problem appears to be related to the creation of your task and the immediate re-assignment. The assumption is that it should be reassigned to itself but based on my observations, it does not work if that is the first thing done.

This is the troublesome statement tsk = package.Executables[i];

    private static List<string> getAccessTables()
    {
        List<string> tables = new List<string>() {"A", "B", "C", "D", "E", "F", "G" };
        return tables;
    }

    public static void BuildPackage(/*Microsoft.SqlServer.Dts.Tasks.ScriptTask.ScriptObjectModel Dts*/)
    {
        var tables = getAccessTables();
        var app = new Application();

        Package package = new Package();
        package.Name = "AccessToFlatFile";

        // Add the Data Flow Task 
        int cnt = 1;
        Executable tsk = null, prevTsk = null;

        for (int i = 0; i < Math.Ceiling(tables.Count * 1.0 / cnt * 1.0); i++)
        {
            if (i > 0)
            {
                prevTsk = tsk;
            }

            tsk = package.Executables.Add("STOCK:PipelineTask");
            //tsk = package.Executables[i];

            // Get the task host wrapper, and the Data Flow task
            TaskHost taskHost = tsk as TaskHost;
            taskHost.Name = string.Format("DFT {0}", tables[i]);

            if (i > 0)
            {
                Console.WriteLine(string.Format("Linking {0:36} to {1:36}", (tsk as TaskHost).Name, (prevTsk as TaskHost).Name));
                //prevTsk = package.Executables[i-1];
                PrecedenceConstraint pcPipelineTask =
                       package.PrecedenceConstraints.Add((Executable)prevTsk, (Executable)tsk);
            }
        }

        app.SaveToXml(String.Format(@"C:\Users\bfellows\documents\visual studio 2013\Projects\WTF\WTF\{0}.dtsx", package.Name + tables.Count.ToString()), package, null);

        package.Dispose();

        Console.WriteLine();
        AltBuildPackage();
    }

    public static void AltBuildPackage(/*Microsoft.SqlServer.Dts.Tasks.ScriptTask.ScriptObjectModel Dts*/)
    {
        var tables = getAccessTables();
        var app = new Application();

        Package package = new Package();
        package.Name = "AccessToFlatFile";

        // Add the Data Flow Task 
        int cnt = 1;
        Executable tsk = null, prevTsk = null;

        for (int i = 0; i < Math.Ceiling(tables.Count * 1.0 / cnt * 1.0); i++)
        {
            tsk = package.Executables.Add("STOCK:PipelineTask");
            // This line is what is causing you pain. I don't know why
            // Theory is that you're losing your reference
            Console.WriteLine(string.Format("\tHash before {0}", tsk.GetHashCode()));
            tsk = package.Executables[i];
            Console.WriteLine(string.Format("\tHash after  {0}", tsk.GetHashCode()));


            // Get the task host wrapper, and the Data Flow task
            TaskHost taskHost = tsk as TaskHost;
            taskHost.Name = string.Format("DFT {0}", tables[i]);

            // if the above tsk = assignment is delayed to this point
            // the reassignment works fine.
            tsk = package.Executables[i];

            if (i > 0)
            {                    
                prevTsk = package.Executables[i - 1];
                Console.WriteLine(string.Format("Linking {0:36} to {1:36}", (tsk as TaskHost).Name, (prevTsk as TaskHost).Name));
                PrecedenceConstraint pcPipelineTask =
                       package.PrecedenceConstraints.Add((Executable)prevTsk, (Executable)tsk);
            }
        }

        app.SaveToXml(String.Format(@"C:\Users\bfellows\documents\visual studio 2013\Projects\WTF\WTF\Alt{0}.dtsx", package.Name + tables.Count.ToString()), package, null);

        package.Dispose();
    }

Output

Linking DFT B to DFT A
Linking DFT C to DFT B
Linking DFT D to DFT C
Linking DFT E to DFT D
Linking DFT F to DFT E
Linking DFT G to DFT F

        Hash before 94299808
        Hash after  94299808
        Hash before 94320648
        Hash after  94299808
Linking DFT B to {7E0B0C2B-7C69-4EDF-9EDE-8B2382B8221D}
        Hash before 94346952
        Hash after  94299808
Linking DFT C to {7E0B0C2B-7C69-4EDF-9EDE-8B2382B8221D}

Upvotes: 1

Related Questions