/*
 * Decompiled with CFR 0.152.
 */
package com.spss.ac.acbase;

import com.spss.ac.acbase.ComponentUtils;
import com.spss.ac.accode.ACException;
import com.spss.ac.accode.i18n.ACMessages;
import com.spss.analyticframework.api.ComponentId;
import com.spss.analyticframework.api.ComponentOutput;
import com.spss.analyticframework.api.IdMapping;
import com.spss.analyticframework.api.ParamsValidator;
import com.spss.analyticframework.api.ParamsValidatorService;
import com.spss.analyticframework.api.Traversable;
import com.spss.analyticframework.api.TraversableBuilder;
import com.spss.analyticframework.api.parameters.Parameters;
import com.spss.analyticframework.api.pull.ContainerChannel;
import com.spss.analyticframework.api.pull.KeyedRecord;
import com.spss.analyticframework.api.pull.RecordChannel;
import com.spss.analyticframework.api.pull.RecordIterator;
import com.spss.analyticframework.api.stream.ExecutionStream;
import com.spss.analyticframework.cache.CacheManager;
import com.spss.analyticframework.commons.AFUtils;
import com.spss.analyticframework.componentapi.ComponentContext;
import com.spss.analyticframework.directorapi.ContainerConsumerDirector;
import com.spss.analyticframework.directorapi.ContainerProducerDirector;
import com.spss.analyticframework.directorapi.DataModelConsumer;
import com.spss.analyticframework.directorapi.DirectorContext;
import com.spss.analyticframework.directorapi.DirectorDescriptor;
import com.spss.analyticframework.directorapi.DirectorExecutionResults;
import com.spss.analyticframework.directorapi.ExecutionDirector;
import com.spss.analyticframework.directorapi.IterativeExecutionDirector;
import com.spss.analyticframework.directorapi.plans.DataParallelPlan;
import com.spss.analyticframework.directorapi.plans.DelegateExecutionPlan;
import com.spss.analyticframework.directorapi.plans.ExecutionPlan;
import com.spss.analyticframework.directorapi.plans.MapReduceExecutionPlan;
import com.spss.analyticframework.directorapi.plans.ParallelExecutionPlan;
import com.spss.analyticframework.directorapi.plans.TaskParallelPlan;
import com.spss.analyticframework.directorapi.plans.mixins.IterationDataPointer;
import com.spss.analyticframework.directorapi.plans.mixins.JobAware;
import com.spss.analyticframework.directorapi.plans.mixins.MapperSharedData;
import com.spss.analyticframework.directorapi.plans.mixins.MappersShareDataSource;
import com.spss.analyticframework.directorapi.plans.mixins.ReducerSharedData;
import com.spss.analyticframework.directorapi.plans.mixins.SortSpec;
import com.spss.datamodel.DataModel;
import com.spss.datamodel.Fields;
import com.spss.datarecord.Record;
import com.spss.utilities.functional.Tuple2;
import com.spss.utilities.i18n.LocMsgId;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.log4j.Logger;

public class DirectorLocalRunner {
    private static final Logger log = Logger.getLogger(DirectorLocalRunner.class);
    protected ExecutionDirector executionDirector = null;
    protected DirectorContext directorContext = null;
    protected Parameters parameters = null;
    protected ContainerChannel inputContainer = null;
    protected RecordChannel inputRecordChannel = null;
    protected RecordChannel lastRecordChannel = null;
    protected boolean hasRunned = false;
    protected CacheManager cacheManager = null;
    protected int curJobIndex = 0;

    public DirectorLocalRunner(ExecutionDirector executionDirector) {
        this.executionDirector = executionDirector;
    }

    public void init(ComponentContext context, Parameters parameters) {
        this.init(ComponentUtils.wrapAsDirectorContext(context), parameters, false);
    }

    public void init(DirectorContext context, Parameters parameters, boolean validateParams) {
        this.directorContext = context;
        this.hasRunned = false;
        if (validateParams) {
            ParamsValidator validator = ParamsValidatorService.get().getValidator();
            DirectorDescriptor descriptor = this.executionDirector.getDescriptor();
            if (descriptor != null) {
                ParamsValidator.ValidationResult result = validator.validateParams(descriptor.getModuleId() + ":" + descriptor.getDirectorId(), parameters, descriptor.getSupportedParameters());
                this.parameters = result.getParameters();
            }
        } else {
            this.parameters = parameters;
        }
        this.cacheManager = ComponentUtils.getCacheManager(this.directorContext.getStreamExecutorService());
    }

