View Javadoc
1   /*
2    * Copyright (c) 2002-2014, Mairie de Paris
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met:
8    *
9    *  1. Redistributions of source code must retain the above copyright notice
10   *     and the following disclaimer.
11   *
12   *  2. Redistributions in binary form must reproduce the above copyright notice
13   *     and the following disclaimer in the documentation and/or other materials
14   *     provided with the distribution.
15   *
16   *  3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
17   *     contributors may be used to endorse or promote products derived from
18   *     this software without specific prior written permission.
19   *
20   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21   * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22   * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
24   * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25   * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26   * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27   * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28   * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30   * POSSIBILITY OF SUCH DAMAGE.
31   *
32   * License 1.0
33   */
34  package fr.paris.lutece.portal.service.daemon;
35  
36  import fr.paris.lutece.portal.service.plugin.Plugin;
37  import fr.paris.lutece.portal.service.util.AppPropertiesService;
38  
39  import java.util.ArrayDeque;
40  import java.util.ArrayList;
41  import java.util.Deque;
42  import java.util.HashMap;
43  import java.util.List;
44  import java.util.Map;
45  import java.util.Map.Entry;
46  
47  
48  /**
49   * Daemon that manage a pool of threads to launch runnables.
50   */
51  public class ThreadLauncherDaemon extends Daemon
52  {
53      private static final String PROPERTY_MAX_NUMBER_THREAD = "daemon.threadLauncherDaemon.maxNumberOfThread";
54      private static Deque<RunnableQueueItem> _stackItems = new ArrayDeque<RunnableQueueItem>(  );
55      private Map<String, Thread> _mapThreadByKey = new HashMap<String, Thread>(  );
56      private List<Thread> _listThread = new ArrayList<Thread>(  );
57  
58      /**
59       * {@inheritDoc}
60       */
61      @Override
62      public void run(  )
63      {
64          int nMaxNumberThread = AppPropertiesService.getPropertyInt( PROPERTY_MAX_NUMBER_THREAD, 5 );
65  
66          // We remove dead threads from running thread collections
67          RunnableQueueItem item = null;
68          List<String> listDeadThreadKeys = new ArrayList<String>(  );
69  
70          for ( Entry<String, Thread> threadEntry : _mapThreadByKey.entrySet(  ) )
71          {
72              if ( !threadEntry.getValue(  ).isAlive(  ) )
73              {
74                  listDeadThreadKeys.add( threadEntry.getKey(  ) );
75              }
76          }
77  
78          for ( String strThreadKey : listDeadThreadKeys )
79          {
80              _mapThreadByKey.remove( strThreadKey );
81          }
82  
83          List<Thread> listDeadThreads = new ArrayList<Thread>(  );
84  
85          for ( Thread thread : _listThread )
86          {
87              if ( !thread.isAlive(  ) )
88              {
89                  listDeadThreads.add( thread );
90              }
91          }
92  
93          for ( Thread thread : listDeadThreads )
94          {
95              _listThread.remove( thread );
96          }
97  
98          int nCurrentNumberRunningThreads = _mapThreadByKey.size(  ) + _listThread.size(  );
99  
100         List<RunnableQueueItem> listLockedItems = new ArrayList<RunnableQueueItem>(  );
101 
102         while ( ( nCurrentNumberRunningThreads < nMaxNumberThread ) && ( ( item = popItemFromQueue(  ) ) != null ) )
103         {
104             // If the item has a key, then we must make sure that another thread with the same key and plugin is not running
105             if ( ( item.getKey(  ) != null ) && ( item.getPlugin(  ) != null ) )
106             {
107                 Thread thread = _mapThreadByKey.get( item.computeKey(  ) );
108 
109                 if ( thread != null )
110                 {
111                     if ( thread.isAlive(  ) )
112                     {
113                         // The thread is already running. We declare this item as locked for this run of the daemon.
114                         listLockedItems.add( item );
115                     }
116                     else
117                     {
118                         // Dead threads are removed from collections at the beginning of the run of the daemon
119                         // We still check again that the thread is alive just in case it died during the run
120                         thread = new Thread( item.getRunnable(  ) );
121                         thread.start(  );
122                         _mapThreadByKey.put( item.computeKey(  ), thread );
123 
124                         // We do not increase the number of running threads, because we removed and add one
125                     }
126                 }
127                 else
128                 {
129                     // We start a new thread, and increase the current number of running threads
130                     thread = new Thread( item.getRunnable(  ) );
131                     thread.start(  );
132                     _mapThreadByKey.put( item.computeKey(  ), thread );
133                     nCurrentNumberRunningThreads++;
134                 }
135             }
136             else
137             {
138                 // If it has no key, or if the plugin has not been set, we create a thread in the keyless collection
139                 Thread thread = new Thread( item.getRunnable(  ) );
140                 thread.start(  );
141                 _mapThreadByKey.put( item.computeKey(  ), thread );
142                 nCurrentNumberRunningThreads++;
143             }
144         }
145 
146         // We replace in the queue items that was locked
147         for ( RunnableQueueItem itemQueue : listLockedItems )
148         {
149             addItemToQueue( itemQueue );
150         }
151 
152         // If the maximum number of running threads has been reached, we end this run
153         if ( nCurrentNumberRunningThreads >= nMaxNumberThread )
154         {
155             setLastRunLogs( "Every threads are running. Daemon execution ending." );
156 
157             return;
158         }
159 
160         setLastRunLogs( "There is no more runnable to launch." );
161     }
162 
163     /**
164      * Add a runnable to the launch queue. It will be launched as soon as a
165      * thread is available.
166      * @param runnable The runnable to execute
167      * @param strKey The key of the runnable. Runnables of a given plugin are
168      *            ensured that they will not be executed at the same time if
169      *            they have the same key.
170      * @param plugin The plugin the runnable is associated with
171      */
172     public static void addItemToQueue( Runnable runnable, String strKey, Plugin plugin )
173     {
174         RunnableQueueItem runnableItem = new RunnableQueueItem( runnable, strKey, plugin );
175 
176         synchronized ( ThreadLauncherDaemon.class )
177         {
178             _stackItems.addLast( runnableItem );
179         }
180     }
181 
182     /**
183      * Add a runnable item to the queue
184      * @param runnableItem The runnable item to add to the queue
185      */
186     private static synchronized void addItemToQueue( RunnableQueueItem runnableItem )
187     {
188         _stackItems.addLast( runnableItem );
189     }
190 
191     /**
192      * Pop the first item of the queue. The item is removed from the queue.
193      * @return The first item of the queue, or null if the queue is empty
194      */
195     private static synchronized RunnableQueueItem popItemFromQueue(  )
196     {
197         if ( _stackItems.size(  ) == 0 )
198         {
199             return null;
200         }
201 
202         return _stackItems.pop(  );
203     }
204 
205     /**
206      * Count the number of items in the queue.
207      * @return The current number of items in the queue
208      */
209     public static synchronized Integer countItemsInQueue(  )
210     {
211         return _stackItems.size(  );
212     }
213 }