/*
 * Decompiled with CFR 0.152.
 */
package org.jlab.coda.emu.test;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import org.jlab.coda.emu.support.data.ByteBufferItem;
import org.jlab.coda.emu.support.data.ByteBufferSupply;
import org.jlab.coda.emu.support.data.ControlType;
import org.jlab.coda.emu.support.data.EventType;
import org.jlab.coda.jevio.BlockHeaderV4;
import org.jlab.coda.jevio.EvioCompactReaderUnsync;
import org.jlab.coda.jevio.EvioException;
import org.jlab.coda.jevio.EvioNode;

class InputDataChannelImplEmu {
    private volatile boolean haveInputEndEvent;
    private boolean noDelay;
    private int sourceId;
    private DataInputHelper[] dataInputThread;
    private DataInputStream[] in;
    private int tcpRecvBuf;
    private int maxBufferSize;
    private int socketCount;
    private int socketsConnected;
    protected ByteBufferSupply[] bbSupply;
    StatisticsThread statThread;

    InputDataChannelImplEmu() {
    }

    void attachToInput(SocketChannel channel, int sourceId, int maxBufferSize, int socketCount, int socketPosition) throws IOException {
        if (this.in == null) {
            this.in = new DataInputStream[socketCount];
            this.bbSupply = new ByteBufferSupply[socketCount];
            this.dataInputThread = new DataInputHelper[socketCount];
            this.statThread = new StatisticsThread(socketCount);
        } else {
            if (socketCount != this.socketCount) {
                System.out.println("Bad socketCount: " + socketCount + ", != previous " + this.socketCount);
            }
            if (sourceId != this.sourceId) {
                System.out.println("Bad sourceId: " + sourceId + ", != previous " + this.sourceId);
            }
        }
        this.sourceId = sourceId;
        this.socketCount = socketCount;
        this.maxBufferSize = maxBufferSize;
        Socket socket = channel.socket();
        ++this.socketsConnected;
        System.out.println("attachToInput: socketsConnected = " + this.socketsConnected + ", socketCount = " + socketCount);
        if (this.tcpRecvBuf > 0) {
            socket.setPerformancePreferences(0, 0, 1);
            socket.setReceiveBufferSize(this.tcpRecvBuf);
        }
        this.in[socketPosition - 1] = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
        this.bbSupply[socketPosition - 1] = new ByteBufferSupply(16, maxBufferSize);
        this.dataInputThread[socketPosition - 1] = new DataInputHelper(socketPosition - 1);
        if (this.socketsConnected == socketCount) {
            System.out.println("Trying to start statThread");
            this.statThread.start();
            for (int i = 0; i < socketCount; ++i) {
                this.dataInputThread[i].setStatThread(this.statThread);
                this.dataInputThread[i].waitUntilStarted();
            }
        }
    }

