FullIndexTask.java
/*
* Copyright (c) 2002-2024, City of Paris
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright notice
* and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice
* and the following disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* 3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* License 1.0
*/
package fr.paris.lutece.plugins.identitystore.service.indexer.elastic.index.task;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.client.ElasticClientException;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.index.model.IdentityObject;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.index.model.internal.BulkAction;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.index.model.internal.BulkActionType;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.index.service.IIdentityIndexer;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.index.service.IdentityObjectHome;
import fr.paris.lutece.plugins.identitystore.service.network.DelayedNetworkService;
import fr.paris.lutece.plugins.identitystore.utils.Batch;
import fr.paris.lutece.plugins.identitystore.web.exception.IdentityStoreException;
import fr.paris.lutece.portal.service.util.AppLogService;
import fr.paris.lutece.portal.service.util.AppPropertiesService;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.lang3.time.StopWatch;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
public class FullIndexTask extends AbstractIndexTask implements UsingElasticConnection
{
private final String CURRENT_INDEX_ALIAS = AppPropertiesService.getProperty( "identitystore.elastic.client.identities.alias", "identities-alias" );
private final int BATCH_SIZE = AppPropertiesService.getPropertyInt( "identitystore.task.reindex.batch.size", 1000 );
private final boolean ACTIVE = AppPropertiesService.getPropertyBoolean( "identitystore.task.reindex.active", false );
public FullIndexTask( )
{
}
public void run( )
{
if ( ACTIVE )
{
AppLogService.info( "Full index task is active." );
this.doJob( );
}
else
{
AppLogService.info( "Full index task is not active." );
}
}
public void doJob( )
{
final StopWatch stopWatch = new StopWatch( );
stopWatch.start( );
this.init( );
final List<Integer> identityIdsList = new ArrayList<>( IdentityObjectHome.getEligibleIdListForIndex( ) );
if ( !identityIdsList.isEmpty( ) )
{
this.debug( "Starting identities full reindex at " + DateFormatUtils.format( stopWatch.getStartTime( ), "dd-MM-yyyy'T'HH:mm:ss" ) );
final IIdentityIndexer identityIndexer = this.createIdentityIndexer( );
if ( identityIndexer.isAlive( ) )
{
final String newIndex = "identities-" + UUID.randomUUID( );
this.debug( "ES available :: indexing" );
try
{
this.debug( "Creating new index : " + newIndex );
identityIndexer.initIndex( newIndex );
if ( identityIndexer.indexExists( CURRENT_INDEX_ALIAS ) )
{
this.debug( "Set current index READ-ONLY" );
identityIndexer.makeIndexReadOnly( CURRENT_INDEX_ALIAS );
}
else
{
this.debug( "Create alias" );
identityIndexer.addAliasOnIndex( newIndex, CURRENT_INDEX_ALIAS );
}
this.getStatus( ).setNbTotalIdentities( identityIdsList.size( ) );
this.debug( "NB identities to be indexed : " + this.getStatus( ).getNbTotalIdentities( ) );
this.debug( "Size of indexing batches : " + BATCH_SIZE );
final Batch<Integer> batch = Batch.ofSize( identityIdsList, BATCH_SIZE );
this.debug( "NB of indexing batches : " + batch.size( ) );
batch.stream( ).parallel( ).forEach( identityIdList -> {
this.process( identityIdList, newIndex );
} );
this.debug( "All batches processed, now switch alias to publish new index.." );
final String oldIndex = identityIndexer.getIndexBehindAlias( CURRENT_INDEX_ALIAS );
if ( !StringUtils.equals( oldIndex, newIndex ) )
{
this.debug( "Old index id: " + oldIndex );
identityIndexer.addAliasOnIndex( newIndex, CURRENT_INDEX_ALIAS );
if ( oldIndex != null )
{
this.debug( "Delete old index : " + oldIndex );
identityIndexer.removeIndexReadOnly( oldIndex );
identityIndexer.deleteIndex( oldIndex );
}
}
}
catch( final ElasticClientException e )
{
this.debug( "Failed to reindex " + e.getMessage( ) );
final String oldIndex = identityIndexer.getIndexBehindAlias( CURRENT_INDEX_ALIAS );
this.rollbackIndexCreation( oldIndex, newIndex, identityIndexer );
}
}
else
{
this.debug( "[ERROR] ES not available" );
}
stopWatch.stop( );
final String duration = DurationFormatUtils.formatDurationWords( stopWatch.getTime( ), true, true );
this.debug( "Re-indexed " + this.getStatus( ).getCurrentNbIndexedIdentities( ) + " identities in " + duration );
}
else
{
stopWatch.stop( );
this.debug( "No index in database" );
}
this.close( );
}
private void process( final List<Integer> identityIdList, final String newIndex )
{
final List<IdentityObject> identityObjects = IdentityObjectHome.loadEligibleIdentitiesForIndex( identityIdList );
final List<BulkAction> actions = identityObjects.stream( )
.map( identityObject -> new BulkAction( identityObject.getCustomerId( ), identityObject, BulkActionType.INDEX ) )
.collect( Collectors.toList( ) );
boolean bulked = false;
try
{
bulked = new DelayedNetworkService<Boolean>().call( ( ) -> this.createIdentityIndexer( ).bulk( actions, newIndex ), "Index identities by bulk", this);
}
catch ( final IdentityStoreException e )
{
AppLogService.error("An error occurred while bulking: " + e.getMessage( ) );
}
if ( bulked )
{
this.getStatus( ).incrementCurrentNbIndexedIdentities( identityObjects.size( ) );
}
}
public void rollbackIndexCreation( String oldIndex, String newIndex, IIdentityIndexer identityIndexer )
{
try
{
identityIndexer.deleteIndex( newIndex );
identityIndexer.removeIndexReadOnly( oldIndex );
}
catch( ElasticClientException e )
{
this.debug( "Failed to rollback " + e.getMessage( ) );
}
finally {
this.close();
}
}
}