/*
 * Decompiled with CFR 0.152.
 */
package com.ibm.neo.probekit.impl.sinks;

import com.ibm.neo.config.ConfigTree;
import com.ibm.neo.io.CSVWriter;
import com.ibm.neo.probekit.IEvent;
import com.ibm.neo.probekit.ISink;
import com.ibm.neo.probekit.ProbeSchema;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.commons.lang.time.DateFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CsvFileSink
implements ISink {
    public static final String CONFIG_OUTPUT_DIR = "output-dir";
    public static final String CONFIG_FLUSH_INTERVAL = "flush-interval";
    private static final String DEFAULT_OUTPUT_DIR = "./";
    private static final long DEFAULT_FLUSH_INTERVAL = 1000L;
    private static Logger LOGGER = LoggerFactory.getLogger(CsvFileSink.class);
    private final ConcurrentHashMap<String, WriteQueue> mProbeName2Queue = new ConcurrentHashMap();
    private File mOutputDir = null;
    private String mName = null;
    private ScheduledFuture<?> mFlushFuture = null;
    private int mFlushErrorCount = 0;

    @Override
    public void initialize(ConfigTree.ConfigNode config, ScheduledExecutorService scheduler) throws Exception {
        long flushInterval;
        this.mName = config.getName();
        this.mOutputDir = new File(config.getChildStringValue(CONFIG_OUTPUT_DIR, DEFAULT_OUTPUT_DIR));
        if (!this.mOutputDir.exists()) {
            FileUtils.forceMkdir((File)this.mOutputDir);
        }
        if ((flushInterval = config.getChildLongValue(CONFIG_FLUSH_INTERVAL, Long.valueOf(1000L)).longValue()) < 1L) {
            throw new IllegalArgumentException("Illegal flush interval: " + flushInterval);
        }
        this.mFlushFuture = scheduler.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                CsvFileSink.this.flush();
            }
        }, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
        LOGGER.info("[ProbeKit] Initialized {} [output-dir: {}, flush-interval: {}]", new Object[]{this.mName, this.mOutputDir.getAbsolutePath(), flushInterval});
    }

    @Override
    public void shutdown() throws Exception {
        this.mFlushFuture.cancel(false);
        this.flush();
    }

    @Override
    public void write(IEvent event) throws Exception {
        ProbeSchema schema = event.getProbe().getSchema();
        this.getOrCreateQueue(schema).enqueue(event);
    }

    public String toString() {
        return new ToStringBuilder((Object)this, ToStringStyle.SHORT_PREFIX_STYLE).append("name", (Object)this.mName).append("outputDir", (Object)this.mOutputDir).toString();
    }

    protected void flush() {
        for (WriteQueue writer : this.mProbeName2Queue.values()) {
            try {
                writer.flush();
            }
            catch (Throwable t) {
                LOGGER.error("[ProbeKit] Unexpected error while flushing events", t);
            }
        }
    }

    private WriteQueue getOrCreateQueue(ProbeSchema schema) throws IOException {
        WriteQueue writer = this.mProbeName2Queue.get(schema.getName());
        if (null == writer) {
            writer = new WriteQueue(this.mOutputDir, schema);
            WriteQueue old = this.mProbeName2Queue.putIfAbsent(schema.getName(), writer);
            if (null != old) {
                writer.close();
                writer = old;
            }
        }
        return writer;
    }

    private static File createOutputFile(File outputDir, String baseName) {
        long now = System.currentTimeMillis();
        String timestamp = DateFormatUtils.format((long)now, (String)"yyyy-MM-dd'T'HH.mm.ss.SSS", (Locale)Locale.US);
        String name = baseName + "_" + timestamp + ".csv";
        return new File(outputDir, name);
    }

    private static String asString(Object value) {
        if (null == value) {
            return null;
        }
        if (value instanceof String) {
            return (String)value;
        }
        if (value instanceof Date) {
            return DateFormatUtils.ISO_DATETIME_FORMAT.format((Date)value);
        }
        return value.toString();
    }

    private final class WriteQueue {
        private final ConcurrentLinkedQueue<String[]> mQueue = new ConcurrentLinkedQueue();
        private final CSVWriter mWriter;

        WriteQueue(File outputDir, ProbeSchema schema) throws IOException {
            this.mWriter = new CSVWriter(CsvFileSink.createOutputFile(outputDir, schema.getName()), ',', "\n", Character.valueOf('\"'), "");
            String[] header = new String[schema.getFields().size()];
            int i = 0;
            for (ProbeSchema.Field<?> f : schema.getFields()) {
                header[i++] = f.getName();
            }
            this.mWriter.writeRecord(header);
        }

        void enqueue(IEvent event) {
            ProbeSchema schema = event.getProbe().getSchema();
            List<ProbeSchema.Field<?>> fields = schema.getFields();
            String[] record = new String[fields.size()];
            for (int i = 0; i < record.length; ++i) {
                record[i] = CsvFileSink.asString(event.getField(fields.get(i)));
            }
            this.mQueue.add(record);
        }

        void flush() {
            block3: {
                String[] record = this.mQueue.poll();
                try {
                    while (null != record) {
                        this.mWriter.writeRecord(record);
                        record = this.mQueue.poll();
                    }
                    this.mWriter.flush();
                    CsvFileSink.this.mFlushErrorCount = 0;
                }
                catch (Exception ex) {
                    ++CsvFileSink.this.mFlushErrorCount;
                    if (CsvFileSink.this.mFlushErrorCount >= 10) break block3;
                    LOGGER.error("[ProbeKit] Unexpected error while flushing events", (Throwable)ex);
                }
            }
        }

        void close() throws IOException {
            this.flush();
            this.mWriter.close();
        }
    }
}

