DataSourceIncrementalService.java

/*
 * Copyright (c) 2002-2021, City of Paris
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *  1. Redistributions of source code must retain the above copyright notice
 *     and the following disclaimer.
 *
 *  2. Redistributions in binary form must reproduce the above copyright notice
 *     and the following disclaimer in the documentation and/or other materials
 *     provided with the distribution.
 *
 *  3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
 *     contributors may be used to endorse or promote products derived from
 *     this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * License 1.0
 */
package fr.paris.lutece.plugins.elasticdata.service;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.collections.CollectionUtils;
import org.json.JSONArray;

import fr.paris.lutece.plugins.elasticdata.business.DataObject;
import fr.paris.lutece.plugins.elasticdata.business.DataSource;
import fr.paris.lutece.plugins.elasticdata.business.IndexerAction;
import fr.paris.lutece.plugins.elasticdata.business.IndexerActionHome;
import fr.paris.lutece.plugins.libraryelastic.business.bulk.BulkRequest;
import fr.paris.lutece.plugins.libraryelastic.business.bulk.IndexSubRequest;
import fr.paris.lutece.plugins.libraryelastic.util.Elastic;
import fr.paris.lutece.plugins.libraryelastic.util.ElasticClientException;
import fr.paris.lutece.portal.service.util.AppLogService;
import fr.paris.lutece.util.sql.TransactionManager;

public final class DataSourceIncrementalService
{

    private DataSourceIncrementalService( )
    {

    }

    /**
     * Insert Incremental data sources
     * 
     * @return The logs of the process
     * @throws ElasticClientException
     * 
     */
    public static String processIncrementalIndexing( )
    {
        StringBuilder builder = new StringBuilder( );
        for ( DataSource dataSource : DataSourceService.getDataSources( ) )
        {
            try
            {
                processIncrementalIndexing( dataSource );
            }
            catch( ElasticClientException e )
            {

                AppLogService.error( e.getMessage( ), e );
                builder.append( e.getMessage( ) );
            }
            builder.append( dataSource.getIndexingStatus( ).getSbLogs( ).toString( ) ).append( "\n" );

        }
        return builder.toString( );
    }

    /**
     * Insert Incremental data source
     * 
     * @param dataSource
     *            the data source
     * 
     * @return The logs of the process
     * 
     */
    public static void processAsynchronouslyIncrementalIndexing( DataSource dataSource )
    {
        if ( dataSource.getIndexingStatus( ).getIsRunning( ).compareAndSet( false, true ) )
        {
            ( new Thread( )
            {
                @Override
                public void run( )
                {
                    try
                    {
                        dataSource.getIndexingStatus( ).reset( );
                        processIncrementalIndexing( dataSource );

                    }
                    catch( ElasticClientException e )
                    {
                        dataSource.getIndexingStatus( ).getSbLogs( ).append( e.getMessage( ) ).append( e );
                        AppLogService.error( "Process intcremental indexing error: ", e );

                    }
                    finally
                    {
                        dataSource.getIndexingStatus( ).getIsRunning( ).set( false );
                    }

                }
            } ).start( );
        }

    }

    /**
     * process the incremental indexing of a data source
     * 
     * @param dataSource
     *            the data source
     * 
     * @throws ElasticClientException
     *             If an error occurs accessing to ElasticSearch
     * 
     */
    public static void processIncrementalIndexing( DataSource dataSource ) throws ElasticClientException
    {
        dataSource.getIndexingStatus( ).reset( );
        int nCount = 0;
        int [ ] taskList = {
                IndexerAction.TASK_CREATE, IndexerAction.TASK_MODIFY, IndexerAction.TASK_DELETE
        };
        long timeBegin = System.currentTimeMillis( );
        for ( int nTask : taskList )
        {
            nCount += processIncrementalIndexing( dataSource, IndexerActionHome.getIdResourceIndexerActionsList( dataSource.getId( ), nTask ), nTask );

        }
        dataSource.getIndexingStatus( ).getSbLogs( ).append( "Number of documents processed by the incremental service from the Data Source '" )
                .append( dataSource.getName( ) ).append( "' : " ).append( nCount );
        dataSource.getIndexingStatus( ).getSbLogs( ).append( " (duration : " ).append( System.currentTimeMillis( ) - timeBegin ).append( "ms)\n" );
    }