    public void setInputContainers(TraversableBuilder<ContainerChannel> inputContainers) {
        Traversable it = inputContainers.build();
        if (it.hasNext()) {
            this.inputContainer = (ContainerChannel)it.next();
        }
    }

    public void setInputData(TraversableBuilder<RecordChannel> inputData) {
        Traversable recordChannels = AFUtils.filterOutCustomRecordChannels(inputData);
        RecordChannel recordChannel = null;
        if (recordChannels.hasNext()) {
            recordChannel = (RecordChannel)recordChannels.next();
        }
        if (recordChannel == null) {
            log.error((Object)"No input record channel is found");
            throw new ACException((LocMsgId)ACMessages.RECORD_CHANNEL_ERROR, new Object[0]);
        }
        this.inputRecordChannel = recordChannel;
    }

    public ContainerChannel getContainerChannel() {
        if (!this.hasRunned) {
            this.runDirector();
        }
        ContainerChannel result = null;
        if (this.executionDirector instanceof ContainerProducerDirector) {
            result = ((ContainerProducerDirector)this.executionDirector).getContainerChannel();
        }
        return result;
    }

    public RecordChannel getLastRecordChannel() {
        if (!this.hasRunned) {
            this.runDirector();
        }
        return this.lastRecordChannel;
    }

    protected void runDirector() {
        this.hasRunned = true;
        DataModel dataModel = this.inputRecordChannel.getDataModel();
        if (this.executionDirector instanceof DataModelConsumer) {
            ((DataModelConsumer)this.executionDirector).setDataModel(dataModel);
        }
        if (this.executionDirector instanceof ContainerConsumerDirector) {
            ContainerConsumerDirector cc = (ContainerConsumerDirector)this.executionDirector;
            cc.setContainers(this.inputContainer);
        }
        this.executionDirector.init(this.parameters, this.directorContext);
        this.runDirectorExecution(this.inputRecordChannel);
    }

    protected void runDirectorExecution(RecordChannel recordChannel) {
        if (this.executionDirector instanceof IterativeExecutionDirector) {
            IterativeExecutionDirector itDirector = (IterativeExecutionDirector)this.executionDirector;
            int i = 0;
            do {
                this.curJobIndex = i;
                ExecutionPlan plan = itDirector.getExecutionPlan();
                ComponentOutput output = this.runExecutionPlan(plan, recordChannel);
                if (!(plan instanceof ParallelExecutionPlan)) {
                    this.postAnalyze(output, i);
                }
                ++i;
            } while (!itDirector.finished());
        } else {
            ExecutionPlan plan = this.executionDirector.getExecutionPlan();
            ComponentOutput output = this.runExecutionPlan(plan, recordChannel);
            if (!(plan instanceof ParallelExecutionPlan)) {
                this.postAnalyze(output, 0);
            }
        }
    }

    protected ComponentOutput runExecutionPlan(ExecutionPlan plan, RecordChannel recordChannel) {
        ComponentOutput output = null;
        RecordChannel jobInputChannel = recordChannel;
        if (plan != null) {
            IterationDataPointer.Source[] dataPointers;
            if (plan instanceof IterationDataPointer && (dataPointers = ((IterationDataPointer)plan).readDataFromInputs()) != null && dataPointers.length > 0 && dataPointers[0] instanceof IterationDataPointer.IterationSource) {
                Traversable prevChannels = this.cacheManager.getChannels(Integer.toString(((IterationDataPointer.IterationSource)dataPointers[0]).get()));
                if (prevChannels != null && prevChannels.hasNext()) {
                    jobInputChannel = (RecordChannel)prevChannels.next();
                } else {
                    throw new ACException((LocMsgId)ACMessages.INTERNAL_ERROR, new Object[0]);
                }
            }
            if (plan instanceof ParallelExecutionPlan) {
                ArrayList<ComponentOutput> allOutput = new ArrayList<ComponentOutput>();
                ParallelExecutionPlan parallelPlan = (ParallelExecutionPlan)plan;
                for (ExecutionPlan subPlan : parallelPlan.plans()) {
                    ComponentOutput out = this.runExecutionPlan(subPlan, jobInputChannel);
                    this.postAnalyze(out, -1);
                    allOutput.add(out);
                }
                this.postAnalyze(allOutput, -1);
            } else if (plan instanceof DelegateExecutionPlan) {
                DirectorLocalRunner runner = this.runDelegateExecutionPlan((DelegateExecutionPlan)plan, jobInputChannel);
                Traversable recordChs = null;
                if (runner.getLastRecordChannel() != null) {
                    recordChs = AFUtils.toTraversable((Object[])new RecordChannel[]{runner.getLastRecordChannel()});
                }
                ContainerChannel conCh = runner.getContainerChannel();
                output = new ComponentOutput(recordChs, conCh, null);
            } else {
                output = this.runLocalMapReduceJob(plan, jobInputChannel);
            }
        }
        return output;
    }

