Reputation: 8764
I'm running into some trouble in trying to parallelize a computationally expensive API integration.
The integration queries an API in parallel and populates a ConcurrentBag
collection. Some processing is done, and then it is passed to Parallel.ForEach()
in which it is interfaced with the database by using LINQ To Sql.
There is:
one outer loop which runs in parallel for Courses
an inner loop through Disciplines
inside it, another loop iterating through Lessons.
The problem I'm running into is: as any one lesson may belong to more than one course, looping over courses in parallel means that sometimes a lesson will be inserted more than once.
The code currently looks like this:
(externalCourseList
is the collection of type ConcurrentBag<ExternalCourse>
.)
Parallel.ForEach(externalCourseList, externalCourse =>
{
using ( var context = new DataClassesDataContext() )
{
var dbCourse = context.Courses.Single(
x => x.externalCourseId == externalCourse.courseCode.ToString());
dbCourse.ShortDesc = externalCourse.Summary;
//dbCourse.LongDesc = externalCourse.MoreInfo;
//(etc)
foreach (var externalDiscipline in externalCourse.Disciplines)
{
var dbDiscipline = context.Disciplines.Where(
x => x.ExternalDisciplineID == externalDiscipline.DisciplineCode
.ToString())
.SingleOrDefault();
if (dbDiscipline == null)
dbDiscipline = new Linq2SQLEntities.Discipline();
dbDiscipline.Title = externalDiscipline.Name;
//(etc)
dbDiscipline.ExternalDisciplineID = externalDiscipline.DisciplineCode
.ToString();
if (!dbDiscipline.IsLoaded)
context.Disciplines.InsertOnSubmit(dbDiscipline);
// relational table used as one-to-many relationship for legacy reasons
var courseDiscipline = dbDiscipline.Course_Disciplines.SingleOrDefault(
x => x.CourseID == dbCourse.CourseID);
if (courseDiscipline == null)
{
courseDiscipline = new Course_Discipline
{
Course = dbCourse,
Discipline = dbDiscipline
};
context.Course_Disciplines.InsertOnSubmit(courseDiscipline);
}
foreach (var externalLesson in externalDiscipline.Lessons)
{
/// The next statement throws an exception
var dbLesson = context.Lessons.Where(
x => x.externalLessonID == externalLesson.LessonCode)
.SingleOrDefault();
if (dbLesson == null)
dbLesson = new Linq2SQLEntities.Lesson();
dbLesson.Title = externalLesson.Title;
//(etc)
dbLesson.externalLessonID = externalLesson.LessonCode;
if (!dbLesson.IsLoaded)
context.Lessons.InsertOnSubmit(dbLesson);
var disciplineLesson = dbLesson.Discipline_Lessons.SingleOrDefault(
x => x.DisciplineID == dbDiscipline.DisciplineID
&& x.LessonID == dbLesson.LessonID);
if (disciplineLesson == null)
{
disciplineLesson = new Discipline_Lesson
{
Discipline = dbDiscipline,
Lesson = dbLesson
};
context.Discipline_Lessons.InsertOnSubmit(disciplineLesson);
}
}
}
context.SubmitChanges();
}
});
(IsLoaded
is implemented as described here.)
An exception is thrown at the line preceded with ///
because the same lesson is often inserted multiple times and calling .SingleOrDefault()
on context.Lessons.Where(x => x.externalLessonID == externalLesson.LessonCode)
fails.
What would be the best way to solve this?
Upvotes: 1
Views: 464
Reputation: 43996
One approach could be to separate the insertion of the lessons in the database from the other work that has to be done in parallel. I haven't studied your code deeply, so I am not sure if this approach is feasible, but I'll give an example anyway. The basic idea is to serialize the insertion of the lessons, in order to avoid the problems caused by the parallelization:
IEnumerable<Lesson[]> query = externalCourseList
.AsParallel()
.AsOrdered() // Optional
.Select(externalCourse =>
{
using DataClassesDataContext context = new();
List<Lesson> results = new();
// Here do the work that adds lessons in the results list.
return results.ToArray();
}
.AsSequential();
This is a parallel query (PLINQ), that does the parallel work while it is enumerated. So at this point it hasn't started yet. Now let's enumerate it:
using DataClassesDataContext context = new();
foreach (Lesson lesson in query.SelectMany(x => x))
{
// Here insert the lesson in the DB.
}
The work of inserting the lessons in the DB will be done exclusively on the current thread. This thread will also participate in the work inside the parallel query, along with ThreadPool
threads. In case this is a problem, you could offload the enumeration of the query on a ThreadPool
thread, freeing the current thread from doing anything else than the lesson-inserting work. I've posted an OffloadEnumeration
extension method here, that you could use just before starting the enumeration of the query
:
query = OffloadEnumeration(query);
Upvotes: 1