Eoin Campbell
Eoin Campbell

Reputation: 44298

Concurrency with TPL in .NET. Parallel Generator blocks after a while and gets stuck

I'm running a game over on CodeGolf.stackexchange.com where players submit bots to compete against one another in the game.

At this stage there's 70 bots and with (N*(N+1))/2 games the tournament is getting quite slow to run so now I'm looking to parallelize it. One of the game rules is that a bot can write to it's own data directory so I want to ensure I don't have a bot instance playing 2 games at once.

I've written an IEnuemable<T> generator to yield back valid matches (valid being where neither of the players are currently involved in another match) but I've got some sort of concurrency/blocking problem which causes the Enumerable loop infinitely.

At some arbitrary point, the while(any matches) call will just keep looping because the only remaining matches involve players who are in the activePlayers list, so it'll just keep going round and round. This would insinuate that there are active matches in progress.

But the IsCompleted event will never be called again so either whatever matches that are running are somehow blocked, or they've completed, but I've got a concurrency bug in the _activePlayers.TryRemove() code.

public interface IMatchGenerator
{
    void CompleteMatch(Match match);
}

public class MatchGenerator : IMatchGenerator
{
    private static object SYNC_LOCK = new object();
    private ConcurrentDictionary<Player, Player> _activePlayers; //tracks players actively playing
    private ConcurrentQueue<Match> _allMatches;

    public MatchGenerator()
    {
        _activePlayers = new ConcurrentDictionary<Player, Player>();
    }

    public IEnumerable<Match> Generate(IList<Match> matches)
    {
        //take the list of matches passed in and stick them in a queue.
        _allMatches = new ConcurrentQueue<Match>(matches);

        //keep looping while there are matches to play
        while (_allMatches.Any())
        {
            Match nextMatch;

            lock (SYNC_LOCK)
            {
                 _allMatches.TryDequeue(out nextMatch); //Grab from front of queue

                if (!_activePlayers.ContainsKey(nextMatch.Player1) &&
                     !_activePlayers.ContainsKey(nextMatch.Player2))
                {
                    //If neither player is in the active player list, then this is a
                    //good match so add both players
                    _activePlayers.TryAdd(nextMatch.Player1, nextMatch.Player1);
                    _activePlayers.TryAdd(nextMatch.Player2, nextMatch.Player2);

                }
                else
                {
                    //Otherwise push this match back in to the start of the queue...
                    //FIFO should move on to next;
                    _allMatches.Enqueue(nextMatch);
                    nextMatch = null;
                }
            }
            if (nextMatch != null)
                yield return nextMatch;
        }
    }

    public void CompleteMatch(Match match)
    {
        //Matches in progress have the generator attached to them and will call 
        //home when they are complete to remove players from the active list
        Player junk1, junk2;
        lock (SYNC_LOCK)
        {
            _activePlayers.TryRemove(match.Player1, out junk1);
            _activePlayers.TryRemove(match.Player2, out junk2);
        }
        if (junk1 == null || junk2 == null)
        {
            Debug.WriteLine("Uhoh! a match came in for completion but on of the players who should have been in the active list didn't get removed");
        }
    }
}

And the code which is using this.

var mg = new MatchGenerator();
//Code to generate IList<Match> or all player combinations and attach mg

Parallel.ForEach(mg.Generate(matches),
                    new ParallelOptions() {MaxDegreeOfParallelism = 8},
                    match =>
                    {
                        var localMatch = match;
                        try
                        {
                            PlayMatch(localMatch, gameLogDirectory, results);
                        }
                        finally
                        {
                            localMatch.IsCompleted();
                        }
                    });

