IdentitySearcher.java

/*
 * Copyright (c) 2002-2024, City of Paris
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *  1. Redistributions of source code must retain the above copyright notice
 *     and the following disclaimer.
 *
 *  2. Redistributions in binary form must reproduce the above copyright notice
 *     and the following disclaimer in the documentation and/or other materials
 *     provided with the distribution.
 *
 *  3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
 *     contributors may be used to endorse or promote products derived from
 *     this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * License 1.0
 */
package fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.service;

import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.client.ElasticClient;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.client.ElasticClientException;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.model.ASearchRequest;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.model.ComplexSearchRequest;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.model.CustomerIdSearchRequest;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.model.inner.request.InnerSearchRequest;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.model.inner.request.MultiSearchAction;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.model.inner.request.MultiSearchActionType;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.model.inner.response.Hit;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.model.inner.response.Response;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.model.inner.response.Responses;
import fr.paris.lutece.plugins.identitystore.service.indexer.elastic.search.model.inner.response.Result;
import fr.paris.lutece.plugins.identitystore.utils.Combinations;
import fr.paris.lutece.plugins.identitystore.utils.Maps;
import fr.paris.lutece.plugins.identitystore.v3.web.rs.dto.common.AttributeTreatmentType;
import fr.paris.lutece.plugins.identitystore.v3.web.rs.dto.common.IdentityDto;
import fr.paris.lutece.plugins.identitystore.v3.web.rs.dto.search.SearchAttribute;
import fr.paris.lutece.plugins.identitystore.web.exception.IdentityStoreException;
import fr.paris.lutece.portal.service.util.AppPropertiesService;
import org.apache.commons.collections4.CollectionUtils;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class IdentitySearcher implements IIdentitySearcher
{

    public static final String IDENTITYSTORE_SEARCH_OFFSET = "identitystore.search.offset";
    private static final String INDEX = "identities-alias";
    final private static int propertySize = AppPropertiesService.getPropertyInt( IDENTITYSTORE_SEARCH_OFFSET, 10 );
    private final ElasticClient _elasticClient;

    public IdentitySearcher( String strServerUrl, String strLogin, String strPassword )
    {
        this._elasticClient = new ElasticClient( strServerUrl, strLogin, strPassword );
    }

    public IdentitySearcher( String strServerUrl )
    {
        this._elasticClient = new ElasticClient( strServerUrl );
    }

    @Override
    public Response multiSearch( final List<SearchAttribute> attributes, final List<List<SearchAttribute>> specialTreatmentAttributes,
            final Integer nbEqualAttributes, final Integer nbMissingAttributes, final int max, final boolean connected, final List<String> attributesFilter )
            throws IdentityStoreException
    {
        final List<ASearchRequest> requests = new ArrayList<>( );
        /* Split attribute list into N combinations (list) of nbEqualAttributes */
        final List<List<SearchAttribute>> combinationsOfEqualAttributes = Combinations.combinations( attributes, nbEqualAttributes );
        for ( final List<SearchAttribute> currentCombinationOfEqualAttributes : combinationsOfEqualAttributes )
        {
            /* if Attribute treatments are defined, generate a request for each */
            if ( !CollectionUtils.isEmpty( specialTreatmentAttributes ) )
            {
                final List<List<SearchAttribute>> eligibleNuples = specialTreatmentAttributes
                        .stream( ).filter(
                                nuple -> nuple.stream( )
                                        .noneMatch( nupleAttribute -> currentCombinationOfEqualAttributes.stream( )
                                                .anyMatch( equal -> Objects.equals( equal.getKey( ), nupleAttribute.getKey( ) ) ) ) )
                        .collect( Collectors.toList( ) );
                for ( final List<SearchAttribute> eligibleNuple : eligibleNuples )
                {
                    final List<SearchAttribute> allAttributes = Stream.concat( currentCombinationOfEqualAttributes.stream( ), eligibleNuple.stream( ) )
                            .collect( Collectors.toList( ) );
                    requests.addAll( this.generateRequestsWithOrWithoutMissingAttributes( attributes, nbMissingAttributes, connected, allAttributes,
                            attributesFilter ) );
                }
            }
            else
            {
                requests.addAll( this.generateRequestsWithOrWithoutMissingAttributes( attributes, nbMissingAttributes, connected,
                        currentCombinationOfEqualAttributes, attributesFilter ) );
            }
        }
        return this.getResponse( requests, max );
    }

    /**
     * Handle the possibility of having missing attributes.<br>
     * If @param nbMissingAttributes is > 0, generate all combined requests, else, generate single request.
     * 
     * @param baseAttributes
     *            the complete list of {@link SearchAttribute} defined in the client request
     * @param nbMissingAttributes
     *            can be >= 0
     * @param connected
     *            search for a connected {@link IdentityDto}
     * @param workingAttributes
     *            the current list of attributes considered for the requests
     * @param filterAttributes
     *            set selected attribute list to attributes provided in the request
     * @return
     */
    private List<ASearchRequest> generateRequestsWithOrWithoutMissingAttributes( final List<SearchAttribute> baseAttributes, final Integer nbMissingAttributes,
            final boolean connected, final List<SearchAttribute> workingAttributes, final List<String> attributesFilter )
    {
        final List<ASearchRequest> requests = new ArrayList<>( );
        if ( nbMissingAttributes != null && nbMissingAttributes > 0 )
        {
            final List<SearchAttribute> missingAttributes = baseAttributes.stream( )
                    .filter( attribute -> workingAttributes.stream( ).noneMatch( a -> Objects.equals( a.getKey( ), attribute.getKey( ) ) ) )
                    .collect( Collectors.toList( ) );
            final List<List<SearchAttribute>> combinationsOfMissingAttributes = Combinations.combinations( missingAttributes, nbMissingAttributes );
            for ( final List<SearchAttribute> combinationOfMissingAttributes : combinationsOfMissingAttributes )
            {
                final List<SearchAttribute> missingSearchAttributes = combinationOfMissingAttributes.stream( )
                        .map( attribute -> new SearchAttribute( attribute.getKey( ), attribute.getValue( ), attribute.getOutputKeys( ),
                                AttributeTreatmentType.ABSENT ) )
                        .collect( Collectors.toList( ) );
                final List<SearchAttribute> complete = Stream.concat( missingSearchAttributes.stream( ), workingAttributes.stream( ) )
                        .collect( Collectors.toList( ) );
                requests.add( new ComplexSearchRequest( complete, connected, attributesFilter ) );
            }
        }
        else
        {
            requests.add( new ComplexSearchRequest( workingAttributes, connected, attributesFilter ) );
        }
        return requests;
    }

    @Override
    public Response search( final List<SearchAttribute> attributes, final int max, final boolean connected, final List<String> attributesFilter )
            throws IdentityStoreException
    {
        final ASearchRequest request = new ComplexSearchRequest( attributes, connected, attributesFilter );
        return this.getResponse( request, max );
    }

    @Override
    public Response search( final String customerId, final List<String> attributesFilter ) throws IdentityStoreException
    {
        final ASearchRequest request = new CustomerIdSearchRequest( customerId, attributesFilter );
        return this.getResponse( request, 0 );
    }

    @Override
    public Response search( final List<String> customerIds, final List<String> attributesFilter ) throws IdentityStoreException
    {
        final List<ASearchRequest> request = customerIds.stream( ).map( customerId -> {
            final CustomerIdSearchRequest customerIdSearchRequest = new CustomerIdSearchRequest( customerId, attributesFilter );
            customerIdSearchRequest.getSearchAttributes( ).addAll( attributesFilter.stream( )
                    .map( filteredAttribute -> new SearchAttribute( filteredAttribute, null, null ) ).collect( Collectors.toList( ) ) );
            return customerIdSearchRequest;
        } ).collect( Collectors.toList( ) );
        return this.getResponse( request, 0 );
    }

    private Response getResponse( final ASearchRequest request, final int max ) throws IdentityStoreException
    {
        try
        {
            final InnerSearchRequest initialRequest = request.body( );
            final int size = ( max == 0 ) ? propertySize : Math.min( max, propertySize );
            initialRequest.setFrom( 0 );
            initialRequest.setSize( size );
            final Response innerResponse = this._elasticClient.search( INDEX, initialRequest );
            int total = innerResponse.getResult( ).getTotal( ).getValue( );
            int limit = max == 0 ? total : Math.min( max, total );
            if ( size < limit )
            {
                int offset = initialRequest.getFrom( ) + size;
                while ( offset < limit )
                {
                    initialRequest.setFrom( offset );
                    final Response pagedResponse = this._elasticClient.search( INDEX, initialRequest );
                    innerResponse.getResult( ).getHits( ).addAll( pagedResponse.getResult( ).getHits( ) );
                    if ( pagedResponse.getResult( ).getMaxScore( ).compareTo( innerResponse.getResult( ).getMaxScore( ) ) > 0 )
                    {
                        innerResponse.getResult( ).setMaxScore( pagedResponse.getResult( ).getMaxScore( ) );
                    }
                    offset += size;
                }
            }
            return innerResponse;
        }
        catch( ElasticClientException e )
        {
            throw new IdentityStoreException( e.getMessage( ), e );
        }
    }

    private Response getResponse( final List<ASearchRequest> requests, final int max ) throws IdentityStoreException
    {
        try
        {
            if ( CollectionUtils.isNotEmpty( requests ) )
            {
                final int size = ( max == 0 ) ? propertySize : Math.min( max, propertySize );
                final List<MultiSearchAction> searchActions = requests.stream( ).map( aSearchRequest -> {
                    final InnerSearchRequest innerSearchRequest = aSearchRequest.body( );
                    innerSearchRequest.setFrom( 0 );
                    innerSearchRequest.setSize( size );
                    return new MultiSearchAction( innerSearchRequest, MultiSearchActionType.QUERY, INDEX );
                } ).collect( Collectors.toList( ) );
                final Responses innerResponses = this._elasticClient.multiSearch( INDEX, searchActions );
                final Response globalResponse = new Response( );
                globalResponse.setResult( new Result( ) );
                globalResponse.getResult( ).setHits( new ArrayList<>( ) );
                this.computeResponseMetadata( globalResponse, innerResponses, searchActions );

                final Map<String, Hit> hits = innerResponses.getResponses( ).stream( ).flatMap( r -> r.getResult( ).getHits( ).stream( ) ).distinct( )
                        .collect( Collectors.toMap( hit -> hit.getSource( ).getCustomerId( ), hit -> hit ) );
                final Map<String, Hit> distinctHits = new HashMap<>( hits );

                final int maxTotal = innerResponses.getResponses( ).stream( ).map( r -> r.getResult( ).getTotal( ).getValue( ) ).mapToInt( value -> value )
                        .max( ).orElseThrow( ( ) -> new IdentityStoreException( "Cannot compute total of hits" ) );
                final int limit = Math.min( max, maxTotal );
                int offset = searchActions.get( 0 ).getQuery( ).getFrom( ) + size;
                while ( offset < limit )
                {
                    final int finalOffset = offset;
                    searchActions.forEach( multiSearchAction -> multiSearchAction.getQuery( ).setFrom( finalOffset ) );
                    final Responses pagedResponses = this._elasticClient.multiSearch( INDEX, searchActions );
                    pagedResponses.getResponses( ).stream( ).flatMap( r -> r.getResult( ).getHits( ).stream( ) ).distinct( )
                            .forEach( hit -> distinctHits.putIfAbsent( hit.getSource( ).getCustomerId( ), hit ) );
                    final BigDecimal maxScore = pagedResponses.getResponses( ).stream( ).filter( r -> r.getResult( ).getMaxScore( ) != null )
                            .map( r -> r.getResult( ).getMaxScore( ) ).max( Comparator.comparingDouble( BigDecimal::doubleValue ) )
                            .orElseThrow( ( ) -> new IdentityStoreException( "Cannot compute max score" ) );
                    globalResponse.getResult( ).setMaxScore( maxScore );
                    this.computeResponseMetadata( globalResponse, pagedResponses, searchActions );
                    offset += size;
                }
                globalResponse.getResult( ).getHits( ).addAll( distinctHits.values( ) );
                return globalResponse;
            }
        }
        catch( final ElasticClientException | IdentityStoreException e )
        {
            throw new IdentityStoreException( e.getMessage( ), e );
        }
        return emptyResponse( );
    }

    /**
     * Compute metada of global response to give information about what kind of requests had a match.
     * 
     * @param globalResponse
     *            the global {@link Response} that holds the entire multi search matches
     * @param innerResponses
     *            the multi search {@link Responses} in the same order as {@link MultiSearchAction} (ES gives responses in the same order as bulk requests)
     * @param searchActions
     *            the {@link MultiSearchAction} requests that we want to determine if they have matches or not
     */
    private void computeResponseMetadata( final Response globalResponse, final Responses innerResponses, final List<MultiSearchAction> searchActions )
    {
        for ( int index = 0; index < searchActions.size( ); index++ )
        {
            final Response response = innerResponses.getResponses( ).get( index );
            if ( response.getResult( ) != null && !response.getResult( ).getHits( ).isEmpty( ) )
            {
                Maps.mergeStringMap( globalResponse.getMetadata( ), searchActions.get( index ).getQuery( ).getMetadata( ) );
            }
        }
    }

}