1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 package fr.paris.lutece.portal.service.daemon;
35
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.Timer;
42 import java.util.TimerTask;
43 import java.util.concurrent.BlockingQueue;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.TimeUnit;
46
47 import fr.paris.lutece.portal.service.util.AppLogService;
48 import fr.paris.lutece.portal.service.util.AppPropertiesService;
49
50
51
52
53
54
55
56
57
58
59 class DaemonScheduler implements Runnable, IDaemonScheduler
60 {
61 private static final String PROPERTY_MAX_AWAIT_TERMINATION_DELAY = "daemon.maxAwaitTerminationDelay";
62
63 private final BlockingQueue<DaemonEntry> _queue;
64 private final ExecutorService _executor;
65 private final Thread _coordinatorThread;
66 private final Timer _scheduledDaemonsTimer;
67 private final Map<String, RunnableWrapper> _executingDaemons;
68 private final Map<String, DaemonTimerTask> _scheduledDaemons;
69 private volatile boolean _bShuttingDown;
70
71
72
73
74
75
76
77
78
79 public DaemonScheduler( BlockingQueue<DaemonEntry> queue, ExecutorService executor )
80 {
81 _queue = queue;
82 _executor = executor;
83 _scheduledDaemonsTimer = new Timer( "Lutece-Daemons-Scheduled-Timer-Thread", true );
84 _executingDaemons = new HashMap<>( );
85 _scheduledDaemons = new HashMap<>( );
86 _coordinatorThread = new Thread( this, "Lutece-Daemons-Coordinator" );
87 _coordinatorThread.setDaemon( true );
88 _coordinatorThread.start( );
89 _bShuttingDown = false;
90 }
91
92 @Override
93 public boolean enqueue( DaemonEntry entry, long nDelay, TimeUnit unit )
94 {
95 assertNotShuttingDown( );
96 if ( nDelay == 0L )
97 {
98 boolean queued = _queue.offer( entry );
99 if ( !queued )
100 {
101 AppLogService.error( "Failed to enqueue a run of daemon {}", entry.getId( ) );
102 }
103 return queued;
104 }
105 try
106 {
107 _scheduledDaemonsTimer.schedule( new DaemonTimerTask( entry ), unit.toMillis( nDelay ) );
108 return true;
109 }
110 catch( IllegalStateException e )
111 {
112 return false;
113 }
114 }
115
116 private void assertNotShuttingDown( )
117 {
118 if ( _bShuttingDown )
119 {
120 throw new IllegalStateException( "DaemonScheduler is shutting down. Enqueing tasks or scheduling tasks is not possible anymore." );
121 }
122 }
123
124
125
126
127
128
129
130 private void enqueue( DaemonEntry entry )
131 {
132 enqueue( entry, 0L, TimeUnit.MILLISECONDS );
133 }
134
135 @Override
136 public void schedule( DaemonEntry entry, long nInitialDelay, TimeUnit unit )
137 {
138 assertNotShuttingDown( );
139 synchronized( _scheduledDaemons )
140 {
141 if ( _scheduledDaemons.containsKey( entry.getId( ) ) )
142 {
143 AppLogService.error( "Daemon " + entry.getId( ) + " already scheduled, not scheduling again" );
144 }
145 else
146 {
147
148 DaemonTimerTask daemonTimerTask = new DaemonTimerTask( entry );
149 _scheduledDaemonsTimer.scheduleAtFixedRate( daemonTimerTask, unit.toMillis( nInitialDelay ), entry.getInterval( ) * 1000 );
150 _scheduledDaemons.put( entry.getId( ), daemonTimerTask );
151 }
152 }
153
154 }
155
156 @Override
157 public void unSchedule( DaemonEntry entry )
158 {
159 synchronized( _scheduledDaemons )
160 {
161 DaemonTimerTask daemonTimerTask = _scheduledDaemons.get( entry.getId( ) );
162 if ( daemonTimerTask == null )
163 {
164 AppLogService.error( "Could not unschedule daemon " + entry.getId( ) + " which was not scheduled" );
165 }
166 else
167 {
168 daemonTimerTask.cancel( );
169 _scheduledDaemonsTimer.purge( );
170 _scheduledDaemons.remove( entry.getId( ) );
171 }
172 }
173 boolean bStopScheduled = false;
174 synchronized( _executingDaemons )
175 {
176 RunnableWrapper runnable = _executingDaemons.get( entry.getId( ) );
177 if ( runnable != null )
178 {
179 runnable.stopDaemonAfterExecution( );
180 bStopScheduled = true;
181 }
182 }
183 if ( !bStopScheduled )
184 {
185 try
186 {
187 entry.getDaemon( ).stop( );
188 }
189 catch( Throwable t )
190 {
191 AppLogService.error( "Failed to stop daemon {}", entry.getId( ), t );
192 }
193 }
194 }
195
196 @Override
197 public void run( )
198 {
199
200 Set<DaemonEntry> queued = new HashSet<>( );
201 do
202 {
203 try
204 {
205
206 queued.add( _queue.take( ) );
207 _queue.drainTo( queued );
208 }
209 catch( InterruptedException e )
210 {
211
212 break;
213 }
214
215 for ( DaemonEntry entry : queued )
216 {
217 RunnableWrapper runnable = null;
218 synchronized( _executingDaemons )
219 {
220 runnable = _executingDaemons.get( entry.getId( ) );
221 if ( runnable != null )
222 {
223
224 runnable.shouldEnqueueAgain( );
225 runnable = null;
226 }
227 else
228 {
229 runnable = new RunnableWrapper( entry );
230 _executingDaemons.put( entry.getId( ), runnable );
231 }
232 }
233 if ( runnable != null )
234 {
235 _executor.execute( runnable );
236 }
237 }
238
239 queued.clear( );
240 }
241 while ( !Thread.interrupted( ) );
242 }
243
244 @Override
245 public void shutdown( )
246 {
247 _bShuttingDown = true;
248 int maxAwaitTerminationDelay = AppPropertiesService.getPropertyInt( PROPERTY_MAX_AWAIT_TERMINATION_DELAY, 15 );
249 AppLogService
250 .info( "Lutece daemons scheduler stop requested : trying to terminate gracefully daemons list (max wait " + maxAwaitTerminationDelay + " s)." );
251 _scheduledDaemonsTimer.cancel( );
252 _coordinatorThread.interrupt( );
253 _executor.shutdown( );
254
255
256
257 ArrayList<DaemonTimerTask> scheduled = new ArrayList<>( _scheduledDaemons.values( ) );
258 scheduled.forEach( task -> unSchedule( task.getDaemonEntry( ) ) );
259
260 try
261 {
262 if ( _executor.awaitTermination( maxAwaitTerminationDelay, TimeUnit.SECONDS ) )
263 {
264 AppLogService.info( "All daemons shutdown successfully." );
265 }
266 else
267 {
268 AppLogService.info( "Some daemons are still running, trying to interrupt them..." );
269 _executor.shutdownNow( );
270
271 if ( _executor.awaitTermination( 1, TimeUnit.SECONDS ) )
272 {
273 AppLogService.info( "All running daemons successfully interrupted." );
274 }
275 else
276 {
277 AppLogService.error( "Interrupt failed; daemons still running." );
278 }
279 }
280 }
281 catch( InterruptedException e )
282 {
283 AppLogService.error( "Interruped while waiting for daemons termination", e );
284 }
285 }
286
287
288
289
290 private final class RunnableWrapper implements Runnable
291 {
292
293 private final DaemonEntry _entry;
294 private volatile boolean _bShouldEnqueueAgain;
295 private volatile boolean _bstopAfterExecution;
296
297
298
299
300
301 public RunnableWrapper( DaemonEntry entry )
302 {
303 _entry = entry;
304 }
305
306 public void stopDaemonAfterExecution( )
307 {
308 _bstopAfterExecution = true;
309 }
310
311
312
313
314 public void shouldEnqueueAgain( )
315 {
316 _bShouldEnqueueAgain = true;
317 }
318
319 @Override
320 public void run( )
321 {
322 try
323 {
324 _entry.getDaemonThread( ).run( );
325 }
326 finally
327 {
328 synchronized( _executingDaemons )
329 {
330 _executingDaemons.remove( _entry.getId( ) );
331 }
332 if ( _bstopAfterExecution )
333 {
334 _entry.getDaemon( ).stop( );
335 }
336 else
337 if ( _bShouldEnqueueAgain )
338 {
339 enqueue( _entry );
340 }
341 }
342 }
343
344 }
345
346
347
348
349 private final class DaemonTimerTask extends TimerTask
350 {
351
352 private final DaemonEntry _entry;
353
354
355
356
357
358 public DaemonTimerTask( DaemonEntry entry )
359 {
360 _entry = entry;
361 }
362
363 @Override
364 public void run( )
365 {
366 enqueue( _entry );
367 }
368
369
370
371
372
373
374 public DaemonEntry getDaemonEntry( )
375 {
376 return _entry;
377 }
378
379 }
380 }