View Javadoc
1   /*
2    * Copyright (c) 2002-2022, City of Paris
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met:
8    *
9    *  1. Redistributions of source code must retain the above copyright notice
10   *     and the following disclaimer.
11   *
12   *  2. Redistributions in binary form must reproduce the above copyright notice
13   *     and the following disclaimer in the documentation and/or other materials
14   *     provided with the distribution.
15   *
16   *  3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
17   *     contributors may be used to endorse or promote products derived from
18   *     this software without specific prior written permission.
19   *
20   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21   * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22   * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
24   * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25   * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26   * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27   * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28   * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30   * POSSIBILITY OF SUCH DAMAGE.
31   *
32   * License 1.0
33   */
34  package fr.paris.lutece.portal.service.daemon;
35  
36  import fr.paris.lutece.portal.service.plugin.Plugin;
37  import fr.paris.lutece.portal.service.util.AppPropertiesService;
38  
39  import java.util.ArrayDeque;
40  import java.util.ArrayList;
41  import java.util.Deque;
42  import java.util.HashMap;
43  import java.util.List;
44  import java.util.Map;
45  import java.util.Map.Entry;
46  
47  import org.apache.commons.collections.CollectionUtils;
48  
49  /**
50   * Daemon that manage a pool of threads to launch runnables.
51   */
52  public class ThreadLauncherDaemon extends Daemon
53  {
54      private static final String THREAD_LAUNCHER_DAEMON_ID = "threadLauncherDaemon";
55  
56      /**
57       * Runnable wrapper to signal the daemon on processing end
58       */
59      private class RunnableWrapper implements Runnable
60      {
61          private final Runnable _wrapped;
62  
63          /**
64           * Constructor
65           * 
66           * @param wrapped
67           *            the wrapped Runnable
68           */
69          public RunnableWrapper( Runnable wrapped )
70          {
71              _wrapped = wrapped;
72          }
73  
74          @Override
75          public void run( )
76          {
77              try
78              {
79                  _wrapped.run( );
80              }
81              finally
82              {
83                  // will clean up the dead thread
84                  // and permit execution of blocked tasks
85                  AppDaemonService.signalDaemon( THREAD_LAUNCHER_DAEMON_ID );
86              }
87          }
88  
89      }
90  
91      private static final String PROPERTY_MAX_NUMBER_THREAD = "daemon.threadLauncherDaemon.maxNumberOfThread";
92      private static Deque<RunnableQueueItem> _stackItems = new ArrayDeque<>( );
93      private Map<String, Thread> _mapThreadByKey = new HashMap<>( );
94      private List<Thread> _listThread = new ArrayList<>( );
95  
96      /**
97       * {@inheritDoc}
98       */
99      @Override
100     public void run( )
101     {
102         int nMaxNumberThread = AppPropertiesService.getPropertyInt( PROPERTY_MAX_NUMBER_THREAD, 5 );
103 
104         // We remove dead threads from running thread collections
105         String logs = removeDeadThreads( );
106 
107         RunnableQueueItem item = null;
108         int nCurrentNumberRunningThreads = _mapThreadByKey.size( ) + _listThread.size( );
109 
110         List<RunnableQueueItem> listLockedItems = new ArrayList<>( );
111 
112         while ( ( nCurrentNumberRunningThreads < nMaxNumberThread ) && ( ( item = popItemFromQueue( ) ) != null ) )
113         {
114             String key = item.computeKey( );
115             // If the item has a key, then we must make sure that another thread with the same key and plugin is not running
116             if ( key != null )
117             {
118                 Thread thread = _mapThreadByKey.get( key );
119 
120                 if ( thread != null )
121                 {
122                     if ( thread.isAlive( ) )
123                     {
124                         // The thread is already running. We declare this item as locked for this run of the daemon.
125                         listLockedItems.add( item );
126                     }
127                     else
128                     {
129                         // Dead threads are removed from collections at the beginning of the run of the daemon
130                         // We still check again that the thread is alive just in case it died during the run
131                         thread = getThread( item );
132                         _mapThreadByKey.put( key, thread );
133 
134                         // We do not increase the number of running threads, because we removed and add one
135                     }
136                 }
137                 else
138                 {
139                     thread = getThread( item );
140                     _mapThreadByKey.put( key, thread );
141                     nCurrentNumberRunningThreads++;
142                 }
143             }
144             else
145             {
146                 // If it has no key, or if the plugin has not been set, we create a thread in the keyless collection
147                 Thread thread = getThread( item );
148                 _mapThreadByKey.put( key, thread );
149                 nCurrentNumberRunningThreads++;
150             }
151         }
152 
153         // We replace in the queue items that was locked
154         listLockedItems.stream( ).forEach( ThreadLauncherDaemon::addItemToQueue );
155 
156         // If the maximum number of running threads has been reached, we end this run
157         if ( nCurrentNumberRunningThreads >= nMaxNumberThread )
158         {
159             setLastRunLogs( logs + "Every threads are running. Daemon execution ending." );
160             return;
161         }
162 
163         setLastRunLogs( logs + "There is no more runnable to launch." );
164     }
165 
166     private Thread getThread( RunnableQueueItem item )
167     {
168         Thread thread = new Thread( new RunnableWrapper( item.getRunnable( ) ) );
169         thread.start( );
170         return thread;
171     }
172 
173     private String removeDeadThreads( )
174     {
175         List<String> listDeadThreadKeys = new ArrayList<>( );
176         for ( Entry<String, Thread> threadEntry : _mapThreadByKey.entrySet( ) )
177         {
178             if ( !threadEntry.getValue( ).isAlive( ) )
179             {
180                 listDeadThreadKeys.add( threadEntry.getKey( ) );
181             }
182         }
183 
184         for ( String strThreadKey : listDeadThreadKeys )
185         {
186             _mapThreadByKey.remove( strThreadKey );
187         }
188 
189         List<Thread> listDeadThreads = new ArrayList<>( );
190 
191         for ( Thread thread : _listThread )
192         {
193             if ( !thread.isAlive( ) )
194             {
195                 listDeadThreads.add( thread );
196             }
197         }
198 
199         String logs = "";
200         if ( !listDeadThreads.isEmpty( ) )
201         {
202             logs = "Releasing " + listDeadThreads.size( ) + " dead threads.\n";
203         }
204         for ( Thread thread : listDeadThreads )
205         {
206             _listThread.remove( thread );
207         }
208         return logs;
209     }
210 
211     /**
212      * Add a runnable to the launch queue. It will be launched as soon as a thread is available.
213      * 
214      * @param runnable
215      *            The runnable to execute
216      * @param strKey
217      *            The key of the runnable. Runnables of a given plugin are ensured that they will not be executed at the same time if they have the same key.
218      * @param plugin
219      *            The plugin the runnable is associated with
220      */
221     public static void addItemToQueue( Runnable runnable, String strKey, Plugin plugin )
222     {
223         RunnableQueueItemunnableQueueItem.html#RunnableQueueItem">RunnableQueueItem runnableItem = new RunnableQueueItem( runnable, strKey, plugin );
224 
225         synchronized( ThreadLauncherDaemon.class )
226         {
227             _stackItems.addLast( runnableItem );
228         }
229 
230         AppDaemonService.signalDaemon( THREAD_LAUNCHER_DAEMON_ID );
231     }
232 
233     /**
234      * Add a runnable item to the queue
235      * 
236      * @param runnableItem
237      *            The runnable item to add to the queue
238      */
239     private static synchronized void addItemToQueue( RunnableQueueItem runnableItem )
240     {
241         _stackItems.addLast( runnableItem );
242     }
243 
244     /**
245      * Pop the first item of the queue. The item is removed from the queue.
246      * 
247      * @return The first item of the queue, or null if the queue is empty
248      */
249     private static synchronized RunnableQueueItem popItemFromQueue( )
250     {
251         if ( CollectionUtils.isEmpty( _stackItems ) )
252         {
253             return null;
254         }
255 
256         return _stackItems.pop( );
257     }
258 
259     /**
260      * Count the number of items in the queue.
261      * 
262      * @return The current number of items in the queue
263      */
264     public static synchronized Integer countItemsInQueue( )
265     {
266         return _stackItems.size( );
267     }
268 }