Cameron Tinker
Cameron Tinker

Reputation: 9789

Is it safe to access SQL Server CE from multiple threads?

I have a large CSV database of about 5MB with ZIP codes, cities, and states that I'm trying to import into a SQL Server CE database.

Using a single thread, the process is estimated to take about 3 hours to complete. While this is fine for getting the job done, I'd like to try and split up the task across multiple threads to cut down on the 3 hours total time. If I create a SqlCeConnection object on each thread, is it safe to run commands on each thread simultaneously?

I have a feeling that there would be issues with concurrency and deadlocks. Here is where I found the CSV database: http://www.unitedstateszipcodes.org/zip-code-database/

Here is my relevant code:

List<AddressSet> addressList;

public void OpenCSV(string file)
{
    var addresses = from line in File.ReadAllLines(file).Skip(1)
                    let columns = line.Split(',')
                    select new AddressSet
                    {
                        ZipCode = columns[0].Replace("\"", "").Trim(),
                        City = columns[2].Replace("\"", "").Trim(),
                        State = columns[5].Replace("\"", "").Trim()
                    };
    addressList = addresses.ToList();

    Thread worker = new Thread(new ThreadStart(ProcessData));
    worker.Start();

}

private void ProcessData()
{
    try
    {
        int i = 1;
        DateTime operationStart = DateTime.Now;
        foreach (AddressSet address in addressList)
        {
            int stateId = InsertState(address.State);
            int zipCodeId = InsertZipCode(address.ZipCode, stateId);
            int cityId = InsertCity(address.City, stateId);

            UpdateRelationships(zipCodeId, cityId);
            float pct = i / (float)addressList.Count() * 100;
            TimeSpan timeSinceStart = DateTime.Now.Subtract(operationStart);
            TimeSpan totalTime = TimeSpan.FromMilliseconds(timeSinceStart.TotalMilliseconds / (pct/100));
            TimeSpan timeLeft = totalTime - timeSinceStart;
            //richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.Text = pct.ToString("N2") + "% (" + i + " of " + addressList.Count().ToString() + ") " + address.City + ", " + address.State + " " + address.ZipCode 
            //    + "\nEstimated Total Time: " + totalTime.Days.ToString() + " days, " + totalTime.Hours.ToString() + " hours, " + totalTime.Minutes.ToString() + " minutes" +
            //    " - Time Left: " + timeLeft.Days.ToString() + " days, " + timeLeft.Hours.ToString() + " hours, " + timeLeft.Minutes.ToString() + " minutes"));
            richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.Text = pct.ToString("N2") + "% (" + i + " of " + addressList.Count().ToString() + ") " + address.City + ", " + address.State + " " + address.ZipCode
                + "\nEstimated Total Time: " + totalTime.ToString("h'h 'm'm 's's'") +
                "\nTime Left: " + timeLeft.ToString("h'h 'm'm 's's'") +
                "\nRunning Time: " + timeSinceStart.ToString("h'h 'm'm 's's'")));
            richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.SelectionStart = richTextBox1.Text.Length));
            richTextBox1.BeginInvoke((MethodInvoker)(() => richTextBox1.ScrollToCaret()));
            i++;
        }
        this.Invoke(new Action(() =>
        {
            MessageBox.Show("Done!");
            btnChooseCSV.Enabled = true;
        }));
    }
    catch (Exception ex)
    {
        this.Invoke(new Action(() =>
        {
            MessageBox.Show(ex.Message);
        }));
    }
}

