David Gerst
David Gerst

Reputation: 73

Data type 0x2A has an invalid data length or metadata length

I am trying to upload a million records from an Oracle database to SQL Server data warehouse. I've used similar code previously to upload/sync data from SQL Server and a Synergex db via ODBC with great success. However, with Oracle I've hit a roadblock. I keep getting the following error.

Microsoft.Data.SqlClient.SqlException: 'The incoming tabular data stream (TDS) remote procedure call (RPC) protocol stream is incorrect. Table-valued parameter 1 (""), row 3, column 2: Data type 0x2A has an invalid data length or metadata length.
The data for table-valued parameter "@UserHistory" doesn't conform to the table type of the parameter. SQL Server error is: 8037, state: 83 The statement has been terminated.'

The code is

public static async Task StreamUserHistoryToDWAsync(IConfigurationRoot config)
{
    try
    {
        var watch = System.Diagnostics.Stopwatch.StartNew();

        string query = @"WBH_DATAWAREHOUSE.WBH_UserHistory";

        OracleConnection OraCn = new OracleConnection(config.GetConnectionString("SynapseProd"));
        await OraCn.OpenAsync();

        OracleCommand cmd = new OracleCommand(query, OraCn);
        cmd.CommandType = CommandType.StoredProcedure;

        cmd.Parameters.Add(new OracleParameter("MaxBegtime", GetMaxUserHistory(config)));
        cmd.Parameters.Add(new OracleParameter("rcUserHistory", OracleDbType.RefCursor, ParameterDirection.Output));

        OracleDataReader odr =  await cmd.ExecuteReaderAsync(CommandBehavior.CloseConnection);

        string insertQuery = "[Synapse].[InsertUserHistory]";

        SqlConnection sqlCn = new SqlConnection(config.GetConnectionString("powerbi"));
        SqlCommand sqlCmd = new SqlCommand(insertQuery, sqlCn);
        sqlCmd.CommandType = CommandType.StoredProcedure;

        await sqlCn.OpenAsync();

        SqlParameter tvp = new SqlParameter("@UserHistory", odr);
        tvp.SqlDbType = SqlDbType.Structured;

        SqlParameter rtn = new SqlParameter("@rtn_result", SqlDbType.Int);
        rtn.Direction = ParameterDirection.Output;

        sqlCmd.Parameters.Add(tvp);
        sqlCmd.Parameters.Add(rtn);

        await sqlCmd.ExecuteNonQueryAsync();
    
        sqlCmd.Connection.Close();

        watch.Stop();
        var elapsed = watch.ElapsedMilliseconds;

        Console.WriteLine(elapsed.ToString());

        logger.Info("User History elapsed time: {time}", elapsed.ToString());
    }
    catch (Exception ex)
    {
        logger.Error(ex, "StreamUserHistoryToDW Error");
    }
}

The parameter @UserHistory is a table value parameter. It matches the datatypes exactly for what's coming from Oracle. The stored procedure simply inserts the data. not sure if needed for troubleshoot, because I commented out the stored procedure to do "nothing" but return a value. The issue seems to be with the table value parameter, not the procedure.

CREATE TYPE [Synapse].[UserHistoryTableType] AS TABLE
(
    [NAMEID] [varchar](12) NOT NULL,
    [BEGTIME] [datetime] NOT NULL,
    [EVENT] [varchar](4) NOT NULL,
    [ENDTIME] [datetime] NULL,
    [FACILITY] [varchar](3) NULL,
    [CUSTID] [varchar](10) NULL,
    [EQUIPMENT] [varchar](2) NULL,
    [UNITS] [int] NULL,
    [ETC] [nvarchar](255) NULL,
    [ORDERID] [int] NULL,
    [SHIPID] [smallint] NULL,
    [LOCATION] [varchar](10) NULL,
    [LPID] [varchar](20) NULL,
    [ITEM] [varchar](50) NULL,
    [UOM] [varchar](4) NULL,
    [BASEUOM] [varchar](4) NULL,
    [BASEUNITS] [int] NULL,
    [CUBE] [decimal](10, 4) NULL,
    [WEIGHT] [decimal](17, 8) NULL,
    [EMPLOYEECOST] [decimal](10, 2) NULL,
    [EQUIPMENTCOST] [decimal](10, 2) NULL
)

