DaemonScheduler.java

  1. /*
  2.  * Copyright (c) 2002-2022, City of Paris
  3.  * All rights reserved.
  4.  *
  5.  * Redistribution and use in source and binary forms, with or without
  6.  * modification, are permitted provided that the following conditions
  7.  * are met:
  8.  *
  9.  *  1. Redistributions of source code must retain the above copyright notice
  10.  *     and the following disclaimer.
  11.  *
  12.  *  2. Redistributions in binary form must reproduce the above copyright notice
  13.  *     and the following disclaimer in the documentation and/or other materials
  14.  *     provided with the distribution.
  15.  *
  16.  *  3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
  17.  *     contributors may be used to endorse or promote products derived from
  18.  *     this software without specific prior written permission.
  19.  *
  20.  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  21.  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  22.  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  23.  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
  24.  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  25.  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  26.  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  27.  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  28.  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  29.  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  30.  * POSSIBILITY OF SUCH DAMAGE.
  31.  *
  32.  * License 1.0
  33.  */
  34. package fr.paris.lutece.portal.service.daemon;

  35. import java.util.ArrayList;
  36. import java.util.HashMap;
  37. import java.util.HashSet;
  38. import java.util.Map;
  39. import java.util.Set;
  40. import java.util.Timer;
  41. import java.util.TimerTask;
  42. import java.util.concurrent.BlockingQueue;
  43. import java.util.concurrent.ExecutorService;
  44. import java.util.concurrent.TimeUnit;

  45. import fr.paris.lutece.portal.service.util.AppLogService;
  46. import fr.paris.lutece.portal.service.util.AppPropertiesService;

  47. /**
  48.  * Daemon scheduler.
  49.  * <p>
  50.  * Responsible for ensuring on demand or timely daemon execution. Starts a thread which monitor the queue for daemons to execute. A {@link Timer} handles
  51.  * repeating daemons runs.
  52.  *
  53.  * <p>
  54.  * Daemon run requests are coalesced. If a daemon is already running when a request comes, a new run is scheduled right after the current run ends.
  55.  */
  56. class DaemonScheduler implements Runnable, IDaemonScheduler
  57. {
  58.     private static final String PROPERTY_MAX_AWAIT_TERMINATION_DELAY = "daemon.maxAwaitTerminationDelay";

  59.     private final BlockingQueue<DaemonEntry> _queue;
  60.     private final ExecutorService _executor;
  61.     private final Thread _coordinatorThread;
  62.     private final Timer _scheduledDaemonsTimer;
  63.     private final Map<String, RunnableWrapper> _executingDaemons;
  64.     private final Map<String, DaemonTimerTask> _scheduledDaemons;
  65.     private volatile boolean _bShuttingDown;

  66.     /**
  67.      * Constructor
  68.      *
  69.      * @param queue
  70.      *            the queue where daemon execution requests are stored
  71.      * @param executor
  72.      *            the executor service handling the execution of daemons
  73.      */
  74.     public DaemonScheduler( BlockingQueue<DaemonEntry> queue, ExecutorService executor )
  75.     {
  76.         _queue = queue;
  77.         _executor = executor;
  78.         _scheduledDaemonsTimer = new Timer( "Lutece-Daemons-Scheduled-Timer-Thread", true );
  79.         _executingDaemons = new HashMap<>( );
  80.         _scheduledDaemons = new HashMap<>( );
  81.         _coordinatorThread = new Thread( this, "Lutece-Daemons-Coordinator" );
  82.         _coordinatorThread.setDaemon( true );
  83.         _coordinatorThread.start( );
  84.         _bShuttingDown = false;
  85.     }

  86.     @Override
  87.     public boolean enqueue( DaemonEntry entry, long nDelay, TimeUnit unit )
  88.     {
  89.         assertNotShuttingDown( );
  90.         if ( nDelay == 0L )
  91.         {
  92.             boolean queued = _queue.offer( entry );
  93.             if ( !queued )
  94.             {
  95.                 AppLogService.error( "Failed to enqueue a run of daemon {}", entry.getId( ) );
  96.             }
  97.             return queued;
  98.         }
  99.         try
  100.         {
  101.             _scheduledDaemonsTimer.schedule( new DaemonTimerTask( entry ), unit.toMillis( nDelay ) );
  102.             return true;
  103.         }
  104.         catch( IllegalStateException e )
  105.         {
  106.             return false;
  107.         }
  108.     }

  109.     private void assertNotShuttingDown( )
  110.     {
  111.         if ( _bShuttingDown )
  112.         {
  113.             throw new IllegalStateException( "DaemonScheduler is shutting down. Enqueing tasks or scheduling tasks is not possible anymore." );
  114.         }
  115.     }

  116.     /**
  117.      * Enqueue without delay
  118.      *
  119.      * @param entry
  120.      *            the daemon entry
  121.      */
  122.     private void enqueue( DaemonEntry entry )
  123.     {
  124.         enqueue( entry, 0L, TimeUnit.MILLISECONDS );
  125.     }

  126.     @Override
  127.     public void schedule( DaemonEntry entry, long nInitialDelay, TimeUnit unit )
  128.     {
  129.         assertNotShuttingDown( );
  130.         synchronized( _scheduledDaemons )
  131.         {
  132.             if ( _scheduledDaemons.containsKey( entry.getId( ) ) )
  133.             {
  134.                 AppLogService.error( "Daemon " + entry.getId( ) + " already scheduled, not scheduling again" );
  135.             }
  136.             else
  137.             {

  138.                 DaemonTimerTask daemonTimerTask = new DaemonTimerTask( entry );
  139.                 _scheduledDaemonsTimer.scheduleAtFixedRate( daemonTimerTask, unit.toMillis( nInitialDelay ), entry.getInterval( ) * 1000 );
  140.                 _scheduledDaemons.put( entry.getId( ), daemonTimerTask );
  141.             }
  142.         }

  143.     }

  144.     @Override
  145.     public void unSchedule( DaemonEntry entry )
  146.     {
  147.         synchronized( _scheduledDaemons )
  148.         {
  149.             DaemonTimerTask daemonTimerTask = _scheduledDaemons.get( entry.getId( ) );
  150.             if ( daemonTimerTask == null )
  151.             {
  152.                 AppLogService.error( "Could not unschedule daemon " + entry.getId( ) + " which was not scheduled" );
  153.             }
  154.             else
  155.             {
  156.                 daemonTimerTask.cancel( );
  157.                 _scheduledDaemonsTimer.purge( );
  158.                 _scheduledDaemons.remove( entry.getId( ) );
  159.             }
  160.         }
  161.         boolean bStopScheduled = false;
  162.         synchronized( _executingDaemons )
  163.         {
  164.             RunnableWrapper runnable = _executingDaemons.get( entry.getId( ) );
  165.             if ( runnable != null )
  166.             {
  167.                 runnable.stopDaemonAfterExecution( );
  168.                 bStopScheduled = true;
  169.             }
  170.         }
  171.         if ( !bStopScheduled )
  172.         {
  173.             try
  174.             {
  175.                 entry.getDaemon( ).stop( );
  176.             }
  177.             catch( Throwable t )
  178.             {
  179.                 AppLogService.error( "Failed to stop daemon {}", entry.getId( ), t );
  180.             }
  181.         }
  182.     }

  183.     @Override
  184.     public void run( )
  185.     {
  186.         // use a set to coalesce daemon signaling
  187.         Set<DaemonEntry> queued = new HashSet<>( );
  188.         do
  189.         {
  190.             try
  191.             {
  192.                 // collect signaled daemons
  193.                 queued.add( _queue.take( ) );
  194.                 _queue.drainTo( queued );
  195.             }
  196.             catch( InterruptedException e )
  197.             {
  198.                 // We were asked to stop
  199.                 break;
  200.             }
  201.             // execute them
  202.             for ( DaemonEntry entry : queued )
  203.             {
  204.                 RunnableWrapper runnable = null;
  205.                 synchronized( _executingDaemons )
  206.                 {
  207.                     runnable = _executingDaemons.get( entry.getId( ) );
  208.                     if ( runnable != null )
  209.                     {
  210.                         // already executing; schedule a new run after this one
  211.                         runnable.shouldEnqueueAgain( );
  212.                         runnable = null;
  213.                     }
  214.                     else
  215.                     {
  216.                         runnable = new RunnableWrapper( entry );
  217.                         _executingDaemons.put( entry.getId( ), runnable );
  218.                     }
  219.                 }
  220.                 if ( runnable != null )
  221.                 {
  222.                     _executor.execute( runnable );
  223.                 }
  224.             }
  225.             // prepare next iteration
  226.             queued.clear( );
  227.         }
  228.         while ( !Thread.interrupted( ) );
  229.     }

  230.     @Override
  231.     public void shutdown( )
  232.     {
  233.         _bShuttingDown = true; // prevent future scheduling of daemons
  234.         int maxAwaitTerminationDelay = AppPropertiesService.getPropertyInt( PROPERTY_MAX_AWAIT_TERMINATION_DELAY, 15 );
  235.         AppLogService
  236.                 .info( "Lutece daemons scheduler stop requested : trying to terminate gracefully daemons list (max wait " + maxAwaitTerminationDelay + " s)." );
  237.         _scheduledDaemonsTimer.cancel( );
  238.         _coordinatorThread.interrupt( );
  239.         _executor.shutdown( );

  240.         // make a copy of scheduled daemons so that the list can be modified by
  241.         // #unSchedule
  242.         ArrayList<DaemonTimerTask> scheduled = new ArrayList<>( _scheduledDaemons.values( ) );
  243.         scheduled.forEach( task -> unSchedule( task.getDaemonEntry( ) ) );

  244.         try
  245.         {
  246.             if ( _executor.awaitTermination( maxAwaitTerminationDelay, TimeUnit.SECONDS ) )
  247.             {
  248.                 AppLogService.info( "All daemons shutdown successfully." );
  249.             }
  250.             else
  251.             {
  252.                 AppLogService.info( "Some daemons are still running, trying to interrupt them..." );
  253.                 _executor.shutdownNow( );

  254.                 if ( _executor.awaitTermination( 1, TimeUnit.SECONDS ) )
  255.                 {
  256.                     AppLogService.info( "All running daemons successfully interrupted." );
  257.                 }
  258.                 else
  259.                 {
  260.                     AppLogService.error( "Interrupt failed; daemons still running." );
  261.                 }
  262.             }
  263.         }
  264.         catch( InterruptedException e )
  265.         {
  266.             AppLogService.error( "Interruped while waiting for daemons termination", e );
  267.         }
  268.     }

  269.     /**
  270.      * Wrapper for the daemon Runnable which can enqueue a new run when execution completes
  271.      */
  272.     private final class RunnableWrapper implements Runnable
  273.     {

  274.         private final DaemonEntry _entry;
  275.         private volatile boolean _bShouldEnqueueAgain;
  276.         private volatile boolean _bstopAfterExecution;

  277.         /**
  278.          * @param entry
  279.          *            the wrapped DaemonEntry
  280.          */
  281.         public RunnableWrapper( DaemonEntry entry )
  282.         {
  283.             _entry = entry;
  284.         }

  285.         public void stopDaemonAfterExecution( )
  286.         {
  287.             _bstopAfterExecution = true;
  288.         }

  289.         /**
  290.          * Signals that an execution of the daemon should be enqueued on completion
  291.          */
  292.         public void shouldEnqueueAgain( )
  293.         {
  294.             _bShouldEnqueueAgain = true;
  295.         }

  296.         @Override
  297.         public void run( )
  298.         {
  299.             try
  300.             {
  301.                 _entry.getDaemonThread( ).run( );
  302.             }
  303.             finally
  304.             {
  305.                 synchronized( _executingDaemons )
  306.                 {
  307.                     _executingDaemons.remove( _entry.getId( ) );
  308.                 }
  309.                 if ( _bstopAfterExecution )
  310.                 {
  311.                     _entry.getDaemon( ).stop( );
  312.                 }
  313.                 else
  314.                     if ( _bShouldEnqueueAgain )
  315.                     {
  316.                         enqueue( _entry );
  317.                     }
  318.             }
  319.         }

  320.     }

  321.     /**
  322.      * Timer task to enqueue daemon runs
  323.      */
  324.     private final class DaemonTimerTask extends TimerTask
  325.     {

  326.         private final DaemonEntry _entry;

  327.         /**
  328.          * @param entry
  329.          *            the daemon
  330.          */
  331.         public DaemonTimerTask( DaemonEntry entry )
  332.         {
  333.             _entry = entry;
  334.         }

  335.         @Override
  336.         public void run( )
  337.         {
  338.             enqueue( _entry );
  339.         }

  340.         /**
  341.          * Access the daemon entry
  342.          *
  343.          * @return the daemon
  344.          */
  345.         public DaemonEntry getDaemonEntry( )
  346.         {
  347.             return _entry;
  348.         }

  349.     }
  350. }