/*
 * Decompiled with CFR 0.152.
 */
package com.cognos.xqe.runtree.relational;

import com.cognos.xqe.ast.IXQEQueryNode;
import com.cognos.xqe.ast.XQEPersistContext;
import com.cognos.xqe.ast.XQERestoreContext;
import com.cognos.xqe.config.ServiceEnumeration;
import com.cognos.xqe.data.values.DataValueFactory;
import com.cognos.xqe.data.values.IRow;
import com.cognos.xqe.data.values.IValue;
import com.cognos.xqe.data.values.RowValue;
import com.cognos.xqe.exception.XQERuntimeException;
import com.cognos.xqe.query.engine.PlanningEnvironment;
import com.cognos.xqe.resultset.interfaces.IExecutable;
import com.cognos.xqe.resultset.interfaces.IHybridResultSet;
import com.cognos.xqe.resultset.interfaces.IRowsetInfo;
import com.cognos.xqe.resultset.interfaces.ITabularIterator;
import com.cognos.xqe.resultset.interfaces.ITabularResultSet;
import com.cognos.xqe.resultsets.ExchangeIterator;
import com.cognos.xqe.resultsets.tabular.TabularHybridResultSet;
import com.cognos.xqe.runtree.XDataContext;
import com.cognos.xqe.runtree.XNode;
import com.cognos.xqe.runtree.XNodeFactory;
import com.cognos.xqe.runtree.XResultSetBase;
import com.cognos.xqe.runtree.XTabularResultSet;
import com.cognos.xqe.trace.LogLevel;
import com.cognos.xqe.trace.XQEDebugLog;
import com.cognos.xqe.trace.XQELog;
import com.cognos.xqe.trace.XQELogger;
import com.cognos.xqe.trace.XQETrace;
import com.cognos.xqe.util.concurrent.ThreadPool;
import com.cognos.xqe.util.context.ExecutionEnvironmentContext;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.dom4j.Attribute;
import org.dom4j.Element;