private int InsertZipCode(string zipCode, int stateId)
{
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString;
    SqlCeConnection connection = new SqlCeConnection(connstr);
    connection.Open();

    SqlCeCommand command = new SqlCeCommand("SELECT COUNT(*) FROM ZipCode WHERE ZipCode = @ZipCode", connection);

    command.Parameters.AddWithValue("ZipCode", zipCode);

    int result = (int)command.ExecuteScalar();

    // if nothing found, insert
    if (result == 0)
    {
        command = new SqlCeCommand("INSERT INTO ZipCode(ZipCode, StateId) VALUES(@ZipCode, @StateId)", connection);
        command.Parameters.AddWithValue("ZipCode", zipCode);
        command.Parameters.AddWithValue("StateId", stateId);
        command.ExecuteNonQuery();

        command = new SqlCeCommand("SELECT @@IDENTITY", connection);
    }

    if (result == 1)
    {
        command = new SqlCeCommand("SELECT ZipCodeId FROM ZipCode WHERE ZipCode = @ZipCode", connection);
        command.Parameters.AddWithValue("ZipCode", zipCode);
    }

    string test = command.ExecuteScalar().ToString();
    result = int.Parse(test);

    connection.Close();
    return result;
}

private int InsertCity(string city, int stateId)
{
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString;
    SqlCeConnection connection = new SqlCeConnection(connstr);
    connection.Open();

    SqlCeCommand command = new SqlCeCommand("SELECT COUNT(*) FROM City WHERE CityName = @City", connection);
    command.Parameters.AddWithValue("City", city);
    int result = (int)command.ExecuteScalar();

    // if nothing found, insert
    if (result == 0)
    {
        command = new SqlCeCommand("INSERT INTO City(CityName, StateId) VALUES(@City, @StateId)", connection);
        command.Parameters.AddWithValue("City", city);
        command.Parameters.AddWithValue("StateId", stateId);
        command.ExecuteNonQuery();

        command = new SqlCeCommand("SELECT @@IDENTITY", connection);
    }

    if (result == 1)
    {
        command = new SqlCeCommand("SELECT CityId FROM City WHERE CityName = @City", connection);
        command.Parameters.AddWithValue("City", city);
    }
    string test = command.ExecuteScalar().ToString();
    result = int.Parse(test);

    connection.Close();
    return result;
}

private int InsertState(string state)
{
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString;
    SqlCeConnection connection = new SqlCeConnection(connstr);
    connection.Open();

    SqlCeCommand command = new SqlCeCommand("SELECT COUNT(*) FROM State WHERE State = @State", connection);
    command.Parameters.AddWithValue("State", state);
    int result = (int)command.ExecuteScalar();

    // if nothing found, insert
    if (result == 0)
    {
        command = new SqlCeCommand("INSERT INTO State(State) VALUES(@State)", connection);
        command.Parameters.AddWithValue("State", state);
        command.ExecuteNonQuery();

        command = new SqlCeCommand("SELECT @@IDENTITY", connection);
    }

    if (result == 1)
    {
        command = new SqlCeCommand("SELECT StateId FROM State WHERE State = @State", connection);
        command.Parameters.AddWithValue("State", state);
    }
    string test = command.ExecuteScalar().ToString();
    result = int.Parse(test);

    connection.Close();
    return result;
}

private void UpdateRelationships(int zipCodeId, int cityId)
{
    string connstr = System.Configuration.ConfigurationManager.ConnectionStrings["AddressInformation"].ConnectionString;
    SqlCeConnection connection = new SqlCeConnection(connstr);
    connection.Open();

    SqlCeCommand command = new SqlCeCommand("INSERT INTO CityZipCode(CityId, ZipCodeId) VALUES(@CityId, @ZipCodeId)", connection);

    command.Parameters.AddWithValue("CityId", cityId);
    command.Parameters.AddWithValue("ZipCodeId", zipCodeId);
    command.ExecuteNonQuery();

    connection.Close();
}

Edit:

Just to clarify, I'm not just simply inserting each row of information from the CSV file. I'm changing how the data is laid out by inserting each respective item into separate tables and adding relationships between each entity.

For example, a city can have multiple zip codes and a zip code can sometimes cover multiple cities so that would be represented by a many to many relationship. Cities and zip codes have only one state so that relationship is many to one.