    protected void postAnalyze(ComponentOutput output, int curIndex) {
        this.postAnalyze(this.asDirectorResults(output), curIndex);
    }

    protected void postAnalyze(List<ComponentOutput> allOutput, int curIndex) {
        this.postAnalyze(this.asDirectorResults(allOutput), curIndex);
    }

    protected void postAnalyze(DirectorExecutionResults prevResults, int curIndex) {
        if (prevResults != null) {
            DirectorExecutionResults dResults = this.executionDirector.postAnalyze(prevResults);
            if (dResults != null && curIndex >= 0) {
                for (DirectorExecutionResults.Result result : dResults) {
                    if (result.getType() != DirectorExecutionResults.Result.ResultType.DATA) continue;
                    this.lastRecordChannel = result.asRecordChannel();
                    this.cacheManager.writeChannels(Integer.toString(curIndex), AFUtils.toTraversable((Object[])new RecordChannel[]{this.lastRecordChannel}));
                    Traversable it = this.cacheManager.getChannels(Integer.toString(curIndex));
                    while (it.hasNext()) {
                        this.lastRecordChannel = (RecordChannel)it.next();
                    }
                    break;
                }
            }
        } else {
            this.executionDirector.postAnalyze(null);
        }
    }

    protected ComponentOutput runLocalMapReduceJob(ExecutionPlan plan, final RecordChannel inputChannel) {
        ComponentOutput result = null;
        if (plan instanceof JobAware) {
            JobAware jobConf = (JobAware)plan;
            Properties props = new Properties();
            props.put("max.map.parallel.tasks", "1");
            props.put("max.reduce.parallel.tasks", "1");
            props.put("total.blocks", "1");
            jobConf.setProperties(props);
        }
        Tuple2 sharedMapData = null;
        ComponentId mapTargetID = null;
        if (plan instanceof MapperSharedData) {
            MapperSharedData mapSharedData = (MapperSharedData)plan;
            sharedMapData = mapSharedData.getMapperSharedData();
            mapTargetID = mapSharedData.getMapperTargetComponent();
        }
        Tuple2 sharedRedData = null;
        ComponentId redTargetID = null;
        if (plan instanceof ReducerSharedData) {
            ReducerSharedData redSharedData = (ReducerSharedData)plan;
            sharedRedData = redSharedData.getReducerSharedData();
            redTargetID = redSharedData.getReducerTargetComponent();
        }
        if (plan instanceof DataParallelPlan) {
            DataParallelPlan mrPlan = (DataParallelPlan)plan;
            result = this.runMapReduceExecutionPlan((MapReduceExecutionPlan)mrPlan, mapTargetID, (Tuple2<String, InputStream>)sharedMapData, redTargetID, (Tuple2<String, InputStream>)sharedRedData, inputChannel);
        }
        if (plan instanceof TaskParallelPlan) {
            TaskParallelPlan taskPlan = (TaskParallelPlan)plan;
            RecordChannel mapDataSource = null;
            if (plan instanceof MappersShareDataSource) {
                mapDataSource = new RecordChannel(){

                    public DataModel getDataModel() {
                        return inputChannel.getDataModel();
                    }

                    public String getName() {
                        return "SharedDataSource";
                    }

                    public RecordIterator getRecordIterator() {
                        return inputChannel.getRecordIterator();
                    }
                };
            }
            Tuple2 ins = taskPlan.getMapperInstructions();
            for (InputStream inputStream : (Collection)ins._2()) {
                RecordChannel insChannel = ComponentUtils.asRecordChannel((Tuple2<String, InputStream>)new Tuple2(ins._1(), (Object)inputStream));
                RecordChannel[] inputChannels = null;
                inputChannels = mapDataSource != null ? new RecordChannel[]{mapDataSource, insChannel} : new RecordChannel[]{insChannel};
                result = this.runMapReduceExecutionPlan((MapReduceExecutionPlan)taskPlan, mapTargetID, (Tuple2<String, InputStream>)sharedMapData, redTargetID, (Tuple2<String, InputStream>)sharedRedData, inputChannels);
            }
        }
        return result;
    }

