package com.hazelcast.jet.impl;

import com.hazelcast.core.IndeterminateOperationStateException;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.exception.ExecutionNotFoundException;
import com.hazelcast.jet.impl.execution.SnapshotFlags;
import com.hazelcast.jet.impl.operation.SnapshotPhase1Operation;
import com.hazelcast.jet.impl.operation.SnapshotPhase2Operation;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:MICRO-INF/runtime/hazelcast.jar:com/hazelcast/jet/impl/MasterSnapshotContext.class */
public class MasterSnapshotContext {
    final MasterContext mc;
    private final ILogger logger;
    private boolean snapshotInProgress;

    @Nonnull
    private volatile CompletableFuture<Void> terminalSnapshotFuture = CompletableFuture.completedFuture(null);
    private final Queue<SnapshotRequest> snapshotQueue = new LinkedList();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:MICRO-INF/runtime/hazelcast.jar:com/hazelcast/jet/impl/MasterSnapshotContext$SnapshotRequest.class */
    public class SnapshotRequest {
        final String snapshotName;
        final boolean isTerminal;
        final CompletableFuture<Void> future;

        SnapshotRequest(@Nullable String str, boolean z, @Nullable CompletableFuture<Void> completableFuture) {
            this.snapshotName = str;
            this.isTerminal = z;
            this.future = completableFuture;
        }

        public boolean isExport() {
            return this.snapshotName != null;
        }

        public boolean isExportOnly() {
            return isExport() && !this.isTerminal;
        }

        public int snapshotFlags() {
            return SnapshotFlags.create(this.isTerminal, isExport());
        }

        public String mapName() {
            return isExport() ? JobRepository.exportedSnapshotMapName(this.snapshotName) : JobRepository.snapshotDataMapName(MasterSnapshotContext.this.mc.jobId(), MasterSnapshotContext.this.mc.jobExecutionRecord().ongoingDataMapIndex());
        }

