DaemonScheduler.java
/*
* Copyright (c) 2002-2022, City of Paris
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright notice
* and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice
* and the following disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* 3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* License 1.0
*/
package fr.paris.lutece.portal.service.daemon;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import fr.paris.lutece.portal.service.util.AppLogService;
import fr.paris.lutece.portal.service.util.AppPropertiesService;
/**
* Daemon scheduler.
* <p>
* Responsible for ensuring on demand or timely daemon execution. Starts a thread which monitor the queue for daemons to execute. A {@link Timer} handles
* repeating daemons runs.
*
* <p>
* Daemon run requests are coalesced. If a daemon is already running when a request comes, a new run is scheduled right after the current run ends.
*/
class DaemonScheduler implements Runnable, IDaemonScheduler
{
private static final String PROPERTY_MAX_AWAIT_TERMINATION_DELAY = "daemon.maxAwaitTerminationDelay";
private final BlockingQueue<DaemonEntry> _queue;
private final ExecutorService _executor;
private final Thread _coordinatorThread;
private final Timer _scheduledDaemonsTimer;
private final Map<String, RunnableWrapper> _executingDaemons;
private final Map<String, DaemonTimerTask> _scheduledDaemons;
private volatile boolean _bShuttingDown;
/**
* Constructor
*
* @param queue
* the queue where daemon execution requests are stored
* @param executor
* the executor service handling the execution of daemons
*/
public DaemonScheduler( BlockingQueue<DaemonEntry> queue, ExecutorService executor )
{
_queue = queue;
_executor = executor;
_scheduledDaemonsTimer = new Timer( "Lutece-Daemons-Scheduled-Timer-Thread", true );
_executingDaemons = new HashMap<>( );
_scheduledDaemons = new HashMap<>( );
_coordinatorThread = new Thread( this, "Lutece-Daemons-Coordinator" );
_coordinatorThread.setDaemon( true );
_coordinatorThread.start( );
_bShuttingDown = false;
}
@Override
public boolean enqueue( DaemonEntry entry, long nDelay, TimeUnit unit )
{
assertNotShuttingDown( );
if ( nDelay == 0L )
{
boolean queued = _queue.offer( entry );
if ( !queued )
{
AppLogService.error( "Failed to enqueue a run of daemon {}", entry.getId( ) );
}
return queued;
}
try
{
_scheduledDaemonsTimer.schedule( new DaemonTimerTask( entry ), unit.toMillis( nDelay ) );
return true;
}
catch( IllegalStateException e )
{
return false;
}
}
private void assertNotShuttingDown( )
{
if ( _bShuttingDown )
{
throw new IllegalStateException( "DaemonScheduler is shutting down. Enqueing tasks or scheduling tasks is not possible anymore." );
}
}
/**
* Enqueue without delay
*
* @param entry
* the daemon entry
*/
private void enqueue( DaemonEntry entry )
{
enqueue( entry, 0L, TimeUnit.MILLISECONDS );
}
@Override
public void schedule( DaemonEntry entry, long nInitialDelay, TimeUnit unit )
{
assertNotShuttingDown( );
synchronized( _scheduledDaemons )
{
if ( _scheduledDaemons.containsKey( entry.getId( ) ) )
{
AppLogService.error( "Daemon " + entry.getId( ) + " already scheduled, not scheduling again" );
}
else
{
DaemonTimerTask daemonTimerTask = new DaemonTimerTask( entry );
_scheduledDaemonsTimer.scheduleAtFixedRate( daemonTimerTask, unit.toMillis( nInitialDelay ), entry.getInterval( ) * 1000 );
_scheduledDaemons.put( entry.getId( ), daemonTimerTask );
}
}
}
@Override
public void unSchedule( DaemonEntry entry )
{
synchronized( _scheduledDaemons )
{
DaemonTimerTask daemonTimerTask = _scheduledDaemons.get( entry.getId( ) );
if ( daemonTimerTask == null )
{
AppLogService.error( "Could not unschedule daemon " + entry.getId( ) + " which was not scheduled" );
}
else
{
daemonTimerTask.cancel( );
_scheduledDaemonsTimer.purge( );
_scheduledDaemons.remove( entry.getId( ) );
}
}
boolean bStopScheduled = false;
synchronized( _executingDaemons )
{
RunnableWrapper runnable = _executingDaemons.get( entry.getId( ) );
if ( runnable != null )
{
runnable.stopDaemonAfterExecution( );
bStopScheduled = true;
}
}
if ( !bStopScheduled )
{
try
{
entry.getDaemon( ).stop( );
}
catch( Throwable t )
{
AppLogService.error( "Failed to stop daemon {}", entry.getId( ), t );
}
}
}
@Override
public void run( )
{
// use a set to coalesce daemon signaling
Set<DaemonEntry> queued = new HashSet<>( );
do
{
try
{
// collect signaled daemons
queued.add( _queue.take( ) );
_queue.drainTo( queued );
}
catch( InterruptedException e )
{
// We were asked to stop
break;
}
// execute them
for ( DaemonEntry entry : queued )
{
RunnableWrapper runnable = null;
synchronized( _executingDaemons )
{
runnable = _executingDaemons.get( entry.getId( ) );
if ( runnable != null )
{
// already executing; schedule a new run after this one
runnable.shouldEnqueueAgain( );
runnable = null;
}
else
{
runnable = new RunnableWrapper( entry );
_executingDaemons.put( entry.getId( ), runnable );
}
}
if ( runnable != null )
{
_executor.execute( runnable );
}
}
// prepare next iteration
queued.clear( );
}
while ( !Thread.interrupted( ) );
}
@Override
public void shutdown( )
{
_bShuttingDown = true; // prevent future scheduling of daemons
int maxAwaitTerminationDelay = AppPropertiesService.getPropertyInt( PROPERTY_MAX_AWAIT_TERMINATION_DELAY, 15 );
AppLogService
.info( "Lutece daemons scheduler stop requested : trying to terminate gracefully daemons list (max wait " + maxAwaitTerminationDelay + " s)." );
_scheduledDaemonsTimer.cancel( );
_coordinatorThread.interrupt( );
_executor.shutdown( );
// make a copy of scheduled daemons so that the list can be modified by
// #unSchedule
ArrayList<DaemonTimerTask> scheduled = new ArrayList<>( _scheduledDaemons.values( ) );
scheduled.forEach( task -> unSchedule( task.getDaemonEntry( ) ) );
try
{
if ( _executor.awaitTermination( maxAwaitTerminationDelay, TimeUnit.SECONDS ) )
{
AppLogService.info( "All daemons shutdown successfully." );
}
else
{
AppLogService.info( "Some daemons are still running, trying to interrupt them..." );
_executor.shutdownNow( );
if ( _executor.awaitTermination( 1, TimeUnit.SECONDS ) )
{
AppLogService.info( "All running daemons successfully interrupted." );
}
else
{
AppLogService.error( "Interrupt failed; daemons still running." );
}
}
}
catch( InterruptedException e )
{
AppLogService.error( "Interruped while waiting for daemons termination", e );
}
}
/**
* Wrapper for the daemon Runnable which can enqueue a new run when execution completes
*/
private final class RunnableWrapper implements Runnable
{
private final DaemonEntry _entry;
private volatile boolean _bShouldEnqueueAgain;
private volatile boolean _bstopAfterExecution;
/**
* @param entry
* the wrapped DaemonEntry
*/
public RunnableWrapper( DaemonEntry entry )
{
_entry = entry;
}
public void stopDaemonAfterExecution( )
{
_bstopAfterExecution = true;
}
/**
* Signals that an execution of the daemon should be enqueued on completion
*/
public void shouldEnqueueAgain( )
{
_bShouldEnqueueAgain = true;
}
@Override
public void run( )
{
try
{
_entry.getDaemonThread( ).run( );
}
finally
{
synchronized( _executingDaemons )
{
_executingDaemons.remove( _entry.getId( ) );
}
if ( _bstopAfterExecution )
{
_entry.getDaemon( ).stop( );
}
else
if ( _bShouldEnqueueAgain )
{
enqueue( _entry );
}
}
}
}
/**
* Timer task to enqueue daemon runs
*/
private final class DaemonTimerTask extends TimerTask
{
private final DaemonEntry _entry;
/**
* @param entry
* the daemon
*/
public DaemonTimerTask( DaemonEntry entry )
{
_entry = entry;
}
@Override
public void run( )
{
enqueue( _entry );
}
/**
* Access the daemon entry
*
* @return the daemon
*/
public DaemonEntry getDaemonEntry( )
{
return _entry;
}
}
}