IdentityDuplicatesDaemon.java

/*
 * Copyright (c) 2002-2024, City of Paris
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *  1. Redistributions of source code must retain the above copyright notice
 *     and the following disclaimer.
 *
 *  2. Redistributions in binary form must reproduce the above copyright notice
 *     and the following disclaimer in the documentation and/or other materials
 *     provided with the distribution.
 *
 *  3. Neither the name of 'Mairie de Paris' nor 'Lutece' nor the names of its
 *     contributors may be used to endorse or promote products derived from
 *     this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * License 1.0
 */
package fr.paris.lutece.plugins.identitystore.modules.quality.service;

import fr.paris.lutece.plugins.identitystore.business.attribute.AttributeKey;
import fr.paris.lutece.plugins.identitystore.business.duplicates.suspicions.SuspiciousIdentity;
import fr.paris.lutece.plugins.identitystore.business.duplicates.suspicions.SuspiciousIdentityHome;
import fr.paris.lutece.plugins.identitystore.business.identity.Identity;
import fr.paris.lutece.plugins.identitystore.business.rules.duplicate.DuplicateRule;
import fr.paris.lutece.plugins.identitystore.business.rules.duplicate.DuplicateRuleHome;
import fr.paris.lutece.plugins.identitystore.modules.quality.business.DuplicatesDaemonLimitationMode;
import fr.paris.lutece.plugins.identitystore.service.daemon.LoggingDaemon;
import fr.paris.lutece.plugins.identitystore.service.duplicate.DuplicateRuleNotFoundException;
import fr.paris.lutece.plugins.identitystore.service.duplicate.DuplicateRuleService;
import fr.paris.lutece.plugins.identitystore.service.identity.IdentityService;
import fr.paris.lutece.plugins.identitystore.service.network.DelayedNetworkService;
import fr.paris.lutece.plugins.identitystore.utils.Batch;
import fr.paris.lutece.plugins.identitystore.v3.web.rs.dto.common.AuthorType;
import fr.paris.lutece.plugins.identitystore.v3.web.rs.dto.common.IdentityDto;
import fr.paris.lutece.plugins.identitystore.v3.web.rs.dto.common.RequestAuthor;
import fr.paris.lutece.plugins.identitystore.v3.web.rs.dto.crud.SuspiciousIdentityChangeRequest;
import fr.paris.lutece.plugins.identitystore.v3.web.rs.dto.crud.SuspiciousIdentityChangeResponse;
import fr.paris.lutece.plugins.identitystore.v3.web.rs.dto.crud.SuspiciousIdentityDto;
import fr.paris.lutece.plugins.identitystore.v3.web.rs.dto.search.DuplicateSearchResponse;
import fr.paris.lutece.plugins.identitystore.web.exception.IdentityStoreException;
import fr.paris.lutece.portal.service.util.AppPropertiesService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.lang3.time.StopWatch;

import java.sql.Timestamp;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * This task identifies {@link Identity} with potential duplicates. The best quality identity is saved in the database to be processed later.<br/>
 * This daemon is also deleting expired suspicions.
 */
public class IdentityDuplicatesDaemon extends LoggingDaemon
{
    private static final String clientCode = AppPropertiesService.getProperty( "daemon.identityDuplicatesDaemon.client.code" );
    private static final Integer batchSize = AppPropertiesService.getPropertyInt( "daemon.identityDuplicatesDaemon.batch.size", 10 );
    private static final Integer purgeSize = AppPropertiesService.getPropertyInt( "daemon.identityDuplicatesDaemon.purge.size", 500 );
    private static final DuplicatesDaemonLimitationMode limitationMode = DuplicatesDaemonLimitationMode.getMode(AppPropertiesService.getProperty( "daemon.identityDuplicatesDaemon.limitation.mode" ));
    private final DelayedNetworkService<DuplicateSearchResponse> delayedNetworkService = new DelayedNetworkService<>();

