/*
 * Decompiled with CFR 0.152.
 */
package com.ibm.smarts.core.mq;

import com.ibm.smarts.core.mq.AbstractMessageQueueingSystem;
import com.ibm.smarts.core.mq.ExchangeId;
import com.ibm.smarts.core.mq.MessageConsumer;
import com.ibm.smarts.core.mq.MessageProducer;
import com.ibm.smarts.core.mq.MessageProducerProvider;
import com.ibm.smarts.core.mq.QueueId;
import com.ibm.smarts.core.mq.TypedMessageConsumer;
import com.ibm.smarts.core.mq.impl.AbstractMessageProducer;
import com.ibm.smarts.core.mq.impl.AbstractRawMessageConsumer;
import com.ibm.smarts.core.util.MessageQueueingMetrics;
import com.ibm.smarts.core.util.RequestContext;
import com.ibm.smarts.core.util.RequestContextDecorator;
import com.ibm.smarts.core.util.TypedMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleMessageQueueing
extends AbstractMessageQueueingSystem {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleMessageQueueing.class);
    private final Semaphore itemsAvailableSema = new Semaphore(0);
    public final MessageQueueingMetrics metrics = new MessageQueueingMetrics();
    private final TypedMap<ExchangeId, List<WrappedConsumer>> queueConsumers_ = new TypedMap();
    private final int threadPoolSize;
    private final ExecutorService executorService;
    private final AtomicBoolean threadTimeToStop = new AtomicBoolean(false);
    private final CountDownLatch threadStartedLatch = new CountDownLatch(1);
    private final CountDownLatch threadStoppedLatch = new CountDownLatch(1);
    private final WrappedSemaphore handlerSema;

    private static int getMaximumNumberOfThreads() {
        int CORE_COUNT = Runtime.getRuntime().availableProcessors();
        return CORE_COUNT * 2;
    }

    public SimpleMessageQueueing() {
        this(new RequestContextDecorator());
    }

    public SimpleMessageQueueing(RequestContextDecorator requestContextDecorator) {
        this(SimpleMessageQueueing.getMaximumNumberOfThreads(), requestContextDecorator);
    }

    public SimpleMessageQueueing(int totalThreads, RequestContextDecorator requestContextDecorator) {
        super(requestContextDecorator);
        this.threadPoolSize = totalThreads;
        this.executorService = Executors.newFixedThreadPool(totalThreads);
        this.handlerSema = new WrappedSemaphore();
        Runnable runnable = this.processRunnable();
        Thread processor = new Thread(runnable);
        processor.setName("SimpleMessageQueueProcessor");
        processor.setDaemon(true);
        processor.start();
        try {
            this.threadStartedLatch.await();
        }
        catch (InterruptedException e) {
            LOGGER.error(e.getMessage());
            Thread.currentThread().interrupt();
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("[SimpleMessageQueueing @" + Integer.toHexString(this.hashCode()));
        sb.append("]");
        String ret = sb.toString();
        return ret;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<ExchangeId> getExchangeIds() {
        TypedMap<ExchangeId, List<WrappedConsumer>> typedMap = this.queueConsumers_;
        synchronized (typedMap) {
            Set<ExchangeId> actual = this.queueConsumers_.keySet();
            ArrayList<ExchangeId> ret = new ArrayList<ExchangeId>();
            for (ExchangeId eid : actual) {
                ret.add(eid);
            }
            return ret;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<WrappedConsumer> getQueueConsumersActual(ExchangeId exchangeId) {
        TypedMap<ExchangeId, List<WrappedConsumer>> typedMap = this.queueConsumers_;
        synchronized (typedMap) {
            List<WrappedConsumer> consumers = this.queueConsumers_.get(exchangeId);
            if (consumers == null) {
                consumers = new ArrayList<WrappedConsumer>();
                this.queueConsumers_.put(exchangeId, consumers);
            }
            return consumers;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<WrappedConsumer> getQueueConsumersCopy(ExchangeId exchangeId) {
        TypedMap<ExchangeId, List<WrappedConsumer>> typedMap = this.queueConsumers_;
        synchronized (typedMap) {
            List<WrappedConsumer> actual = this.getQueueConsumersActual(exchangeId);
            ArrayList<WrappedConsumer> ret = new ArrayList<WrappedConsumer>();
            for (WrappedConsumer wc : actual) {
                ret.add(wc);
            }
            return ret;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addQueueConsumer(ExchangeId exchangeId, WrappedConsumer wc) {
        TypedMap<ExchangeId, List<WrappedConsumer>> typedMap = this.queueConsumers_;
        synchronized (typedMap) {
            List<WrappedConsumer> consumers = this.getQueueConsumersActual(exchangeId);
            consumers.add(wc);
        }
    }

    @Override
    public void setConsumer(QueueId queueId, MessageConsumer consumer) {
        this.addQueueConsumer(queueId.exchangeId, new WrappedConsumer(new SimpleRawMessageConsumer(queueId, consumer), this.threadPoolSize));
    }

    @Override
    public <T> void setConsumer(QueueId queueId, TypedMessageConsumer<T> typedConsumer) {
        this.addQueueConsumer(queueId.exchangeId, new WrappedConsumer(new SimpleRawMessageConsumer(queueId, typedConsumer), this.threadPoolSize));
    }

    @Override
    public MessageProducerProvider getMessageProducerProvider(final RequestContext rc) {
        return new MessageProducerProvider(){

            @Override
            public MessageProducer producer() {
                return new SimpleMessageProducer(rc);
            }

            @Override
            public MessageProducer producer(RequestContext context) {
                return new SimpleMessageProducer(context);
            }
        };
    }

    private Runnable processRunnable() {
        return new Runnable(){
            final WrappedSemaphore sema;
            {
                this.sema = SimpleMessageQueueing.this.handlerSema;
            }

            private void processAll() {
                for (ExchangeId eid : SimpleMessageQueueing.this.getExchangeIds()) {
                    List consumers = SimpleMessageQueueing.this.getQueueConsumersCopy(eid);
                    for (WrappedConsumer wc : consumers) {
                        QueueItem item;
                        while ((item = wc.queue.poll()) != null) {
                            SimpleMessageQueueing.this.executorService.execute(() -> {
                                try {
                                    this.sema.acquire();
                                    wc.consumer.handleJson(item.json);
                                }
                                finally {
                                    this.sema.release();
                                }
                            });
                        }
                    }
                }
            }

            @Override
            public void run() {
                try {
                    SimpleMessageQueueing.this.threadStartedLatch.countDown();
                    while (true) {
                        try {
                            do {
                                boolean itemsAvailable;
                                if (!(itemsAvailable = SimpleMessageQueueing.this.itemsAvailableSema.tryAcquire(200L, TimeUnit.MILLISECONDS))) continue;
                                this.processAll();
                            } while (!SimpleMessageQueueing.this.threadTimeToStop.get());
                        }
                        catch (InterruptedException e) {
                            LOGGER.error(e.getMessage());
                            Thread.currentThread().interrupt();
                            continue;
                        }
                        break;
                    }
                }
                finally {
                    SimpleMessageQueueing.this.threadStoppedLatch.countDown();
                }
            }
        };
    }

    @Override
    public void shutdown() {
        this.threadTimeToStop.set(true);
        try {
            this.threadStoppedLatch.await();
        }
        catch (InterruptedException e) {
            LOGGER.error(e.getMessage());
            Thread.currentThread().interrupt();
        }
        this.executorService.shutdown();
    }

    private class SimpleMessageProducer
    extends AbstractMessageProducer {
        public SimpleMessageProducer(RequestContext rc) {
            super(rc);
        }

        @Override
        public void publishMessage(QueueId queueId, String json) {
            MessageQueueingMetrics.ExchangeMetrics exMetrics = SimpleMessageQueueing.this.metrics.forExchange(queueId);
            QueueItem item = new QueueItem(queueId, json);
            List consumers = SimpleMessageQueueing.this.getQueueConsumersCopy(queueId.exchangeId);
            for (WrappedConsumer wc : consumers) {
                try {
                    exMetrics.metrics.metrics.queued.durationStart();
                    wc.queue.put(item);
                    SimpleMessageQueueing.this.itemsAvailableSema.release();
                }
                catch (InterruptedException e) {
                    LOGGER.error(e.getMessage());
                    Thread.currentThread().interrupt();
                }
            }
        }

        @Override
        public MessageProducer producer(RequestContext ctx) {
            return new SimpleMessageProducer(ctx);
        }
    }

    private class SimpleRawMessageConsumer
    extends AbstractRawMessageConsumer {
        public SimpleRawMessageConsumer(QueueId queueId, MessageConsumer consumer) {
            super(SimpleMessageQueueing.this.requestContextDecorator, queueId, consumer, SimpleMessageQueueing.this.metrics.forExchange(queueId));
        }

        public <T> SimpleRawMessageConsumer(QueueId queueId, TypedMessageConsumer<T> typedConsumer) {
            super(SimpleMessageQueueing.this.requestContextDecorator, queueId, typedConsumer, SimpleMessageQueueing.this.metrics.forExchange(queueId));
        }

        @Override
        protected MessageProducerProvider getProducerProviderFor(RequestContext rc) {
            return new SimpleMessageProducer(rc);
        }
    }

    private static class WrappedSemaphore {
        private final Semaphore sema;

        public WrappedSemaphore() {
            this(null);
        }

        public WrappedSemaphore(Semaphore sema) {
            this.sema = sema;
        }

        public void acquire() {
            if (this.sema != null) {
                try {
                    this.sema.acquire();
                }
                catch (InterruptedException e) {
                    LOGGER.error(e.getMessage());
                    Thread.currentThread().interrupt();
                }
            }
        }

        public void release() {
            if (this.sema != null) {
                this.sema.release();
            }
        }
    }

    private static class WrappedConsumer {
        public final AbstractRawMessageConsumer consumer;
        public final ArrayBlockingQueue<QueueItem> queue;

        public WrappedConsumer(AbstractRawMessageConsumer consumer, int maxThreads) {
            this.consumer = consumer;
            this.queue = new ArrayBlockingQueue(maxThreads);
        }
    }

    private static class QueueItem {
        public final QueueId queueId;
        public final String json;

        public QueueItem(QueueId queueId, String json) {
            this.queueId = queueId;
            this.json = json;
        }
    }
}