I have a table for cities, zip codes, and states. I also have a table for relating cities to zip codes. I will need to modify my relationship table schema to take into effect that cities with the same name may exist in multiple states. The relationship table should really be a set including the city, state and zip code and not just the city and zip code.

My end goal is to distribute the SQL Server CE database with password protection with another application for city, state and zip code validation. I don't want to distribute the CSV database as anyone could change that to pass validation.

Upvotes: 0

Views: 5304

Answers (2)

FabianCook
FabianCook

Reputation: 20557

Just a suggestion, I am doing the same kind of thing, and this is what I have, it is extremely fast compared to the simple solution

public static DataTable CSVToDataTable(string path, string name)
{
    return CSVToDataTable(Path.Combine(path, name));
}

public static DataTable CSVToDataTable(string path)
{
    DataTable res = new DataTable();
    if (!File.Exists(path))
    {
        return res;
    }
    using (FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
    {
        using (StreamReader re = new StreamReader(stream))
        {
            if (re.EndOfStream)
                return res;
            string line = re.ReadLine();
            if (line.IsNullOrWhiteSpace())
                return res;
            string[] headers = LineToArray(line);
            foreach (string header in headers)
            {
                res.Columns.Add(header);
            }
            int i = 0;
            string[] cells = new string[0];
            DataRow row = null;
            while (!re.EndOfStream)
            {
                line = re.ReadLine();
                if (line.IsNullOrWhiteSpace())
                    continue;
                cells = LineToArray(line);
                row = res.NewRow();
                for (i = 0; i < headers.Length && i < cells.Length; i += 1)
                {
                    row[i] = cells[i];
                }
                res.Rows.Add(row);
            }
        }
    }
    return res;
}

private static string[] LineToArray(string line, char delimiter = ',')
{
    if (line.Contains("\""))
    {
        List<string> l = new List<string>();
        bool inq = false;
        string cell = string.Empty;
        char lastCh = 'x';
        foreach (char ch in line)
        {
            if (ch == '"')
            {
                if (cell.Length == 0)
                {
                    inq = true;
                }
                else if (lastCh == '\\')
                {
                    cell += ch;
                }
                else
                {
                    inq = false;
                }
            }
            else if (delimiter == ch)
            {
                if (inq)
                {
                    cell += ch;
                }
                else
                {
                    l.Add(cell);
                    inq = false;
                    cell = string.Empty;
                }
            }
            else
            {
                cell += ch;
            }
            if (inq)
                lastCh = ch;
            else
                lastCh = 'x';
        }
        return l.ToArray();
    }
    else
    {
        return line.Split(new String[] { delimiter.ToString() }, StringSplitOptions.None);
    }
}

public void insert(string path, string name, string table, bool KeepNulls){

    DataTable data = CSVToDataTable(path, name);
    //do data manipulation here

    SqlCeBulkCopyOptions options = new SqlCeBulkCopyOptions();
    if (KeepNulls)
    {
        options = options |= SqlCeBulkCopyOptions.KeepNulls;
    }
    using (SqlCeBulkCopy bc = new SqlCeBulkCopy(Fastway_Remote_Agent.Properties.Settings.Default.DatabaseConnectionString, options))
    {
        bc.DestinationTableName = table;
        bc.WriteToServer(data);
    }
}

Using this library: http://sqlcebulkcopy.codeplex.com/

Also for thread pooling (Change it to meet your needs):

/// <summary>
/// Manages open connections on a per-thread basis
/// </summary>
public abstract class SqlCeConnectionPool
{
    private static Dictionary<int, DBCon> threadConnectionMap = new Dictionary<int, DBCon>();

    private static Dictionary<int, Thread> threadMap = new Dictionary<int, Thread>();

    /// <summary>
    /// The connection map
    /// </summary>
    public static Dictionary<int, DBCon> ThreadConnectionMap
    {
        get { return SqlCeConnectionPool.threadConnectionMap; }
    }

    /// <summary>
    /// Gets the connection string.
    /// </summary>
    /// <value>The connection string.</value>
    public static ConnectionString ConnectionString
    {
        get { return global::ConnectionString.Default; }
    }

    /// <summary>
    /// Gets a connection for this thread, maintains one open one of each.
    /// </summary>
    /// <remarks>Don't do this with anything but SQL compact edition or you'll run out of connections - compact edition is not
    /// connection pooling friendly and unloads itself too often otherwise so that is why this class exists</remarks> 
    /// <returns>An open connection</returns>
    public static DBCon Connection
    {
        get
        {
            lock (threadConnectionMap)
            {
                //do some quick maintenance on existing connections (closing those that have no thread)
                List<int> removeItems = new List<int>();
                foreach (var kvp in threadConnectionMap)
                {
                    if (threadMap.ContainsKey(kvp.Key))
                    {
                        if (!threadMap[kvp.Key].IsAlive)
                        {
                            //close the connection
                            if (!kvp.Value.Disposed)
                                kvp.Value.Dispose();
                            removeItems.Add(kvp.Key);
                        }
                    }
                    else
                    {
                        if (!kvp.Value.Disposed)
                            kvp.Value.Dispose();
                        removeItems.Add(kvp.Key);
                    }
                }
                foreach (int i in removeItems)
                {
                    threadMap.Remove(i);
                    threadConnectionMap.Remove(i);
                }

                //now issue the appropriate connection for our current thread
                int threadId = Thread.CurrentThread.ManagedThreadId;

                DBCon connection = null;
                if (threadConnectionMap.ContainsKey(threadId))
                {
                    connection = threadConnectionMap[threadId];
                    if (connection.Disposed)
                    {
                        if (threadConnectionMap.ContainsKey(threadId))
                            threadConnectionMap.Remove(threadId);
                        if (threadMap.ContainsKey(threadId))
                            threadMap.Remove(threadId);
                        connection = null;
                    }
                    else if (connection.Connection.State == ConnectionState.Broken)
                    {
                        connection.Dispose();
                        if (threadConnectionMap.ContainsKey(threadId))
                            threadConnectionMap.Remove(threadId);
                        if (threadMap.ContainsKey(threadId))
                            threadMap.Remove(threadId);
                        connection = null;
                    }
                    else if (connection.Connection.State == ConnectionState.Closed)
                    {
                        connection.Dispose();
                        if (threadConnectionMap.ContainsKey(threadId))
                            threadConnectionMap.Remove(threadId);
                        if (threadMap.ContainsKey(threadId))
                            threadMap.Remove(threadId);
                        connection = null;
                    }

                }
                if (connection == null)
                {
                    connection = new DBCon(ConnectionString);
                    //connection.Connection.Open();
                    if (threadConnectionMap.ContainsKey(threadId))
                        threadConnectionMap[threadId] = connection;
                    else
                        threadConnectionMap.Add(threadId, connection);
                    if (threadMap.ContainsKey(threadId))
                        threadMap[threadId] = Thread.CurrentThread;
                    else
                        threadMap.Add(threadId, Thread.CurrentThread);

                }
                return connection;
            }
        }
    }
}

Upvotes: 1

KF2
KF2

Reputation: 10153

You must create a connection object per thread, it is not safe for multithreading:

SqlCeConnection Class

Edited

SQL CE objects are not thread-safe and are not thread affinitive either. If an instance of SqlCeConnection or SqlCeTransaction is shared across threads without ensuring thread safety, then that may lead to Access Violation exception.

It is recommended that each thread should use a separate connection than sharing across. If there is really a need for sharing SQL CE objects across threads, then the application should serialize access to these objects.

Multithreaded programming with SQL Server Compact

Why you don't use SQL Server Compact Toolbox You can use it, which generates INSERT statements based on a CSV file.

or use Conversion of CSV to SQLCE database app

Upvotes: 4

Related Questions