/*
 * Decompiled with CFR 0.152.
 */
package com.ibm.neo.probekit.impl.sinks;

import com.ibm.json.java.JSONObject;
import com.ibm.neo.config.ConfigTree;
import com.ibm.neo.probekit.IEvent;
import com.ibm.neo.probekit.IProbe;
import com.ibm.neo.probekit.ISink;
import com.ibm.neo.probekit.ProbeKit;
import com.ibm.neo.probekit.ProbeSchema;
import com.ibm.neo.probekit.impl.sinks.JsonEventSerializer;
import com.ibm.neo.util.CyclicArray;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonTcpSink
implements ISink {
    public static final String CONFIG_HOST = "host";
    public static final String CONFIG_PORT = "port";
    public static final String CONFIG_CONNECT_TIMEOUT = "connect-timeout";
    public static final String CONFIG_CONNECT_RETRY_INTERVAL = "connect-retry-interval";
    public static final String CONFIG_FLUSH_INTERVAL = "flush-interval";
    public static final String CONFIG_MAX_FLUSH_RATE = "max-flush-rate";
    private static final int DEFAULT_CONNECT_TIMEOUT = 5000;
    private static final long DEFAULT_CONNECT_RETRY_INTERVAL = 5000L;
    private static final long DEFAULT_FLUSH_INTERVAL = 1000L;
    private static final int DEFAULT_MAX_FLUSH_RATE = -1;
    private static Logger LOGGER = LoggerFactory.getLogger(JsonTcpSink.class);
    private final ConcurrentHashMap<String, WriteQueue> mProbeName2Queue = new ConcurrentHashMap();
    private String mName = null;
    private Socket mSock = null;
    private InetSocketAddress mAddr = null;
    private int mConnectTimeout = 0;
    private long mConnectRetryInterval = 0L;
    private long mFlushInterval = 0L;
    private int mMaxFlushRate = 0;
    private ScheduledFuture<?> mFlushFuture = null;
    private int mFlushErrorCount = 0;
    private long mLastConnectAttemptNanos = 0L;
    private volatile long mLastFlushNanos = System.nanoTime();

    @Override
    public void initialize(ConfigTree.ConfigNode config, ScheduledExecutorService scheduler) throws Exception {
        this.mName = config.getName();
        this.mMaxFlushRate = config.getChildIntegerValue(CONFIG_MAX_FLUSH_RATE, Integer.valueOf(-1));
        String host = config.getChildStringValue(CONFIG_HOST, null);
        if (null == host || host.length() == 0) {
            throw new IllegalArgumentException("A host address was not specified");
        }
        int port = config.getChildIntegerValue(CONFIG_PORT, Integer.valueOf(-1));
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("A valid port number was not specified");
        }
        this.mAddr = new InetSocketAddress(host, port);
        this.mConnectTimeout = config.getChildIntegerValue(CONFIG_CONNECT_TIMEOUT, Integer.valueOf(5000));
        if (this.mConnectTimeout < 1) {
            LOGGER.error("[ProbeKit] Illegal value specified for configuration element '{}' - reverting to default ({})", (Object)CONFIG_CONNECT_TIMEOUT, (Object)5000);
            this.mConnectTimeout = 5000;
        }
        this.mConnectRetryInterval = config.getChildLongValue(CONFIG_CONNECT_RETRY_INTERVAL, Long.valueOf(5000L));
        if (this.mConnectRetryInterval < 1L) {
            LOGGER.error("[ProbeKit] Illegal value specified for configuration element '{}' - reverting to default ({})", (Object)CONFIG_CONNECT_RETRY_INTERVAL, (Object)5000L);
            this.mConnectRetryInterval = 5000L;
        }
        this.mFlushInterval = config.getChildLongValue(CONFIG_FLUSH_INTERVAL, Long.valueOf(1000L));
        if (this.mFlushInterval < 1L) {
            LOGGER.error("[ProbeKit] Illegal value specified for configuration element '{}' - reverting to default ({})", (Object)CONFIG_FLUSH_INTERVAL, (Object)1000L);
            this.mFlushInterval = 1000L;
        }
        this.mFlushFuture = scheduler.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                JsonTcpSink.this.flush();
            }
        }, this.mFlushInterval, this.mFlushInterval, TimeUnit.MILLISECONDS);
        LOGGER.info("[ProbeKit] Initialized {} [host: {}, port: {}, connect-timeout: {}, connect-retry-interval: {}, flush-interval: {}]", new Object[]{this.mName, host, port, this.mConnectTimeout, this.mConnectRetryInterval, this.mFlushInterval});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() throws Exception {
        this.mFlushFuture.cancel(false);
        try {
            this.flush();
        }
        finally {
            this.ensureSocketClosed();
        }
    }

    @Override
    public void write(IEvent event) throws Exception {
        long millisSinceLastFlush = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.mLastFlushNanos);
        if (millisSinceLastFlush > 3L * this.mFlushInterval) {
            return;
        }
        IProbe probe = event.getProbe();
        ProbeSchema schema = probe.getSchema();
        WriteQueue queue = this.getOrCreateQueue(schema, probe.getConfig());
        if (null != queue) {
            queue.enqueue(event);
        }
    }

    private WriteQueue getOrCreateQueue(ProbeSchema schema, ConfigTree.ConfigNode config) {
        WriteQueue oldQueue;
        String probeName = schema.getName();
        WriteQueue queue = this.mProbeName2Queue.get(probeName);
        if (null == queue && null != (oldQueue = this.mProbeName2Queue.putIfAbsent(probeName, queue = new WriteQueue(this.getMaxFlushRate(config))))) {
            queue = oldQueue;
        }
        return queue;
    }

    protected void flush() {
        for (WriteQueue q : this.mProbeName2Queue.values()) {
            try {
                q.flush();
            }
            catch (Throwable t) {
                LOGGER.error("[ProbeKit] Unexpected error while flushing events", t);
            }
        }
        this.mLastFlushNanos = System.nanoTime();
    }

    private int getMaxFlushRate(ConfigTree.ConfigNode config) {
        return config.getChildIntegerValue(CONFIG_MAX_FLUSH_RATE, Integer.valueOf(this.mMaxFlushRate));
    }

    private boolean ensureSocketConnected() throws IOException {
        if (null == this.mSock || !this.mSock.isConnected() || this.mSock.isClosed() || this.mFlushErrorCount > 0) {
            if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.mLastConnectAttemptNanos) < this.mConnectRetryInterval) {
                return false;
            }
            IOUtils.closeQuietly((Socket)this.mSock);
            LOGGER.info("[ProbeKit] Attempting connection to {}", (Object)this.mAddr);
            this.mLastConnectAttemptNanos = System.nanoTime();
            this.mSock = new Socket();
            this.mSock.setPerformancePreferences(1, 1, 0);
            this.mSock.setKeepAlive(true);
            this.mSock.connect(this.mAddr, this.mConnectTimeout);
            LOGGER.info("[ProbeKit] Connection established to {}", (Object)this.mAddr);
        }
        return true;
    }

    private void ensureSocketClosed() throws IOException {
        if (null != this.mSock && !this.mSock.isClosed()) {
            LOGGER.info("[ProbeKit] Closing connection to {}", (Object)this.mAddr);
            this.mSock.close();
        }
    }

    public String toString() {
        return new ToStringBuilder((Object)this, ToStringStyle.SHORT_PREFIX_STYLE).append("name", (Object)this.mName).append("address", (Object)this.mAddr).toString();
    }

    private final class WriteQueue {
        private final ConcurrentLinkedQueue<IEvent> mQueue = new ConcurrentLinkedQueue();
        private final CyclicArray<Long> mFlushCounts = new CyclicArray(10);
        private final CyclicArray<Long> mFlushTimes = new CyclicArray(10);
        private final int mMaxFlushRate;
        private volatile long mLastFlushTime = System.nanoTime();
        private volatile double mLastFlushRate = 0.0;

        WriteQueue(int maxFlushRate) {
            this.mMaxFlushRate = maxFlushRate;
        }

        private double getFlushRate() {
            long totalCount = 0L;
            long totalNanos = 0L;
            for (int i = 0; i < this.mFlushCounts.capacity(); ++i) {
                Long count = (Long)this.mFlushCounts.get(i);
                Long nanos = (Long)this.mFlushTimes.get(i);
                if (null == count || null == nanos) continue;
                totalCount += count.longValue();
                totalNanos += nanos.longValue();
            }
            long totalSeconds = TimeUnit.NANOSECONDS.toSeconds(totalNanos);
            if (0L == totalSeconds) {
                return 0.0;
            }
            return (double)totalCount / (double)totalSeconds;
        }

        void enqueue(IEvent event) {
            if (this.mMaxFlushRate > 0 && this.mLastFlushRate > (double)this.mMaxFlushRate) {
                ProbeKit.getInstance().recordEventOverflow(event.getProbe());
            } else {
                this.mQueue.add(event);
            }
        }

        synchronized void flush() {
            IEvent event = this.mQueue.poll();
            long count = 0L;
            while (null != event) {
                block5: {
                    try {
                        if (JsonTcpSink.this.ensureSocketConnected()) {
                            this.serializeEvent(JsonTcpSink.this.mSock.getOutputStream(), event);
                            JsonTcpSink.this.mFlushErrorCount = 0;
                        }
                    }
                    catch (Exception ex) {
                        ++JsonTcpSink.this.mFlushErrorCount;
                        if (JsonTcpSink.this.mFlushErrorCount >= 10) break block5;
                        LOGGER.error("[ProbeKit] Unexpected error while flushing events", (Throwable)ex);
                    }
                }
                event = this.mQueue.poll();
                ++count;
            }
            if (this.mMaxFlushRate > 0) {
                long nanoTime = System.nanoTime();
                this.mFlushTimes.add((Object)(nanoTime - this.mLastFlushTime));
                this.mFlushCounts.add((Object)count);
                this.mLastFlushTime = nanoTime;
                this.mLastFlushRate = this.getFlushRate();
            }
        }

        private void serializeEvent(OutputStream out, IEvent event) throws IOException {
            JSONObject eventObj = JsonEventSerializer.serialize(event);
            OutputStreamWriter writer = new OutputStreamWriter(out, "UTF-8");
            eventObj.serialize((Writer)writer, false);
            writer.append('\n');
            writer.flush();
        }
    }
}

