1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 package fr.paris.lutece.portal.service.daemon;
35
36 import fr.paris.lutece.portal.service.plugin.Plugin;
37 import fr.paris.lutece.portal.service.util.AppPropertiesService;
38
39 import java.util.ArrayDeque;
40 import java.util.ArrayList;
41 import java.util.Deque;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Map.Entry;
46
47 import org.apache.commons.collections.CollectionUtils;
48
49
50
51
52 public class ThreadLauncherDaemon extends Daemon
53 {
54 private static final String THREAD_LAUNCHER_DAEMON_ID = "threadLauncherDaemon";
55
56
57
58
59 private class RunnableWrapper implements Runnable
60 {
61 private final Runnable _wrapped;
62
63
64
65
66
67
68
69 public RunnableWrapper( Runnable wrapped )
70 {
71 _wrapped = wrapped;
72 }
73
74 @Override
75 public void run( )
76 {
77 try
78 {
79 _wrapped.run( );
80 }
81 finally
82 {
83
84
85 AppDaemonService.signalDaemon( THREAD_LAUNCHER_DAEMON_ID );
86 }
87 }
88
89 }
90
91 private static final String PROPERTY_MAX_NUMBER_THREAD = "daemon.threadLauncherDaemon.maxNumberOfThread";
92 private static Deque<RunnableQueueItem> _stackItems = new ArrayDeque<>( );
93 private Map<String, Thread> _mapThreadByKey = new HashMap<>( );
94 private List<Thread> _listThread = new ArrayList<>( );
95
96
97
98
99 @Override
100 public void run( )
101 {
102 int nMaxNumberThread = AppPropertiesService.getPropertyInt( PROPERTY_MAX_NUMBER_THREAD, 5 );
103
104
105 String logs = removeDeadThreads( );
106
107 RunnableQueueItem item = null;
108 int nCurrentNumberRunningThreads = _mapThreadByKey.size( ) + _listThread.size( );
109
110 List<RunnableQueueItem> listLockedItems = new ArrayList<>( );
111
112 while ( ( nCurrentNumberRunningThreads < nMaxNumberThread ) && ( ( item = popItemFromQueue( ) ) != null ) )
113 {
114 String key = item.computeKey( );
115
116 if ( key != null )
117 {
118 Thread thread = _mapThreadByKey.get( key );
119
120 if ( thread != null )
121 {
122 if ( thread.isAlive( ) )
123 {
124
125 listLockedItems.add( item );
126 }
127 else
128 {
129
130
131 thread = getThread( item );
132 _mapThreadByKey.put( key, thread );
133
134
135 }
136 }
137 else
138 {
139 thread = getThread( item );
140 _mapThreadByKey.put( key, thread );
141 nCurrentNumberRunningThreads++;
142 }
143 }
144 else
145 {
146
147 Thread thread = getThread( item );
148 _mapThreadByKey.put( key, thread );
149 nCurrentNumberRunningThreads++;
150 }
151 }
152
153
154 listLockedItems.stream( ).forEach( ThreadLauncherDaemon::addItemToQueue );
155
156
157 if ( nCurrentNumberRunningThreads >= nMaxNumberThread )
158 {
159 setLastRunLogs( logs + "Every threads are running. Daemon execution ending." );
160 return;
161 }
162
163 setLastRunLogs( logs + "There is no more runnable to launch." );
164 }
165
166 private Thread getThread( RunnableQueueItem item )
167 {
168 Thread thread = new Thread( new RunnableWrapper( item.getRunnable( ) ) );
169 thread.start( );
170 return thread;
171 }
172
173 private String removeDeadThreads( )
174 {
175 List<String> listDeadThreadKeys = new ArrayList<>( );
176 for ( Entry<String, Thread> threadEntry : _mapThreadByKey.entrySet( ) )
177 {
178 if ( !threadEntry.getValue( ).isAlive( ) )
179 {
180 listDeadThreadKeys.add( threadEntry.getKey( ) );
181 }
182 }
183
184 for ( String strThreadKey : listDeadThreadKeys )
185 {
186 _mapThreadByKey.remove( strThreadKey );
187 }
188
189 List<Thread> listDeadThreads = new ArrayList<>( );
190
191 for ( Thread thread : _listThread )
192 {
193 if ( !thread.isAlive( ) )
194 {
195 listDeadThreads.add( thread );
196 }
197 }
198
199 String logs = "";
200 if ( !listDeadThreads.isEmpty( ) )
201 {
202 logs = "Releasing " + listDeadThreads.size( ) + " dead threads.\n";
203 }
204 for ( Thread thread : listDeadThreads )
205 {
206 _listThread.remove( thread );
207 }
208 return logs;
209 }
210
211
212
213
214
215
216
217
218
219
220
221 public static void addItemToQueue( Runnable runnable, String strKey, Plugin plugin )
222 {
223 RunnableQueueItemunnableQueueItem.html#RunnableQueueItem">RunnableQueueItem runnableItem = new RunnableQueueItem( runnable, strKey, plugin );
224
225 synchronized( ThreadLauncherDaemon.class )
226 {
227 _stackItems.addLast( runnableItem );
228 }
229
230 AppDaemonService.signalDaemon( THREAD_LAUNCHER_DAEMON_ID );
231 }
232
233
234
235
236
237
238
239 private static synchronized void addItemToQueue( RunnableQueueItem runnableItem )
240 {
241 _stackItems.addLast( runnableItem );
242 }
243
244
245
246
247
248
249 private static synchronized RunnableQueueItem popItemFromQueue( )
250 {
251 if ( CollectionUtils.isEmpty( _stackItems ) )
252 {
253 return null;
254 }
255
256 return _stackItems.pop( );
257 }
258
259
260
261
262
263
264 public static synchronized Integer countItemsInQueue( )
265 {
266 return _stackItems.size( );
267 }
268 }