From here it gets a little verbose but there's not an awful lot going on. PlayMatch(...) calls a method call Play and has some stringbuilder code in it. Plays calls a couple of external processes depending on the bot that's playing (i.e. ruby/python etc... It also StreamWrites to a per player log file but assuming that only one player bot is in action at a time, there should be any clashes here.

The whole Control program is available on GitHub @

https://github.com/eoincampbell/big-bang-game/blob/master/BigBang.Orchestrator/Program.cs

    public static Result Play(Player p1, Player p2, string gameLogDirectory)
    {
        var dir = Player.PlayerDirectory;

        var result = new Result() { P1 = p1, P2 = p2, P1Score = 0, P2Score = 0 };
        string player1ParamList = string.Empty, player2ParamList = string.Empty;
        List<long> p1Times = new List<long>(), p2Times = new List<long>();
        Stopwatch sw1 = new Stopwatch(), sw2 = new Stopwatch(), swGame = new Stopwatch();
        var sb = new StringBuilder();

        var proc = new Process
        {
            StartInfo =
            {
                UseShellExecute = false, RedirectStandardOutput = true, WorkingDirectory = dir
            }
        };

        swGame.Start();
        sb.AppendLine("+--------------------------------------------------------------------------------------------+");
        sb.AppendFormat("| Starting Game between {0} & {1} \n", p1.Name, p2.Name);
        sb.AppendLine("| ");
        for (var i = 0; i < 1; i++)
        {
            sw1.Reset();
            sw1.Start();
            var o1 = ProcessRunner.RunPlayerProcess(ref proc, player1ParamList, player2ParamList, p1, dir);
            sw1.Stop();
            p1Times.Add(sw1.ElapsedMilliseconds);

            //System.Threading.Thread.Sleep(1);

            sw2.Reset();
            sw2.Start();
            var o2 = ProcessRunner.RunPlayerProcess(ref proc, player2ParamList, player1ParamList, p2, dir);
            sw2.Stop();
            p2Times.Add(sw2.ElapsedMilliseconds);


            var whoWon = GetWinner(o1, o2, ref player1ParamList, ref player2ParamList);
            var whoWonMessage = "Draw Match";
            if (whoWon == "P1")
            {
                result.P1Score++;
                whoWonMessage = string.Format("{0} wins", p1.Name);
            }
            else if (whoWon == "P2")
            {
                result.P2Score++;
                whoWonMessage = string.Format("{0} wins", p2.Name);
            }

            sb.AppendFormat("| {0} plays {1} | {2} plays {3} | {4}\n", p1.Name, o1, p2.Name, o2, whoWonMessage);

        }
        swGame.Stop();
        sb.AppendLine("| ");
        sb.AppendFormat("| Game Time: {0}", swGame.Elapsed);

        result.WriteLine(sb.ToString());

        var resultMessage = string.Format("Result: {0} vs {1}: {2} - {3}",
                        result.P1,
                        result.P2,
                        result.P1Score,
                        result.P2Score);

        sb.AppendLine("| ");
        sb.AppendFormat("| {0}", resultMessage);



        using (var p1sw = new StreamWriter(Path.Combine(gameLogDirectory, p1.Name + ".log"), true))
        {
            p1sw.WriteLine(sb.ToString());
        }
        using (var p2sw = new StreamWriter(Path.Combine(gameLogDirectory, p2.Name + ".log"), true))
        {
            p2sw.WriteLine(sb.ToString());
        }

        result.P1AvgTimeMs = p1Times.Average();
        result.P2AvgTimeMs = p2Times.Average();

        return result;
    }

Upvotes: 3

Views: 210

Answers (1)

svick
svick

Reputation: 244918

I believe your problem is caused by the use of Parallel.ForEach() on a IEnumerable<T> that may take indefinitely long to produce the next element. This is basically the same issue as using Parallel.ForEach() on BlockingCollection.GetConsumingEnumerable():

the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.

But since in your case, the next element in the sequence won't be produced until a previous element is processed, this will cause indefinite blocking.

I think the right solution here is to use BlockingCollection<T> instead of yield return: replace yield return nextMatch with blockingCollection.Add(nextMatch), then run Generate() on a separate thread and use blockingCollection.GetConsumingPartitioner() from the blog post mentioned above in the Parallel.ForEach().

I also don't like that your Generate() wastes a whole CPU core basically doing nothing when there are no valid matches, but that's a separate issue.

Upvotes: 4

Related Questions