Paul Taylor
Paul Taylor

Reputation: 13210

Why is ExecutorService.awaitTermination() succeeding before a submitted task has completed

In my code I have a number of executorservices that run as a pipeline in that the first executorService may submit a task to any later executor service but never the other way round.

  services.add(songLoaderService);
  services.add(AcoustIdMatcher.getExecutorService());
  services.add(SongPrematcherMatcher.getExecutorService());
  services.add(MusicBrainzMetadataMatcher.getExecutorService());
  //Start Loading Songs
  songLoaderService.submit(loader);

We only submit one task for first service then I can request a shutdown. This will not succeed until this task has completed and by then it will have put some tasks on to the second service and so on.

So this code has always worked for a number of years, shutdown() is never called until all tasks that were going to be submitted have been, and the awaitTermination() method does not until all the tasks that were submitted have completed.

        int count = 0;
        for (ExecutorService service : services)
        {
            MainWindow.logger.severe("Requested Shutdown Task:" + count + ":"+((SongKongThreadFactory)((TimeoutThreadPoolExecutor) service).getThreadFactory()).getName());

            //Request Shutdown
            service.shutdown();

            //Now wait for service to terminate
            service.awaitTermination(10, TimeUnit.DAYS);
            MainWindow.logger.severe("Completed Shutdown Task:" + count);

            if(count==2)
            {
                MainWindow.logger.severe("Report:"+currentReportId+":SongPreMatcher:" + SongPrematcherMatcher.getPipelineQueuedCount()+":"+ SongPrematcherMatcher.getPipelineCallCount()+":"+ SongPrematcherMatcher.getPipelineCompletedCount()+":"+SongPrematcherMatcher.getPipelineFileCount());
            }
            count++;
        }

But I am now seeing a problem with one ExecutorService not working in this way. The request to shutdown SongPrematcherMatcher service is succeeding after all tasks added to this service by the previous (AcoustIdMatcher) service have been submitted and started but before one of them has completed, as can be seen by the following debug line

Report:353:SongPreMatcher:init:57:started:57:Finished:56

The missing task hasnt failed since we can see it completes at the end of the log output, but the point is it completes after the service it runs on has been successfully terminated.

This has significant consequences since it means all the tasks that this task trie to submit to the MusicBrainzMetadataMatcher service fail since a shutdown request has already been made for that since the previous service (PrematcherMatched) has been shutdown .

PrematcherMatcher was added quite recently so my assumption is that there is something wrong with it but I cannot see what it can possibly be.

toplevelanalyzer.FixSongsController:start:SEVERE: Requested Shutdown Task:0
analyser.AcoustIdMatcher:<init>:SEVERE: GROUP 115:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
toplevelanalyzer.FixSongsController:start:SEVERE: Completed Shutdown Task:0
toplevelanalyzer.FixSongsController:start:SEVERE: Requested Shutdown Task:1:analyser.AcoustIdMatcher
analyser.SongPrematcherMatcher:<init>:SEVERE: Queue:GROUP 791:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.SongPrematcherMatcher:call:SEVERE: Start:GROUP 791:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
toplevelanalyzer.FixSongsController:start:SEVERE: Completed Shutdown Task:1
toplevelanalyzer.FixSongsController:start:SEVERE: Requested Shutdown Task:2:analyser.SongPrematcherMatcher
toplevelanalyzer.FixSongsController:start:SEVERE: Completed Shutdown Task:2
toplevelanalyzer.FixSongsController:start:SEVERE: Report:353:SongPreMatcher:init:57:started:57:Finished:56
toplevelanalyzer.FixSongsController:start:SEVERE: Requested Shutdown Task:3:analyser.MusicBrainzMetadataMatcher
analyser.MusicBrainzMetadataMatcher:<init>:SEVERE: Queue:GROUP 795:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.MusicBrainzMetadataMatcher:<init>:SEVERE: Queue:GROUP 797:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.MusicBrainzMetadataMatcher:<init>:SEVERE: Queue:GROUP 799:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.MusicBrainzMetadataMatcher:<init>:SEVERE: Queue:GROUP 821:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.MusicBrainzMetadataMatcher:<init>:SEVERE: Queue:GROUP 823:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false
analyser.SongPrematcherMatcher:call:SEVERE: Finish:GROUP 791:C:\Users\Paul\Desktop\FlatRandomFolder:true:false:true:false

