/*
 * Decompiled with CFR 0.152.
 */
package org.jlab.coda.emu.support.transport;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import org.jlab.coda.cMsg.cMsgCallbackAdapter;
import org.jlab.coda.cMsg.cMsgCallbackInterface;
import org.jlab.coda.cMsg.cMsgException;
import org.jlab.coda.cMsg.cMsgMessage;
import org.jlab.coda.cMsg.cMsgSubscriptionHandle;
import org.jlab.coda.emu.Emu;
import org.jlab.coda.emu.EmuException;
import org.jlab.coda.emu.EmuModule;
import org.jlab.coda.emu.EmuUtilities;
import org.jlab.coda.emu.support.codaComponent.CODAClass;
import org.jlab.coda.emu.support.codaComponent.CODAState;
import org.jlab.coda.emu.support.data.ByteBufferItem;
import org.jlab.coda.emu.support.data.ByteBufferSupply;
import org.jlab.coda.emu.support.data.ControlType;
import org.jlab.coda.emu.support.data.EventType;
import org.jlab.coda.emu.support.data.Evio;
import org.jlab.coda.emu.support.data.RingItem;
import org.jlab.coda.emu.support.transport.DataChannelAdapter;
import org.jlab.coda.emu.support.transport.DataTransportException;
import org.jlab.coda.emu.support.transport.DataTransportImplCmsg;
import org.jlab.coda.emu.support.transport.TransportType;
import org.jlab.coda.jevio.BlockHeaderV4;
import org.jlab.coda.jevio.EventWriterUnsync;
import org.jlab.coda.jevio.EvioCompactReaderUnsync;
import org.jlab.coda.jevio.EvioException;
import org.jlab.coda.jevio.EvioNode;

