Marc.2377
Marc.2377

Reputation: 8764

LINQ To SQL in a parallel loop: How to prevent duplicate insertions?

I'm running into some trouble in trying to parallelize a computationally expensive API integration.

The integration queries an API in parallel and populates a ConcurrentBag collection. Some processing is done, and then it is passed to Parallel.ForEach() in which it is interfaced with the database by using LINQ To Sql.

There is:

  1. one outer loop which runs in parallel for Courses

  2. an inner loop through Disciplines

  3. inside it, another loop iterating through Lessons.

The problem I'm running into is: as any one lesson may belong to more than one course, looping over courses in parallel means that sometimes a lesson will be inserted more than once.

The code currently looks like this:

(externalCourseList is the collection of type ConcurrentBag<ExternalCourse>.)

Parallel.ForEach(externalCourseList, externalCourse =>
{
    using ( var context = new DataClassesDataContext() )
    {
        var dbCourse = context.Courses.Single(
            x => x.externalCourseId == externalCourse.courseCode.ToString());

        dbCourse.ShortDesc = externalCourse.Summary;
        //dbCourse.LongDesc = externalCourse.MoreInfo;
        //(etc)

        foreach (var externalDiscipline in externalCourse.Disciplines)
        {
            var dbDiscipline = context.Disciplines.Where(
                x => x.ExternalDisciplineID == externalDiscipline.DisciplineCode
                   .ToString())
               .SingleOrDefault();

            if (dbDiscipline == null)
                dbDiscipline = new Linq2SQLEntities.Discipline();

            dbDiscipline.Title = externalDiscipline.Name;
            //(etc)
            dbDiscipline.ExternalDisciplineID = externalDiscipline.DisciplineCode
                .ToString();

            if (!dbDiscipline.IsLoaded)
                context.Disciplines.InsertOnSubmit(dbDiscipline);

            // relational table used as one-to-many relationship for legacy reasons
            var courseDiscipline = dbDiscipline.Course_Disciplines.SingleOrDefault(
                x => x.CourseID == dbCourse.CourseID);

            if (courseDiscipline == null)
            {
                courseDiscipline = new Course_Discipline
                {
                    Course = dbCourse,
                    Discipline = dbDiscipline
                };

                context.Course_Disciplines.InsertOnSubmit(courseDiscipline);
            }

            foreach (var externalLesson in externalDiscipline.Lessons)
            {
                /// The next statement throws an exception
                var dbLesson = context.Lessons.Where(
                    x => x.externalLessonID == externalLesson.LessonCode)
                        .SingleOrDefault();

                if (dbLesson == null)
                    dbLesson = new Linq2SQLEntities.Lesson();

                dbLesson.Title = externalLesson.Title;
                //(etc)
                dbLesson.externalLessonID = externalLesson.LessonCode;

                if (!dbLesson.IsLoaded)
                    context.Lessons.InsertOnSubmit(dbLesson);

                var disciplineLesson = dbLesson.Discipline_Lessons.SingleOrDefault(
                    x => x.DisciplineID == dbDiscipline.DisciplineID
                        && x.LessonID == dbLesson.LessonID);

                if (disciplineLesson == null)
                {
                    disciplineLesson = new Discipline_Lesson
                    {
                        Discipline = dbDiscipline,
                        Lesson = dbLesson
                    };

                    context.Discipline_Lessons.InsertOnSubmit(disciplineLesson);
                }
            }

        }

        context.SubmitChanges();
    }
});

(IsLoaded is implemented as described here.)

An exception is thrown at the line preceded with /// because the same lesson is often inserted multiple times and calling .SingleOrDefault() on context.Lessons.Where(x => x.externalLessonID == externalLesson.LessonCode) fails.

What would be the best way to solve this?

Upvotes: 1

Views: 464

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43996

One approach could be to separate the insertion of the lessons in the database from the other work that has to be done in parallel. I haven't studied your code deeply, so I am not sure if this approach is feasible, but I'll give an example anyway. The basic idea is to serialize the insertion of the lessons, in order to avoid the problems caused by the parallelization:

IEnumerable<Lesson[]> query = externalCourseList
    .AsParallel()
    .AsOrdered() // Optional
    .Select(externalCourse =>
    {
        using DataClassesDataContext context = new();
        List<Lesson> results = new();
        // Here do the work that adds lessons in the results list.
        return results.ToArray();
    }
    .AsSequential();

This is a parallel query (PLINQ), that does the parallel work while it is enumerated. So at this point it hasn't started yet. Now let's enumerate it:

using DataClassesDataContext context = new();
foreach (Lesson lesson in query.SelectMany(x => x))
{
    // Here insert the lesson in the DB.
}

The work of inserting the lessons in the DB will be done exclusively on the current thread. This thread will also participate in the work inside the parallel query, along with ThreadPool threads. In case this is a problem, you could offload the enumeration of the query on a ThreadPool thread, freeing the current thread from doing anything else than the lesson-inserting work. I've posted an OffloadEnumeration extension method here, that you could use just before starting the enumeration of the query:

query = OffloadEnumeration(query);

Upvotes: 1

Related Questions