        public void completeFuture(@Nullable Throwable th) {
            if (this.future != null) {
                if (th == null) {
                    this.future.complete(null);
                } else {
                    this.future.completeExceptionally(th);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MasterSnapshotContext(MasterContext masterContext, ILogger iLogger) {
        this.mc = masterContext;
        this.logger = iLogger;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueSnapshot(String str, boolean z, CompletableFuture<Void> completableFuture) {
        this.snapshotQueue.add(new SnapshotRequest(str, z, completableFuture));
    }

    private void enqueueRegularSnapshot() {
        enqueueSnapshot(null, false, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startScheduledSnapshot(long j) {
        this.mc.lock();
        try {
            if (this.mc.jobStatus() != JobStatus.RUNNING) {
                this.logger.fine("Not beginning snapshot, " + this.mc.jobIdString() + " is not RUNNING, but " + this.mc.jobStatus());
            } else if (this.mc.executionId() != j) {
                this.logger.fine("Not beginning snapshot since unexpected execution ID received for " + this.mc.jobIdString() + ". Received execution ID: " + Util.idToString(j));
            } else {
                enqueueRegularSnapshot();
                tryBeginSnapshot();
            }
        } finally {
            this.mc.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tryBeginSnapshot() {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            this.mc.lock();
            try {
                if (this.mc.jobStatus() != JobStatus.RUNNING) {
                    this.logger.fine("Not beginning snapshot, " + this.mc.jobIdString() + " is not RUNNING, but " + this.mc.jobStatus());
                    this.mc.unlock();
                    return;
                }
                if (this.snapshotInProgress) {
                    this.logger.fine("Not beginning snapshot since one is already in progress " + this.mc.jobIdString());
                    this.mc.unlock();
                    return;
                }
                if (this.terminalSnapshotFuture.isDone()) {
                    this.logger.fine("Not beginning snapshot since terminal snapshot is already completed " + this.mc.jobIdString());
                    this.mc.unlock();
                    return;
                }
                SnapshotRequest poll = this.snapshotQueue.poll();
                if (poll == null) {
                    return;
                }
                this.snapshotInProgress = true;
                this.mc.jobExecutionRecord().startNewSnapshot(poll.snapshotName);
                long executionId = this.mc.executionId();
                this.mc.unlock();
                long ongoingSnapshotId = this.mc.jobExecutionRecord().ongoingSnapshotId();
                int snapshotFlags = poll.snapshotFlags();
                String mapName = poll.mapName();
                try {
                    this.mc.writeJobExecutionRecordSafe(false);
                    this.mc.nodeEngine().getHazelcastInstance().getMap(mapName).clear();
                    LoggingUtil.logFine(this.logger, "Starting snapshot %d for %s, flags: %s, writing to: %s", Long.valueOf(ongoingSnapshotId), com.hazelcast.jet.impl.util.Util.jobNameAndExecutionId(this.mc.jobName(), executionId), SnapshotFlags.toString(snapshotFlags), poll.snapshotName);
                    this.mc.invokeOnParticipants(executionPlan -> {
                        return new SnapshotPhase1Operation(this.mc.jobId(), executionId, ongoingSnapshotId, mapName, snapshotFlags);
                    }, collection -> {
                        onSnapshotPhase1Complete(collection, executionId, ongoingSnapshotId, poll);
                    }, null, true);
                } catch (Exception e) {
                    this.logger.warning(String.format("Failed to start snapshot %d for %s", Long.valueOf(ongoingSnapshotId), com.hazelcast.jet.impl.util.Util.jobNameAndExecutionId(this.mc.jobName(), executionId)), e);
                    poll.completeFuture(e);
                }
            } finally {
                this.mc.unlock();
            }
        });
    }

    private void onSnapshotPhase1Complete(Collection<Map.Entry<MemberInfo, Object>> collection, long j, long j2, SnapshotRequest snapshotRequest) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            SnapshotPhase1Operation.SnapshotPhase1Result snapshotPhase1Result = new SnapshotPhase1Operation.SnapshotPhase1Result();
            ArrayList arrayList = new ArrayList();
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                SnapshotPhase1Operation.SnapshotPhase1Result value = entry.getValue();
                if (value instanceof Throwable) {
                    if (value instanceof ExecutionNotFoundException) {
                        arrayList.add(this.mc.startOperationResponses().get(((MemberInfo) entry.getKey()).getAddress()));
                    } else {
                        value = new SnapshotPhase1Operation.SnapshotPhase1Result(0L, 0L, 0L, (Throwable) value);
                    }
                }
                snapshotPhase1Result.merge(value);
            }
            if (!arrayList.isEmpty()) {
                LoggingUtil.logFine(this.logger, "%s will wait for %d responses to StartExecutionOperation in onSnapshotPhase1Complete()", this.mc.jobIdString(), Integer.valueOf(arrayList.size()));
            }
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).whenComplete(ExceptionUtil.withTryCatch(this.logger, (r19, th) -> {
                onSnapshotPhase1CompleteWithStartResponses(collection, j, j2, snapshotRequest, snapshotPhase1Result, arrayList);
            }));
        });
    }

    private void onSnapshotPhase1CompleteWithStartResponses(Collection<Map.Entry<MemberInfo, Object>> collection, long j, long j2, SnapshotRequest snapshotRequest, SnapshotPhase1Operation.SnapshotPhase1Result snapshotPhase1Result, List<CompletableFuture<Void>> list) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            boolean z = false;
            this.mc.lock();
            try {
                if (!list.isEmpty()) {
                    LoggingUtil.logFine(this.logger, "%s all awaited responses to StartExecutionOperation received or were already received", this.mc.jobIdString());
                }
                if (j != this.mc.executionId()) {
                    LoggingUtil.logFine(this.logger, "%s: ignoring responses for snapshot %s phase 1: the responses are from a different execution: %s. Responses: %s", this.mc.jobIdString(), Long.valueOf(j2), Util.idToString(j), collection);
                    this.mc.unlock();
                    return;
                }
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    CompletableFuture completableFuture = (CompletableFuture) it.next();
                    if (!$assertionsDisabled && !completableFuture.isDone()) {
                        throw new AssertionError("response not done");
                    }
                    try {
                        completableFuture.get();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (ExecutionException e2) {
                        snapshotPhase1Result.merge(new SnapshotPhase1Operation.SnapshotPhase1Result(0L, 0L, 0L, e2.getCause()));
                    }
                }
                String mapName = snapshotRequest.mapName();
                IMap safeImap = JobRepository.safeImap(this.mc.nodeEngine().getHazelcastInstance().getMap(mapName));
                try {
                    SnapshotValidationRecord snapshotValidationRecord = new SnapshotValidationRecord(j2, snapshotPhase1Result.getNumChunks(), snapshotPhase1Result.getNumBytes(), this.mc.jobExecutionRecord().ongoingSnapshotStartTime(), this.mc.jobId(), this.mc.jobName(), this.mc.jobRecord().getDagJson());
                    Object put = safeImap.put(SnapshotValidationRecord.KEY, snapshotValidationRecord);
                    if (snapshotRequest.isExport()) {
                        if (!$assertionsDisabled && snapshotRequest.snapshotName == null) {
                            throw new AssertionError();
                        }
                        this.mc.jobRepository().cacheValidationRecord(snapshotRequest.snapshotName, snapshotValidationRecord);
                    }
                    if (put != null) {
                        this.logger.severe("SnapshotValidationRecord overwritten after writing to '" + mapName + "' for " + this.mc.jobIdString() + ": snapshot data might be corrupted");
                    }
                } catch (Exception e3) {
                    snapshotPhase1Result.merge(new SnapshotPhase1Operation.SnapshotPhase1Result(0L, 0L, 0L, e3));
                }
                boolean z2 = snapshotPhase1Result.getError() == null;
                JobExecutionRecord.SnapshotStats ongoingSnapshotDone = this.mc.jobExecutionRecord().ongoingSnapshotDone(snapshotPhase1Result.getNumBytes(), snapshotPhase1Result.getNumKeys(), snapshotPhase1Result.getNumChunks(), snapshotPhase1Result.getError(), snapshotRequest.isTerminal);
                if (!z2 || snapshotRequest.isExportOnly()) {
                    this.mc.writeJobExecutionRecord(false);
                } else {
                    try {
                        this.mc.writeJobExecutionRecordSafe(false);
                    } catch (IndeterminateOperationStateException e4) {
                        z = true;
                        this.logger.warning(this.mc.jobIdString() + " snapshot " + j2 + " update of JobExecutionRecord was indeterminate. Will restart the job forcefully.", e4);
                    } catch (Exception e5) {
                        z = true;
                        this.logger.warning(this.mc.jobIdString() + " snapshot " + j2 + " update of JobExecutionRecord failed. Will restart the job forcefully.", e5);
                    }
                }
                if (this.logger.isFineEnabled()) {
                    ILogger iLogger = this.logger;
                    Object[] objArr = new Object[9];
                    objArr[0] = Long.valueOf(j2);
                    objArr[1] = this.mc.jobIdString();
                    objArr[2] = (z ? "INDETERMINATE/" : "") + (z2 ? "SUCCESS" : "FAILURE");
                    objArr[3] = Long.valueOf(ongoingSnapshotDone.duration());
                    objArr[4] = Long.valueOf(ongoingSnapshotDone.numBytes());
                    objArr[5] = Long.valueOf(ongoingSnapshotDone.numKeys());
                    objArr[6] = Long.valueOf(ongoingSnapshotDone.numChunks());
                    objArr[7] = mapName;
                    objArr[8] = (z ? ", skipping " : ", proceeding to ") + "phase 2";
                    iLogger.fine(String.format("Snapshot %d phase 1 for %s completed with status %s in %dms, %,d bytes, %,d keys in %,d chunks, stored in '%s'%s", objArr));
                }
                if (!z2) {
                    this.logger.warning(this.mc.jobIdString() + " snapshot " + j2 + " phase 1 failed on some member(s), one of the failures: " + snapshotPhase1Result.getError());
                    try {
                        safeImap.clear();
                    } catch (Exception e6) {
                        this.logger.warning(this.mc.jobIdString() + ": failed to clear snapshot map '" + mapName + "' after a failure", e6);
                    }
                }
                if (z2 && !z && !snapshotRequest.isExport()) {
                    this.mc.jobRepository().clearSnapshotData(this.mc.jobId(), this.mc.jobExecutionRecord().ongoingDataMapIndex());
                }
                if (!z) {
                    this.mc.invokeOnParticipants(executionPlan -> {
                        return new SnapshotPhase2Operation(this.mc.jobId(), j, j2, z2 && !snapshotRequest.isExportOnly());
                    }, collection2 -> {
                        onSnapshotPhase2Complete(snapshotPhase1Result.getError(), collection2, j, j2, snapshotRequest, ongoingSnapshotDone.startTime());
                    }, null, true);
                    return;
                }
                TerminationMode terminationMode = TerminationMode.RESTART_FORCEFUL;
                logger().fine(this.mc.jobIdString() + ": Terminating job without performing snapshot phase 2 with mode " + terminationMode);
                this.mc.jobContext().handleTermination(terminationMode);
                snapshotRequest.completeFuture(new JetException("Snapshot in unknown state"));
            } finally {
                this.mc.unlock();
            }
        });
    }

    private void onSnapshotPhase2Complete(String str, Collection<Map.Entry<MemberInfo, Object>> collection, long j, long j2, SnapshotRequest snapshotRequest, long j3) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            if (j != this.mc.executionId()) {
                LoggingUtil.logFine(this.logger, "%s: ignoring responses for snapshot %s phase 2: the responses are from a different execution: %s. Responses: %s", this.mc.jobIdString(), Long.valueOf(j2), Util.idToString(j), collection);
                return;
            }
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                if (entry.getValue() instanceof Throwable) {
                    this.logger.log(entry.getValue() instanceof ExecutionNotFoundException ? Level.FINE : Level.WARNING, SnapshotPhase2Operation.class.getSimpleName() + " for snapshot " + j2 + " in " + this.mc.jobIdString() + " failed on member: " + entry, (Throwable) entry.getValue());
                }
            }
            snapshotRequest.completeFuture(str == null ? null : new JetException(str));
            this.mc.lock();
            try {
                if (j != this.mc.executionId()) {
                    this.logger.fine("Not completing terminalSnapshotFuture on " + this.mc.jobIdString() + ", new execution already started, snapshot was for executionId=" + Util.idToString(j));
                    this.mc.unlock();
                    return;
                }
                if (!$assertionsDisabled && !this.snapshotInProgress) {
                    throw new AssertionError("snapshot not in progress");
                }
                this.snapshotInProgress = false;
                if (snapshotRequest.isTerminal) {
                    boolean complete = this.terminalSnapshotFuture.complete(null);
                    if (!$assertionsDisabled && !complete) {
                        throw new AssertionError("terminalSnapshotFuture was already completed");
                    }
                    if (str != null) {
                        this.mc.jobContext().cancelExecutionInvocations(this.mc.jobId(), this.mc.executionId(), null, null);
                    }
                } else if (!snapshotRequest.isExport()) {
                    this.mc.coordinationService().scheduleSnapshot(this.mc, j);
                }
                if (this.logger.isFineEnabled()) {
                    this.logger.fine("Snapshot " + j2 + " for " + this.mc.jobIdString() + " completed in " + (System.currentTimeMillis() - j3) + "ms, status=" + (str == null ? "success" : "failure: " + str));
                }
                tryBeginSnapshot();
            } finally {
                this.mc.unlock();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> terminalSnapshotFuture() {
        return this.terminalSnapshotFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onExecutionStarted() {
        this.snapshotInProgress = false;
        if (!$assertionsDisabled && !this.snapshotQueue.isEmpty()) {
            throw new AssertionError("snapshotQueue not empty");
        }
        this.terminalSnapshotFuture = new CompletableFuture<>();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onExecutionTerminated() {
        Iterator<SnapshotRequest> it = this.snapshotQueue.iterator();
        while (it.hasNext()) {
            it.next().completeFuture(new JetException("Execution completed before snapshot executed"));
        }
        this.snapshotQueue.clear();
    }

    public ILogger logger() {
        return this.logger;
    }

    static {
        $assertionsDisabled = !MasterSnapshotContext.class.desiredAssertionStatus();
    }
}