    private static final RequestAuthor author;
    static
    {
        author = new RequestAuthor( );
        author.setType( AuthorType.application );
        author.setName( AppPropertiesService.getProperty( "daemon.identityDuplicatesDaemon.author.name" ) );
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void doTask( )
    {
        final StopWatch stopWatch = new StopWatch( );
        stopWatch.start( );
        this.info( "Starting IdentityDuplicatesDaemon..." );
        this.info( "daemon.identityDuplicatesDaemon.client.code: " + clientCode );
        this.info( "daemon.identityDuplicatesDaemon.batch.size: " + batchSize );
        this.info( "daemon.identityDuplicatesDaemon.purge.size: " + purgeSize );
        this.info( "daemon.identityDuplicatesDaemon.limitation.mode: " + limitationMode.name( ) );

        try
        {
            this.purgeExpiredSuspicions( );
        }
        catch( final IdentityStoreException | InterruptedException e )
        {
            this.error( "Error occurred while purging expired suspicions : " + e.getMessage( ) );
            this.info( "Continuing..." );
        }
        final List<DuplicateRule> rules;
        try
        {
            rules = DuplicateRuleService.instance( ).findAll( ).stream( ).filter( rule -> rule != null && rule.isDaemon() && rule.isActive() ).collect( Collectors.toList( ) );
        }
        catch( final DuplicateRuleNotFoundException e )
        {
            this.error( "No duplicate rules found in database: " + e.getMessage( ) );
            this.info( "Stopping daemon." );
            return;
        }
        if ( CollectionUtils.isEmpty( rules ) )
        {
            this.error( "No existing duplicate rules marked to be used in daemon. Stopping daemon." );
            return;
        }

        this.info( rules.size( ) + " applicable detection rules found. Starting process..." );
        rules.sort( Comparator.comparingInt( DuplicateRule::getPriority ) );

        for ( final DuplicateRule rule : rules )
        {
            this.processRule( rule );
            rule.setDaemonLastExecDate( Timestamp.from( ZonedDateTime.now( ZoneId.systemDefault( ) ).toInstant( ) ) );
            DuplicateRuleHome.update( rule );
        }

        stopWatch.stop( );
        final String duration = DurationFormatUtils.formatDurationWords( stopWatch.getTime( ), true, true );
        this.info( "Execution time " + duration );
    }

    /**
     * Search for potential duplicates according to the provided rule.
     * 
     * @param rule
     *            the rule used to search duplicates
     */
    private void processRule( final DuplicateRule rule )
    {
        try
        {
            this.info( "-- Processing Rule id = [" + rule.getId( ) + "] code = [" + rule.getCode( ) + "] priority = [" + rule.getPriority( ) + "] ..." );
            switch ( limitationMode )
            {
                case GLOBAL:
                    final int bddSuspicious = SuspiciousIdentityHome.countSuspiciousIdentity( rule.getId() );
                    if( rule.getDetectionLimit( ) > 0 && bddSuspicious >= rule.getDetectionLimit( ) )
                    {
                        this.info( "Limitation mode is set to GLOBAL. Rule detection limit (" + rule.getDetectionLimit( ) + ") exceeded. Detection count : " + bddSuspicious );
                    }
                    else
                    {
                        this.processRule( rule, bddSuspicious );
                    }
                    break;
                case INCREMENTAL:
                    this.processRule( rule, 0 );
                    break;
                default:
                    break;
            }
        }
        catch( final Exception e )
        {
            this.error( "An error occurred during processing of rule " + rule.getCode( ) + " : " + e.getMessage( ) );
        }
    }

    private void processRule( final DuplicateRule rule, final int suspiciousCounterInitializer ) {
        final Batch<String> cuidBatches = IdentityService.instance( ).getCUIDsBatchForPotentialDuplicate( rule, batchSize, limitationMode == DuplicatesDaemonLimitationMode.INCREMENTAL );
        if ( cuidBatches == null || cuidBatches.isEmpty( ) )
        {
            this.error( "No identities having required attributes and not already suspicious found." );
            return;
        }

        this.info( cuidBatches.totalSize( ) + " identities found. Searching for potential duplicates on those..." );
        int suspicionsCounter = suspiciousCounterInitializer;
        final List<String> enhancerFilter = new ArrayList<>( ); // holds cuids that have been detected as duplicates to reduce iteration
        final List<String> attributesFilter = rule.getCheckedAttributes( ).stream( ).map( AttributeKey::getKeyName ).collect( Collectors.toList( ) );
        detection_loop: for( final List<String> cuids : cuidBatches )
        {
            final List<IdentityDto> identities = IdentityService.instance( ).search( cuids, attributesFilter ).stream( ).filter( Objects::nonNull ).collect( Collectors.toList( ) );
            for ( final IdentityDto identity : identities )
            {
                if ( !enhancerFilter.contains( identity.getCustomerId( ) ) )
                {
                    try {
                        final DuplicateSearchResponse duplicates = this.delayedNetworkService.call(() -> SearchDuplicatesService.instance().findDuplicates(identity, Collections.singletonList( rule.getCode( ) ), Collections.singletonList( "customerId" )), "Find duplicates", this);
                        final int duplicateCount = duplicates != null ? duplicates.getIdentities( ).size( ) : 0;
                        if ( duplicateCount > 0 )
                        {
                            this.debug( "Identity " + identity.getCustomerId( ) + " has " + duplicateCount + " duplicates." );
                            final List<IdentityDto> processedIdentities = new ArrayList<>( duplicates.getIdentities( ) );
                            processedIdentities.add( identity );
                            final List<String> customerIds = processedIdentities.stream( ).map( IdentityDto::getCustomerId ).collect( Collectors.toList( ) );
                            if ( DuplicatesDaemonLimitationMode.INCREMENTAL == limitationMode )
                            {
                                this.debug("Incremental mode: remove lower rule suspicious detections if any.");
                                final List<SuspiciousIdentityDto> suspiciousIdentity = SuspiciousIdentityService.instance().getSuspiciousIdentity(customerIds);
                                for ( final SuspiciousIdentityDto suspiciousIdentityDto : suspiciousIdentity )
                                {
                                    final DuplicateRule existingDuplicateRule = DuplicateRuleService.instance().get(suspiciousIdentityDto.getDuplicationRuleCode());
                                    if ( rule.getPriority() < existingDuplicateRule.getPriority() ) // Higher priority means that priority level is lower
                                    {
                                        this.info("Incremental mode: removing suspicion [rule-code: " + suspiciousIdentityDto.getDuplicationRuleCode() + "][cuid: " + suspiciousIdentityDto.getCustomerId() + "] with lower rule priority [rule-priority: " + existingDuplicateRule.getPriority() + "]");
                                        SuspiciousIdentityHome.remove( suspiciousIdentityDto.getCustomerId() );
                                    }
                                }
                            }

                            if ( !SuspiciousIdentityService.instance( ).hasSuspicious( customerIds ) )
                            {
                                final SuspiciousIdentityChangeResponse response = new SuspiciousIdentityChangeResponse( );
                                final SuspiciousIdentityChangeRequest request = new SuspiciousIdentityChangeRequest( );
                                request.setSuspiciousIdentity( new SuspiciousIdentityDto( ) );
                                request.getSuspiciousIdentity( ).setCustomerId( identity.getCustomerId( ) );
                                request.getSuspiciousIdentity( ).setDuplicationRuleCode( rule.getCode( ) );
                                request.getSuspiciousIdentity( ).getMetadata( ).putAll( duplicates.getMetadata( ) );
                                SuspiciousIdentityService.instance( ).create( request, clientCode, author, response );
                                this.info( "Identity " + identity.getCustomerId( ) + " has been marked suspicious." );
                                suspicionsCounter++;
                            }
                            enhancerFilter.addAll( customerIds );
                        }

                        if ( rule.getDetectionLimit( ) > 0 && suspicionsCounter >= rule.getDetectionLimit( ) )
                        {
                            this.info( "Rule detection limit (" + rule.getDetectionLimit( ) + ") exceeded. Detection count : " + suspicionsCounter );
                            break detection_loop;
                        }
                    }
                    catch ( final Exception e )
                    {
                        this.error( "An error occurred during duplicate search for identity" + identity.getCustomerId( ) + " and rule " + rule.getCode( )
                                + " : " + e.getMessage( ) );
                    }
                }
            }
        }
        this.info( suspicionsCounter + " identities have been marked as suspicious." );
    }

    /**
     * Purges the existing suspicious identities by deleting those that don't have duplicates anymore.
     */
    private void purgeExpiredSuspicions( ) throws IdentityStoreException, InterruptedException {
        this.info( "Starting purge suspicions process..." );

        final List<SuspiciousIdentity> suspiciousIdentitysList = SuspiciousIdentityHome.getSuspiciousIdentitysList( null, purgeSize, null );
        final int suspiciousIdentitiesTotalCount = SuspiciousIdentityHome.countSuspiciousIdentity();
        int purgeCount = 0;
        for ( final SuspiciousIdentity suspicious : suspiciousIdentitysList )
        {
            final IdentityDto identity = IdentityService.instance( ).search( suspicious.getCustomerId( ) );
            if(identity != null)
            {
                final DuplicateSearchResponse duplicates = this.delayedNetworkService.call(() -> SearchDuplicatesService.instance().findDuplicates(identity, Collections.singletonList(suspicious.getDuplicateRuleCode()), Collections.emptyList()), "Find duplicates", this);

                if ( duplicates.getIdentities( ).isEmpty( ) )
                {
                    SuspiciousIdentityHome.remove( suspicious.getCustomerId( ) );
                    purgeCount++;
                }
            }
        }
        this.info( "Purge process ended with " + purgeCount + " deleted suspicions on " + suspiciousIdentitiesTotalCount + "." );
    }
}