1 /*
2 * Copyright (c) 2002-2025, City of Paris
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * 1. Redistributions of source code must retain the above copyright notice
10 * and the following disclaimer.
11 *
12 * 2. Redistributions in binary form must reproduce the above copyright notice
13 * and the following disclaimer in the documentation and/or other materials
14 * provided with the distribution.
15 *
16 * 3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 *
32 * License 1.0
33 */
34 package fr.paris.lutece.portal.service.daemon;
35
36 import fr.paris.lutece.portal.service.plugin.Plugin;
37 import fr.paris.lutece.portal.service.util.AppPropertiesService;
38
39 import java.util.ArrayDeque;
40 import java.util.ArrayList;
41 import java.util.Deque;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Map.Entry;
46
47 import org.apache.commons.collections.CollectionUtils;
48
49 /**
50 * Daemon that manage a pool of threads to launch runnables.
51 */
52 public class ThreadLauncherDaemon extends Daemon
53 {
54 private static final String THREAD_LAUNCHER_DAEMON_ID = "threadLauncherDaemon";
55
56 /**
57 * Runnable wrapper to signal the daemon on processing end
58 */
59 private class RunnableWrapper implements Runnable
60 {
61 private final Runnable _wrapped;
62
63 /**
64 * Constructor
65 *
66 * @param wrapped
67 * the wrapped Runnable
68 */
69 public RunnableWrapper( Runnable wrapped )
70 {
71 _wrapped = wrapped;
72 }
73
74 @Override
75 public void run( )
76 {
77 try
78 {
79 _wrapped.run( );
80 }
81 finally
82 {
83 // will clean up the dead thread
84 // and permit execution of blocked tasks
85 AppDaemonService.signalDaemon( THREAD_LAUNCHER_DAEMON_ID );
86 }
87 }
88
89 }
90
91 private static final String PROPERTY_MAX_NUMBER_THREAD = "daemon.threadLauncherDaemon.maxNumberOfThread";
92 private static Deque<RunnableQueueItem> _stackItems = new ArrayDeque<>( );
93 private Map<String, Thread> _mapThreadByKey = new HashMap<>( );
94 private List<Thread> _listThread = new ArrayList<>( );
95
96 /**
97 * {@inheritDoc}
98 */
99 @Override
100 public void run( )
101 {
102 int nMaxNumberThread = AppPropertiesService.getPropertyInt( PROPERTY_MAX_NUMBER_THREAD, 5 );
103
104 // We remove dead threads from running thread collections
105 String logs = removeDeadThreads( );
106
107 RunnableQueueItem item = null;
108 int nCurrentNumberRunningThreads = _mapThreadByKey.size( ) + _listThread.size( );
109
110 List<RunnableQueueItem> listLockedItems = new ArrayList<>( );
111
112 while ( ( nCurrentNumberRunningThreads < nMaxNumberThread ) && ( ( item = popItemFromQueue( ) ) != null ) )
113 {
114 String key = item.computeKey( );
115 // If the item has a key, then we must make sure that another thread with the same key and plugin is not running
116 if ( key != null )
117 {
118 Thread thread = _mapThreadByKey.get( key );
119
120 if ( thread != null )
121 {
122 if ( thread.isAlive( ) )
123 {
124 // The thread is already running. We declare this item as locked for this run of the daemon.
125 listLockedItems.add( item );
126 }
127 else
128 {
129 // Dead threads are removed from collections at the beginning of the run of the daemon
130 // We still check again that the thread is alive just in case it died during the run
131 thread = getThread( item );
132 _mapThreadByKey.put( key, thread );
133
134 // We do not increase the number of running threads, because we removed and add one
135 }
136 }
137 else
138 {
139 thread = getThread( item );
140 _mapThreadByKey.put( key, thread );
141 nCurrentNumberRunningThreads++;
142 }
143 }
144 else
145 {
146 // If it has no key, or if the plugin has not been set, we create a thread in the keyless collection
147 Thread thread = getThread( item );
148 _mapThreadByKey.put( key, thread );
149 nCurrentNumberRunningThreads++;
150 }
151 }
152
153 // We replace in the queue items that was locked
154 listLockedItems.stream( ).forEach( ThreadLauncherDaemon::addItemToQueue );
155
156 // If the maximum number of running threads has been reached, we end this run
157 if ( nCurrentNumberRunningThreads >= nMaxNumberThread )
158 {
159 setLastRunLogs( logs + "Every threads are running. Daemon execution ending." );
160 return;
161 }
162
163 setLastRunLogs( logs + "There is no more runnable to launch." );
164 }
165
166 private Thread getThread( RunnableQueueItem item )
167 {
168 Thread thread = new Thread( new RunnableWrapper( item.getRunnable( ) ) );
169 thread.start( );
170 return thread;
171 }
172
173 private String removeDeadThreads( )
174 {
175 List<String> listDeadThreadKeys = new ArrayList<>( );
176 for ( Entry<String, Thread> threadEntry : _mapThreadByKey.entrySet( ) )
177 {
178 if ( !threadEntry.getValue( ).isAlive( ) )
179 {
180 listDeadThreadKeys.add( threadEntry.getKey( ) );
181 }
182 }
183
184 for ( String strThreadKey : listDeadThreadKeys )
185 {
186 _mapThreadByKey.remove( strThreadKey );
187 }
188
189 List<Thread> listDeadThreads = new ArrayList<>( );
190
191 for ( Thread thread : _listThread )
192 {
193 if ( !thread.isAlive( ) )
194 {
195 listDeadThreads.add( thread );
196 }
197 }
198
199 String logs = "";
200 if ( !listDeadThreads.isEmpty( ) )
201 {
202 logs = "Releasing " + listDeadThreads.size( ) + " dead threads.\n";
203 }
204 for ( Thread thread : listDeadThreads )
205 {
206 _listThread.remove( thread );
207 }
208 return logs;
209 }
210
211 /**
212 * Add a runnable to the launch queue. It will be launched as soon as a thread is available.
213 *
214 * @param runnable
215 * The runnable to execute
216 * @param strKey
217 * The key of the runnable. Runnables of a given plugin are ensured that they will not be executed at the same time if they have the same key.
218 * @param plugin
219 * The plugin the runnable is associated with
220 */
221 public static void addItemToQueue( Runnable runnable, String strKey, Plugin plugin )
222 {
223 RunnableQueueItemunnableQueueItem.html#RunnableQueueItem">RunnableQueueItem runnableItem = new RunnableQueueItem( runnable, strKey, plugin );
224
225 synchronized( ThreadLauncherDaemon.class )
226 {
227 _stackItems.addLast( runnableItem );
228 }
229
230 AppDaemonService.signalDaemon( THREAD_LAUNCHER_DAEMON_ID );
231 }
232
233 /**
234 * Add a runnable item to the queue
235 *
236 * @param runnableItem
237 * The runnable item to add to the queue
238 */
239 private static synchronized void addItemToQueue( RunnableQueueItem runnableItem )
240 {
241 _stackItems.addLast( runnableItem );
242 }
243
244 /**
245 * Pop the first item of the queue. The item is removed from the queue.
246 *
247 * @return The first item of the queue, or null if the queue is empty
248 */
249 private static synchronized RunnableQueueItem popItemFromQueue( )
250 {
251 if ( CollectionUtils.isEmpty( _stackItems ) )
252 {
253 return null;
254 }
255
256 return _stackItems.pop( );
257 }
258
259 /**
260 * Count the number of items in the queue.
261 *
262 * @return The current number of items in the queue
263 */
264 public static synchronized Integer countItemsInQueue( )
265 {
266 return _stackItems.size( );
267 }
268 }