package com.github.philippheuer.events4j.reactor;

import com.github.philippheuer.events4j.api.domain.IDisposable;
import com.github.philippheuer.events4j.api.service.IEventHandler;
import com.github.philippheuer.events4j.reactor.util.Events4JSubscriber;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.scheduler.forkjoin.ForkJoinPoolScheduler;

/* loaded from: input_file:com/github/philippheuer/events4j/reactor/ReactorEventHandler.class */
public class ReactorEventHandler implements IEventHandler {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ReactorEventHandler.class);
    private final Scheduler scheduler;
    private final FluxProcessor<Object, Object> processor;
    private final FluxSink<Object> eventSink;

    public ReactorEventHandler() {
        this.scheduler = ForkJoinPoolScheduler.create("events4j-events", Math.max(Runtime.getRuntime().availableProcessors(), 4));
        this.processor = EmitterProcessor.create(8192, true);
        this.eventSink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    public ReactorEventHandler(Scheduler scheduler, FluxProcessor<Object, Object> fluxProcessor, FluxSink.OverflowStrategy overflowStrategy) {
        this.scheduler = scheduler;
        this.processor = fluxProcessor;
        this.eventSink = fluxProcessor.sink(overflowStrategy);
    }

    @Override // com.github.philippheuer.events4j.api.service.IEventHandler
    public void publish(Object obj) {
        this.eventSink.next(obj);
    }

    @Override // com.github.philippheuer.events4j.api.service.IEventHandler
    public <E> IDisposable onEvent(Class<E> cls, Consumer<E> consumer) {
        Publisher ofType = this.processor.publishOn(this.scheduler).ofType(cls);
        Events4JSubscriber events4JSubscriber = new Events4JSubscriber(consumer);
        ofType.subscribe(events4JSubscriber);
        return events4JSubscriber;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.eventSink.complete();
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (this.scheduler.isDisposed()) {
            return;
        }
        this.scheduler.dispose();
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public FluxProcessor<Object, Object> getProcessor() {
        return this.processor;
    }

    public FluxSink<Object> getEventSink() {
        return this.eventSink;
    }
}