    private class StatisticsThread
    extends Thread {
        private boolean init;
        private long totalBytes;
        private long oldVal;
        private long totalT;
        private long totalCount;
        long localByteCount;
        long t;
        long deltaT;
        long deltaCount;
        private int skip = 2;
        private int timeInterval = 5;
        private int socketCount = 1;
        private volatile int byteCount;

        public StatisticsThread(int socketCount) {
            this.socketCount = socketCount;
        }

        public void clear() {
            this.totalBytes = 0L;
            this.totalT = 0L;
            this.totalCount = 0L;
            this.byteCount = 0;
            this.oldVal = 0L;
            this.skip = 2;
            this.init = true;
            this.t = System.currentTimeMillis();
        }

        public void addBytes(int bytes) {
            this.byteCount += bytes;
        }

        @Override
        public void run() {
            int sleepTime = 1000 * this.timeInterval;
            this.clear();
            while (true) {
                try {
                    Thread.sleep(sleepTime);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                this.localByteCount = this.byteCount;
                this.deltaT = System.currentTimeMillis() - this.t;
                if (this.localByteCount > 0L) {
                    if (!this.init) {
                        this.totalBytes += this.localByteCount;
                    }
                    if (this.skip-- < 1) {
                        this.totalT += this.deltaT;
                        this.deltaCount = this.totalBytes - this.oldVal;
                        this.totalCount += this.deltaCount;
                        System.out.printf("%3.2e bytes/s in %d sec, %3.2e avg,  %d sockets\n", (double)this.deltaCount * 1000.0 / (double)this.deltaT, this.timeInterval, (double)this.totalCount * 1000.0 / (double)this.totalT, this.socketCount);
                    } else {
                        System.out.printf("%3.2e bytes/s in %d sec,  %d sockets\n", (double)this.localByteCount * 1000.0 / (double)this.deltaT, this.timeInterval, this.socketCount);
                    }
                    this.t = System.currentTimeMillis();
                    this.init = false;
                    this.oldVal = this.totalBytes;
                    this.byteCount = 0;
                    continue;
                }
                System.out.println("No bytes read in the last " + this.timeInterval + " seconds.");
                this.clear();
            }
        }
    }

    private class DataInputHelper
    extends Thread {
        private CountDownLatch latch = new CountDownLatch(1);
        private EvioCompactReaderUnsync compactReader;
        private final int socketIndex;
        private DataInputStream inStream;
        private StatisticsThread statThread;

        DataInputHelper(int socketIndex) {
            this.socketIndex = socketIndex;
            this.inStream = InputDataChannelImplEmu.this.in[socketIndex];
            System.out.println("      DataChannel Emu in: start EMU input thread");
            this.start();
        }

        public void setStatThread(StatisticsThread statThread) {
            this.statThread = statThread;
        }

        private void waitUntilStarted() {
            try {
                this.latch.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            this.latch.countDown();
            try {
                boolean delay = false;
                do {
                    int command = this.inStream.readInt();
                    switch (command & 0xFF) {
                        case 1: {
                            this.handleEvioFileToBuf();
                            break;
                        }
                        case 3: {
                            this.handleEvioFileToBuf();
                            return;
                        }
                        case 2: {
                            System.out.println("      DataChannel Emu in: get emuEnd cmd");
                        }
                    }
                } while (!InputDataChannelImplEmu.this.haveInputEndEvent);
                return;
            }
            catch (Exception e) {
                System.out.println("      DataChannel Emu in: exit thd: " + e.getMessage());
            }
        }

        private final void handleEvioFileToBuf() throws IOException, EvioException, InterruptedException {
            ControlType controlType = null;
            ByteBufferItem bbItem = InputDataChannelImplEmu.this.bbSupply[this.socketIndex].get();
            int evioBytes = this.inStream.readInt();
            this.statThread.addBytes(evioBytes + 8);
            bbItem.ensureCapacity(evioBytes);
            ByteBuffer buf = bbItem.getBuffer();
            buf.position(0).limit(evioBytes);
            this.inStream.readFully(buf.array(), 0, evioBytes);
            try {
                if (this.compactReader == null) {
                    this.compactReader = new EvioCompactReaderUnsync(buf);
                } else {
                    this.compactReader.setBuffer(buf);
                }
            }
            catch (EvioException e) {
                e.printStackTrace();
                throw e;
            }
            BlockHeaderV4 blockHeader = this.compactReader.getFirstBlockHeader();
            if (blockHeader.getVersion() < 4) {
                throw new EvioException("Evio data needs to be written in version 4+ format");
            }
            EventType eventType = EventType.getEventType(blockHeader.getEventType());
            int eventCount = this.compactReader.getEventCount();
            bbItem.setUsers(eventCount);
            for (int i = 1; i < eventCount + 1; ++i) {
                EvioNode node = this.compactReader.getScannedEvent(i);
                if (eventType == EventType.CONTROL && (controlType = ControlType.getControlType(node.getTag())) == null) {
                    throw new EvioException("Found unidentified control event");
                }
                InputDataChannelImplEmu.this.bbSupply[this.socketIndex].release(bbItem);
                if (controlType != ControlType.END) continue;
                System.out.println("      DataChannel Emu in: found END event");
                InputDataChannelImplEmu.this.haveInputEndEvent = true;
                break;
            }
        }
    }
}

