DaemonScheduler.java
- /*
- * Copyright (c) 2002-2022, City of Paris
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice
- * and the following disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above copyright notice
- * and the following disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * 3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * License 1.0
- */
- package fr.paris.lutece.portal.service.daemon;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Map;
- import java.util.Set;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.TimeUnit;
- import fr.paris.lutece.portal.service.util.AppLogService;
- import fr.paris.lutece.portal.service.util.AppPropertiesService;
- /**
- * Daemon scheduler.
- * <p>
- * Responsible for ensuring on demand or timely daemon execution. Starts a thread which monitor the queue for daemons to execute. A {@link Timer} handles
- * repeating daemons runs.
- *
- * <p>
- * Daemon run requests are coalesced. If a daemon is already running when a request comes, a new run is scheduled right after the current run ends.
- */
- class DaemonScheduler implements Runnable, IDaemonScheduler
- {
- private static final String PROPERTY_MAX_AWAIT_TERMINATION_DELAY = "daemon.maxAwaitTerminationDelay";
- private final BlockingQueue<DaemonEntry> _queue;
- private final ExecutorService _executor;
- private final Thread _coordinatorThread;
- private final Timer _scheduledDaemonsTimer;
- private final Map<String, RunnableWrapper> _executingDaemons;
- private final Map<String, DaemonTimerTask> _scheduledDaemons;
- private volatile boolean _bShuttingDown;
- /**
- * Constructor
- *
- * @param queue
- * the queue where daemon execution requests are stored
- * @param executor
- * the executor service handling the execution of daemons
- */
- public DaemonScheduler( BlockingQueue<DaemonEntry> queue, ExecutorService executor )
- {
- _queue = queue;
- _executor = executor;
- _scheduledDaemonsTimer = new Timer( "Lutece-Daemons-Scheduled-Timer-Thread", true );
- _executingDaemons = new HashMap<>( );
- _scheduledDaemons = new HashMap<>( );
- _coordinatorThread = new Thread( this, "Lutece-Daemons-Coordinator" );
- _coordinatorThread.setDaemon( true );
- _coordinatorThread.start( );
- _bShuttingDown = false;
- }
- @Override
- public boolean enqueue( DaemonEntry entry, long nDelay, TimeUnit unit )
- {
- assertNotShuttingDown( );
- if ( nDelay == 0L )
- {
- boolean queued = _queue.offer( entry );
- if ( !queued )
- {
- AppLogService.error( "Failed to enqueue a run of daemon {}", entry.getId( ) );
- }
- return queued;
- }
- try
- {
- _scheduledDaemonsTimer.schedule( new DaemonTimerTask( entry ), unit.toMillis( nDelay ) );
- return true;
- }
- catch( IllegalStateException e )
- {
- return false;
- }
- }
- private void assertNotShuttingDown( )
- {
- if ( _bShuttingDown )
- {
- throw new IllegalStateException( "DaemonScheduler is shutting down. Enqueing tasks or scheduling tasks is not possible anymore." );
- }
- }
- /**
- * Enqueue without delay
- *
- * @param entry
- * the daemon entry
- */
- private void enqueue( DaemonEntry entry )
- {
- enqueue( entry, 0L, TimeUnit.MILLISECONDS );
- }
- @Override
- public void schedule( DaemonEntry entry, long nInitialDelay, TimeUnit unit )
- {
- assertNotShuttingDown( );
- synchronized( _scheduledDaemons )
- {
- if ( _scheduledDaemons.containsKey( entry.getId( ) ) )
- {
- AppLogService.error( "Daemon " + entry.getId( ) + " already scheduled, not scheduling again" );
- }
- else
- {
- DaemonTimerTask daemonTimerTask = new DaemonTimerTask( entry );
- _scheduledDaemonsTimer.scheduleAtFixedRate( daemonTimerTask, unit.toMillis( nInitialDelay ), entry.getInterval( ) * 1000 );
- _scheduledDaemons.put( entry.getId( ), daemonTimerTask );
- }
- }
- }
- @Override
- public void unSchedule( DaemonEntry entry )
- {
- synchronized( _scheduledDaemons )
- {
- DaemonTimerTask daemonTimerTask = _scheduledDaemons.get( entry.getId( ) );
- if ( daemonTimerTask == null )
- {
- AppLogService.error( "Could not unschedule daemon " + entry.getId( ) + " which was not scheduled" );
- }
- else
- {
- daemonTimerTask.cancel( );
- _scheduledDaemonsTimer.purge( );
- _scheduledDaemons.remove( entry.getId( ) );
- }
- }
- boolean bStopScheduled = false;
- synchronized( _executingDaemons )
- {
- RunnableWrapper runnable = _executingDaemons.get( entry.getId( ) );
- if ( runnable != null )
- {
- runnable.stopDaemonAfterExecution( );
- bStopScheduled = true;
- }
- }
- if ( !bStopScheduled )
- {
- try
- {
- entry.getDaemon( ).stop( );
- }
- catch( Throwable t )
- {
- AppLogService.error( "Failed to stop daemon {}", entry.getId( ), t );
- }
- }
- }
- @Override
- public void run( )
- {
- // use a set to coalesce daemon signaling
- Set<DaemonEntry> queued = new HashSet<>( );
- do
- {
- try
- {
- // collect signaled daemons
- queued.add( _queue.take( ) );
- _queue.drainTo( queued );
- }
- catch( InterruptedException e )
- {
- // We were asked to stop
- break;
- }
- // execute them
- for ( DaemonEntry entry : queued )
- {
- RunnableWrapper runnable = null;
- synchronized( _executingDaemons )
- {
- runnable = _executingDaemons.get( entry.getId( ) );
- if ( runnable != null )
- {
- // already executing; schedule a new run after this one
- runnable.shouldEnqueueAgain( );
- runnable = null;
- }
- else
- {
- runnable = new RunnableWrapper( entry );
- _executingDaemons.put( entry.getId( ), runnable );
- }
- }
- if ( runnable != null )
- {
- _executor.execute( runnable );
- }
- }
- // prepare next iteration
- queued.clear( );
- }
- while ( !Thread.interrupted( ) );
- }
- @Override
- public void shutdown( )
- {
- _bShuttingDown = true; // prevent future scheduling of daemons
- int maxAwaitTerminationDelay = AppPropertiesService.getPropertyInt( PROPERTY_MAX_AWAIT_TERMINATION_DELAY, 15 );
- AppLogService
- .info( "Lutece daemons scheduler stop requested : trying to terminate gracefully daemons list (max wait " + maxAwaitTerminationDelay + " s)." );
- _scheduledDaemonsTimer.cancel( );
- _coordinatorThread.interrupt( );
- _executor.shutdown( );
- // make a copy of scheduled daemons so that the list can be modified by
- // #unSchedule
- ArrayList<DaemonTimerTask> scheduled = new ArrayList<>( _scheduledDaemons.values( ) );
- scheduled.forEach( task -> unSchedule( task.getDaemonEntry( ) ) );
- try
- {
- if ( _executor.awaitTermination( maxAwaitTerminationDelay, TimeUnit.SECONDS ) )
- {
- AppLogService.info( "All daemons shutdown successfully." );
- }
- else
- {
- AppLogService.info( "Some daemons are still running, trying to interrupt them..." );
- _executor.shutdownNow( );
- if ( _executor.awaitTermination( 1, TimeUnit.SECONDS ) )
- {
- AppLogService.info( "All running daemons successfully interrupted." );
- }
- else
- {
- AppLogService.error( "Interrupt failed; daemons still running." );
- }
- }
- }
- catch( InterruptedException e )
- {
- AppLogService.error( "Interruped while waiting for daemons termination", e );
- }
- }
- /**
- * Wrapper for the daemon Runnable which can enqueue a new run when execution completes
- */
- private final class RunnableWrapper implements Runnable
- {
- private final DaemonEntry _entry;
- private volatile boolean _bShouldEnqueueAgain;
- private volatile boolean _bstopAfterExecution;
- /**
- * @param entry
- * the wrapped DaemonEntry
- */
- public RunnableWrapper( DaemonEntry entry )
- {
- _entry = entry;
- }
- public void stopDaemonAfterExecution( )
- {
- _bstopAfterExecution = true;
- }
- /**
- * Signals that an execution of the daemon should be enqueued on completion
- */
- public void shouldEnqueueAgain( )
- {
- _bShouldEnqueueAgain = true;
- }
- @Override
- public void run( )
- {
- try
- {
- _entry.getDaemonThread( ).run( );
- }
- finally
- {
- synchronized( _executingDaemons )
- {
- _executingDaemons.remove( _entry.getId( ) );
- }
- if ( _bstopAfterExecution )
- {
- _entry.getDaemon( ).stop( );
- }
- else
- if ( _bShouldEnqueueAgain )
- {
- enqueue( _entry );
- }
- }
- }
- }
- /**
- * Timer task to enqueue daemon runs
- */
- private final class DaemonTimerTask extends TimerTask
- {
- private final DaemonEntry _entry;
- /**
- * @param entry
- * the daemon
- */
- public DaemonTimerTask( DaemonEntry entry )
- {
- _entry = entry;
- }
- @Override
- public void run( )
- {
- enqueue( _entry );
- }
- /**
- * Access the daemon entry
- *
- * @return the daemon
- */
- public DaemonEntry getDaemonEntry( )
- {
- return _entry;
- }
- }
- }