public class DataChannelImplCmsg
extends DataChannelAdapter {
    private final DataTransportImplCmsg dataTransportImplCmsg;
    private String subject;
    private String type;
    private boolean pause;
    private volatile boolean haveOutputEndEvent;
    private boolean isER;
    private cMsgSubscriptionHandle sub;
    private int writeThreadCount;
    private DataOutputHelper dataOutputThread;
    private int outputSizeLimit = 256000;
    private int outputCountLimit = 1000;
    private int recordId;

    private final void messageToBuf(cMsgMessage msg) {
        try {
            byte[] data = msg.getByteArray();
            if (data == null) {
                this.channelState = CODAState.ERROR;
                this.emu.setErrorState("DataChannel cmsg in: cMsg message has no data");
                return;
            }
            ByteBuffer buf = ByteBuffer.wrap(data);
            EvioCompactReaderUnsync compactReader = new EvioCompactReaderUnsync(buf);
            BlockHeaderV4 blockHeader = compactReader.getFirstBlockHeader();
            if (blockHeader.getVersion() < 4) {
                this.channelState = CODAState.ERROR;
                this.emu.setErrorState("DataChannel cmsg in: data NOT evio v4 format");
                return;
            }
            boolean isUser = false;
            boolean hasFirstEvent = blockHeader.hasFirstEvent();
            EventType eventType = EventType.getEventType(blockHeader.getEventType());
            int recordId = blockHeader.getNumber();
            int sourceId = blockHeader.getReserved1();
            int eventCount = compactReader.getEventCount();
            ControlType controlType = null;
            for (int i = 1; i < eventCount + 1; ++i) {
                EvioNode node = this.isER ? compactReader.getEvent(i) : compactReader.getScannedEvent(i);
                EventType bankType = eventType;
                if (eventType == EventType.ROC_RAW) {
                    if (Evio.isUserEvent(node)) {
                        bankType = EventType.USER;
                        isUser = true;
                    }
                } else if (eventType == EventType.CONTROL) {
                    controlType = ControlType.getControlType(node.getTag());
                    if (controlType == null) {
                        this.channelState = CODAState.ERROR;
                        this.emu.setErrorState("DataChannel cmsg in: found unidentified control event");
                        return;
                    }
                } else if (eventType == EventType.USER) {
                    isUser = true;
                }
                long nextRingItem = this.ringBufferIn.next();
                RingItem ringItem = (RingItem)this.ringBufferIn.get(nextRingItem);
                if (bankType.isBuildable()) {
                    ringItem.setAll(null, null, node, bankType, controlType, isUser, hasFirstEvent, this.id, recordId, sourceId, node.getNum(), this.name, null, null);
                } else {
                    ringItem.setAll(null, null, node, bankType, controlType, isUser, hasFirstEvent, this.id, recordId, sourceId, 1, this.name, null, null);
                }
                hasFirstEvent = false;
                isUser = false;
                this.ringBufferIn.publish(nextRingItem);
                if (controlType != ControlType.END) continue;
                this.logger.info("      DataChannel Emu in: " + this.name + " found END event");
                if (this.endCallback != null) {
                    this.endCallback.endWait();
                }
                return;
            }
        }
        catch (Exception e) {
            this.channelState = CODAState.ERROR;
            this.emu.setErrorState("DataChannel cmsg in: " + e.getMessage());
        }
    }

    DataChannelImplCmsg(String name, DataTransportImplCmsg transport, Map<String, String> attributeMap, boolean input, Emu emu, EmuModule module, int outputIndex) throws DataTransportException {
        super(name, transport, attributeMap, input, emu, module, outputIndex);
        this.dataTransportImplCmsg = transport;
        this.subject = attributeMap.get("subject");
        if (this.subject == null) {
            this.subject = name;
        }
        this.type = attributeMap.get("type");
        if (this.type == null) {
            this.type = "data";
        }
        if (input) {
            this.isER = emu.getCodaClass() == CODAClass.ER;
            try {
                ReceiveMsgCallback cb = new ReceiveMsgCallback();
                this.sub = this.dataTransportImplCmsg.getCmsgConnection().subscribe(this.subject, this.type, (cMsgCallbackInterface)cb, null);
            }
            catch (cMsgException e) {
                this.logger.info("      DataChannel cmsg: " + e.getMessage());
                throw new DataTransportException((Exception)((Object)e));
            }
        }
        emu.addOutputDestination("cMsg");
        this.writeThreadCount = 2;
        String attribString = attributeMap.get("wthreads");
        if (attribString != null) {
            try {
                this.writeThreadCount = Integer.parseInt(attribString);
                if (this.writeThreadCount < 1) {
                    this.writeThreadCount = 1;
                }
                if (this.writeThreadCount > 10) {
                    this.writeThreadCount = 10;
                }
            }
            catch (NumberFormatException numberFormatException) {
                // empty catch block
            }
        }
        this.logger.info("      DataChannel cmsg: write threads = " + this.writeThreadCount);
        this.dataOutputThread = new DataOutputHelper(emu.getThreadGroup(), this.name() + " data out");
        this.dataOutputThread.start();
        this.dataOutputThread.waitUntilStarted();
    }

    @Override
    public TransportType getTransportType() {
        return TransportType.CMSG;
    }

    @Override
    public int getInputLevel() {
        return (int)(100L * (this.ringBufferIn.getCursor() - this.ringBufferIn.getMinimumGatingSequence()) / (long)this.ringBufferIn.getBufferSize());
    }

    @Override
    public void go() {
        if (this.input) {
            this.sub.restart();
        } else {
            this.pause = false;
        }
    }

    @Override
    public void pause() {
        if (this.input) {
            this.sub.pause();
        } else {
            this.pause = true;
        }
    }

    private void interruptAndJoinThreads() {
        try {
            if (this.dataOutputThread != null) {
                this.dataOutputThread.interrupt();
                this.dataOutputThread.shutdown();
                this.dataOutputThread.join(1000L);
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    public void end() {
        this.logger.warn("      DataChannel cmsg: end() " + this.name);
        this.gotEndCmd = true;
        this.gotResetCmd = false;
        this.interruptAndJoinThreads();
        if (this.sub != null) {
            try {
                this.dataTransportImplCmsg.getCmsgConnection().unsubscribe(this.sub);
            }
            catch (cMsgException cMsgException2) {
                // empty catch block
            }
        }
        this.channelState = CODAState.DOWNLOADED;
        this.logger.debug("      DataChannel cmsg: end() " + this.name + " done");
    }

    @Override
    public void reset() {
        this.logger.debug("      DataChannel cmsg: reset() " + this.name);
        this.gotEndCmd = false;
        this.gotResetCmd = true;
        this.interruptAndJoinThreads();
        if (this.sub != null) {
            try {
                this.dataTransportImplCmsg.getCmsgConnection().unsubscribe(this.sub);
            }
            catch (cMsgException cMsgException2) {
                // empty catch block
            }
        }
        this.errorMsg.set(null);
        this.channelState = CODAState.CONFIGURED;
        this.logger.debug("      DataChannel cmsg: reset() " + this.name + " done");
    }

    private class DataOutputHelper
    extends Thread {
        private Phaser phaser;
        private int pauseCounter;
        private ExecutorService writeThreadPool;
        private CountDownLatch startLatch;
        private volatile DataChannelAdapter.ThreadState threadState;

        DataOutputHelper(ThreadGroup group, String name) {
            super(group, name);
            this.startLatch = new CountDownLatch(1);
            try {
                this.writeThreadPool = Executors.newFixedThreadPool(DataChannelImplCmsg.this.writeThreadCount);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }

        private void waitUntilStarted() {
            try {
                this.startLatch.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        private void shutdown() {
            this.writeThreadPool.shutdown();
            if (DataChannelImplCmsg.this.gotEndCmd) {
                try {
                    this.writeThreadPool.awaitTermination(1000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }

        private void writeMessages(cMsgMessage[] msgs, int messages2Write, EvWriter[] writers) throws InterruptedException, cMsgException {
            for (int i = 0; i < messages2Write; ++i) {
                DataChannelImplCmsg.this.dataTransportImplCmsg.getCmsgConnection().send(msgs[i]);
                writers[i].releaseBuffer();
            }
        }

        @Override
        public void run() {
            this.threadState = DataChannelAdapter.ThreadState.RUNNING;
            this.startLatch.countDown();
            try {
                EvWriter[] writers = new EvWriter[DataChannelImplCmsg.this.writeThreadCount];
                RingItem firstBankFromRing = null;
                long timeout = 2000L;
                int[] recordIds = new int[DataChannelImplCmsg.this.writeThreadCount];
                int[] bankListSize = new int[DataChannelImplCmsg.this.writeThreadCount];
                cMsgMessage[] msgs = new cMsgMessage[DataChannelImplCmsg.this.writeThreadCount];
                ArrayList[] bankListArray = new ArrayList[DataChannelImplCmsg.this.writeThreadCount];
                for (int i = 0; i < DataChannelImplCmsg.this.writeThreadCount; ++i) {
                    bankListArray[i] = new ArrayList(200);
                    msgs[i] = new cMsgMessage();
                    msgs[i].setSubject(DataChannelImplCmsg.this.subject);
                    msgs[i].setType(DataChannelImplCmsg.this.type);
                }
                int outputRingIndex = 0;
                boolean gotPrestart = false;
                boolean doneWithFirstUsersAndControls = false;
                this.phaser = new Phaser(1);
                while (DataChannelImplCmsg.this.dataTransportImplCmsg.getCmsgConnection().isConnected()) {
                    int i;
                    ControlType pBankControlType;
                    int pBankSize;
                    EventType pBankType;
                    RingItem ringItem;
                    if (DataChannelImplCmsg.this.pause) {
                        if (this.pauseCounter++ % 400 != 0) continue;
                        Thread.sleep(5L);
                        continue;
                    }
                    if (DataChannelImplCmsg.this.gotResetCmd) {
                        this.shutdown();
                        return;
                    }
                    for (int j = 0; j < DataChannelImplCmsg.this.writeThreadCount; ++j) {
                        bankListArray[j].clear();
                    }
                    int eventCount = 0;
                    int messages2Write = 0;
                    int nextMsgListIndex = 0;
                    int thisMsgListIndex = 0;
                    ArrayList bankList = bankListArray[thisMsgListIndex];
                    bankList.clear();
                    int listTotalSizeMax = 32;
                    EventType previousType = null;
                    long startTime = System.currentTimeMillis();
                    if (!doneWithFirstUsersAndControls) {
                        ringItem = DataChannelImplCmsg.this.getNextOutputRingItem(0);
                        pBankType = ringItem.getEventType();
                        pBankSize = ringItem.getTotalBytes();
                        pBankControlType = ringItem.getControlType();
                        if (pBankType == EventType.CONTROL) {
                            if (pBankControlType == ControlType.PRESTART) {
                                if (gotPrestart) {
                                    throw new EmuException("got 2 prestart events");
                                }
                                DataChannelImplCmsg.this.logger.debug("      DataChannel cmsg out " + DataChannelImplCmsg.this.outputIndex + ": found prestart event");
                                gotPrestart = true;
                            } else {
                                if (!gotPrestart) {
                                    throw new EmuException("prestart, not " + (Object)((Object)pBankControlType) + ", must be first control event");
                                }
                                if (pBankControlType != ControlType.GO && pBankControlType != ControlType.END) {
                                    throw new EmuException("second control event must be go or end");
                                }
                                DataChannelImplCmsg.this.logger.debug("      DataChannel cmsg out " + DataChannelImplCmsg.this.outputIndex + ": found " + (Object)((Object)pBankControlType) + " event");
                                doneWithFirstUsersAndControls = true;
                                outputRingIndex = DataChannelImplCmsg.this.ringIndex;
                            }
                        } else if (pBankType != EventType.USER) {
                            throw new EmuException((Object)((Object)pBankType) + " type of events must come after go event");
                        }
                        bankList.add(ringItem);
                        bankListSize[0] = pBankSize + 64;
                        recordIds[0] = -1;
                        nextMsgListIndex = 1;
                        DataChannelImplCmsg.this.gotoNextRingItem(0);
                    } else {
                        do {
                            if (firstBankFromRing != null) {
                                ringItem = firstBankFromRing;
                                firstBankFromRing = null;
                            } else {
                                try {
                                    ringItem = DataChannelImplCmsg.this.getNextOutputRingItem(outputRingIndex);
                                }
                                catch (InterruptedException e) {
                                    return;
                                }
                            }
                            ++eventCount;
                            pBankType = ringItem.getEventType();
                            pBankSize = ringItem.getTotalBytes();
                            pBankControlType = ringItem.getControlType();
                            listTotalSizeMax += pBankSize + 32;
                            if (previousType == null) {
                                bankList.add(ringItem);
                                recordIds[thisMsgListIndex] = pBankType.isAnyPhysics() || pBankType.isROCRaw() ? DataChannelImplCmsg.this.recordId++ : -1;
                                bankListSize[thisMsgListIndex] = listTotalSizeMax;
                                ++nextMsgListIndex;
                            } else if (DataChannelImplCmsg.this.singleEventOut || previousType != pBankType || listTotalSizeMax >= DataChannelImplCmsg.this.outputSizeLimit || bankList.size() + 1 > DataChannelImplCmsg.this.outputCountLimit) {
                                bankListSize[thisMsgListIndex] = listTotalSizeMax - pBankSize - 32;
                                if (nextMsgListIndex >= DataChannelImplCmsg.this.writeThreadCount) {
                                    firstBankFromRing = ringItem;
                                    break;
                                }
                                bankList = bankListArray[nextMsgListIndex];
                                bankList.clear();
                                bankList.add(ringItem);
                                bankListSize[nextMsgListIndex] = listTotalSizeMax = pBankSize + 64;
                                recordIds[nextMsgListIndex] = pBankType.isAnyPhysics() || pBankType.isROCRaw() ? DataChannelImplCmsg.this.recordId++ : -1;
                                ++thisMsgListIndex;
                                ++nextMsgListIndex;
                            } else {
                                bankList.add(ringItem);
                                bankListSize[thisMsgListIndex] = listTotalSizeMax;
                            }
                            previousType = pBankType;
                            ringItem.setAttachment(Boolean.FALSE);
                            DataChannelImplCmsg.this.gotoNextRingItem(outputRingIndex);
                            if (pBankControlType != null) {
                                if (pBankControlType == ControlType.END) {
                                    ringItem.setAttachment(Boolean.TRUE);
                                    DataChannelImplCmsg.this.haveOutputEndEvent = true;
                                    System.out.println("      DataChannel cmsg out " + DataChannelImplCmsg.this.outputIndex + ": I got END event, quitting 2, byteOrder = " + ringItem.getByteOrder());
                                    if (DataChannelImplCmsg.this.endCallback == null) break;
                                    DataChannelImplCmsg.this.endCallback.endWait();
                                    break;
                                }
                                throw new EmuException("only END control event allowed here");
                            }
                            if (DataChannelImplCmsg.this.outputRingCount <= 1 || pBankType.isUser()) continue;
                            outputRingIndex = DataChannelImplCmsg.this.setNextEventAndRing();
                        } while (eventCount < DataChannelImplCmsg.this.outputRingItemCount * 3 / 4 && DataChannelImplCmsg.this.emu.getTime() - startTime <= timeout && !DataChannelImplCmsg.this.gotResetCmd && nextMsgListIndex < DataChannelImplCmsg.this.writeThreadCount);
                    }
                    if (DataChannelImplCmsg.this.gotResetCmd) {
                        this.shutdown();
                        return;
                    }
                    this.phaser.bulkRegister(nextMsgListIndex);
                    for (i = 0; i < nextMsgListIndex; ++i) {
                        bankList = bankListArray[i];
                        if (bankList.size() < 1) continue;
                        if (writers[i] == null) {
                            writers[i] = new EvWriter(bankList, msgs[i], bankListSize[i], recordIds[i]);
                        } else {
                            writers[i].setupWriter(bankList, msgs[i], bankListSize[i], recordIds[i]);
                        }
                        this.writeThreadPool.execute(writers[i]);
                        ++messages2Write;
                    }
                    this.phaser.arriveAndAwaitAdvance();
                    try {
                        this.writeMessages(msgs, messages2Write, writers);
                    }
                    catch (cMsgException e) {
                        DataChannelImplCmsg.this.errorMsg.compareAndSet(null, "Cannot communicate with cMsg server");
                        throw e;
                    }
                    for (i = 0; i < DataChannelImplCmsg.this.outputRingCount; ++i) {
                        DataChannelImplCmsg.this.releaseOutputRingItem(i);
                    }
                    if (!DataChannelImplCmsg.this.haveOutputEndEvent) continue;
                    System.out.println("      DataChannel cmsg out: " + DataChannelImplCmsg.this.name + " some thd got END event, quitting 4");
                    this.shutdown();
                    this.threadState = DataChannelAdapter.ThreadState.DONE;
                    return;
                }
            }
            catch (InterruptedException e) {
                DataChannelImplCmsg.this.logger.warn("      DataChannel cmsg out: " + DataChannelImplCmsg.this.name + "  interrupted thd, exiting");
            }
            catch (Exception e) {
                DataChannelImplCmsg.this.channelState = CODAState.ERROR;
                DataChannelImplCmsg.this.emu.setErrorState("DataChannel cmsg out: " + e.getMessage());
                DataChannelImplCmsg.this.logger.warn("      DataChannel cmsg out: " + DataChannelImplCmsg.this.name + " exit thd: " + e.getMessage());
            }
            this.threadState = DataChannelAdapter.ThreadState.DONE;
        }

        private class EvWriter
        implements Runnable {
            private ArrayList<RingItem> bankList;
            private ByteBuffer buffer;
            private EventWriterUnsync evWriter;
            private cMsgMessage msg;
            private ByteBufferSupply bbSupply = new ByteBufferSupply(8, 0x200B20);
            private ByteBufferItem bufItem;

            EvWriter(ArrayList<RingItem> bankList, cMsgMessage msg, int bankByteSize, int myRecordId) {
                try {
                    this.setupWriter(bankList, msg, bankByteSize, myRecordId);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }

            void setupWriter(ArrayList<RingItem> bankList, cMsgMessage msg, int bankByteSize, int myRecordId) throws InterruptedException {
                this.msg = msg;
                this.bankList = bankList;
                this.bufItem = this.bbSupply.get();
                this.bufItem.ensureCapacity(bankByteSize);
                this.buffer = this.bufItem.getBuffer();
                this.buffer.order(DataChannelImplCmsg.this.byteOrder);
                BitSet bitInfo = new BitSet(24);
                EmuUtilities.setEventType(bitInfo, bankList.get(0).getEventType());
                if (bankList.get(0).isFirstEvent()) {
                    EmuUtilities.setFirstEvent(bitInfo);
                }
                try {
                    if (this.evWriter == null) {
                        this.evWriter = new EventWriterUnsync(this.buffer, 550000, 200, null, bitInfo, DataChannelImplCmsg.this.emu.getCodaid(), myRecordId);
                    } else {
                        this.evWriter.setBuffer(this.buffer, bitInfo, myRecordId);
                    }
                }
                catch (EvioException evioException) {
                    // empty catch block
                }
            }

            void releaseBuffer() {
                this.bbSupply.release(this.bufItem);
            }

            @Override
            public void run() {
                try {
                    for (RingItem ri : this.bankList) {
                        ByteBuffer buf = ri.getBuffer();
                        EvioNode node = ri.getNode();
                        if (buf != null) {
                            this.evWriter.writeEvent(buf);
                        } else if (node != null) {
                            buf = ri.getNode().getBufferNode().getBuffer();
                            this.evWriter.writeEvent(ri.getNode(), false);
                        }
                        ri.releaseByteBuffer();
                    }
                    this.evWriter.close();
                    this.buffer.flip();
                    this.msg.setByteArrayNoCopy(this.buffer.array(), 0, this.buffer.limit());
                    this.msg.setByteArrayEndian(DataChannelImplCmsg.this.byteOrder == ByteOrder.BIG_ENDIAN ? 0 : 1);
                    DataOutputHelper.this.phaser.arriveAndDeregister();
                }
                catch (Exception e) {
                    e.printStackTrace();
                    DataChannelImplCmsg.this.channelState = CODAState.ERROR;
                    DataChannelImplCmsg.this.emu.setErrorState("DataChannel cmsg out: " + e.getMessage());
                }
            }
        }
    }

    class ReceiveMsgCallback
    extends cMsgCallbackAdapter {
        ReceiveMsgCallback() {
        }

        public void callback(cMsgMessage msg, Object userObject) {
            DataChannelImplCmsg.this.messageToBuf(msg);
        }

        public int getMaximumQueueSize() {
            return 100000;
        }
    }
}