If I artificially add a delay in the loop then at least for this particular test case it works, but this is not a solution as it introduces delays when there are series of executor services where it would be valid to shutdown them now one after another. Its also not clear why this fix works, and if it would always work.

for (ExecutorService service : services)
{
     Thread.sleep(5000);
     //Request Shutdown
     service.shutdown();
    ......
}

There is one executorservice for each task, only one type of task can be added to a particular executorservice. The executorService does have special handling to allow user to cancel tasks and to protect against long runing tasks but this was not the issue here.

I cannot see anything different in the PreMatcherMatcher code to any other of the tasks.

package com.jthink.songkong.analyse.analyser;

import com.jthink.songkong.analyse.general.Errors;
import com.jthink.songkong.cmdline.SongKong;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.List;
import java.util.concurrent.*;

/**
 * From http://stackoverflow.com/questions/2758612/executorservice-that-interrupts-tasks-after-a-timeout
 * With additional support for caller running task when bounded queue is full
 */
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    private final static int WAIT_BEFORE_STOP = 10000;

    public long getTimeout()
    {
        return timeout;
    }
    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue,long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }


    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
        return new FutureCallable<T>(callable);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        MainWindow.logger.warning("beforeExecute:"+t.getName()+":"+r.toString());
        SongKong.checkIn();
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t,r), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        MainWindow.logger.warning("afterExecute:"+r.toString());

        //AfterExecute will be called after the task has completed, either of its own accord or because it
        //took too long and was interrupted by corresponding timeout task
        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

    }

    @Override
    protected void terminated()
    {
        //All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
        MainWindow.logger.warning("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
        timeoutExecutor.shutdown();
    }

    class TimeoutTask implements Runnable {
        private final       Thread thread;
        private             Callable c;

        public TimeoutTask(Thread thread, Runnable c) {
            this.thread = thread;
            if(c instanceof FutureCallable)
            {
                this.c = ((FutureCallable) c).getCallable();
            }
        }

        @Override
        public void run()
        {

            String msg = "";
            if (c != null)
            {
                if (c instanceof AcoustIdMatcher)
                {
                    msg = c.getClass() + ":" + ((AcoustIdMatcher) c).getSongGroup().getKey();
                }
                else if (c instanceof SongPrematcherMatcher)
                {
                    msg = c.getClass() + ":" + ((SongPrematcherMatcher) c).getSongGroup().getKey();
                }
                else if (c instanceof MusicBrainzSongGroupMatcher)
                {
                    msg = c.getClass() + ":" + ((MusicBrainzSongGroupMatcher) c).getSongGroup().getKey();
                }
                else if (c instanceof MusicBrainzMetadataMatcher)
                {
                    msg = c.getClass() + ":" + ((MusicBrainzMetadataMatcher) c).getSongGroup().getKey();
                }
                else if (c instanceof MusicBrainzUpdateSongOnly)
                {
                    msg = c.getClass() + ":" + ((MusicBrainzUpdateSongOnly) c).getSongGroup().getKey();
                }
                else if (c instanceof DiscogsSongGroupMatcher)
                {
                    msg = c.getClass() + ":" + ((DiscogsSongGroupMatcher) c).getSongGroup().getKey();
                }
                else if (c instanceof MusicBrainzSongMatcher)
                {
                    msg = c.getClass() + ":" + String.valueOf(((MusicBrainzSongMatcher) c).getSongId());
                }
                else if (c instanceof SongSaver)
                {
                    msg = c.getClass() + ":" + String.valueOf(((SongSaver) c).getSongId());
                }
                else
                {
                    msg = c.getClass().getName();
                }
            }

            if (c != null && c instanceof CancelableTask)
            {
                MainWindow.logger.warning("+++Cancelling " + msg + " task because taking too long");
                ((CancelableTask) c).setCancelTask(true);

                StackTraceElement[] stackTrace = thread.getStackTrace();
                Errors.addError("Cancelled " + msg + " because taken too long", stackTrace);
                Counters.getErrors().getCounter().incrementAndGet();

                if(stackTrace.length>0)
                {
                    boolean isKnownProblem = false;
                    for(int i=0;i<stackTrace.length;i++)
                    {
                        if(
                                (stackTrace[i].getClassName().contains("CosineSimilarity")) ||
                                (stackTrace[i].getClassName().contains("com.jthink.songkong.fileloader.FileFilters"))
                        )
                        {
                            isKnownProblem=true;
                            break;
                        }
                    }

                    if(isKnownProblem)
                    {
                        MainWindow.logger.warning("+++Interrupting " + msg + " task because taking too long");
                        thread.interrupt();

                        try
                        {
                            Thread.sleep(WAIT_BEFORE_STOP);
                        }
                        catch (InterruptedException ie)
                        {
                            MainWindow.logger.warning("+++Interrupted TimeoutTask " + msg + " task because taking too long");
                        }

                        if(thread.isAlive())
                        {
                            MainWindow.logger.warning("+++Stopping CosineSimailarity task");
                            thread.stop();
                        }
                    }
                }
            }
        }
    }
}


        public class AnalyserService
        {

            protected static final int BOUNDED_QUEUE_SIZE = 500;
            protected String threadGroup;

            public AnalyserService(String threadGroup)
            {
                this.threadGroup=threadGroup;
            }

            protected  ExecutorService      executorService;

            protected void initExecutorService()
            {
                int workerSize = Runtime.getRuntime().availableProcessors();
                executorService = new PausableExecutor(workerSize, workerSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),new SongKongThreadFactory(threadGroup));
            }

            public ExecutorService getExecutorService()
            {
                if (executorService == null || executorService.isShutdown())
                {
                    initExecutorService();
                }
                return executorService;
            }

            /** Submit and return immediately
             *
             * @param task
             */
            public void submit(Callable<Boolean> task) //throws Exception
            {
                executorService.submit(task);
            }
        }


        public class AnalyserServiceWithTimeout extends AnalyserService
        {
            private static final int TIMEOUT_PER_TASK = 30;

            public AnalyserServiceWithTimeout(String threadGroup)
            {
                super(threadGroup);
            }

            @Override
            protected void initExecutorService()
            {
                int workerSize = Runtime.getRuntime().availableProcessors();
                executorService = new TimeoutThreadPoolExecutor(workerSize,
                        new SongKongThreadFactory(threadGroup),
                        new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
                        TIMEOUT_PER_TASK,
                        TimeUnit.MINUTES);
            }
        }

    package com.jthink.songkong.analyse.analyser;

    import com.google.common.base.Strings;
    import com.jthink.songkong.analyse.general.Errors;
    import com.jthink.songkong.cmdline.SongKong;
    import com.jthink.songkong.db.SongCache;
    import com.jthink.songkong.match.MetadataGatherer;
    import com.jthink.songkong.preferences.UserPreferences;
    import com.jthink.songkong.ui.MainWindow;
    import com.jthink.songkong.util.SongKongThreadGroup;
    import com.jthink.songlayer.Song;
    import com.jthink.songlayer.hibernate.HibernateUtil;
    import org.hibernate.Session;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.logging.Level;

        /**
         * Try and match songs to acoustid only first as a starting point
         *
         * Use when we have no or little metadata
         */
        public class SongPrematcherMatcher extends CancelableTask implements Callable<Boolean> {
            private static PipelineCount pipelineCount = new PipelineCount();

            public static int getPipelineQueuedCount()
            {
                return pipelineCount.getQueuedCount();
            }

            public static int getPipelineCallCount()
            {
                return pipelineCount.getCallCount();
            }

            public static void resetPipelineCount()
            {
                pipelineCount.resetCounts();
            }

            public static int getPipelineFileCount()
            {
                return pipelineCount.getFileCount();
            }

            public static int getPipelineCompletedCount()
            {
                return pipelineCount.getCompletedCount();
            }

            private static AnalyserService analyserService = new AnalyserServiceWithTimeout(SongKongThreadGroup.THREAD_PREMATCHER_WORKER);

            private Session     session;
            private SongGroup   songGroup;

            public SongGroup getSongGroup()
            {
                return songGroup;
            }

            public SongPrematcherMatcher(SongGroup songGroup)
            {
                SongKong.logger.severe("Queue:"+ songGroup.getKey());
                pipelineCount.incQueuedCount();
                pipelineCount.incFileCount(songGroup.getSongIds().size());
                this.songGroup = songGroup;
            }

            public static ExecutorService getExecutorService()
            {
                return analyserService.getExecutorService();
            }

            public static AnalyserService getService()
            {
                return analyserService;
            }

            public Boolean call()
            {
                try
                {
                    SongKong.logger.severe("Start:" + songGroup.getKey());
                    if (SongKong.isStopTask() || isCancelTask())
                    {
                        return false;
                    }
                    SongKong.checkIn();
                    pipelineCount.incCallCount();
                    session = HibernateUtil.beginTransaction();
                    AnalysisStats stats = new AnalysisStats();

                    List<Song> songs = SongCache.loadSongsFromDatabase(session, songGroup.getSongIds());

                    //Try to match acoustid this should allow more to be grouped and matched by metadata on first pass
                    try
                    {
                        new RecordingOnlyMatcher().matchRecordingsOnlyByAcoustid(session, songGroup, songs, stats);
                    }
                    catch(Exception ex)
                    {
                        MainWindow.logger.log(Level.SEVERE, Strings.nullToEmpty(ex.getMessage()), ex);
                        Errors.addError(Strings.nullToEmpty(ex.getMessage()));
                    }

                    session.getTransaction().commit();
                    HibernateUtil.closeSession(session);

                    processSongsWithNewMetadata(songGroup, songs);
                    pipelineCount.incCompletedCount();
                    SongKong.logger.severe("Finish:" + songGroup.getKey());
                    return true;
                }
                catch (Exception e)
                {
                    SongKong.logger.severe("FinishFail:" + songGroup.getKey());
                    MainWindow.logger.log(Level.SEVERE, "SongPrematcherMatcher:" + e.getMessage(), e);
                    if (session.getTransaction() != null)
                    {
                        session.getTransaction().rollback();
                    }
                    return false;
                }
                catch (Error e)
                {
                    SongKong.logger.severe("FinishFail:" + songGroup.getKey());
                    MainWindow.logger.log(Level.SEVERE, "SongPrematcherMatcher:" + e.getMessage(), e);
                    if (session.getTransaction() != null)
                    {
                        session.getTransaction().rollback();
                    }
                    return false;
                }
                catch (Throwable t)
                {
                    SongKong.logger.severe("FinishFail:" + songGroup.getKey());
                    MainWindow.logger.log(Level.SEVERE, "SongPrematcherMatcher:" + t.getMessage(), t);
                    if (session.getTransaction() != null)
                    {
                        session.getTransaction().rollback();
                    }
                    return false;
                }
                finally
                {
                    if(session.isOpen())
                    {
                        session.getTransaction().commit();
                        HibernateUtil.closeSession(session);
                    }
                }
            }

            private boolean processSongsWithNewMetadata(SongGroup songGroup,  List<Song> songs)
            {
                MainWindow.logger.info("Prematcher:" + songGroup.getKey() + ":totalcount:" + songs.size());

                int count = 0;
                //Group based on actual metadata only
                MetadataGatherer mg = new MetadataGatherer(songs);


                for (String album : mg.getAlbums().keySet())
                {
                    List<Song> songsInGrouping = mg.getAlbums().get(album);
                    count+=songsInGrouping.size();
                    MainWindow.logger.warning("Prematcher:" + songGroup.getKey() + ":" + album + ":count:" + songsInGrouping.size());
                    SongGroup sg = SongGroup.createSongGroupForSongs(songGroup, songsInGrouping);
                    sg.setRandomFolderNoMetadata(false);
                    sg.setRandomFolder(false);
                    processRandomFolder(sg, songsInGrouping);
                }

                List<Song> songsWithNoInfo = new ArrayList<>(mg.getSongsWithNoRelease());
                if(songsWithNoInfo.size()>0)
                {
                    count+=songsWithNoInfo.size();
                    SongGroup sgWithNoInfo = SongGroup.createSongGroupForSongs(songGroup, songsWithNoInfo);
                    MainWindow.logger.warning("Prematcher:" + songGroup.getKey() + ":NoMetadata:" + ":count:" + songsWithNoInfo.size());
                    processRandomFolderNoMetadata(sgWithNoInfo, songsWithNoInfo);
                }

                if(count<songs.size())
                {
                    MainWindow.logger.warning(songGroup.getKey()+":Not all songs have been processed"+songs.size());
                    Errors.addErrorWithoutStackTrace(songGroup.getKey()+":Not all songs have been processed:"+songs.size());
                }
                return true;
            }

            private boolean processRandomFolder(SongGroup songGroup,  List<Song> songs)
            {
                if(UserPreferences.getInstance().isSearchMusicBrainz())
                {
                    MusicBrainzMetadataMatcher.getService().submit(new MusicBrainzMetadataMatcher(songGroup));
                }
                else if(UserPreferences.getInstance().isSearchDiscogs())
                {
                    if(songGroup.getSubSongGroups().size() > 1)
                    {
                        DiscogsMultiFolderSongGroupMatcher.getService().submit(new DiscogsMultiFolderSongGroupMatcher(songGroup));
                    }
                    else if(songGroup.getSongIds().size()==1)
                    {
                        DiscogsSongMatcher.getService().submit(new DiscogsSongMatcher(songGroup, songGroup.getSongIds().get(0)));
                    }
                    else
                    {
                        DiscogsSongGroupMatcher.getService().submit(new DiscogsSongGroupMatcher(songGroup));
                    }
                }
                else
                {
                    for (Integer songId : songGroup.getSongIds())
                    {
                        SongSaver.getService().submit(new SongSaver(songId));
                    }
                }
                return true;
            }

            /**
             * Process a group of files that are in a Random folder and dont seem to have anything in common so should not be grouped
             * together.
             *
             * @param songGroup
             * @param songs
             * @return
             */
            private boolean processRandomFolderNoMetadata(SongGroup songGroup, List<Song> songs)
            {
                if(UserPreferences.getInstance().isSearchMusicBrainz())
                {
                    for (Song song : songs)
                    {
                        MusicBrainzSongMatcher.getService().submit(new MusicBrainzSongMatcher(songGroup, song.getRecNo()));
                    }
                }
                else if(UserPreferences.getInstance().isSearchDiscogs())
                {
                    for (Song song : songs)
                    {
                        DiscogsSongMatcher.getService().submit(new DiscogsSongMatcher(songGroup, song.getRecNo()));
                    }
                }
                else
                {
                    for (Integer songId : songGroup.getSongIds())
                    {
                        SongSaver.getService().submit(new SongSaver(songId));
                    }
                }
                return true;
            }
        }

Upvotes: 4

Views: 777

Answers (1)

Paul Taylor
Paul Taylor

Reputation: 13210

I thought it was something silly but couldn't find it, however the issue was I submitted the tasks to the incorrect ExecutorService

In AcoustidMatcher I had

 private boolean processFolderWithPoorMetadata(SongGroup songGroup)
    {
         MusicBrainzMetadataMatcher.getService().submit(new SongPrematcherMatcher(songGroup));
        return true;
    }

when I should have had

 private boolean processFolderWithPoorMetadata(SongGroup songGroup)
    {
        SongPrematcherMatcher.getService().submit(new SongPrematcherMatcher(songGroup));
        return true;
    }

So this means all the SongPrematcher tasks were submitted to the wrong ExecutorService so when I requested to close the SongPrematcher ExecutorService it could be closed down immediately !

Upvotes: 2

Related Questions