package io.smallrye.mutiny.helpers.test;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.subscription.ContextSupport;
import java.time.Duration;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/smallrye/mutiny/helpers/test/AssertSubscriber.class */
public class AssertSubscriber<T> implements Subscriber<T>, ContextSupport {
    private final CountDownLatch terminal;
    private final CountDownLatch subscribed;
    private final AtomicReference<Subscription> subscription;
    private final AtomicLong requested;
    private final List<T> items;
    private final AtomicReference<Throwable> failure;
    private final AtomicBoolean completed;
    private int numberOfSubscription;
    private final boolean upfrontCancellation;
    private boolean cancelled;
    private final Context context;
    private final List<EventListener> eventListeners;
    public static final String DEFAULT_MUTINY_AWAIT_TIMEOUT = "DEFAULT_MUTINY_AWAIT_TIMEOUT";
    public static Duration DEFAULT_TIMEOUT = Duration.ofSeconds(Integer.parseInt(System.getenv().getOrDefault(DEFAULT_MUTINY_AWAIT_TIMEOUT, "10")));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/mutiny/helpers/test/AssertSubscriber$Event.class */
    public static class Event {
        private final Object item;
        private final Throwable failure;
        private final boolean completion;
        private final boolean cancellation;

        private Event(Object obj, Throwable th, boolean z, boolean z2) {
            this.item = obj;
            this.failure = th;
            this.completion = z;
            this.cancellation = z2;
        }

        public boolean isItem() {
            return this.item != null;
        }

        public boolean isCancellation() {
            return this.cancellation;
        }

        public boolean isFailure() {
            return this.failure != null;
        }