enter image description here

So, the interesting thing is if I limit the query results to two rows or less, the stored procedure saves the data in SQL. If I try to do three rows, I get the error. Doesn't matter what data I use, so it's not something "wrong" with the data.

I am using Oracle.ManagedAccess.Core 23.6.1 with .NET 8.0.

Do I need to find a different way to upload the data to SQL Server? Anything you'd recommend?

Thanks

Upvotes: 1

Views: 63

Answers (1)

David Gerst
David Gerst

Reputation: 73

I got my code to work. My code now looks like this. This article got me pointed in the right direction. How do I specify to use server defined values in a Table Valued Parameter when my source is a DataReader?

    public static async Task StreamUserHistoryToDWAsync3(IConfigurationRoot config)
{
    try
    {
        var watch = System.Diagnostics.Stopwatch.StartNew();

        string query = @"WBH_DATAWAREHOUSE.WBH_UserHistory";
        string insertQuery = "[Synapse].[InsertUserHistory]";

        using (SqlConnection sqlCn = new SqlConnection(config.GetConnectionString("powerbi")))
        using (OracleConnection oraCn = new OracleConnection(config.GetConnectionString("SynapseProd")))
        {
            await sqlCn.OpenAsync();
            await oraCn.OpenAsync();

            using (OracleCommand oraCmd = new OracleCommand(query, oraCn))
            using (SqlCommand sqlCmd = new SqlCommand(insertQuery, sqlCn))
            {
                oraCmd.CommandType = CommandType.StoredProcedure;
                oraCmd.Parameters.Add(new OracleParameter("MaxBegtime", GetMaxUserHistory(config)));
                oraCmd.Parameters.Add(new OracleParameter("rcUserHistory", OracleDbType.RefCursor, ParameterDirection.Output));
                
                sqlCmd.CommandType = CommandType.StoredProcedure;

                using (OracleDataReader odr = oraCmd.ExecuteReader())
                {
                    SqlParameter tvp = new SqlParameter("@UserHistory", SendRowsToProc(odr));
                    tvp.SqlDbType = SqlDbType.Structured;
                    tvp.TypeName = "[Synapse].[UserHistoryTableType]";
                    SqlParameter rtn = new SqlParameter("@rtn_result", SqlDbType.Int);
                    rtn.Direction = ParameterDirection.Output;
                    sqlCmd.Parameters.Add(tvp);
                    sqlCmd.Parameters.Add(rtn);
                    await sqlCmd.ExecuteNonQueryAsync();
                }
            }
        }


        watch.Stop();
        var elapsed = watch.ElapsedMilliseconds;

        Console.WriteLine(elapsed.ToString());

        logger.Info("User History elapsed time: {time}", elapsed.ToString());

    }
    catch (Exception ex)
    {
        logger.Error(ex, "StreamUserHistoryToDW Error");
    }
}