public class XDistribute
extends XTabularResultSet {
    private static final long serialVersionUID = 1L;
    private static final String ATTRIBUTE_PACKETSIZE = "packetSize";
    private static final String ATTRIBUTE_DOP = "dop";
    private static final String ATTRIBUTE_TYPE = "type";
    protected static final String ATTRIBUTE_COLUMNNO = "columnNo";
    protected static final String ELEMENT_PARTITIONBY = "PartitionBy";
    private static final int DEFAULT_PACKET_SIZE = 100;
    private int packetSize = 100;
    private int dop = Runtime.getRuntime().availableProcessors();
    private Producer producer;
    private PartitioningType partitioningType;
    private int[] partitionByList;
    private static XQELogger mErrorLogger = XQELog.getLogger(ServiceEnumeration.XQE, "XQE", "Exception", LogLevel.ERROR);

    @Override
    protected synchronized IValue executeImpl(XDataContext context) throws XQERuntimeException {
        if (this.producer == null) {
            IXQEQueryNode child = this.getChild(0);
            switch (this.partitioningType) {
                case BROADCAST: {
                    this.producer = new BroadcastProducer(context, child);
                    break;
                }
                case RANGE: {
                    this.producer = new RangeProducer(context, child);
                    break;
                }
                case HASH: {
                    this.producer = new HashProducer(context, child);
                    break;
                }
                case ROUND_ROBIN: {
                    this.producer = new RoundRobinProducer(context, child);
                    break;
                }
            }
            ThreadPool.getInstance().submit(this.producer);
        }
        return new TabularHybridResultSet(context, new XDistributeResultSet(context), this.getId());
    }

    @Override
    public int getType() {
        return 501046;
    }

    public void setDop(int theDop) {
        this.dop = theDop;
    }

    public void setPartitionByList(int[] thePartitionByList) {
        this.partitionByList = thePartitionByList;
    }

    public void setPartitioningType(PartitioningType thePartitioningType) {
        this.partitioningType = thePartitioningType;
    }

    @Override
    public void dumpExtraInfo(XQETrace trace, boolean includeRuntimeSpecifics) {
        trace.attribute(ATTRIBUTE_DOP, this.dop);
        trace.attribute(ATTRIBUTE_PACKETSIZE, this.packetSize);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void capture(PlanningEnvironment env, Element inputNode) {
        XNodeFactory nodeFactory = (XNodeFactory)env.getNodeFactory();
        String aPacketSize = inputNode.attributeValue(ATTRIBUTE_PACKETSIZE);
        String aDop = inputNode.attributeValue(ATTRIBUTE_DOP);
        String aPartitioningType = inputNode.attributeValue(ATTRIBUTE_TYPE);
        if (aPacketSize != null) {
            this.packetSize = Integer.valueOf(aPacketSize);
        }
        if (aDop != null) {
            this.dop = Integer.valueOf(aDop);
        }
        if (aPartitioningType != null) {
            if (aPartitioningType.equals("broadcast")) {
                this.partitioningType = PartitioningType.BROADCAST;
            } else if (aPartitioningType.equals("range")) {
                this.partitioningType = PartitioningType.RANGE;
            } else if (aPartitioningType.equals("hash")) {
                this.partitioningType = PartitioningType.HASH;
            } else {
                if (!aPartitioningType.equals("round-robin")) throw new IllegalArgumentException(aPartitioningType);
                this.partitioningType = PartitioningType.ROUND_ROBIN;
            }
        } else {
            this.partitioningType = PartitioningType.BROADCAST;
        }
        Iterator it = inputNode.elements().iterator();
        Element child = (Element)it.next();
        if (child.getName().equals(ELEMENT_PARTITIONBY)) {
            List eList = child.elements();
            if (eList != null) {
                this.partitionByList = new int[eList.size()];
                for (int i = 0; i < eList.size(); ++i) {
                    child = (Element)eList.get(i);
                    this.partitionByList[i] = Integer.valueOf(child.attributeValue(ATTRIBUTE_COLUMNNO));
                }
            }
            child = (Element)it.next();
        }
        XNode xNode = (XNode)nodeFactory.createNodeByName(child.getName());
        xNode.capture(env, child);
        this.addChild(xNode);
    }

    @Override
    protected void persistAttributeProperties(XQEPersistContext ctx) {
        super.persistAttributeProperties(ctx);
    }

    @Override
    protected void persistElementProperties(XQEPersistContext ctx) {
        super.persistElementProperties(ctx);
    }

    @Override
    protected void restoreAttributeProperty(XQERestoreContext ctx, Attribute att, Element inputNode) {
        super.restoreAttributeProperty(ctx, att, inputNode);
    }

    @Override
    protected void restoreElementProperty(XQERestoreContext ctx, Element node, Element inputNode) {
        super.restoreElementProperty(ctx, node, inputNode);
    }

    private final class HashProducer
    extends Producer {
        private HashProducer(XDataContext context, IXQEQueryNode child) {
            super(context, child);
            this.name = String.format("XDistribute.HashProducer[%d]", this.hashCode());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            IRow row = null;
            int nRows = 0;
            ExecutionEnvironmentContext executionEnvironmentContext = ExecutionEnvironmentContext.enter(this.context.getEnvironment());
            try {
                RowValue tmpKey = DataValueFactory.createRowValue(new IValue[XDistribute.this.partitionByList.length]);
                do {
                    if ((row = (IRow)this.tabIt.next()) == null) continue;
                    for (int i = 0; i < XDistribute.this.partitionByList.length; ++i) {
                        tmpKey.setColumn(i, row.getColumn(XDistribute.this.partitionByList[i]));
                    }
                    this.current = ((Object)tmpKey).hashCode() % XDistribute.this.dop;
                    this.queues[this.current].put((IRow)row.copy());
                    ++nRows;
                } while (row != null);
                this.terminate();
            }
            catch (InterruptedException e) {
                mErrorLogger.log(e);
            }
            catch (XQERuntimeException e) {
                this.exception = e;
            }
            finally {
                executionEnvironmentContext.exit();
            }
        }
    }

    private final class RangeProducer
    extends Producer {
        private IValue key;

        private RangeProducer(XDataContext context, IXQEQueryNode child) {
            super(context, child);
            this.name = String.format("XDistribute.RangeProducer[%d]", this.hashCode());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            IRow row = null;
            int nRows = 0;
            ExecutionEnvironmentContext executionEnvironmentContext = ExecutionEnvironmentContext.enter(this.context.getEnvironment());
            try {
                this.current = -1;
                RowValue tmpKey = DataValueFactory.createRowValue(new IValue[XDistribute.this.partitionByList.length]);
                do {
                    if ((row = (IRow)this.tabIt.next()) == null) continue;
                    for (int i = 0; i < XDistribute.this.partitionByList.length; ++i) {
                        tmpKey.setColumn(i, row.getColumn(XDistribute.this.partitionByList[i]));
                    }
                    if (this.key == null || this.key.compareTo(tmpKey) != 0) {
                        this.key = (IValue)tmpKey.copy();
                        ++this.current;
                        if (this.current == this.queues.length) {
                            this.current = 0;
                        }
                    }
                    this.queues[this.current].put((IRow)row.copy());
                    ++nRows;
                } while (row != null);
                this.terminate();
            }
            catch (InterruptedException e) {
                mErrorLogger.log(e);
            }
            catch (XQERuntimeException e) {
                this.exception = e;
            }
            finally {
                executionEnvironmentContext.exit();
            }
        }
    }

    private final class RoundRobinProducer
    extends Producer {
        private RoundRobinProducer(XDataContext context, IXQEQueryNode child) {
            super(context, child);
            this.name = String.format("XDistribute.RoundRobinProducer[%d]", this.hashCode());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            IRow row = null;
            int nRows = 0;
            ExecutionEnvironmentContext executionEnvironmentContext = ExecutionEnvironmentContext.enter(this.context.getEnvironment());
            try {
                this.current = 0;
                do {
                    if ((row = (IRow)this.tabIt.next()) == null) continue;
                    this.queues[this.current++].put((IRow)row.copy());
                    if (this.current == this.queues.length) {
                        this.current = 0;
                    }
                    ++nRows;
                } while (row != null);
                this.terminate();
            }
            catch (InterruptedException e) {
                mErrorLogger.log(e);
            }
            catch (XQERuntimeException e) {
                this.exception = e;
            }
            finally {
                executionEnvironmentContext.exit();
            }
        }
    }

    private final class BroadcastProducer
    extends Producer {
        private BroadcastProducer(XDataContext context, IXQEQueryNode child) {
            super(context, child);
            this.name = String.format("XDistribute.BroadcastProducer[%d]", this.hashCode());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            IRow row = null;
            int nRows = 0;
            ExecutionEnvironmentContext executionEnvironmentContext = ExecutionEnvironmentContext.enter(this.context.getEnvironment());
            try {
                this.current = 0;
                do {
                    if ((row = (IRow)this.tabIt.next()) == null) continue;
                    for (LinkedBlockingQueue queue : this.queues) {
                        if (row instanceof RowValue) {
                            queue.put((IRow)((RowValue)row).copy(false));
                            continue;
                        }
                        queue.put((IRow)row.copy());
                    }
                    ++nRows;
                } while (row != null);
                this.terminate();
            }
            catch (InterruptedException e) {
                mErrorLogger.log(e);
            }
            catch (XQERuntimeException e) {
                this.exception = e;
            }
            finally {
                executionEnvironmentContext.exit();
            }
        }
    }

    private abstract class Producer
    implements Runnable {
        protected String name;
        protected XDataContext context;
        protected int current;
        protected int index;
        protected IHybridResultSet resultSet;
        protected LinkedBlockingQueue<IRow>[] queues;
        protected ITabularIterator tabIt;
        private AtomicInteger refCount = new AtomicInteger(0);
        protected XQERuntimeException exception;

        private Producer(XDataContext theContext, IXQEQueryNode child) {
            this.context = theContext;
            this.resultSet = (IHybridResultSet)((IExecutable)((Object)child)).execute(this.context);
            this.queues = new LinkedBlockingQueue[XDistribute.this.dop];
            for (int i = 0; i < XDistribute.this.dop; ++i) {
                this.queues[i] = new LinkedBlockingQueue(XDistribute.this.packetSize);
            }
            this.tabIt = this.resultSet.getTabularIterator();
            this.refCount.set(XDistribute.this.dop);
        }

        public ITabularIterator getTabularIterator() {
            return new ExchangeIterator(this.context, this.queues[this.index++]);
        }

        public String getName() {
            return this.name;
        }

        public XQERuntimeException getException() {
            return this.exception;
        }

        protected void terminate() {
            try {
                RowValue row = DataValueFactory.createRowValue(EMPTY_DATATYPES);
                for (int i = 0; i < this.queues.length; ++i) {
                    XQEDebugLog.out.println(this.name + ": sending poison to queue " + this.queues[i].hashCode());
                    this.queues[i].put(row);
                }
            }
            catch (InterruptedException e) {
                mErrorLogger.log(e);
            }
        }

        public IRowsetInfo getTabularRowsetInfo() {
            return this.resultSet.getTabularRowsetInfo();
        }

        public boolean release() {
            this.refCount.decrementAndGet();
            if (this.refCount.get() > 0) {
                return false;
            }
            if (this.tabIt != null) {
                this.tabIt.release();
                this.tabIt = null;
            }
            if (this.resultSet != null) {
                this.resultSet.release();
                this.resultSet = null;
            }
            return true;
        }
    }

    private final class XDistributeResultSet
    extends XResultSetBase
    implements ITabularResultSet {
        XDistributeResultSet(XDataContext theContext) {
            super(theContext, XDistribute.this.getId());
            super.setTabularRowsetInfo(XDistribute.this.producer.getTabularRowsetInfo());
        }

        @Override
        public ITabularIterator getTabularIterator() {
            return XDistribute.this.producer.getTabularIterator();
        }

        @Override
        public void releaseImpl() {
            if (XDistribute.this.producer.release()) {
                XDistribute.this.producer = null;
            }
        }
    }

    public static enum PartitioningType {
        BROADCAST("Broadcast"),
        RANGE("Range"),
        HASH("Hash"),
        ROUND_ROBIN("Round-robin");

        private String descriptor;

        private PartitioningType(String theDescriptor) {
            this.descriptor = theDescriptor;
        }

        public String getDescriptor() {
            return this.descriptor;
        }
    }
}

