DataSourceService.java
/*
* Copyright (c) 2002-2021, City of Paris
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright notice
* and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice
* and the following disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* 3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* License 1.0
*/
package fr.paris.lutece.plugins.elasticdata.service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import fr.paris.lutece.plugins.elasticdata.business.DataObject;
import fr.paris.lutece.plugins.elasticdata.business.DataSource;
import fr.paris.lutece.plugins.elasticdata.business.IDataSourceExternalAttributesProvider;
import fr.paris.lutece.plugins.libraryelastic.business.bulk.BulkRequest;
import fr.paris.lutece.plugins.libraryelastic.business.bulk.IndexSubRequest;
import fr.paris.lutece.plugins.libraryelastic.util.Elastic;
import fr.paris.lutece.plugins.libraryelastic.util.ElasticClientException;
import fr.paris.lutece.portal.business.event.ResourceEvent;
import fr.paris.lutece.portal.service.event.ResourceEventManager;
import fr.paris.lutece.portal.service.spring.SpringContextService;
import fr.paris.lutece.portal.service.util.AppLogService;
import fr.paris.lutece.portal.service.util.AppPropertiesService;
/**
* DataSourceService
*/
public final class DataSourceService
{
private static final String PROPERTY_ELASTIC_SERVER_LOGIN = "elasticdata.elastic_server.login";
private static final String PROPERTY_ELASTIC_SERVER_PWD = "elasticdata.elastic_server.pwd";
private static final String PROPERTY_ELASTIC_SERVER_URL = "elasticdata.elastic_server.url";
private static final String DEFAULT_ELASTIC_SERVER_URL = "httt://localhost:9200";
private static final String SERVER_URL = AppPropertiesService.getProperty( PROPERTY_ELASTIC_SERVER_URL, DEFAULT_ELASTIC_SERVER_URL );
private static final String SERVER_LOGIN = AppPropertiesService.getProperty( PROPERTY_ELASTIC_SERVER_LOGIN );
private static final String SERVEUR_PWD = AppPropertiesService.getProperty( PROPERTY_ELASTIC_SERVER_PWD );
private static Map<String, DataSource> _mapDataSources;
/** Package constructor */
DataSourceService( )
{
}
/**
* Gets all data sources found into Spring context files
*
* @return The list
*/
public static Collection<DataSource> getDataSources( )
{
synchronized( DataSourceService.class )
{
if ( _mapDataSources == null )
{
_mapDataSources = new HashMap<>( );
for ( DataSource source : SpringContextService.getBeansOfType( DataSource.class ) )
{
_mapDataSources.put( source.getId( ), source );
}
}
}
return _mapDataSources.values( );
}
/**
* Get a Data Source from its ID
*
* @param strId
* The ID
* @return The Data Source
*/
public static DataSource getDataSource( String strId )
{
return _mapDataSources.get( strId );
}
/**
* Insert data from a DataSource into Elastic Search
*
* @param sbLogs
* A log buffer
* @param dataSource
* The data source
* @param bReset
* if the index should be reset before inserting
* @throws ElasticClientException
* If an error occurs accessing to ElasticSearch
*/
public static void processFullIndexing( DataSource dataSource, boolean bReset )
{
if ( dataSource.getIndexingStatus( ).getIsRunning( ).compareAndSet( false, true ) )
{
( new Thread( )
{
@Override
public void run( )
{
process( dataSource, bReset );
}
} ).start( );
}
}
/**
* Insert data from a DataSource into Elastic Search
*
* @param sbLogs
* A log buffer
* @param dataSource
* The data source
* @param bReset
* if the index should be reset before inserting
* @throws ElasticClientException
* If an error occurs accessing to ElasticSearch
*/
private static void process( DataSource dataSource, boolean bReset )
{
long timeBegin = System.currentTimeMillis( );
dataSource.getIndexingStatus( ).reset( );
try
{
Elastic elastic = getElastic( );
if ( bReset )
{
if ( elastic.isExists( dataSource.getTargetIndexName( ) ) )
{
elastic.deleteIndex( dataSource.getTargetIndexName( ) );
}
elastic.createMappings( dataSource.getTargetIndexName( ), getMappings( dataSource ) );
}
// Index the objects in bulk mode
int nbDocsInsert = DataSourceIncrementalService.insertObjects( elastic, dataSource, dataSource.getDataObjectsIterator( ) );
long timeEnd = System.currentTimeMillis( );
dataSource.getIndexingStatus( ).getSbLogs( ).append( "Number of object inserted for Data Source '" ).append( dataSource.getName( ) )
.append( "' : " ).append( nbDocsInsert );
dataSource.getIndexingStatus( ).getSbLogs( ).append( " (duration : " ).append( timeEnd - timeBegin ).append( "ms)\n" );
ResourceEvent dataSourceFullIndexed = new ResourceEvent( );
dataSourceFullIndexed.setIdResource( dataSource.getId( ) );
dataSourceFullIndexed.setTypeResource( DataSourceUtils.RESOURCE_TYPE_INDEXING );
ResourceEventManager.fireAddedResource( dataSourceFullIndexed );
}
catch( ElasticClientException e )
{
dataSource.getIndexingStatus( ).getSbLogs( ).append( e.getMessage( ) ).append( e );
AppLogService.error( "Process full indexing: ", e );
}
finally
{
dataSource.getIndexingStatus( ).getIsRunning( ).set( false );
}
}
/**
* Insert one dataObject from a DataSource into Elastic Search
*
* @param dataSource
* The data source
* @param dataObject
* The data object
* @throws ElasticClientException
* If an error occurs accessing to ElasticSearch
*/
public static void processIncrementalIndexing( DataSource dataSource, DataObject dataObject ) throws ElasticClientException
{
completeDataObjectWithFullData( dataSource, dataObject );
Elastic elastic = getElastic( );
elastic.create( dataSource.getTargetIndexName( ), ( dataObject.getId( ) != null ) ? dataObject.getId( ) : StringUtils.EMPTY, dataObject );
}
/**
* Insert a dataObject from a DataSource into Elastic Search
*
* @param sbLogs
* A log buffer
* @param dataSource
* The data source
* @param dataObject
* The collection of data object
* @throws fr.paris.lutece.plugins.libraryelastic.util.ElasticClientException
*/
public static void processIncrementalIndexing( StringBuilder sbLogs, DataSource dataSource, Collection<DataObject> dataObject )
throws ElasticClientException
{
long timeBegin = System.currentTimeMillis( );
Elastic elastic = getElastic( );
// Index the objects in bulk mode
int nbDocsInsert = insertObjects( elastic, dataSource, dataObject.iterator( ) );
long timeEnd = System.currentTimeMillis( );
sbLogs.append( "Number of object inserted for Data Source '" ).append( dataSource.getName( ) ).append( "' : " ).append( nbDocsInsert );
sbLogs.append( " (duration : " ).append( timeEnd - timeBegin ).append( "ms)\n" );
}
/**
* Partial Updates to Documents
*
* @param dataSource
* The data source
* @param strId
* The document id
* @param object
* The object
* @throws ElasticClientException
* Exception If an error occurs accessing to ElasticSearch
*/
public static void partialUpdate( DataSource dataSource, String strId, Object object ) throws ElasticClientException
{
Elastic elastic = getElastic( );
elastic.partialUpdate( dataSource.getTargetIndexName( ), getIdDocument( dataSource.getId( ), strId ), object );
}
/**
* Delete a documents by Query
*
* @param dataSource
* the data source
* @param strQuery
* the query
* @throws ElasticClientException
* Exception If an error occurs accessing to ElasticSearch
*/
public static void deleteByQuery( DataSource dataSource, String strQuery ) throws ElasticClientException
{
Elastic elastic = getElastic( );
elastic.deleteByQuery( dataSource.getTargetIndexName( ), strQuery );
}
/**
* Delete a document based on its id in the index
*
* @param dataSource
* The data source
* @param strId
* The id
* @throws ElasticClientException
* Exception If an error occurs accessing to ElasticSearch
*/
public static void deleteById( DataSource dataSource, String strId ) throws ElasticClientException
{
Elastic elastic = getElastic( );
elastic.deleteDocument( dataSource.getTargetIndexName( ), getIdDocument( dataSource.getId( ), strId ) );
}
/**
* Insert All data sources
*
* @param bReset
* Reset the index before inserting
* @return The logs of the process
* @throws ElasticClientException
* If an error occurs accessing to ElasticSearch
*/
public static String insertDataAllDatasources( boolean bReset )
{
return insertDataAllDatasources( bReset, false );
}
/**
* Insert All data sources
*
* @param bReset
* Reset the index before inserting
* @param bDaemon
* If called by a daemon
* @return The logs of the process
*/
public static String insertDataAllDatasources( boolean bReset, boolean bDaemon )
{
StringBuilder builder = new StringBuilder( );
for ( DataSource dataSource : getDataSources( ) )
{
if ( ( dataSource.usesFullIndexingDaemon( ) || !bDaemon ) )
{
process( dataSource, bReset );
builder.append( dataSource.getIndexingStatus( ).getSbLogs( ).toString( ) ).append( "\n" );
}
}
return builder.toString( );
}
/**
* Return elastic connection
**/
public static Elastic getElastic( )
{
Elastic elastic = null;
if ( StringUtils.isNotEmpty( SERVER_LOGIN ) && StringUtils.isNotEmpty( SERVEUR_PWD ) )
{
elastic = new Elastic( SERVER_URL, SERVER_LOGIN, SERVEUR_PWD );
}
else
{
elastic = new Elastic( SERVER_URL );
}
return elastic;
}
/**
* Return the mappings associated to a data source
*
* @param dataSource
* The data source
* @return The mappings string as JSON
*/
private static String getMappings( DataSource dataSource )
{
if ( dataSource.getMappings( ) != null )
{
// Datasource prodided mappings
return dataSource.getMappings( );
}
if ( dataSource.isLocalizable( ) )
{
// Timestamp and location mappings
return DataSourceUtils.TIMESTAMP_AND_LOCATION_MAPPINGS;
}
// Default timestamp mappings
return DataSourceUtils.TIMESTAMP_MAPPINGS;
}
/**
* Complete the data source with the external attributes and set elastic docuement id
*
* @param dataSource
* the data source
* @param dataObject
* the data object
*/
public static void completeDataObjectWithFullData( DataSource dataSource, DataObject dataObject )
{
dataObject.setId( getIdDocument( dataSource.getId( ), dataObject.getId( ) ) );
provideExternalAttributes( dataSource, dataObject );
}
/**
* Complete the data source with the external attributes and set elastic docuement id
*
* @param dataSource
* the data source
* @param dataObject
* the data object
*/
public static void completeDataObjectWithFullData( DataSource dataSource, List<DataObject> dataObjectList )
{
for ( DataObject dataObject : dataObjectList )
{
dataObject.setId( getIdDocument( dataSource.getId( ), dataObject.getId( ) ) );
}
provideExternalAttributes( dataSource, dataObjectList );
}
/**
* Return the unique id of the elastic document
*
* @param strIdDataSource
* the data source id
* @param strIdDataObject
* the data object id
*/
public static String getIdDocument( String strIdDataSource, String strIdDataObject )
{
return DataSourceUtils.PREFIX_DATA_OBJECT_ID + strIdDataSource + "_" + strIdDataObject;
}
/**
* Insert a list of object in bulk mode
*
* @param elastic
* The Elastic Server
* @param dataSource
* The data source
* @param iterateDataObjects
* The iterator of objects
* @throws ElasticClientException
* If a problem occurs connecting the server
* @return the number of documents posted
*/
public static int insertObjects( Elastic elastic, DataSource dataSource, Iterator<DataObject> iterateDataObjects ) throws ElasticClientException
{
List<DataObject> listBatch = new ArrayList<>( );
int nCount = 0;
BulkRequest br;
while ( iterateDataObjects.hasNext( ) )
{
DataObject dataObject = iterateDataObjects.next( );
listBatch.add( dataObject );
nCount++;
if ( ( listBatch.size( ) == dataSource.getBatchSize( ) ) || !iterateDataObjects.hasNext( ) )
{
completeDataObjectWithFullData( dataSource, listBatch );
br = new BulkRequest( );
for ( DataObject batchObject : listBatch )
{
br.addAction( new IndexSubRequest( batchObject.getId( ) ), batchObject );
}
if ( elastic == null )
{
elastic = getElastic( );
}
String strResponse = elastic.createByBulk( dataSource.getTargetIndexName( ), br );
AppLogService.debug( "ElasticData : Response of the posted bulk request : " + strResponse );
listBatch.clear( );
}
updateIndexingStatus( dataSource, nCount );
}
AppLogService.debug( "ElasticData indexing : completed for " + nCount + " documents of DataSource: " + dataSource.getName( ) );
return nCount;
}
/**
* Provide external attributes for the DataSource
*
* @param dataSource
* the data source
*/
private static void provideExternalAttributes( DataSource dataSource, DataObject dataObject )
{
for ( IDataSourceExternalAttributesProvider provider : dataSource.getExternalAttributesProvider( ) )
{
provider.provideAttributes( dataObject );
}
}
/**
* Provide external attributes for the DataSource
*
* @param dataSource
* the data source
* @param listDataObject
* list of data objects
*/
private static void provideExternalAttributes( DataSource dataSource, List<DataObject> listDataObject )
{
for ( IDataSourceExternalAttributesProvider provider : dataSource.getExternalAttributesProvider( ) )
{
provider.provideAttributes( listDataObject );
}
}
/**
* Update indexing status of the datasource
*
* @param dataSource
* the data source
* @param nCount
* the count of objects processed
*/
public static void updateIndexingStatus( DataSource dataSource, int nCount )
{
if ( dataSource.getIndexingStatus( ).getNbTotalObj( ) < nCount )
{
dataSource.getIndexingStatus( ).setnNbTotalObj( nCount );
}
dataSource.getIndexingStatus( ).setCurrentNbIndexedObj( nCount );
}
}