private static IEnumerable<SqlDataRecord> SendRowsToProc(OracleDataReader reader)
{
    if (!reader.HasRows)
    {
        yield break;
    }

    SqlDataRecord resultRow = new SqlDataRecord(new SqlMetaData[] {
        new SqlMetaData("NAMEID", SqlDbType.VarChar,12),
        new SqlMetaData("BEGTIME", SqlDbType.DateTime),
        new SqlMetaData("EVENT", SqlDbType.VarChar,4),
        new SqlMetaData("ENDTIME", SqlDbType.DateTime),
        new SqlMetaData("FACILITY", SqlDbType.VarChar,3),
        new SqlMetaData("CUSTID", SqlDbType.VarChar,10),
        new SqlMetaData("EQUIPMENT", SqlDbType.VarChar,2),
        new SqlMetaData("UNITS", SqlDbType.Int),
        new SqlMetaData("ETC", SqlDbType.VarChar,255),
        new SqlMetaData("ORDERID", SqlDbType.Int),
        new SqlMetaData("SHIPID", SqlDbType.SmallInt),
        new SqlMetaData("LOCATION", SqlDbType.VarChar,10),
        new SqlMetaData("LPID", SqlDbType.VarChar,20),
        new SqlMetaData("ITEM", SqlDbType.VarChar, 50),
        new SqlMetaData("UOM", SqlDbType.VarChar, 4),
        new SqlMetaData("BASEUOM", SqlDbType.VarChar,4),
        new SqlMetaData("BASEUNITS", SqlDbType.Int),
        new SqlMetaData("CUBE", SqlDbType.Float),
        new SqlMetaData("WEIGHT", SqlDbType.Decimal, 17,8),
        new SqlMetaData("EMPLOYEECOST", SqlDbType.Decimal,10,2),
        new SqlMetaData("EQUIPMENTCOST", SqlDbType.Decimal, 10, 2)
    });

    while (reader.Read())
    {
        try
        {
            resultRow.SetString(0, ConvertFromDBVal<string>(reader.GetValue("NAMEID").ToString()));
            resultRow.SetDateTime(1, ConvertFromDBVal<DateTime>(reader.GetValue("BEGTIME")));
            resultRow.SetString(2, ConvertFromDBVal<string>(reader.GetValue("EVENT").ToString()));
            resultRow.SetDateTime(3, ConvertFromDBVal<DateTime>(reader.GetValue("ENDTIME")));
            resultRow.SetString(4, ConvertFromDBVal<string>(reader.GetValue("FACILITY").ToString()));
            resultRow.SetString(5, ConvertFromDBVal<string>(reader.GetValue("CUSTID").ToString()));
            resultRow.SetString(6, ConvertFromDBVal<string>(reader.GetValue("EQUIPMENT").ToString()));
            resultRow.SetInt32(7, ConvertFromDBVal<Int32>(reader.GetValue("UNITS")));
            resultRow.SetString(8, ConvertFromDBVal<string>(reader.GetValue("ETC").ToString()));
            resultRow.SetSqlInt32(9, ConvertFromDBVal<Int32>(reader.GetValue("ORDERID")));
            resultRow.SetSqlInt16(10, ConvertFromDBVal<Int16>(reader.GetValue("SHIPID")));
            resultRow.SetString(11, ConvertFromDBVal<string>(reader.GetValue("LOCATION").ToString()));
            resultRow.SetString(12, ConvertFromDBVal<string>(reader.GetValue("LPID").ToString()));
            resultRow.SetString(13, ConvertFromDBVal<string>(reader.GetValue("ITEM").ToString()));
            resultRow.SetString(14, ConvertFromDBVal<string>(reader.GetValue("UOM").ToString()));
            resultRow.SetString(15, ConvertFromDBVal<string>(reader.GetValue("BASEUOM").ToString()));
            resultRow.SetInt32(16, ConvertFromDBVal<Int32>(reader.GetValue("BASEUNITS")));
            resultRow.SetDouble(17, ConvertFromDBVal<double>(reader.GetValue("CUBE")));
            resultRow.SetSqlDecimal(18, ConvertFromDBVal<decimal>(reader.GetValue("WEIGHT")));
            resultRow.SetSqlDecimal(19, ConvertFromDBVal<decimal>(reader.GetValue("EMPLOYEECOST")));
            resultRow.SetSqlDecimal(20, ConvertFromDBVal<decimal>(reader.GetValue("EMPLOYEECOST")));
        }
        catch(Exception ex)
        {
            logger.Error(ex, "Setting up resultrow Error");
        }
        
        yield return resultRow;
    }
}

public static T ConvertFromDBVal<T>(object obj)
{
    if (obj == null || obj == DBNull.Value)
    {
        return default(T); // returns the default value for the type
    }
    else
    {
        return (T)obj;
    }
}

Upvotes: 0

Related Questions