    protected DirectorLocalRunner runDelegateExecutionPlan(DelegateExecutionPlan plan, RecordChannel inputChannel) {
        String directorId = plan.getDirectorId();
        String moduleId = plan.getModuleId();
        ContainerChannel containerChannel = plan.getInputContainers();
        Parameters params = plan.getParameters();
        ExecutionDirector delegatDirector = ComponentUtils.getExecutionDirctor(this.directorContext.getStreamExecutorService(), moduleId, directorId);
        return ComponentUtils.runDirector(delegatDirector, this.directorContext, params, containerChannel, inputChannel);
    }

    protected ComponentOutput runMapReduceExecutionPlan(MapReduceExecutionPlan mrPlan, ComponentId mapTargetID, Tuple2<String, InputStream> sharedMapData, ComponentId redTargetID, Tuple2<String, InputStream> sharedRedData, RecordChannel ... inputChannels) {
        Object mapOutChannel;
        ComponentOutput result = null;
        ExecutionStream mapStream = mrPlan.getMapStream();
        IdMapping<ComponentOutput> outputs = ComponentUtils.runComponent(this.directorContext.getStreamExecutorService(), mapStream, mapTargetID, sharedMapData, inputChannels);
        ExecutionStream redStream = mrPlan.getReduceStream();
        if (redStream != null) {
            mapOutChannel = ComponentUtils.getRecordChannel(outputs);
            RecordChannel sortedOuts = this.sortMapOutputChannel(mrPlan, (RecordChannel)mapOutChannel);
            outputs = ComponentUtils.runComponent(this.directorContext.getStreamExecutorService(), redStream, redTargetID, sharedRedData, sortedOuts);
        }
        if ((mapOutChannel = outputs.getItems().iterator()).hasNext()) {
            ComponentOutput comOut;
            result = comOut = (ComponentOutput)mapOutChannel.next();
        }
        Traversable<RecordChannel> recordChs = result.getRecordChannels();
        if (this.cacheManager != null && recordChs != null) {
            ArrayList<RecordChannel> chList = new ArrayList<RecordChannel>();
            while (recordChs.hasNext()) {
                chList.add((RecordChannel)recordChs.next());
            }
            String cacheID = "a";
            this.cacheManager.writeChannels(cacheID, AFUtils.toTraversable(chList.iterator()));
            recordChs = this.filterOutCachedChannels(chList, (Traversable<RecordChannel>)this.cacheManager.getChannels(cacheID));
        }
        return new ComponentOutput((Traversable)recordChs, result.getContainerChannel(), result.getEmptyChannel());
    }

    protected Traversable<RecordChannel> filterOutCachedChannels(List<RecordChannel> orgChs, final Traversable<RecordChannel> cachedChs) {
        final HashSet<String> chNameSet = new HashSet<String>();
        for (RecordChannel ch : orgChs) {
            chNameSet.add(ch.getName());
        }
        final Traversable orgIt = AFUtils.toTraversable(orgChs.iterator());
        return new Traversable<RecordChannel>(){

            public boolean hasNext() {
                return orgIt.hasNext() && cachedChs.hasNext();
            }

            public RecordChannel next() {
                RecordChannel result = null;
                while (cachedChs.hasNext()) {
                    RecordChannel ch = (RecordChannel)cachedChs.next();
                    if (!chNameSet.contains(ch.getName())) continue;
                    result = ch;
                    break;
                }
                return result;
            }
        };
    }

    protected RecordChannel sortMapOutputChannel(MapReduceExecutionPlan mrPlan, final RecordChannel mapOut) {
        RecordIterator it = mapOut.getRecordIterator();
        final ArrayList<Record> recordList = new ArrayList<Record>();
        while (it.hasNext()) {
            Record record = (Record)it.next();
            recordList.add(record);
        }
        Comparator<Record> comparator = null;
        if (mrPlan instanceof SortSpec && mrPlan instanceof DataParallelPlan) {
            SortSpec sortSpec = (SortSpec)mrPlan;
            Tuple2<List<Integer>, List<SortSpec.Direction>> tup = this.exactSortSpec(sortSpec, mapOut.getDataModel());
            final List indexList = (List)tup._1();
            final List dirList = (List)tup._2();
            comparator = new Comparator<Record>(){

                @Override
                public int compare(Record o1, Record o2) {
                    int result = 0;
                    int i = 0;
                    for (Integer index : indexList) {
                        result = o1.getField(index.intValue()).compareTo(o2.getField(index.intValue()));
                        if (dirList.get(i) == SortSpec.Direction.DESC) {
                            result = -result;
                        }
                        if (result != 0) break;
                        ++i;
                    }
                    return result;
                }
            };
        } else {
            comparator = new Comparator<Record>(){

                @Override
                public int compare(Record o1, Record o2) {
                    if (o1 instanceof KeyedRecord && o2 instanceof KeyedRecord) {
                        return ((KeyedRecord)o1).getKey().compareTo(((KeyedRecord)o2).getKey());
                    }
                    return 0;
                }
            };
        }
        Collections.sort(recordList, comparator);
        return new RecordChannel(){

            public DataModel getDataModel() {
                return mapOut.getDataModel();
            }

            public String getName() {
                return mapOut.getName();
            }

            public RecordIterator getRecordIterator() {
                return new RecordIterator(){
                    private final Iterator<Record> recordIt;
                    {
                        this.recordIt = recordList.iterator();
                    }

                    public boolean hasNext() {
                        return this.recordIt.hasNext();
                    }

                    public Record next() {
                        Record record = this.recordIt.next();
                        if (record instanceof KeyedRecord) {
                            return ((KeyedRecord)record).getRecord();
                        }
                        return record;
                    }

                    public float getProgress() {
                        return 1.0f;
                    }
                };
            }
        };
    }

