/*
 * Decompiled with CFR 0.152.
 */
package org.jlab.coda.emu.support.transport;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SpinCountBackoffWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.jlab.coda.cMsg.cMsg;
import org.jlab.coda.cMsg.cMsgException;
import org.jlab.coda.cMsg.cMsgMessage;
import org.jlab.coda.cMsg.cMsgUtilities;
import org.jlab.coda.cMsg.common.cMsgMessageFull;
import org.jlab.coda.emu.Emu;
import org.jlab.coda.emu.EmuException;
import org.jlab.coda.emu.EmuModule;
import org.jlab.coda.emu.EmuUtilities;
import org.jlab.coda.emu.support.codaComponent.CODAClass;
import org.jlab.coda.emu.support.codaComponent.CODAState;
import org.jlab.coda.emu.support.control.CmdExecException;
import org.jlab.coda.emu.support.data.ByteBufferItem;
import org.jlab.coda.emu.support.data.ByteBufferSupply;
import org.jlab.coda.emu.support.data.ControlType;
import org.jlab.coda.emu.support.data.EventType;
import org.jlab.coda.emu.support.data.Evio;
import org.jlab.coda.emu.support.data.RingItem;
import org.jlab.coda.emu.support.data.RingItemFactory;
import org.jlab.coda.emu.support.transport.DataChannel;
import org.jlab.coda.emu.support.transport.DataChannelAdapter;
import org.jlab.coda.emu.support.transport.DataTransportException;
import org.jlab.coda.emu.support.transport.DataTransportImplEmu;
import org.jlab.coda.emu.support.transport.TransportType;
import org.jlab.coda.jevio.BlockHeaderV4;
import org.jlab.coda.jevio.DataType;
import org.jlab.coda.jevio.EventWriterUnsync;
import org.jlab.coda.jevio.EvioCompactReaderUnsync;
import org.jlab.coda.jevio.EvioException;
import org.jlab.coda.jevio.EvioNode;
import org.jlab.coda.jevio.EvioNodePool;
import org.jlab.coda.jevio.EvioNodeSource;
import org.jlab.coda.jevio.Utilities;