        public boolean isCompletion() {
            return this.completion;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/mutiny/helpers/test/AssertSubscriber$EventListener.class */
    public interface EventListener extends Consumer<Event> {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/mutiny/helpers/test/AssertSubscriber$ItemTask.class */
    public static class ItemTask<T> {
        private final int expected;
        private final AssertSubscriber<T> subscriber;
        private final long duration;

        public ItemTask(int i, long j, AssertSubscriber<T> assertSubscriber) {
            this.expected = i;
            this.subscriber = assertSubscriber;
            this.duration = j;
        }

        public CompletableFuture<Void> future() {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            long currentTimeMillis = System.currentTimeMillis() + this.duration;
            new Thread(() -> {
                while (System.currentTimeMillis() < currentTimeMillis) {
                    if (((AssertSubscriber) this.subscriber).items.size() >= this.expected) {
                        completableFuture.complete(null);
                        return;
                    } else if (this.subscriber.isCancelled() || this.subscriber.getFailure() != null || this.subscriber.hasCompleted()) {
                        completableFuture.completeExceptionally(new NoSuchElementException("Received a terminal event while waiting for items"));
                        return;
                    } else {
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
                completableFuture.completeExceptionally(new TimeoutException());
            }).start();
            return completableFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/mutiny/helpers/test/AssertSubscriber$NextItemTask.class */
    public static class NextItemTask<T> {
        private final int expected;
        private final AssertSubscriber<T> subscriber;

        public NextItemTask(int i, AssertSubscriber<T> assertSubscriber) {
            this.expected = i;
            this.subscriber = assertSubscriber;
        }

        public CompletableFuture<Void> future() {
            CompletableFuture completableFuture = new CompletableFuture();
            AtomicInteger atomicInteger = new AtomicInteger(this.expected);
            EventListener eventListener = event -> {
                if (event.isItem()) {
                    if (atomicInteger.decrementAndGet() == 0) {
                        completableFuture.complete(null);
                    }
                } else if (event.isCancellation() || event.isFailure() || event.isCompletion()) {
                    completableFuture.completeExceptionally(new NoSuchElementException("Received a terminal event while waiting for items"));
                }
            };
            this.subscriber.registerListener(eventListener);
            return completableFuture.whenComplete((BiConsumer) (r5, th) -> {
                this.subscriber.unregisterListener(eventListener);
            });
        }
    }

    public AssertSubscriber(Context context, long j, boolean z) {
        this.terminal = new CountDownLatch(1);
        this.subscribed = new CountDownLatch(1);
        this.subscription = new AtomicReference<>();
        this.requested = new AtomicLong();
        this.items = new CopyOnWriteArrayList();
        this.failure = new AtomicReference<>();
        this.completed = new AtomicBoolean();
        this.numberOfSubscription = 0;
        this.eventListeners = new CopyOnWriteArrayList();
        this.context = context;
        this.requested.set(j);
        this.upfrontCancellation = z;
    }

    public AssertSubscriber(long j, boolean z) {
        this(Context.empty(), j, z);
    }

    public AssertSubscriber() {
        this(Context.empty(), 0L, false);
    }

    public AssertSubscriber(long j) {
        this(Context.empty(), j, false);
    }

    public static <T> AssertSubscriber<T> create() {
        return new AssertSubscriber<>(0L);
    }

    public static <T> AssertSubscriber<T> create(long j) {
        return new AssertSubscriber<>(j);
    }

    public static <T> AssertSubscriber<T> create(Context context) {
        return new AssertSubscriber<>(context, 0L, false);
    }

    public static <T> AssertSubscriber<T> create(Context context, long j) {
        return new AssertSubscriber<>(context, j, false);
    }

    @Override // io.smallrye.mutiny.subscription.ContextSupport
    public Context context() {
        return this.context;
    }

    public AssertSubscriber<T> assertCompleted() {
        AssertionHelper.shouldHaveCompleted(hasCompleted(), getFailure(), getItems());
        return this;
    }

    public AssertSubscriber<T> assertFailedWith(Class<? extends Throwable> cls, String str) {
        AssertionHelper.shouldHaveFailed(hasCompleted(), getFailure(), cls, str);
        return this;
    }

    public AssertSubscriber<T> assertFailedWith(Class<? extends Throwable> cls) {
        AssertionHelper.shouldHaveFailed(hasCompleted(), getFailure(), cls, null);
        return this;
    }

    public AssertSubscriber<T> assertHasNotReceivedAnyItem() {
        AssertionHelper.shouldHaveReceivedNoItems(this.items);
        return this;
    }

    public AssertSubscriber<T> assertSubscribed() {
        AssertionHelper.shouldBeSubscribed(this.numberOfSubscription);
        return this;
    }

    public AssertSubscriber<T> assertNotSubscribed() {
        AssertionHelper.shouldNotBeSubscribed(this.numberOfSubscription);
        return this;
    }

    public AssertSubscriber<T> assertTerminated() {
        AssertionHelper.shouldBeTerminated(hasCompleted(), getFailure());
        return this;
    }

    public AssertSubscriber<T> assertNotTerminated() {
        AssertionHelper.shouldNotBeTerminated(hasCompleted(), getFailure());
        return this;
    }

    @SafeVarargs
    public final AssertSubscriber<T> assertItems(T... tArr) {
        AssertionHelper.shouldHaveReceivedExactly(this.items, tArr);
        return this;
    }

    public T getLastItem() {
        if (this.items.isEmpty()) {
            return null;
        }
        return this.items.get(this.items.size() - 1);
    }

    public AssertSubscriber<T> assertLastItem(T t) {
        AssertionHelper.shouldHaveReceived(getLastItem(), t);
        return this;
    }

    @Deprecated
    public AssertSubscriber<T> await() {
        return await(DEFAULT_TIMEOUT);
    }

    public AssertSubscriber<T> awaitNextItem() {
        return awaitNextItems(1);
    }

    public AssertSubscriber<T> awaitNextItem(Duration duration) {
        return awaitNextItems(1, 1, duration);
    }

    public AssertSubscriber<T> awaitNextItems(int i) {
        return awaitNextItems(i, DEFAULT_TIMEOUT);
    }

    public AssertSubscriber<T> awaitNextItems(int i, int i2) {
        return awaitNextItems(i, i2, DEFAULT_TIMEOUT);
    }

    public AssertSubscriber<T> awaitNextItems(int i, Duration duration) {
        return awaitNextItems(i, i, duration);
    }

    public AssertSubscriber<T> awaitNextItems(int i, int i2, Duration duration) {
        if (!hasCompleted() && getFailure() == null) {
            awaitNextItemEvents(i, i2, duration);
            return this;
        }
        if (hasCompleted()) {
            throw new AssertionError("Expecting a next items, but a completion event has already being received");
        }
        throw new AssertionError("Expecting a next items, but a failure event has already being received: " + getFailure());
    }

    public AssertSubscriber<T> awaitItems(int i) {
        return awaitItems(i, DEFAULT_TIMEOUT);
    }

    public AssertSubscriber<T> awaitItems(int i, Duration duration) {
        if (this.items.size() > i) {
            throw new AssertionError("Expected the number of items to be " + i + ", but it's already " + this.items.size());
        }
        if (!isCancelled() && !hasCompleted() && getFailure() == null) {
            awaitItemEvents(i, duration);
            return this;
        }
        if (this.items.size() != i) {
            throw new AssertionError("Expected the number of items to be " + i + ", but received " + this.items.size() + " and we received a terminal event already");
        }
        return this;
    }

    public AssertSubscriber<T> awaitCompletion() {
        return awaitCompletion(DEFAULT_TIMEOUT);
    }

    public AssertSubscriber<T> awaitCompletion(Duration duration) {
        Throwable th;
        try {
            awaitEvent(this.terminal, duration);
            if (!this.completed.get() && (th = this.failure.get()) != null) {
                throw new AssertionError("Expected a completion event but got a failure: " + th);
            }
            return this;
        } catch (TimeoutException e) {
            throw new AssertionError("No completion (or failure) event received in the last " + duration.toMillis() + " ms");
        }
    }

    public AssertSubscriber<T> awaitFailure() {
        return awaitFailure(th -> {
        });
    }

    public AssertSubscriber<T> awaitFailure(Consumer<Throwable> consumer) {
        return awaitFailure(consumer, DEFAULT_TIMEOUT);
    }

    public AssertSubscriber<T> awaitFailure(Duration duration) {
        return awaitFailure(th -> {
        }, duration);
    }

    public AssertSubscriber<T> awaitFailure(Consumer<Throwable> consumer, Duration duration) {
        try {
            awaitEvent(this.terminal, duration);
            if (this.completed.get()) {
                throw new AssertionError("Expected a failure event but got a completion event.");
            }
            try {
                consumer.accept(this.failure.get());
                return this;
            } catch (AssertionError e) {
                throw new AssertionError("Received a failure event, but that failure did not pass the validation: " + e, e);
            }
        } catch (TimeoutException e2) {
            throw new AssertionError("No completion (or failure) event received in the last " + duration.toMillis() + " ms");
        }
    }

    public AssertSubscriber<T> awaitSubscription() {
        return awaitSubscription(DEFAULT_TIMEOUT);
    }

    public AssertSubscriber<T> awaitSubscription(Duration duration) {
        try {
            awaitEvent(this.subscribed, duration);
            return this;
        } catch (TimeoutException e) {
            throw new AssertionError("Expecting a subscription event in the last " + duration.toMillis() + " ms, but did not get it");
        }
    }

    private void awaitEvent(CountDownLatch countDownLatch, Duration duration) throws TimeoutException {
        if (countDownLatch.getCount() == 0) {
            return;
        }
        try {
            if (countDownLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
            } else {
                throw new TimeoutException();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void awaitNextItemEvents(int i, int i2, Duration duration) {
        CompletableFuture<Void> future = new NextItemTask(i, this).future();
        if (i2 > 0) {
            request(i2);
        }
        int size = this.items.size();
        try {
            future.get(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            int size2 = this.items.size() - size;
            if (isCancelled()) {
                throw new AssertionError("Expected " + i + " items, but received a cancellation event while waiting. Only " + size2 + " item(s) have been received.");
            }
            if (!hasCompleted()) {
                throw new AssertionError("Expected " + i + " items, but received a failure event while waiting: " + getFailure() + ". Only " + size2 + " item(s) have been received.");
            }
            throw new AssertionError("Expected " + i + " items, but received a completion event while waiting. Only " + size2 + " item(s) have been received.");
        } catch (TimeoutException e3) {
            throw new AssertionError("Expected " + i + " items in " + duration.toMillis() + " ms, but only received " + (this.items.size() - size) + " items.");
        }
    }

    private void awaitItemEvents(int i, Duration duration) {
        try {
            new ItemTask(i, duration.toMillis(), this).future().get(duration.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            if (i == this.items.size()) {
                return;
            }
            if (isCancelled()) {
                throw new AssertionError("Expected " + i + " items, but received a cancellation event while waiting. Only " + this.items.size() + " items have been received.");
            }
            if (hasCompleted()) {
                throw new AssertionError("Expected " + i + " items, but received a completion event while waiting. Only " + this.items.size() + " items have been received.");
            }
            if (getFailure() != null) {
                throw new AssertionError("Expected " + i + " items, but received a failure event while waiting: " + getFailure() + ". Only " + this.items.size() + " items have been received.");
            }
            e2.printStackTrace();
            throw new AssertionError("Expected " + i + " items.  Only " + this.items.size() + " items have been received.");
        } catch (TimeoutException e3) {
            if (this.items.size() < i) {
                throw new AssertionError("Expected " + i + " items.  Only " + this.items.size() + " items have been received.");
            }
        }
    }

    @Deprecated
    public AssertSubscriber<T> await(Duration duration) {
        if (this.terminal.getCount() == 0) {
            return this;
        }
        try {
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (this.terminal.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
            return this;
        }
        throw new AssertionError("Expected a terminal event before the timeout.");
    }

    public AssertSubscriber<T> cancel() {
        AssertionHelper.shouldBeSubscribed(this.numberOfSubscription);
        this.subscription.get().cancel();
        this.cancelled = true;
        Event event = new Event(null, null, false, true);
        this.eventListeners.forEach(eventListener -> {
            eventListener.accept(event);
        });
        return this;
    }

    public AssertSubscriber<T> request(long j) {
        this.requested.addAndGet(j);
        if (this.subscription.get() != null) {
            this.subscription.get().request(j);
        }
        return this;
    }

    public void onSubscribe(Subscription subscription) {
        this.numberOfSubscription++;
        this.subscription.set(subscription);
        this.subscribed.countDown();
        if (this.upfrontCancellation) {
            subscription.cancel();
            this.cancelled = true;
        } else if (this.requested.get() > 0) {
            subscription.request(this.requested.get());
        }
    }

    public synchronized void onNext(T t) {
        this.items.add(t);
        Event event = new Event(t, null, false, false);
        this.eventListeners.forEach(eventListener -> {
            eventListener.accept(event);
        });
    }

    public void onError(Throwable th) {
        this.failure.set(th);
        this.terminal.countDown();
        Event event = new Event(null, th, false, false);
        this.eventListeners.forEach(eventListener -> {
            eventListener.accept(event);
        });
    }

    public void onComplete() {
        this.completed.set(true);
        this.terminal.countDown();
        Event event = new Event(null, null, true, false);
        this.eventListeners.forEach(eventListener -> {
            eventListener.accept(event);
        });
    }

    public List<T> getItems() {
        return this.items;
    }

    public Throwable getFailure() {
        return this.failure.get();
    }

    public AssertSubscriber<T> run(Runnable runnable) {
        try {
            runnable.run();
            return this;
        } catch (AssertionError e) {
            throw e;
        } catch (Throwable th) {
            throw new AssertionError(th);
        }
    }

    public boolean isCancelled() {
        return this.cancelled;
    }

    public boolean hasCompleted() {
        return this.completed.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerListener(EventListener eventListener) {
        this.eventListeners.add(eventListener);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unregisterListener(EventListener eventListener) {
        this.eventListeners.remove(eventListener);
    }
}