    /**
     * Process incremental indexing of a data source according to the task
     * 
     * @param dataSource
     *            the datasource
     * @param indexActionList
     *            the list of indexer actions to index
     * @param nIdTask
     *            the task id
     * @throws ElasticClientException
     *             If an error occurs accessing to ElasticSearch
     * @return the total count of documents processed
     */

    public static int processIncrementalIndexing( DataSource dataSource, List<String> listIdResource, int nIdTask ) throws ElasticClientException
    {

        Elastic elastic = DataSourceService.getElastic( );
        int nCount = 0;

        if ( elastic != null && !CollectionUtils.isEmpty( listIdResource ) )
        {
            switch( nIdTask )
            {
                case IndexerAction.TASK_CREATE:
                    nCount += insertObjects( elastic, dataSource, dataSource.getDataObjectsIterator( listIdResource ) );
                    break;
                case IndexerAction.TASK_MODIFY:
                    nCount += updateObjects( elastic, dataSource, dataSource.getDataObjectsIterator( listIdResource ) );
                    break;
                case IndexerAction.TASK_DELETE:
                    nCount += deleteByQuery( dataSource, listIdResource );
                    break;
                default:// do nothing
            }
        }
        return nCount;
    }

    /**
     * Insert a list of object in bulk mode
     * 
     * @param elastic
     *            The Elastic Server
     * @param dataSource
     *            The data source
     * @param iterateDataObjects
     *            The iterator of objects
     * @throws ElasticClientException
     *             If a problem occurs connecting the server
     * @return the number of documents posted
     */
    public static int insertObjects( Elastic elastic, DataSource dataSource, Iterator<DataObject> iterateDataObjects ) throws ElasticClientException
    {
        List<DataObject> listBatch = new ArrayList<>( );
        int nCount = 0;
        BulkRequest br;
        List<String> listIdResource = new ArrayList<>( );
        while ( iterateDataObjects.hasNext( ) )
        {
            DataObject dataObject = iterateDataObjects.next( );
            listIdResource.add( dataObject.getId( ) );
            listBatch.add( dataObject );
            nCount++;
            if ( ( listBatch.size( ) == dataSource.getBatchSize( ) ) || !iterateDataObjects.hasNext( ) )
            {
                DataSourceService.completeDataObjectWithFullData( dataSource, listBatch );
                br = new BulkRequest( );
                for ( DataObject batchObject : listBatch )
                {
                    br.addAction( new IndexSubRequest( batchObject.getId( ) ), batchObject );
                }
                if ( elastic == null )
                {
                    elastic = DataSourceService.getElastic( );
                }

                try
                {
                    TransactionManager.beginTransaction( DataSourceUtils.getPlugin( ) );
                    String strResponse = elastic.createByBulk( dataSource.getTargetIndexName( ), br );
                    AppLogService.debug( "ElasticData : Response of the posted bulk request : " + strResponse );

                    IndexerActionHome.removeByIdResourceList( listIdResource, dataSource.getId( ) );
                    listIdResource.clear( );
                    listBatch.clear( );

                    TransactionManager.commitTransaction( DataSourceUtils.getPlugin( ) );
                }
                catch( ElasticClientException e )
                {
                    TransactionManager.rollBack( DataSourceUtils.getPlugin( ) );
                    AppLogService.error( e.getMessage( ), e );
                    throw new ElasticClientException( "ElasticData createByBulk error", e );
                }
            }
            DataSourceService.updateIndexingStatus( dataSource, nCount );
        }
        AppLogService.debug( "ElasticData indexing : completed for " + nCount + " documents of DataSource: " + dataSource.getName( ) );

        return nCount;
    }

