1 /*
2 * Copyright (c) 2002-2014, Mairie de Paris
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * 1. Redistributions of source code must retain the above copyright notice
10 * and the following disclaimer.
11 *
12 * 2. Redistributions in binary form must reproduce the above copyright notice
13 * and the following disclaimer in the documentation and/or other materials
14 * provided with the distribution.
15 *
16 * 3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 *
32 * License 1.0
33 */
34 package fr.paris.lutece.portal.service.daemon;
35
36 import fr.paris.lutece.portal.service.plugin.Plugin;
37 import fr.paris.lutece.portal.service.util.AppPropertiesService;
38
39 import java.util.ArrayDeque;
40 import java.util.ArrayList;
41 import java.util.Deque;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Map.Entry;
46
47
48 /**
49 * Daemon that manage a pool of threads to launch runnables.
50 */
51 public class ThreadLauncherDaemon extends Daemon
52 {
53 private static final String PROPERTY_MAX_NUMBER_THREAD = "daemon.threadLauncherDaemon.maxNumberOfThread";
54 private static Deque<RunnableQueueItem> _stackItems = new ArrayDeque<RunnableQueueItem>( );
55 private Map<String, Thread> _mapThreadByKey = new HashMap<String, Thread>( );
56 private List<Thread> _listThread = new ArrayList<Thread>( );
57
58 /**
59 * {@inheritDoc}
60 */
61 @Override
62 public void run( )
63 {
64 int nMaxNumberThread = AppPropertiesService.getPropertyInt( PROPERTY_MAX_NUMBER_THREAD, 5 );
65
66 // We remove dead threads from running thread collections
67 RunnableQueueItem item = null;
68 List<String> listDeadThreadKeys = new ArrayList<String>( );
69
70 for ( Entry<String, Thread> threadEntry : _mapThreadByKey.entrySet( ) )
71 {
72 if ( !threadEntry.getValue( ).isAlive( ) )
73 {
74 listDeadThreadKeys.add( threadEntry.getKey( ) );
75 }
76 }
77
78 for ( String strThreadKey : listDeadThreadKeys )
79 {
80 _mapThreadByKey.remove( strThreadKey );
81 }
82
83 List<Thread> listDeadThreads = new ArrayList<Thread>( );
84
85 for ( Thread thread : _listThread )
86 {
87 if ( !thread.isAlive( ) )
88 {
89 listDeadThreads.add( thread );
90 }
91 }
92
93 for ( Thread thread : listDeadThreads )
94 {
95 _listThread.remove( thread );
96 }
97
98 int nCurrentNumberRunningThreads = _mapThreadByKey.size( ) + _listThread.size( );
99
100 List<RunnableQueueItem> listLockedItems = new ArrayList<RunnableQueueItem>( );
101
102 while ( ( nCurrentNumberRunningThreads < nMaxNumberThread ) && ( ( item = popItemFromQueue( ) ) != null ) )
103 {
104 // If the item has a key, then we must make sure that another thread with the same key and plugin is not running
105 if ( ( item.getKey( ) != null ) && ( item.getPlugin( ) != null ) )
106 {
107 Thread thread = _mapThreadByKey.get( item.computeKey( ) );
108
109 if ( thread != null )
110 {
111 if ( thread.isAlive( ) )
112 {
113 // The thread is already running. We declare this item as locked for this run of the daemon.
114 listLockedItems.add( item );
115 }
116 else
117 {
118 // Dead threads are removed from collections at the beginning of the run of the daemon
119 // We still check again that the thread is alive just in case it died during the run
120 thread = new Thread( item.getRunnable( ) );
121 thread.start( );
122 _mapThreadByKey.put( item.computeKey( ), thread );
123
124 // We do not increase the number of running threads, because we removed and add one
125 }
126 }
127 else
128 {
129 // We start a new thread, and increase the current number of running threads
130 thread = new Thread( item.getRunnable( ) );
131 thread.start( );
132 _mapThreadByKey.put( item.computeKey( ), thread );
133 nCurrentNumberRunningThreads++;
134 }
135 }
136 else
137 {
138 // If it has no key, or if the plugin has not been set, we create a thread in the keyless collection
139 Thread thread = new Thread( item.getRunnable( ) );
140 thread.start( );
141 _mapThreadByKey.put( item.computeKey( ), thread );
142 nCurrentNumberRunningThreads++;
143 }
144 }
145
146 // We replace in the queue items that was locked
147 for ( RunnableQueueItem itemQueue : listLockedItems )
148 {
149 addItemToQueue( itemQueue );
150 }
151
152 // If the maximum number of running threads has been reached, we end this run
153 if ( nCurrentNumberRunningThreads >= nMaxNumberThread )
154 {
155 setLastRunLogs( "Every threads are running. Daemon execution ending." );
156
157 return;
158 }
159
160 setLastRunLogs( "There is no more runnable to launch." );
161 }
162
163 /**
164 * Add a runnable to the launch queue. It will be launched as soon as a
165 * thread is available.
166 * @param runnable The runnable to execute
167 * @param strKey The key of the runnable. Runnables of a given plugin are
168 * ensured that they will not be executed at the same time if
169 * they have the same key.
170 * @param plugin The plugin the runnable is associated with
171 */
172 public static void addItemToQueue( Runnable runnable, String strKey, Plugin plugin )
173 {
174 RunnableQueueItem runnableItem = new RunnableQueueItem( runnable, strKey, plugin );
175
176 synchronized ( ThreadLauncherDaemon.class )
177 {
178 _stackItems.addLast( runnableItem );
179 }
180 }
181
182 /**
183 * Add a runnable item to the queue
184 * @param runnableItem The runnable item to add to the queue
185 */
186 private static synchronized void addItemToQueue( RunnableQueueItem runnableItem )
187 {
188 _stackItems.addLast( runnableItem );
189 }
190
191 /**
192 * Pop the first item of the queue. The item is removed from the queue.
193 * @return The first item of the queue, or null if the queue is empty
194 */
195 private static synchronized RunnableQueueItem popItemFromQueue( )
196 {
197 if ( _stackItems.size( ) == 0 )
198 {
199 return null;
200 }
201
202 return _stackItems.pop( );
203 }
204
205 /**
206 * Count the number of items in the queue.
207 * @return The current number of items in the queue
208 */
209 public static synchronized Integer countItemsInQueue( )
210 {
211 return _stackItems.size( );
212 }
213 }