public class DataChannelImplEmu
extends DataChannelAdapter {
    private final DataTransportImplEmu dataTransportImplEmu;
    private volatile boolean pause;
    private volatile boolean haveInputEndEvent;
    private boolean dumpData;
    private DataOutputHelper dataOutputThread;
    private int sendPort;
    private int tcpSendBuf;
    private boolean noDelay;
    private int connectTimeout;
    private String preferredSubnet;
    private List<String> orderedIpAddrs;
    private int sourceId;
    private cMsg emuDomain;
    private boolean isER;
    private DataInputHelper[] dataInputThread;
    private ParserMerger parserMergerThread;
    private DataInputStream[] in;
    private SocketChannel[] socketChannel;
    private Socket[] socket;
    private int tcpRecvBuf;
    private EvioNodePool[][] nodePools;
    private int maxBufferSize;
    private int recordId = 1;
    private boolean direct;
    private int socketCount;
    private int socketsConnected;
    private long nextRingItem;
    protected ByteBufferSupply[] bbInSupply;

    DataChannelImplEmu(String name, DataTransportImplEmu transport, Map<String, String> attributeMap, boolean input, Emu emu, EmuModule module, int outputIndex) throws DataTransportException {
        super(name, transport, attributeMap, input, emu, module, outputIndex);
        this.dataTransportImplEmu = transport;
        if (input) {
            this.logger.info("      DataChannel Emu: creating input channel " + name);
        } else {
            this.logger.info("      DataChannel Emu: creating output channel " + name);
        }
        this.direct = false;
        String attribString = attributeMap.get("direct");
        if (attribString != null && (attribString.equalsIgnoreCase("false") || attribString.equalsIgnoreCase("off") || attribString.equalsIgnoreCase("no"))) {
            this.direct = false;
        }
        this.socketCount = 1;
        attribString = attributeMap.get("sockets");
        if (attribString != null) {
            try {
                this.socketCount = Integer.parseInt(attribString);
                if (this.socketCount < 1) {
                    this.socketCount = 1;
                }
            }
            catch (NumberFormatException numberFormatException) {
                // empty catch block
            }
        }
        this.logger.info("      DataChannel Emu: TCP socket count = " + this.socketCount);
        if (this.socketCount > 1) {
            this.logger.info("      DataChannel Emu: ************** FAT PIPE ***************");
        }
        if (input) {
            boolean bl = this.isER = emu.getCodaClass() == CODAClass.ER;
            if (this.isER) {
                this.ringBufferIn = RingBuffer.createSingleProducer((EventFactory)new RingItemFactory(), (int)this.inputRingItemCount, (WaitStrategy)new SpinCountBackoffWaitStrategy(30000, (WaitStrategy)new LiteTimeoutBlockingWaitStrategy(10L, TimeUnit.SECONDS)));
            }
            this.tcpRecvBuf = 0;
            attribString = attributeMap.get("recvBuf");
            if (attribString != null) {
                try {
                    this.tcpRecvBuf = Integer.parseInt(attribString);
                    if (this.tcpRecvBuf < 0) {
                        this.tcpRecvBuf = 0;
                    }
                    this.logger.info("      DataChannel Emu: set recvBuf to " + this.tcpRecvBuf);
                }
                catch (NumberFormatException numberFormatException) {
                    // empty catch block
                }
            }
            if ((attribString = attributeMap.get("dump")) != null && (attribString.equalsIgnoreCase("true") || attribString.equalsIgnoreCase("on") || attribString.equalsIgnoreCase("yes"))) {
                this.dumpData = true;
            }
        } else {
            this.noDelay = true;
            attribString = attributeMap.get("noDelay");
            if (attribString != null && (attribString.equalsIgnoreCase("false") || attribString.equalsIgnoreCase("off") || attribString.equalsIgnoreCase("no"))) {
                this.noDelay = false;
            }
            this.logger.info("      DataChannel Emu: noDelay = " + this.noDelay);
            this.tcpSendBuf = 0;
            attribString = attributeMap.get("sendBuf");
            if (attribString != null) {
                try {
                    this.tcpSendBuf = Integer.parseInt(attribString);
                    if (this.tcpSendBuf < 0) {
                        this.tcpSendBuf = 0;
                    }
                    this.logger.info("      DataChannel Emu: sendBuf = " + this.tcpSendBuf);
                }
                catch (NumberFormatException numberFormatException) {
                    // empty catch block
                }
            }
            this.sendPort = 46100;
            attribString = attributeMap.get("port");
            if (attribString != null) {
                try {
                    this.sendPort = Integer.parseInt(attribString);
                    if (this.sendPort < 1024 || this.sendPort > 65535) {
                        this.sendPort = 46100;
                    }
                }
                catch (NumberFormatException numberFormatException) {
                    // empty catch block
                }
            }
            this.logger.info("      DataChannel Emu: sending on port " + this.sendPort);
            this.maxBufferSize = 4000000;
            attribString = attributeMap.get("maxBuf");
            if (attribString != null) {
                try {
                    this.maxBufferSize = Integer.parseInt(attribString);
                    if (this.maxBufferSize < 0) {
                        this.maxBufferSize = 4000000;
                    }
                }
                catch (NumberFormatException numberFormatException) {
                    // empty catch block
                }
            }
            this.logger.info("      DataChannel Emu: max buf size = " + this.maxBufferSize);
            this.connectTimeout = -1;
            attribString = attributeMap.get("timeout");
            if (attribString != null) {
                try {
                    this.connectTimeout = Integer.parseInt(attribString);
                    if (this.connectTimeout < 0) {
                        this.connectTimeout = 3;
                    }
                }
                catch (NumberFormatException numberFormatException) {
                    // empty catch block
                }
            }
            this.logger.info("      DataChannel Emu: timeout = " + this.connectTimeout);
            this.preferredSubnet = null;
            attribString = attributeMap.get("subnet");
            this.preferredSubnet = attribString != null && cMsgUtilities.isDottedDecimal((String)attribString) == null ? null : attribString;
            this.logger.info("      DataChannel Emu: over subnet " + this.preferredSubnet);
        }
        this.channelState = CODAState.PAUSED;
    }

    final void attachToInput(SocketChannel channel, int sourceId, int maxBufferSize, int socketCount, int socketPosition) throws IOException {
        int i;
        int channelCount = this.emu.getInputChannelCount();
        int numBufs = 1024000000 / (maxBufferSize * channelCount);
        numBufs = numBufs < 16 ? 16 : numBufs;
        numBufs = numBufs > 128 ? 128 : numBufs;
        numBufs = 32;
        numBufs = EmuUtilities.powerOfTwo(numBufs, true);
        if (this.socketChannel == null) {
            this.in = new DataInputStream[socketCount];
            this.socket = new Socket[socketCount];
            this.bbInSupply = new ByteBufferSupply[socketCount];
            this.socketChannel = new SocketChannel[socketCount];
            this.dataInputThread = new DataInputHelper[socketCount];
            this.parserMergerThread = new ParserMerger();
            this.nodePools = new EvioNodePool[socketCount][numBufs];
        } else {
            if (socketCount != this.socketCount) {
                System.out.println("Bad socketCount: " + socketCount + ", != previous " + this.socketCount);
            }
            if (sourceId != this.sourceId) {
                System.out.println("Bad sourceId: " + sourceId + ", != previous " + this.sourceId);
            }
        }
        this.sourceId = sourceId;
        this.socketCount = socketCount;
        this.maxBufferSize = maxBufferSize;
        ++this.socketsConnected;
        int index = socketPosition - 1;
        Socket sock = this.socket[index] = channel.socket();
        if (this.tcpRecvBuf > 0) {
            sock.setPerformancePreferences(0, 0, 1);
            sock.setReceiveBufferSize(this.tcpRecvBuf);
        }
        this.socketChannel[index] = channel;
        this.in[index] = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
        boolean sequentialRelease = true;
        if (this.module.getEventProducingThreadCount() > 1) {
            sequentialRelease = false;
        }
        if (this.isER) {
            List<DataChannel> outChannels = this.emu.getOutChannels();
            sequentialRelease = outChannels.size() < 1 || outChannels.size() == 1 && outChannels.get(0).getTransportType() == TransportType.FILE ? false : false;
        }
        for (i = 0; i < numBufs; ++i) {
            this.nodePools[index][i] = new EvioNodePool(3500);
        }
        this.bbInSupply[index] = new ByteBufferSupply(numBufs, maxBufferSize, ByteOrder.BIG_ENDIAN, this.direct, sequentialRelease, this.nodePools[index]);
        this.logger.info("      DataChannel Emu in: connection made from " + this.name);
        this.dataInputThread[index] = new DataInputHelper(socketPosition);
        this.dataInputThread[index].start();
        if (this.socketsConnected == socketCount) {
            this.parserMergerThread.start();
            for (i = 0; i < socketCount; ++i) {
                this.dataInputThread[i].waitUntilStarted();
            }
        }
    }

    private final void openOutputChannel() throws cMsgException {
        if (this.orderedIpAddrs != null && this.orderedIpAddrs.size() > 0) {
            this.directOutputChannel();
        } else {
            this.multicastOutputChannel();
        }
    }

    private final void multicastOutputChannel() throws cMsgException {
        StringBuilder builder = new StringBuilder(256);
        builder.append("emu://multicast:").append(this.sendPort).append('/').append(this.emu.getExpid());
        builder.append('/').append(this.name).append("?codaId=").append(this.getID());
        if (this.maxBufferSize > 0) {
            builder.append("&bufSize=").append(this.maxBufferSize);
        } else {
            builder.append("&bufSize=4000000");
        }
        if (this.connectTimeout > -1) {
            builder.append("&timeout=").append(this.connectTimeout);
        }
        if (this.tcpSendBuf > 0) {
            builder.append("&tcpSend=").append(this.tcpSendBuf);
        }
        if (this.preferredSubnet != null) {
            builder.append("&subnet=").append(this.preferredSubnet);
        }
        if (this.socketCount > 1) {
            builder.append("&sockets=").append(this.socketCount);
        }
        if (this.noDelay) {
            builder.append("&noDelay");
        }
        this.logger.info("      DataChannel Emu out: will connect to server w/ multicast UDL = " + builder.toString());
        this.emuDomain = new cMsg(builder.toString(), this.name, "emu domain client");
        this.emuDomain.connect();
        this.startOutputThread();
    }

    private final void directOutputChannel() throws cMsgException {
        StringBuilder builder = new StringBuilder(256);
        for (String ip : this.orderedIpAddrs) {
            builder.append("emu://").append(ip).append(':').append(this.sendPort);
            builder.append('/').append(this.emu.getExpid()).append('/').append(this.name);
            builder.append("?codaId=").append(this.getID());
            if (this.preferredSubnet != null) {
                builder.append("&subnet=").append(this.preferredSubnet);
            }
            if (this.maxBufferSize > 0) {
                builder.append("&bufSize=").append(this.maxBufferSize);
            } else {
                builder.append("&bufSize=4000000");
            }
            if (this.connectTimeout > -1) {
                builder.append("&timeout=").append(this.connectTimeout);
            }
            if (this.tcpSendBuf > 0) {
                builder.append("&tcpSend=").append(this.tcpSendBuf);
            }
            if (this.socketCount > 1) {
                builder.append("&sockets=").append(this.socketCount);
            }
            if (this.noDelay) {
                builder.append("&noDelay");
            }
            try {
                this.logger.info("      DataChannel Emu out: will directly connect to server w/ UDL = " + builder.toString());
                this.emuDomain = new cMsg(builder.toString(), this.name, "emu domain client");
                this.emuDomain.connect();
                this.startOutputThread();
                return;
            }
            catch (cMsgException e) {
                this.logger.info("      DataChannel Emu out: could not connect to server at " + ip);
                builder.delete(0, builder.length());
            }
        }
        throw new cMsgException("Cannot connect to any given IP addresses directly");
    }

    private final void closeOutputChannel() throws cMsgException {
        if (this.input) {
            return;
        }
        this.emuDomain.disconnect();
    }

    private final void closeInputSockets() {
        if (!this.input) {
            return;
        }
        try {
            for (int i = 0; i < this.socketCount; ++i) {
                this.in[i].close();
                this.socket[i].close();
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Override
    public TransportType getTransportType() {
        return TransportType.EMU;
    }

    @Override
    public int getInputLevel() {
        int level = 0;
        for (int i = 0; i < this.socketCount; ++i) {
            int supplyLevel = this.bbInSupply[i].getFillLevel();
            level = level > supplyLevel ? level : supplyLevel;
        }
        return level;
    }

    @Override
    public void prestart() throws CmdExecException {
        super.prestart();
        this.haveInputEndEvent = false;
        if (this.input) {
            this.channelState = CODAState.PAUSED;
            return;
        }
        try {
            this.orderedIpAddrs = cMsgUtilities.orderIPAddresses(Arrays.asList(this.ipAddrList), Arrays.asList(this.bAddrList), (String)this.preferredSubnet);
            System.out.println("Ordered destination IP list:");
            for (String ip : this.orderedIpAddrs) {
                System.out.println("  " + ip);
            }
            this.openOutputChannel();
        }
        catch (Exception e) {
            this.channelState = CODAState.ERROR;
            this.emu.setErrorState("      DataChannel Emu out: " + e.getMessage());
            throw new CmdExecException(e);
        }
        this.channelState = CODAState.PAUSED;
    }

    @Override
    public void go() {
        this.pause = false;
        this.channelState = CODAState.ACTIVE;
    }

    @Override
    public void pause() {
        this.pause = true;
        this.channelState = CODAState.PAUSED;
    }

    private void interruptThreads() {
        int i;
        if (this.dataInputThread != null) {
            this.parserMergerThread.interrupt();
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            for (i = 0; i < this.socketCount; ++i) {
                if (this.dataInputThread[i] == null) continue;
                this.dataInputThread[i].interrupt();
            }
        }
        if (this.dataOutputThread != null) {
            this.dataOutputThread.interrupt();
            for (i = 0; i < this.socketCount; ++i) {
                this.dataOutputThread.sender[i].endThread();
            }
        }
    }

    private void joinThreads() {
        if (this.dataInputThread != null) {
            try {
                this.parserMergerThread.join(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            for (int i = 0; i < this.socketCount; ++i) {
                if (this.dataInputThread[i] == null) continue;
                try {
                    this.dataInputThread[i].join(1000L);
                    continue;
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
        if (this.dataOutputThread != null) {
            try {
                this.dataOutputThread.join(1000L);
            }
            catch (InterruptedException i) {
                // empty catch block
            }
            for (int i = 0; i < this.socketCount; ++i) {
                try {
                    this.dataOutputThread.sender[i].join(1000L);
                    continue;
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
    }

    @Override
    public void end() {
        int i;
        this.gotEndCmd = true;
        this.gotResetCmd = false;
        this.interruptThreads();
        this.joinThreads();
        if (this.dataInputThread != null) {
            for (i = 0; i < this.socketCount; ++i) {
                this.dataInputThread[i] = null;
            }
            this.dataInputThread = null;
            this.parserMergerThread = null;
            this.closeInputSockets();
        }
        if (this.dataOutputThread != null) {
            for (i = 0; i < this.socketCount; ++i) {
                ((DataOutputHelper)this.dataOutputThread).sender[i] = null;
            }
            this.dataOutputThread = null;
            try {
                this.logger.debug("      DataChannel Emu: end(), close output channel " + this.name);
                this.closeOutputChannel();
            }
            catch (cMsgException cMsgException2) {
                // empty catch block
            }
        }
        this.channelState = CODAState.DOWNLOADED;
    }

    @Override
    public void reset() {
        int i;
        this.gotEndCmd = false;
        this.gotResetCmd = true;
        this.interruptThreads();
        this.joinThreads();
        if (this.dataInputThread != null) {
            for (i = 0; i < this.socketCount; ++i) {
                this.dataInputThread[i] = null;
            }
            this.dataInputThread = null;
            this.parserMergerThread = null;
            this.closeInputSockets();
        }
        if (this.dataOutputThread != null) {
            for (i = 0; i < this.socketCount; ++i) {
                ((DataOutputHelper)this.dataOutputThread).sender[i] = null;
            }
            this.dataOutputThread = null;
            try {
                this.logger.debug("      DataChannel Emu: end(), close output channel " + this.name);
                this.closeOutputChannel();
            }
            catch (cMsgException cMsgException2) {
                // empty catch block
            }
        }
        this.errorMsg.set(null);
        this.channelState = CODAState.CONFIGURED;
    }

    private final void startOutputThread() {
        this.dataOutputThread = new DataOutputHelper();
        this.dataOutputThread.start();
        this.dataOutputThread.waitUntilStarted();
    }

    private final class DataOutputHelper
    extends Thread {
        private int pauseCounter;
        private final CountDownLatch startLatch;
        private EventWriterUnsync writer;
        private ByteBuffer currentBuffer;
        private ByteBufferItem currentBBitem;
        private int currentSenderIndex;
        private final BitSet bitInfo;
        private EventType previousEventType;
        private volatile DataChannelAdapter.ThreadState threadState;
        private volatile long lastSendTime;
        private final SocketSender[] sender;
        private final ByteBufferSupply[] bbOutSupply;
        private long lastBufSendTime;
        private int currentEventCount;
        private final long SLEEP_PRECISION;
        private final long SPIN_YIELD_PRECISION;

        DataOutputHelper() {
            super(DataChannelImplEmu.this.emu.getThreadGroup(), DataChannelImplEmu.this.name() + "_data_out");
            this.startLatch = new CountDownLatch(1);
            this.bitInfo = new BitSet(24);
            this.SLEEP_PRECISION = TimeUnit.MILLISECONDS.toNanos(2L);
            this.SPIN_YIELD_PRECISION = TimeUnit.MICROSECONDS.toNanos(2L);
            boolean orderedRelease = true;
            this.sender = new SocketSender[DataChannelImplEmu.this.socketCount];
            this.bbOutSupply = new ByteBufferSupply[DataChannelImplEmu.this.socketCount];
            for (int i = 0; i < DataChannelImplEmu.this.socketCount; ++i) {
                this.bbOutSupply[i] = new ByteBufferSupply(16, DataChannelImplEmu.this.maxBufferSize, DataChannelImplEmu.this.byteOrder, DataChannelImplEmu.this.direct, orderedRelease);
                this.sender[i] = new SocketSender(this.bbOutSupply[i], i);
                this.sender[i].start();
            }
            try {
                this.currentSenderIndex = 0;
                this.currentBBitem = this.bbOutSupply[this.currentSenderIndex].get();
                this.currentBuffer = this.currentBBitem.getBuffer();
                this.writer = new EventWriterUnsync(this.currentBuffer);
                this.writer.close();
            }
            catch (InterruptedException interruptedException) {
            }
            catch (EvioException evioException) {
                // empty catch block
            }
        }

        private final void waitUntilStarted() {
            try {
                this.startLatch.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        private final void flushEvents(boolean force, boolean userBool, boolean isData) throws InterruptedException {
            this.writer.close();
            if (this.writer.getEventsWritten() < 1) {
                return;
            }
            if (isData && DataChannelImplEmu.this.regulateBufferRate) {
                long now = System.nanoTime();
                long elapsedTime = now - this.lastBufSendTime;
                this.lastBufSendTime = now;
                if (elapsedTime < DataChannelImplEmu.this.nanoSecPerBuf) {
                    long timeLeft = DataChannelImplEmu.this.nanoSecPerBuf - elapsedTime;
                    long end = now + timeLeft;
                    do {
                        if (timeLeft > this.SLEEP_PRECISION) {
                            try {
                                Thread.sleep(1L);
                            }
                            catch (InterruptedException interruptedException) {}
                            continue;
                        }
                        if (timeLeft <= this.SPIN_YIELD_PRECISION) continue;
                        Thread.yield();
                    } while ((timeLeft = end - System.nanoTime()) > 0L);
                    this.lastBufSendTime = System.nanoTime();
                }
            }
            this.currentEventCount = 0;
            this.currentBBitem.setForce(force);
            this.currentBBitem.setUserBoolean(userBool);
            this.currentBuffer.flip();
            this.bbOutSupply[this.currentSenderIndex].publish(this.currentBBitem);
            if (DataChannelImplEmu.this.socketCount > 1) {
                this.currentSenderIndex = (this.currentSenderIndex + 1) % DataChannelImplEmu.this.socketCount;
            }
            this.currentBBitem = this.bbOutSupply[this.currentSenderIndex].get();
            this.currentBuffer = this.currentBBitem.getBuffer();
        }

        private final void flushExistingEvioData() throws InterruptedException {
            if (this.currentEventCount == 0) {
                return;
            }
            if (this.previousEventType.isBuildable()) {
                this.flushEvents(true, false, true);
            } else {
                this.flushEvents(true, false, false);
            }
        }

        private final void writeEvioData(RingItem rItem) throws IOException, EvioException, EmuException, InterruptedException {
            EventType eType = rItem.getEventType();
            boolean isBuildable = eType.isBuildable();
            int eventsWritten = this.writer.getEventsWritten();
            if (DataChannelImplEmu.this.singleEventOut || !isBuildable) {
                this.currentEventCount = 0;
                if (eventsWritten > 0 && !this.writer.isClosed()) {
                    if (this.previousEventType.isBuildable()) {
                        this.flushEvents(false, false, true);
                    } else {
                        this.flushEvents(true, false, false);
                    }
                }
                int blockNum = isBuildable ? DataChannelImplEmu.this.recordId : -1;
                DataChannelImplEmu.this.recordId++;
                if (rItem.getTotalBytes() > this.currentBuffer.capacity()) {
                    this.currentBBitem.ensureCapacity(rItem.getTotalBytes() + 1024);
                    this.currentBuffer = this.currentBBitem.getBuffer();
                }
                EmuUtilities.setEventType(this.bitInfo, eType);
                if (rItem.isFirstEvent()) {
                    EmuUtilities.setFirstEvent(this.bitInfo);
                }
                this.writer.setBuffer(this.currentBuffer, this.bitInfo, blockNum);
                EmuUtilities.unsetFirstEvent(this.bitInfo);
                ByteBuffer buf = rItem.getBuffer();
                if (buf != null) {
                    try {
                        this.writer.writeEvent(buf);
                    }
                    catch (Exception e) {
                        System.out.println("      DataChannel Emu out: single ev buf, pos = " + buf.position() + ", lim = " + buf.limit() + ", cap = " + buf.capacity());
                        Utilities.printBufferBytes((ByteBuffer)buf, (int)0, (int)20, (String)"bad END?");
                        throw e;
                    }
                } else {
                    EvioNode node = rItem.getNode();
                    if (node != null) {
                        this.writer.writeEvent(node, false);
                    } else {
                        throw new EmuException("no data to write");
                    }
                }
                rItem.releaseByteBuffer();
                if (isBuildable) {
                    this.flushEvents(false, false, true);
                } else if (rItem.getControlType() == ControlType.END) {
                    this.flushEvents(true, true, false);
                } else {
                    this.flushEvents(true, false, false);
                }
            } else {
                ByteBuffer buf;
                if (eventsWritten > 0 && !this.writer.isClosed()) {
                    if (this.previousEventType != eType) {
                        this.flushEvents(false, false, false);
                    } else if (!this.writer.hasRoom(rItem.getTotalBytes()) || DataChannelImplEmu.this.regulateBufferRate && this.currentEventCount >= DataChannelImplEmu.this.eventsPerBuffer) {
                        this.flushEvents(false, false, true);
                    }
                }
                if (eventsWritten < 1 || this.writer.isClosed()) {
                    if (rItem.getTotalBytes() > this.currentBuffer.capacity()) {
                        this.currentBBitem.ensureCapacity(rItem.getTotalBytes() + 1024);
                        this.currentBuffer = this.currentBBitem.getBuffer();
                    }
                    EmuUtilities.setEventType(this.bitInfo, eType);
                    this.writer.setBuffer(this.currentBuffer, this.bitInfo, DataChannelImplEmu.this.recordId++);
                }
                if ((buf = rItem.getBuffer()) != null) {
                    this.writer.writeEvent(buf);
                } else {
                    EvioNode node = rItem.getNode();
                    if (node != null) {
                        this.writer.writeEvent(node, false);
                    } else {
                        throw new EmuException("no data to write");
                    }
                }
                ++this.currentEventCount;
                rItem.releaseByteBuffer();
            }
            this.previousEventType = eType;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            ControlType pBankControlType;
            EventType pBankType;
            RingItem ringItem;
            long timeout;
            block21: {
                this.threadState = DataChannelAdapter.ThreadState.RUNNING;
                this.startLatch.countDown();
                try {
                    boolean gotPrestart = false;
                    timeout = 2000L;
                    this.lastSendTime = System.currentTimeMillis();
                    while (true) {
                        block23: {
                            block22: {
                                ringItem = DataChannelImplEmu.this.getNextOutputRingItem(0);
                                pBankType = ringItem.getEventType();
                                pBankControlType = ringItem.getControlType();
                                if (pBankType != EventType.CONTROL) break block22;
                                if (pBankControlType == ControlType.PRESTART) {
                                    if (gotPrestart) {
                                        throw new EmuException("got 2 prestart events");
                                    }
                                    DataChannelImplEmu.this.logger.info("      DataChannel Emu out " + DataChannelImplEmu.this.outputIndex + ": send prestart event");
                                    gotPrestart = true;
                                    this.writeEvioData(ringItem);
                                    break block23;
                                } else {
                                    if (!gotPrestart) {
                                        throw new EmuException("prestart, not " + (Object)((Object)pBankControlType) + ", must be first control event");
                                    }
                                    if (pBankControlType != ControlType.GO && pBankControlType != ControlType.END) {
                                        throw new EmuException("second control event must be go or end");
                                    }
                                    DataChannelImplEmu.this.logger.info("      DataChannel Emu out " + DataChannelImplEmu.this.outputIndex + ": send " + (Object)((Object)pBankControlType) + " event");
                                    this.writeEvioData(ringItem);
                                    DataChannelImplEmu.this.releaseCurrentAndGoToNextOutputRingItem(0);
                                    if (pBankControlType == ControlType.END) {
                                        break;
                                    }
                                    break block21;
                                }
                            }
                            if (pBankType != EventType.USER) throw new EmuException((Object)((Object)pBankType) + " type of events must come after go event");
                            DataChannelImplEmu.this.logger.debug("      DataChannel Emu out " + DataChannelImplEmu.this.outputIndex + ": send user event");
                            this.writeEvioData(ringItem);
                        }
                        DataChannelImplEmu.this.releaseCurrentAndGoToNextOutputRingItem(0);
                    }
                    DataChannelImplEmu.this.logger.info("      DataChannel Emu out: " + DataChannelImplEmu.this.name + " got END event, quitting 1");
                    this.threadState = DataChannelAdapter.ThreadState.DONE;
                    return;
                }
                catch (InterruptedException e) {
                    DataChannelImplEmu.this.logger.warn("      DataChannel Emu out: " + DataChannelImplEmu.this.name + "  interrupted thd, quitting");
                    return;
                }
                catch (Exception e) {
                    e.printStackTrace();
                    DataChannelImplEmu.this.channelState = CODAState.ERROR;
                    System.out.println("      DataChannel Emu out:" + e.getMessage());
                    DataChannelImplEmu.this.emu.setErrorState("DataChannel Emu out: " + e.getMessage());
                }
                return;
            }
            while (true) {
                if (DataChannelImplEmu.this.pause) {
                    if (this.pauseCounter++ % 400 != 0) continue;
                    try {
                        Thread.sleep(5L);
                    }
                    catch (InterruptedException interruptedException) {}
                    continue;
                }
                try {
                    ringItem = DataChannelImplEmu.this.getNextOutputRingItem(DataChannelImplEmu.this.ringIndex);
                }
                catch (InterruptedException e) {
                    return;
                }
                pBankType = ringItem.getEventType();
                pBankControlType = ringItem.getControlType();
                try {
                    this.writeEvioData(ringItem);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    DataChannelImplEmu.this.errorMsg.compareAndSet(null, "Cannot write data: " + e.getMessage());
                    throw e;
                }
                DataChannelImplEmu.this.releaseCurrentAndGoToNextOutputRingItem(DataChannelImplEmu.this.ringIndex);
                if (DataChannelImplEmu.this.outputRingCount > 1 && pBankControlType == null && !pBankType.isUser()) {
                    DataChannelImplEmu.this.setNextEventAndRing();
                }
                if (pBankControlType == ControlType.END) {
                    DataChannelImplEmu.this.logger.info("      DataChannel Emu out: " + DataChannelImplEmu.this.name + " got END event, quitting 2");
                    this.threadState = DataChannelAdapter.ThreadState.DONE;
                    return;
                }
                if (DataChannelImplEmu.this.gotResetCmd) {
                    DataChannelImplEmu.this.logger.info("      DataChannel Emu out: " + DataChannelImplEmu.this.name + " got RESET cmd, quitting");
                    this.threadState = DataChannelAdapter.ThreadState.DONE;
                    return;
                }
                if (DataChannelImplEmu.this.regulateBufferRate || DataChannelImplEmu.this.emu.getTime() - this.lastSendTime <= timeout) continue;
                this.flushExistingEvioData();
            }
        }

        private final class SocketSender
        extends Thread {
            private volatile boolean killThd;
            private final ByteBufferSupply supply;
            private final cMsgMessageFull outGoingMsg;

            SocketSender(ByteBufferSupply supply, int socketIndex) {
                super(DataChannelImplEmu.this.emu.getThreadGroup(), DataChannelImplEmu.this.name() + "_sender_" + socketIndex);
                this.supply = supply;
                this.outGoingMsg = new cMsgMessageFull();
                this.outGoingMsg.setUserInt(1);
                this.outGoingMsg.setSysMsgId(socketIndex);
            }

            final void endThread() {
                this.killThd = true;
                this.interrupt();
            }

            @Override
            public void run() {
                boolean isEnd = false;
                while (!this.killThd) {
                    try {
                        ByteBufferItem item = this.supply.consumerGet();
                        ByteBuffer buf = item.getBufferAsIs();
                        if (DataChannelImplEmu.this.direct) {
                            this.outGoingMsg.setByteArray(buf);
                        } else {
                            this.outGoingMsg.setByteArrayNoCopy(buf.array(), buf.arrayOffset(), buf.remaining());
                        }
                        isEnd = item.getUserBoolean();
                        if (isEnd) {
                            this.outGoingMsg.setUserInt(3);
                        } else {
                            this.outGoingMsg.setUserInt(1);
                        }
                        DataChannelImplEmu.this.emuDomain.send((cMsgMessage)this.outGoingMsg);
                        if (item.getForce()) {
                            try {
                                DataChannelImplEmu.this.emuDomain.flush(0);
                            }
                            catch (cMsgException cMsgException2) {
                                // empty catch block
                            }
                        }
                        if (isEnd) {
                            DataChannelImplEmu.this.endCallback.endWait();
                        }
                        this.supply.release(item);
                    }
                    catch (InterruptedException e) {
                        System.out.println("SocketSender thread interrupted");
                        return;
                    }
                    catch (Exception e) {
                        DataChannelImplEmu.this.channelState = CODAState.ERROR;
                        DataChannelImplEmu.this.emu.setErrorState("DataChannel Emu out: " + e.getMessage());
                        return;
                    }
                    DataOutputHelper.this.lastSendTime = DataChannelImplEmu.this.emu.getTime();
                }
                return;
            }
        }
    }

    private final class ParserMerger
    extends Thread {
        private int expectedRecordId;
        private EvioCompactReaderUnsync reader;

        ParserMerger() {
            super(DataChannelImplEmu.this.emu.getThreadGroup(), DataChannelImplEmu.this.name() + "_parser_merger");
            this.expectedRecordId = 1;
        }

        @Override
        public void run() {
            block8: {
                try {
                    if (DataChannelImplEmu.this.socketCount == 1) {
                        ByteBufferItem item;
                        ByteBufferSupply bbSupply = DataChannelImplEmu.this.bbInSupply[0];
                        while (!this.parseToRing(item = bbSupply.consumerGet(), bbSupply)) {
                        }
                        DataChannelImplEmu.this.logger.info("      DataChannel Emu in: 1 quit parser/merger thread for END event from " + DataChannelImplEmu.this.name);
                        break block8;
                    }
                    int sockIndex = 0;
                    block4: while (true) {
                        ByteBufferSupply[] byteBufferSupplyArray = DataChannelImplEmu.this.bbInSupply;
                        int n = byteBufferSupplyArray.length;
                        int n2 = 0;
                        while (true) {
                            if (n2 >= n) continue block4;
                            ByteBufferSupply bbSupply = byteBufferSupplyArray[n2];
                            ByteBufferItem item = bbSupply.consumerGet();
                            if (this.parseToRing(item, bbSupply)) {
                                DataChannelImplEmu.this.logger.info("      DataChannel Emu in: 2 quit parser/merger thread for END event from " + DataChannelImplEmu.this.name);
                                break block8;
                            }
                            sockIndex = (sockIndex + 1) % 2;
                            ++n2;
                        }
                        break;
                    }
                }
                catch (InterruptedException sockIndex) {
                }
                catch (EvioException e) {
                    DataChannelImplEmu.this.channelState = CODAState.ERROR;
                    DataChannelImplEmu.this.emu.setErrorState("DataChannel Emu in: " + e.getMessage());
                }
            }
        }

        private final boolean parseToRing(ByteBufferItem item, ByteBufferSupply bbSupply) throws EvioException, InterruptedException {
            EvioNodePool pool;
            boolean isUser = false;
            ControlType controlType = null;
            ByteBuffer buf = item.getBuffer();
            try {
                pool = (EvioNodePool)item.getMyObject();
                pool.reset();
                if (this.reader == null) {
                    this.reader = new EvioCompactReaderUnsync(buf, (EvioNodeSource)pool);
                } else {
                    this.reader.setBuffer(buf, (EvioNodeSource)pool);
                }
            }
            catch (EvioException e) {
                System.out.println("      DataChannel Emu in: data NOT evio v4 format 1");
                e.printStackTrace();
                Utilities.printBufferBytes((ByteBuffer)buf, (int)0, (int)80, (String)"BAD BUFFER TO PARSE");
                throw e;
            }
            BlockHeaderV4 blockHeader = this.reader.getFirstBlockHeader();
            if (blockHeader.getVersion() < 4) {
                throw new EvioException("Data not in evio v4 but in version " + blockHeader.getVersion());
            }
            boolean hasFirstEvent = blockHeader.hasFirstEvent();
            EventType eventType = EventType.getEventType(blockHeader.getEventType());
            if (eventType == null || !eventType.isEbFriendly()) {
                throw new EvioException("bad evio format or improper event type");
            }
            DataChannelImplEmu.this.recordId = blockHeader.getNumber();
            this.expectedRecordId = Evio.checkRecordIdSequence(DataChannelImplEmu.this.recordId, this.expectedRecordId, true, eventType, DataChannelImplEmu.this);
            int eventCount = this.reader.getEventCount();
            item.setUsers(eventCount);
            for (int i = 1; i < eventCount + 1; ++i) {
                EventType evType = eventType;
                DataChannelImplEmu.this.nextRingItem = DataChannelImplEmu.this.ringBufferIn.nextIntr(1);
                RingItem ri = (RingItem)DataChannelImplEmu.this.ringBufferIn.get(DataChannelImplEmu.this.nextRingItem);
                EvioNode node = DataChannelImplEmu.this.isER ? this.reader.getEvent(i) : this.reader.getScannedEvent(i, (EvioNodeSource)pool);
                if (node == null) {
                    System.out.println("      DataChannel Emu in: WARNING, event count = " + eventCount + " but get(Scanned)Event(" + i + ") is null - evio parsing bug");
                    continue;
                }
                if (evType == EventType.ROC_RAW) {
                    if (Evio.isUserEvent(node)) {
                        isUser = true;
                        evType = EventType.USER;
                        if (hasFirstEvent) {
                            System.out.println("      DataChannel Emu in: " + DataChannelImplEmu.this.name + "  FIRST event from ROC RAW");
                        } else {
                            System.out.println("      DataChannel Emu in: " + DataChannelImplEmu.this.name + " USER event from ROC RAW");
                        }
                    } else if (!node.getDataTypeObj().isBank()) {
                        DataType eventDataType = node.getDataTypeObj();
                        throw new EvioException("ROC raw record contains " + eventDataType + " instead of banks (data corruption?)");
                    }
                } else if (evType == EventType.CONTROL) {
                    controlType = ControlType.getControlType(node.getTag());
                    DataChannelImplEmu.this.logger.info("      DataChannel Emu in: got " + (Object)((Object)controlType) + " event from " + DataChannelImplEmu.this.name);
                    if (controlType == null) {
                        DataChannelImplEmu.this.logger.info("      DataChannel Emu in: found unidentified control event");
                        throw new EvioException("Found unidentified control event");
                    }
                } else if (evType == EventType.USER) {
                    isUser = true;
                    if (hasFirstEvent) {
                        DataChannelImplEmu.this.logger.info("      DataChannel Emu in: " + DataChannelImplEmu.this.name + " got FIRST event");
                    } else {
                        DataChannelImplEmu.this.logger.info("      DataChannel Emu in: " + DataChannelImplEmu.this.name + " got USER event");
                    }
                } else if (evType == EventType.MIXED) {
                    int num = node.getNum();
                    if (num == 0) {
                        int tag = node.getTag();
                        if (ControlType.isControl(tag)) {
                            controlType = ControlType.getControlType(tag);
                            evType = EventType.CONTROL;
                            DataChannelImplEmu.this.logger.info("      DataChannel Emu in: " + DataChannelImplEmu.this.name + " mixed type to " + controlType.name());
                        } else {
                            isUser = true;
                            evType = EventType.USER;
                            DataChannelImplEmu.this.logger.info("      DataChannel Emu in: " + DataChannelImplEmu.this.name + " mixed type to user type");
                        }
                    } else {
                        DataChannelImplEmu.this.logger.info("      DataChannel Emu in: " + DataChannelImplEmu.this.name + " mixed type to ROC RAW");
                        evType = EventType.ROC_RAW;
                        if (!node.getDataTypeObj().isBank()) {
                            DataType eventDataType = node.getDataTypeObj();
                            throw new EvioException("ROC raw record contains " + eventDataType + " instead of banks (data corruption?)");
                        }
                    }
                } else if (!node.getDataTypeObj().isBank()) {
                    DataType eventDataType = node.getDataTypeObj();
                    throw new EvioException("physics record contains " + eventDataType + " instead of banks (data corruption?)");
                }
                if (DataChannelImplEmu.this.dumpData) {
                    bbSupply.release(item);
                    if (controlType == ControlType.END) {
                        DataChannelImplEmu.this.haveInputEndEvent = true;
                        if (DataChannelImplEmu.this.endCallback == null) break;
                        DataChannelImplEmu.this.endCallback.endWait();
                        break;
                    }
                    if (evType.isBuildable()) continue;
                    ri.setAll(null, null, node, evType, controlType, isUser, hasFirstEvent, DataChannelImplEmu.this.id, DataChannelImplEmu.this.recordId, DataChannelImplEmu.this.sourceId, 1, DataChannelImplEmu.this.name, item, bbSupply);
                    DataChannelImplEmu.this.ringBufferIn.publish(DataChannelImplEmu.this.nextRingItem);
                    continue;
                }
                if (evType.isBuildable()) {
                    ri.setAll(null, null, node, evType, controlType, isUser, hasFirstEvent, DataChannelImplEmu.this.id, DataChannelImplEmu.this.recordId, DataChannelImplEmu.this.sourceId, node.getNum(), DataChannelImplEmu.this.name, item, bbSupply);
                } else {
                    ri.setAll(null, null, node, evType, controlType, isUser, hasFirstEvent, DataChannelImplEmu.this.id, DataChannelImplEmu.this.recordId, DataChannelImplEmu.this.sourceId, 1, DataChannelImplEmu.this.name, item, bbSupply);
                }
                hasFirstEvent = false;
                isUser = false;
                DataChannelImplEmu.this.ringBufferIn.publish(DataChannelImplEmu.this.nextRingItem);
                if (controlType != ControlType.END) continue;
                DataChannelImplEmu.this.haveInputEndEvent = true;
                if (DataChannelImplEmu.this.endCallback == null) break;
                DataChannelImplEmu.this.endCallback.endWait();
                break;
            }
            return DataChannelImplEmu.this.haveInputEndEvent;
        }
    }

    private final class DataInputHelper
    extends Thread {
        private int pauseCounter;
        private final CountDownLatch latch;
        private final DataInputStream inStream;
        private final SocketChannel sockChannel;
        private final ByteBufferSupply bbSupply;
        private final int socketPosition;

        DataInputHelper(int socketPosition) {
            super(DataChannelImplEmu.this.emu.getThreadGroup(), DataChannelImplEmu.this.name() + "_data_in");
            this.pauseCounter = 0;
            this.latch = new CountDownLatch(1);
            int socketIndex = socketPosition - 1;
            this.socketPosition = socketPosition;
            this.inStream = DataChannelImplEmu.this.in[socketIndex];
            this.bbSupply = DataChannelImplEmu.this.bbInSupply[socketIndex];
            this.sockChannel = DataChannelImplEmu.this.socketChannel[socketIndex];
        }

        private final void waitUntilStarted() {
            try {
                this.latch.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        @Override
        public void run() {
            this.latch.countDown();
            int cmd = -1;
            int size = -1;
            boolean delay = false;
            ByteBuffer wordCmdBuf = ByteBuffer.allocate(8);
            IntBuffer ibuf = wordCmdBuf.asIntBuffer();
            try {
                while (true) {
                    ByteBuffer buf;
                    if (DataChannelImplEmu.this.gotResetCmd) {
                        return;
                    }
                    if (delay) {
                        Thread.sleep(5L);
                        delay = false;
                    }
                    if (DataChannelImplEmu.this.pause) {
                        if (this.pauseCounter++ % 400 == 0) {
                            DataChannelImplEmu.this.logger.warn("      DataChannel Emu in: " + DataChannelImplEmu.this.name + " - PAUSED");
                        }
                        Thread.sleep(5L);
                        continue;
                    }
                    ByteBufferItem item = this.bbSupply.get();
                    if (DataChannelImplEmu.this.direct) {
                        this.sockChannel.read(wordCmdBuf);
                        cmd = ibuf.get();
                        size = ibuf.get();
                        ibuf.position(0);
                        wordCmdBuf.position(0);
                        item.ensureCapacity(size);
                        buf = item.getBuffer();
                        buf.limit(size);
                        while (buf.hasRemaining()) {
                            this.sockChannel.read(buf);
                        }
                        buf.flip();
                    } else {
                        long word = this.inStream.readLong();
                        cmd = (int)(word >>> 32 & 0xFFL);
                        size = (int)word;
                        item.ensureCapacity(size);
                        buf = item.getBuffer();
                        buf.limit(size);
                        this.inStream.readFully(item.getBuffer().array(), 0, size);
                    }
                    this.bbSupply.publish(item);
                    if (cmd == 3) break;
                }
                System.out.println("      DataChannel Emu in: " + DataChannelImplEmu.this.name + ", got END event on sock " + this.socketPosition + ", exit reading thd");
                return;
            }
            catch (InterruptedException e) {
                DataChannelImplEmu.this.logger.warn("      DataChannel Emu in: " + DataChannelImplEmu.this.name + ", interrupted, exit reading thd");
            }
            catch (AsynchronousCloseException e) {
                DataChannelImplEmu.this.logger.warn("      DataChannel Emu in: " + DataChannelImplEmu.this.name + ", socket closed, exit reading thd");
            }
            catch (EOFException e) {
                DataChannelImplEmu.this.logger.warn("      DataChannel Emu in: " + DataChannelImplEmu.this.name + ", other end of socket closed for sock " + this.socketPosition + ", exit reading thd");
            }
            catch (Exception e) {
                if (DataChannelImplEmu.this.haveInputEndEvent) {
                    System.out.println("      DataChannel Emu in: " + DataChannelImplEmu.this.name + ", exception but aleady have END event, so exit reading thd");
                    return;
                }
                DataChannelImplEmu.this.channelState = CODAState.ERROR;
                String errString = "DataChannel Emu in: error reading " + DataChannelImplEmu.this.name;
                if (e.getMessage() != null) {
                    errString = errString + ' ' + e.getMessage();
                }
                DataChannelImplEmu.this.emu.setErrorState(errString);
            }
        }
    }
}

