View Javadoc
1   package fr.paris.lutece.plugins.directory.modules.pdfproducerarchive.service.daemon;
2   
3   import fr.paris.lutece.plugins.directory.modules.pdfproducerarchive.business.zipbasket.queue.ZipItem;
4   import fr.paris.lutece.plugins.directory.modules.pdfproducerarchive.service.DirectoryPDFProducerArchivePlugin;
5   import fr.paris.lutece.portal.service.daemon.Daemon;
6   import fr.paris.lutece.portal.service.plugin.Plugin;
7   import fr.paris.lutece.portal.service.plugin.PluginService;
8   import fr.paris.lutece.portal.service.util.AppPropertiesService;
9   
10  import java.util.ArrayDeque;
11  import java.util.Deque;
12  import java.util.HashMap;
13  import java.util.Map;
14  
15  import org.apache.commons.lang.ArrayUtils;
16  import org.apache.commons.lang.StringUtils;
17  
18  
19  public class AddZipToBasketDaemon extends Daemon
20  {
21      private static final String PROPERTY_MAX_NUMBER_THREAD = "daemon.addZipToBacket.maxNumberOfThread";
22  
23      private static Deque<ZipItem> _stackItems = new ArrayDeque<ZipItem>( );
24      private static Plugin _plugin = PluginService.getPlugin( DirectoryPDFProducerArchivePlugin.PLUGIN_NAME );
25      private static Map<Integer, Thread> _mapThreadByUserId = new HashMap<Integer, Thread>( );
26  
27      /**
28       * {@inheritDoc}
29       */
30      @Override
31      public void run( )
32      {
33          int nMaxNumberThread = AppPropertiesService.getPropertyInt( PROPERTY_MAX_NUMBER_THREAD, 5 );
34          ZipItem item = null;
35          while ( ( item = popItemFromQueue( ) ) != null )
36          {
37              Thread thread = _mapThreadByUserId.get( item.getIdAdminUser( ) );
38              AsyncAddZipBasketService asyncAddZipBasketService = new AsyncAddZipBasketService( item.getLocale( ),
39                      item.getIdAdminUser( ), _plugin, item.getIdDirectory( ), item.getListIdRecord( ) );
40              if ( thread != null )
41              {
42                  if ( thread.isAlive( ) )
43                  {
44                      // The thread is already running. We replace the item in the queue.
45                      addItemToQueue( item );
46                      
47                      // If the item we just used is the only item in the queue
48                      if ( countItemsInQueue( ) <= 1 )
49                      {
50                          setLastRunLogs( "No more zip to add to basket." );
51                          return;
52                      }
53                      
54                      // We now check if the daemon can start another thread
55                      if ( _mapThreadByUserId.size( ) >= nMaxNumberThread )
56                      {
57                          // We check if every threads are running
58                          boolean bStopedThreadFound = false;
59                          for ( Thread otherThread : _mapThreadByUserId.values( ) )
60                          {
61                              if ( !otherThread.isAlive( ) )
62                              {
63                                  _mapThreadByUserId.remove( otherThread );
64                                  bStopedThreadFound = true;
65                              }
66                          }
67                          // If every thread is running, and the maximum number of threads has been reached, we stop the daemon
68                          if ( !bStopedThreadFound )
69                          {
70                              setLastRunLogs( "Every threads are running. Daemon execution ending." );
71                              return;
72                          }
73                      }
74                  }
75                  else
76                  {
77                      _mapThreadByUserId.remove( thread );
78                      thread = new Thread( asyncAddZipBasketService );
79                      thread.start( );
80                      _mapThreadByUserId.put( item.getIdAdminUser( ), thread );
81                  }
82              }
83              else
84              {
85                  // We check if we can launch a new thread
86                  if ( _mapThreadByUserId.size( ) < nMaxNumberThread )
87                  {
88                      thread = new Thread( asyncAddZipBasketService );
89                      thread.start( );
90                      _mapThreadByUserId.put( item.getIdAdminUser( ), thread );
91                  }
92                  else
93                  {
94                      // We check if a thread has stoped
95                      boolean bStopedThreadFound = false;
96                      for ( Thread otherThread : _mapThreadByUserId.values( ) )
97                      {
98                          if ( !otherThread.isAlive( ) )
99                          {
100                             _mapThreadByUserId.remove( otherThread );
101                             bStopedThreadFound = true;
102                             break;
103                         }
104                     }
105                     // If we found a dead thread and removed it from the map, we can create a new thread for this user
106                     if ( bStopedThreadFound )
107                     {
108                         thread = new Thread( asyncAddZipBasketService );
109                         thread.start( );
110                         _mapThreadByUserId.put( item.getIdAdminUser( ), thread );
111                     }
112                     else
113                     {
114                         // We can not create a new thread. The item has to wait until a thread end.
115                         addItemToQueue( item );
116                         // We stop the execution of the Daemon because all threads are up, so no other one can be created. The Daemon can do nothing more until at last one thread stop.
117                         setLastRunLogs( "Every threads are running. Daemon execution ending." );
118                         return;
119                     }
120                 }
121             }
122         }
123         setLastRunLogs( "There is no more zip to add to basket." );
124     }
125 
126     public synchronized static void addItemToQueue( ZipItem zipItem )
127     {
128         for ( ZipItem item : _stackItems )
129         {
130             if ( item.getIdAdminUser( ) == zipItem.getIdAdminUser( )
131                     && item.getIdDirectory( ) == zipItem.getIdDirectory( ) )
132             {
133                 String[] listIdRecord = item.getListIdRecord( );
134                 for ( String strIdRecord : zipItem.getListIdRecord( ) )
135                 {
136                     boolean bContain = false;
137                     for ( String strOtherIdRecord : listIdRecord )
138                     {
139                         if ( StringUtils.equals( strOtherIdRecord, strIdRecord ) )
140                         {
141                             bContain = true;
142                             break;
143                         }
144                     }
145                     if ( !bContain )
146                     {
147                         ArrayUtils.add( listIdRecord, strIdRecord );
148                     }
149                 }
150                 item.setListIdRecord( listIdRecord );
151                 return;
152             }
153         }
154         _stackItems.addLast( zipItem );
155     }
156 
157     public synchronized static ZipItem popItemFromQueue( )
158     {
159         if ( _stackItems.size( ) == 0 )
160         {
161             return null;
162         }
163         else
164         {
165             ZipItem item = _stackItems.pop( );
166             return item;
167         }
168     }
169 
170     public synchronized static Integer countItemsInQueue( )
171     {
172         return _stackItems.size( );
173     }
174 
175 }