View Javadoc
1   /*
2    * Copyright (c) 2002-2022, City of Paris
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met:
8    *
9    *  1. Redistributions of source code must retain the above copyright notice
10   *     and the following disclaimer.
11   *
12   *  2. Redistributions in binary form must reproduce the above copyright notice
13   *     and the following disclaimer in the documentation and/or other materials
14   *     provided with the distribution.
15   *
16   *  3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
17   *     contributors may be used to endorse or promote products derived from
18   *     this software without specific prior written permission.
19   *
20   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21   * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22   * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
24   * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25   * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26   * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27   * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28   * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30   * POSSIBILITY OF SUCH DAMAGE.
31   *
32   * License 1.0
33   */
34  package fr.paris.lutece.portal.service.daemon;
35  
36  import java.util.ArrayList;
37  import java.util.HashMap;
38  import java.util.HashSet;
39  import java.util.Map;
40  import java.util.Set;
41  import java.util.Timer;
42  import java.util.TimerTask;
43  import java.util.concurrent.BlockingQueue;
44  import java.util.concurrent.ExecutorService;
45  import java.util.concurrent.TimeUnit;
46  
47  import fr.paris.lutece.portal.service.util.AppLogService;
48  import fr.paris.lutece.portal.service.util.AppPropertiesService;
49  
50  /**
51   * Daemon scheduler.
52   * <p>
53   * Responsible for ensuring on demand or timely daemon execution. Starts a thread which monitor the queue for daemons to execute. A {@link Timer} handles
54   * repeating daemons runs.
55   * 
56   * <p>
57   * Daemon run requests are coalesced. If a daemon is already running when a request comes, a new run is scheduled right after the current run ends.
58   */
59  class DaemonScheduler implements Runnable, IDaemonScheduler
60  {
61      private static final String PROPERTY_MAX_AWAIT_TERMINATION_DELAY = "daemon.maxAwaitTerminationDelay";
62  
63      private final BlockingQueue<DaemonEntry> _queue;
64      private final ExecutorService _executor;
65      private final Thread _coordinatorThread;
66      private final Timer _scheduledDaemonsTimer;
67      private final Map<String, RunnableWrapper> _executingDaemons;
68      private final Map<String, DaemonTimerTask> _scheduledDaemons;
69      private volatile boolean _bShuttingDown;
70  
71      /**
72       * Constructor
73       * 
74       * @param queue
75       *            the queue where daemon execution requests are stored
76       * @param executor
77       *            the executor service handling the execution of daemons
78       */
79      public DaemonScheduler( BlockingQueue<DaemonEntry> queue, ExecutorService executor )
80      {
81          _queue = queue;
82          _executor = executor;
83          _scheduledDaemonsTimer = new Timer( "Lutece-Daemons-Scheduled-Timer-Thread", true );
84          _executingDaemons = new HashMap<>( );
85          _scheduledDaemons = new HashMap<>( );
86          _coordinatorThread = new Thread( this, "Lutece-Daemons-Coordinator" );
87          _coordinatorThread.setDaemon( true );
88          _coordinatorThread.start( );
89          _bShuttingDown = false;
90      }
91  
92      @Override
93      public boolean enqueue( DaemonEntry entry, long nDelay, TimeUnit unit )
94      {
95          assertNotShuttingDown( );
96          if ( nDelay == 0L )
97          {
98              boolean queued = _queue.offer( entry );
99              if ( !queued )
100             {
101                 AppLogService.error( "Failed to enqueue a run of daemon {}", entry.getId( ) );
102             }
103             return queued;
104         }
105         try
106         {
107             _scheduledDaemonsTimer.schedule( new DaemonTimerTask( entry ), unit.toMillis( nDelay ) );
108             return true;
109         }
110         catch( IllegalStateException e )
111         {
112             return false;
113         }
114     }
115 
116     private void assertNotShuttingDown( )
117     {
118         if ( _bShuttingDown )
119         {
120             throw new IllegalStateException( "DaemonScheduler is shutting down. Enqueing tasks or scheduling tasks is not possible anymore." );
121         }
122     }
123 
124     /**
125      * Enqueue without delay
126      * 
127      * @param entry
128      *            the daemon entry
129      */
130     private void enqueue( DaemonEntry entry )
131     {
132         enqueue( entry, 0L, TimeUnit.MILLISECONDS );
133     }
134 
135     @Override
136     public void schedule( DaemonEntry entry, long nInitialDelay, TimeUnit unit )
137     {
138         assertNotShuttingDown( );
139         synchronized( _scheduledDaemons )
140         {
141             if ( _scheduledDaemons.containsKey( entry.getId( ) ) )
142             {
143                 AppLogService.error( "Daemon " + entry.getId( ) + " already scheduled, not scheduling again" );
144             }
145             else
146             {
147 
148                 DaemonTimerTask daemonTimerTask = new DaemonTimerTask( entry );
149                 _scheduledDaemonsTimer.scheduleAtFixedRate( daemonTimerTask, unit.toMillis( nInitialDelay ), entry.getInterval( ) * 1000 );
150                 _scheduledDaemons.put( entry.getId( ), daemonTimerTask );
151             }
152         }
153 
154     }
155 
156     @Override
157     public void unSchedule( DaemonEntry entry )
158     {
159         synchronized( _scheduledDaemons )
160         {
161             DaemonTimerTask daemonTimerTask = _scheduledDaemons.get( entry.getId( ) );
162             if ( daemonTimerTask == null )
163             {
164                 AppLogService.error( "Could not unschedule daemon " + entry.getId( ) + " which was not scheduled" );
165             }
166             else
167             {
168                 daemonTimerTask.cancel( );
169                 _scheduledDaemonsTimer.purge( );
170                 _scheduledDaemons.remove( entry.getId( ) );
171             }
172         }
173         boolean bStopScheduled = false;
174         synchronized( _executingDaemons )
175         {
176             RunnableWrapper runnable = _executingDaemons.get( entry.getId( ) );
177             if ( runnable != null )
178             {
179                 runnable.stopDaemonAfterExecution( );
180                 bStopScheduled = true;
181             }
182         }
183         if ( !bStopScheduled )
184         {
185             try
186             {
187                 entry.getDaemon( ).stop( );
188             }
189             catch( Throwable t )
190             {
191                 AppLogService.error( "Failed to stop daemon {}", entry.getId( ), t );
192             }
193         }
194     }
195 
196     @Override
197     public void run( )
198     {
199         // use a set to coalesce daemon signaling
200         Set<DaemonEntry> queued = new HashSet<>( );
201         do
202         {
203             try
204             {
205                 // collect signaled daemons
206                 queued.add( _queue.take( ) );
207                 _queue.drainTo( queued );
208             }
209             catch( InterruptedException e )
210             {
211                 // We were asked to stop
212                 break;
213             }
214             // execute them
215             for ( DaemonEntry entry : queued )
216             {
217                 RunnableWrapper runnable = null;
218                 synchronized( _executingDaemons )
219                 {
220                     runnable = _executingDaemons.get( entry.getId( ) );
221                     if ( runnable != null )
222                     {
223                         // already executing; schedule a new run after this one
224                         runnable.shouldEnqueueAgain( );
225                         runnable = null;
226                     }
227                     else
228                     {
229                         runnable = new RunnableWrapper( entry );
230                         _executingDaemons.put( entry.getId( ), runnable );
231                     }
232                 }
233                 if ( runnable != null )
234                 {
235                     _executor.execute( runnable );
236                 }
237             }
238             // prepare next iteration
239             queued.clear( );
240         }
241         while ( !Thread.interrupted( ) );
242     }
243 
244     @Override
245     public void shutdown( )
246     {
247         _bShuttingDown = true; // prevent future scheduling of daemons
248         int maxAwaitTerminationDelay = AppPropertiesService.getPropertyInt( PROPERTY_MAX_AWAIT_TERMINATION_DELAY, 15 );
249         AppLogService
250                 .info( "Lutece daemons scheduler stop requested : trying to terminate gracefully daemons list (max wait " + maxAwaitTerminationDelay + " s)." );
251         _scheduledDaemonsTimer.cancel( );
252         _coordinatorThread.interrupt( );
253         _executor.shutdown( );
254 
255         // make a copy of scheduled daemons so that the list can be modified by
256         // #unSchedule
257         ArrayList<DaemonTimerTask> scheduled = new ArrayList<>( _scheduledDaemons.values( ) );
258         scheduled.forEach( task -> unSchedule( task.getDaemonEntry( ) ) );
259 
260         try
261         {
262             if ( _executor.awaitTermination( maxAwaitTerminationDelay, TimeUnit.SECONDS ) )
263             {
264                 AppLogService.info( "All daemons shutdown successfully." );
265             }
266             else
267             {
268                 AppLogService.info( "Some daemons are still running, trying to interrupt them..." );
269                 _executor.shutdownNow( );
270 
271                 if ( _executor.awaitTermination( 1, TimeUnit.SECONDS ) )
272                 {
273                     AppLogService.info( "All running daemons successfully interrupted." );
274                 }
275                 else
276                 {
277                     AppLogService.error( "Interrupt failed; daemons still running." );
278                 }
279             }
280         }
281         catch( InterruptedException e )
282         {
283             AppLogService.error( "Interruped while waiting for daemons termination", e );
284         }
285     }
286 
287     /**
288      * Wrapper for the daemon Runnable which can enqueue a new run when execution completes
289      */
290     private final class RunnableWrapper implements Runnable
291     {
292 
293         private final DaemonEntry _entry;
294         private volatile boolean _bShouldEnqueueAgain;
295         private volatile boolean _bstopAfterExecution;
296 
297         /**
298          * @param entry
299          *            the wrapped DaemonEntry
300          */
301         public RunnableWrapper( DaemonEntry entry )
302         {
303             _entry = entry;
304         }
305 
306         public void stopDaemonAfterExecution( )
307         {
308             _bstopAfterExecution = true;
309         }
310 
311         /**
312          * Signals that an execution of the daemon should be enqueued on completion
313          */
314         public void shouldEnqueueAgain( )
315         {
316             _bShouldEnqueueAgain = true;
317         }
318 
319         @Override
320         public void run( )
321         {
322             try
323             {
324                 _entry.getDaemonThread( ).run( );
325             }
326             finally
327             {
328                 synchronized( _executingDaemons )
329                 {
330                     _executingDaemons.remove( _entry.getId( ) );
331                 }
332                 if ( _bstopAfterExecution )
333                 {
334                     _entry.getDaemon( ).stop( );
335                 }
336                 else
337                     if ( _bShouldEnqueueAgain )
338                     {
339                         enqueue( _entry );
340                     }
341             }
342         }
343 
344     }
345 
346     /**
347      * Timer task to enqueue daemon runs
348      */
349     private final class DaemonTimerTask extends TimerTask
350     {
351 
352         private final DaemonEntry _entry;
353 
354         /**
355          * @param entry
356          *            the daemon
357          */
358         public DaemonTimerTask( DaemonEntry entry )
359         {
360             _entry = entry;
361         }
362 
363         @Override
364         public void run( )
365         {
366             enqueue( _entry );
367         }
368 
369         /**
370          * Access the daemon entry
371          * 
372          * @return the daemon
373          */
374         public DaemonEntry getDaemonEntry( )
375         {
376             return _entry;
377         }
378 
379     }
380 }