ThreadLauncherDaemon.java

  1. /*
  2.  * Copyright (c) 2002-2022, City of Paris
  3.  * All rights reserved.
  4.  *
  5.  * Redistribution and use in source and binary forms, with or without
  6.  * modification, are permitted provided that the following conditions
  7.  * are met:
  8.  *
  9.  *  1. Redistributions of source code must retain the above copyright notice
  10.  *     and the following disclaimer.
  11.  *
  12.  *  2. Redistributions in binary form must reproduce the above copyright notice
  13.  *     and the following disclaimer in the documentation and/or other materials
  14.  *     provided with the distribution.
  15.  *
  16.  *  3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
  17.  *     contributors may be used to endorse or promote products derived from
  18.  *     this software without specific prior written permission.
  19.  *
  20.  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  21.  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  22.  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  23.  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
  24.  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  25.  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  26.  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  27.  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  28.  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  29.  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  30.  * POSSIBILITY OF SUCH DAMAGE.
  31.  *
  32.  * License 1.0
  33.  */
  34. package fr.paris.lutece.portal.service.daemon;

  35. import fr.paris.lutece.portal.service.plugin.Plugin;
  36. import fr.paris.lutece.portal.service.util.AppPropertiesService;

  37. import java.util.ArrayDeque;
  38. import java.util.ArrayList;
  39. import java.util.Deque;
  40. import java.util.HashMap;
  41. import java.util.List;
  42. import java.util.Map;
  43. import java.util.Map.Entry;

  44. import org.apache.commons.collections.CollectionUtils;

  45. /**
  46.  * Daemon that manage a pool of threads to launch runnables.
  47.  */
  48. public class ThreadLauncherDaemon extends Daemon
  49. {
  50.     private static final String THREAD_LAUNCHER_DAEMON_ID = "threadLauncherDaemon";

  51.     /**
  52.      * Runnable wrapper to signal the daemon on processing end
  53.      */
  54.     private class RunnableWrapper implements Runnable
  55.     {
  56.         private final Runnable _wrapped;

  57.         /**
  58.          * Constructor
  59.          *
  60.          * @param wrapped
  61.          *            the wrapped Runnable
  62.          */
  63.         public RunnableWrapper( Runnable wrapped )
  64.         {
  65.             _wrapped = wrapped;
  66.         }

  67.         @Override
  68.         public void run( )
  69.         {
  70.             try
  71.             {
  72.                 _wrapped.run( );
  73.             }
  74.             finally
  75.             {
  76.                 // will clean up the dead thread
  77.                 // and permit execution of blocked tasks
  78.                 AppDaemonService.signalDaemon( THREAD_LAUNCHER_DAEMON_ID );
  79.             }
  80.         }

  81.     }

  82.     private static final String PROPERTY_MAX_NUMBER_THREAD = "daemon.threadLauncherDaemon.maxNumberOfThread";
  83.     private static Deque<RunnableQueueItem> _stackItems = new ArrayDeque<>( );
  84.     private Map<String, Thread> _mapThreadByKey = new HashMap<>( );
  85.     private List<Thread> _listThread = new ArrayList<>( );

  86.     /**
  87.      * {@inheritDoc}
  88.      */
  89.     @Override
  90.     public void run( )
  91.     {
  92.         int nMaxNumberThread = AppPropertiesService.getPropertyInt( PROPERTY_MAX_NUMBER_THREAD, 5 );

  93.         // We remove dead threads from running thread collections
  94.         String logs = removeDeadThreads( );

  95.         RunnableQueueItem item = null;
  96.         int nCurrentNumberRunningThreads = _mapThreadByKey.size( ) + _listThread.size( );

  97.         List<RunnableQueueItem> listLockedItems = new ArrayList<>( );

  98.         while ( ( nCurrentNumberRunningThreads < nMaxNumberThread ) && ( ( item = popItemFromQueue( ) ) != null ) )
  99.         {
  100.             String key = item.computeKey( );
  101.             // If the item has a key, then we must make sure that another thread with the same key and plugin is not running
  102.             if ( key != null )
  103.             {
  104.                 Thread thread = _mapThreadByKey.get( key );

  105.                 if ( thread != null )
  106.                 {
  107.                     if ( thread.isAlive( ) )
  108.                     {
  109.                         // The thread is already running. We declare this item as locked for this run of the daemon.
  110.                         listLockedItems.add( item );
  111.                     }
  112.                     else
  113.                     {
  114.                         // Dead threads are removed from collections at the beginning of the run of the daemon
  115.                         // We still check again that the thread is alive just in case it died during the run
  116.                         thread = getThread( item );
  117.                         _mapThreadByKey.put( key, thread );

  118.                         // We do not increase the number of running threads, because we removed and add one
  119.                     }
  120.                 }
  121.                 else
  122.                 {
  123.                     thread = getThread( item );
  124.                     _mapThreadByKey.put( key, thread );
  125.                     nCurrentNumberRunningThreads++;
  126.                 }
  127.             }
  128.             else
  129.             {
  130.                 // If it has no key, or if the plugin has not been set, we create a thread in the keyless collection
  131.                 Thread thread = getThread( item );
  132.                 _mapThreadByKey.put( key, thread );
  133.                 nCurrentNumberRunningThreads++;
  134.             }
  135.         }

  136.         // We replace in the queue items that was locked
  137.         listLockedItems.stream( ).forEach( ThreadLauncherDaemon::addItemToQueue );

  138.         // If the maximum number of running threads has been reached, we end this run
  139.         if ( nCurrentNumberRunningThreads >= nMaxNumberThread )
  140.         {
  141.             setLastRunLogs( logs + "Every threads are running. Daemon execution ending." );
  142.             return;
  143.         }

  144.         setLastRunLogs( logs + "There is no more runnable to launch." );
  145.     }

  146.     private Thread getThread( RunnableQueueItem item )
  147.     {
  148.         Thread thread = new Thread( new RunnableWrapper( item.getRunnable( ) ) );
  149.         thread.start( );
  150.         return thread;
  151.     }

  152.     private String removeDeadThreads( )
  153.     {
  154.         List<String> listDeadThreadKeys = new ArrayList<>( );
  155.         for ( Entry<String, Thread> threadEntry : _mapThreadByKey.entrySet( ) )
  156.         {
  157.             if ( !threadEntry.getValue( ).isAlive( ) )
  158.             {
  159.                 listDeadThreadKeys.add( threadEntry.getKey( ) );
  160.             }
  161.         }

  162.         for ( String strThreadKey : listDeadThreadKeys )
  163.         {
  164.             _mapThreadByKey.remove( strThreadKey );
  165.         }

  166.         List<Thread> listDeadThreads = new ArrayList<>( );

  167.         for ( Thread thread : _listThread )
  168.         {
  169.             if ( !thread.isAlive( ) )
  170.             {
  171.                 listDeadThreads.add( thread );
  172.             }
  173.         }

  174.         String logs = "";
  175.         if ( !listDeadThreads.isEmpty( ) )
  176.         {
  177.             logs = "Releasing " + listDeadThreads.size( ) + " dead threads.\n";
  178.         }
  179.         for ( Thread thread : listDeadThreads )
  180.         {
  181.             _listThread.remove( thread );
  182.         }
  183.         return logs;
  184.     }

  185.     /**
  186.      * Add a runnable to the launch queue. It will be launched as soon as a thread is available.
  187.      *
  188.      * @param runnable
  189.      *            The runnable to execute
  190.      * @param strKey
  191.      *            The key of the runnable. Runnables of a given plugin are ensured that they will not be executed at the same time if they have the same key.
  192.      * @param plugin
  193.      *            The plugin the runnable is associated with
  194.      */
  195.     public static void addItemToQueue( Runnable runnable, String strKey, Plugin plugin )
  196.     {
  197.         RunnableQueueItem runnableItem = new RunnableQueueItem( runnable, strKey, plugin );

  198.         synchronized( ThreadLauncherDaemon.class )
  199.         {
  200.             _stackItems.addLast( runnableItem );
  201.         }

  202.         AppDaemonService.signalDaemon( THREAD_LAUNCHER_DAEMON_ID );
  203.     }

  204.     /**
  205.      * Add a runnable item to the queue
  206.      *
  207.      * @param runnableItem
  208.      *            The runnable item to add to the queue
  209.      */
  210.     private static synchronized void addItemToQueue( RunnableQueueItem runnableItem )
  211.     {
  212.         _stackItems.addLast( runnableItem );
  213.     }

  214.     /**
  215.      * Pop the first item of the queue. The item is removed from the queue.
  216.      *
  217.      * @return The first item of the queue, or null if the queue is empty
  218.      */
  219.     private static synchronized RunnableQueueItem popItemFromQueue( )
  220.     {
  221.         if ( CollectionUtils.isEmpty( _stackItems ) )
  222.         {
  223.             return null;
  224.         }

  225.         return _stackItems.pop( );
  226.     }

  227.     /**
  228.      * Count the number of items in the queue.
  229.      *
  230.      * @return The current number of items in the queue
  231.      */
  232.     public static synchronized Integer countItemsInQueue( )
  233.     {
  234.         return _stackItems.size( );
  235.     }
  236. }