    /**
     * update a list of object
     * 
     * @param elastic
     *            The Elastic Server
     * @param dataSource
     *            The data source
     * @param iterateDataObjects
     *            The iterator of objects
     * @throws ElasticClientException
     *             If a problem occurs connecting the server
     * @return the number of documents posted
     */
    public static int updateObjects( Elastic elastic, DataSource dataSource, Iterator<DataObject> iterateDataObjects ) throws ElasticClientException
    {
        List<DataObject> listBatch = new ArrayList<>( );
        List<String> listIdResource = new ArrayList<>( );
        int nCount = 0;
        if ( elastic == null )
        {
            elastic = DataSourceService.getElastic( );
        }

        while ( iterateDataObjects.hasNext( ) )
        {
            listBatch.add( iterateDataObjects.next( ) );
            if ( ( listBatch.size( ) == dataSource.getBatchSize( ) ) || !iterateDataObjects.hasNext( ) )
            {
                DataSourceService.completeDataObjectWithFullData( dataSource, listBatch );

                try
                {
                    TransactionManager.beginTransaction( DataSourceUtils.getPlugin( ) );
                    for ( DataObject batchObject : listBatch )
                    {
                        String strResponse = elastic.partialUpdate( dataSource.getTargetIndexName( ), batchObject.getId( ), batchObject );
                        AppLogService.debug( "ElasticData : Response of the partial update : " + strResponse );
                        nCount++;
                    }
                    IndexerActionHome.removeByIdResourceList( listIdResource, dataSource.getId( ) );
                    listBatch.clear( );
                    TransactionManager.commitTransaction( DataSourceUtils.getPlugin( ) );
                }
                catch( ElasticClientException e )
                {
                    TransactionManager.rollBack( DataSourceUtils.getPlugin( ) );
                    AppLogService.error( e.getMessage( ), e );
                    throw new ElasticClientException( "ElasticData partialUpdate error", e );
                }
            }
            DataSourceService.updateIndexingStatus( dataSource, nCount );
        }
        AppLogService.info( "ElasticData partial update indexing : completed for " + nCount + " documents of DataSource '" + dataSource.getName( ) + "'" );
        return nCount;
    }

    /**
     * Delete a documents by Query
     * 
     * @param dataSource
     *            The data source
     * @param listIdResource
     *            The list of resource identifiers
     * @throws ElasticClientException
     *             Exception If an error occurs accessing to ElasticSearch
     */
    public static int deleteByQuery( DataSource dataSource, List<String> listIdResource ) throws ElasticClientException
    {
        try
        {
            TransactionManager.beginTransaction( DataSourceUtils.getPlugin( ) );
            Elastic elastic = DataSourceService.getElastic( );
            List<String> idDocumentList = listIdResource.stream( ).map( idDataObject -> DataSourceService.getIdDocument( dataSource.getId( ), idDataObject ) )
                    .collect( Collectors.toList( ) );
            elastic.deleteByQuery( dataSource.getTargetIndexName( ), "{\"query\" : { \"terms\" : {\"_id\" : " + new JSONArray( idDocumentList ) + "}}}" );
            IndexerActionHome.removeByIdResourceList( listIdResource, dataSource.getId( ) );
            TransactionManager.commitTransaction( DataSourceUtils.getPlugin( ) );
            int nSize = idDocumentList.size( );
            DataSourceService.updateIndexingStatus( dataSource, nSize );
            return nSize;
        }
        catch( ElasticClientException e )
        {
            TransactionManager.rollBack( DataSourceUtils.getPlugin( ) );
            AppLogService.error( e.getMessage( ), e );
            throw new ElasticClientException( "ElasticData createByBulk error", e );
        }
    }

    /**
     * Create incremental task
     * 
     * @param strIdDataSource
     *            the datasource id
     * @param strIdResource
     *            the resource id
     * @param nIdTask
     *            the task id
     * 
     */
    public static void addTask( String strIdDataSource, String strIdResource, int nIdTask )
    {

        IndexerAction indexerAction = IndexerActionHome.findByIdResource( strIdResource, strIdDataSource );

        if ( indexerAction != null )
        {
            if ( indexerAction.getIdTask( ) == IndexerAction.TASK_CREATE && nIdTask == IndexerAction.TASK_DELETE )
            {
                IndexerActionHome.remove( indexerAction.getId( ) );
            }
            else
                if ( indexerAction.getIdTask( ) == IndexerAction.TASK_MODIFY && nIdTask == IndexerAction.TASK_DELETE )
                {
                    indexerAction.setIdTask( nIdTask );
                    IndexerActionHome.update( indexerAction );
                }
            return;
        }

        indexerAction = new IndexerAction( );
        indexerAction.setIdDataSource( strIdDataSource );
        indexerAction.setIdResource( strIdResource );
        indexerAction.setIdTask( nIdTask );
        IndexerActionHome.create( indexerAction );
    }

    /**
     * Load the data of all the datasource indexerAction objects and returns them as a list
     * 
     * @param strIdDataSource
     *            the identifier of data source
     * @return The list which contains the data of all the indexerAction objects
     */
    public static List<IndexerAction> getIndexerAction( String strIdDataSource ) {
        return IndexerActionHome.getIndexerActionsList( strIdDataSource );
    }

}