    protected Tuple2<List<Integer>, List<SortSpec.Direction>> exactSortSpec(SortSpec sortSpec, DataModel dataModel) {
        Integer index;
        ArrayList<Integer> indexList = new ArrayList<Integer>();
        ArrayList<Object> dirList = new ArrayList<Object>();
        Fields fields = dataModel.getFields();
        if (sortSpec.getSortingFields() != null) {
            for (Tuple2 val : sortSpec.getSortingFields()) {
                index = fields.getFieldPos((String)val._1());
                if (index == null) {
                    throw new ACException((LocMsgId)ACMessages.INTERNAL_ERROR, new Object[0]);
                }
                indexList.add(index);
                dirList.add(val._2());
            }
        }
        if (sortSpec.getSecondarySortingFields() != null) {
            for (Tuple2 val : sortSpec.getSecondarySortingFields()) {
                index = fields.getFieldPos((String)val._1());
                if (index == null) {
                    throw new ACException((LocMsgId)ACMessages.INTERNAL_ERROR, new Object[0]);
                }
                indexList.add(index);
                dirList.add(val._2());
            }
        }
        return new Tuple2(indexList, dirList);
    }

    protected DirectorExecutionResults asDirectorResults(ComponentOutput comOutput) {
        if (comOutput == null) {
            return null;
        }
        return this.asDirectorResults(comOutput.getContainerChannel(), (Traversable<? extends RecordChannel>)comOutput.getRecordChannels());
    }

    protected DirectorExecutionResults asDirectorResults(ContainerChannel containerChannel, Traversable<? extends RecordChannel> recordChannels) {
        return new DirectorExecutionResults(this.asResultsList(containerChannel, recordChannels));
    }

    protected List<DirectorExecutionResults.Result> asResultsList(final ContainerChannel containerChannel, Traversable<? extends RecordChannel> recordChannels) {
        ArrayList<DirectorExecutionResults.Result> resultList = new ArrayList<DirectorExecutionResults.Result>();
        if (containerChannel != null) {
            DirectorExecutionResults.Result result = new DirectorExecutionResults.Result(){
                {
                    this.resultType = DirectorExecutionResults.Result.ResultType.CONTAINER;
                }

                public void close() throws IOException {
                }

                public RecordChannel asRecordChannel() {
                    return null;
                }

                public ContainerChannel asContainerChannel() {
                    return containerChannel;
                }

                public InputStream asInputStream() {
                    return null;
                }
            };
            resultList.add(result);
        }
        if (recordChannels != null) {
            while (recordChannels.hasNext()) {
                final RecordChannel recordChannel = (RecordChannel)recordChannels.next();
                DirectorExecutionResults.Result result = new DirectorExecutionResults.Result(){
                    {
                        this.resultType = DirectorExecutionResults.Result.ResultType.DATA;
                    }

                    public void close() throws IOException {
                    }

                    public RecordChannel asRecordChannel() {
                        return recordChannel;
                    }

                    public ContainerChannel asContainerChannel() {
                        return null;
                    }

                    public InputStream asInputStream() {
                        return null;
                    }
                };
                resultList.add(result);
            }
        }
        return resultList;
    }

    protected DirectorExecutionResults asDirectorResults(List<ComponentOutput> directorResults) {
        if (directorResults != null) {
            ArrayList<DirectorExecutionResults.Result> resultList = new ArrayList<DirectorExecutionResults.Result>();
            for (ComponentOutput o : directorResults) {
                resultList.addAll(this.asResultsList(o.getContainerChannel(), (Traversable<? extends RecordChannel>)o.getRecordChannels()));
            }
            return new DirectorExecutionResults(resultList);
        }
        return null;
    }
}

