/*
 * Decompiled with CFR 0.152.
 */
package org.jlab.coda.cMsg.common;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jlab.coda.cMsg.cMsgCallbackInterface;
import org.jlab.coda.cMsg.cMsgSubscriptionHandle;
import org.jlab.coda.cMsg.common.cMsgMessageContextDefault;
import org.jlab.coda.cMsg.common.cMsgMessageFull;

public class cMsgCallbackThread
extends Thread
implements cMsgSubscriptionHandle {
    String domain;
    String subject;
    String type;
    private LinkedBlockingQueue<cMsgMessageFull> messageQueue;
    private ArrayList<cMsgMessageFull> dumpList;
    private Object arg;
    cMsgCallbackInterface callback;
    private AtomicInteger threads = new AtomicInteger();
    cMsgMessageFull message;
    long msgCount;
    private int count;
    private volatile boolean dieNow;
    private CountDownLatch latch;
    private volatile boolean pause;
    private myContext context;

    public void dieNow(boolean callInterrupt) {
        this.dieNow = true;
        this.clearQueue();
        this.restart();
        if (callInterrupt) {
            this.interrupt();
        }
    }

    @Override
    public synchronized void pause() {
        if (this.pause || this.dieNow) {
            return;
        }
        this.latch = new CountDownLatch(1);
        this.pause = true;
    }

    @Override
    public synchronized void restart() {
        if (!this.pause) {
            return;
        }
        this.pause = false;
        this.latch.countDown();
    }

    @Override
    public long getMsgCount() {
        return this.msgCount;
    }

    @Override
    public String getDomain() {
        return this.domain;
    }

    @Override
    public String getSubject() {
        return this.subject;
    }

    @Override
    public String getType() {
        return this.type;
    }

    @Override
    public int getQueueSize() {
        return this.messageQueue.size();
    }

    @Override
    public boolean isQueueFull() {
        return this.messageQueue.remainingCapacity() < 1;
    }

    @Override
    public void clearQueue() {
        this.messageQueue.clear();
    }

    @Override
    public cMsgCallbackInterface getCallback() {
        return this.callback;
    }

    @Override
    public Object getUserObject() {
        return this.arg;
    }

    public int getCount() {
        return this.count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public cMsgCallbackThread(cMsgCallbackInterface callback, Object arg, String domain, String subject, String type) {
        this.callback = callback;
        this.arg = arg;
        this.domain = domain;
        this.subject = subject;
        this.type = type;
        this.messageQueue = new LinkedBlockingQueue(callback.getMaximumQueueSize());
        this.dumpList = new ArrayList(callback.getSkipSize());
        this.count = 1;
        this.context = new myContext();
        this.setName("cMsg callback main");
        this.setDaemon(true);
        new WorkerThread(true);
        this.threads.incrementAndGet();
        if (!callback.mustSerializeMessages()) {
            this.start();
        }
    }

    public void sendMessage(cMsgMessageFull message) {
        if (!this.messageQueue.offer(message)) {
            if (this.dieNow) {
                return;
            }
            System.out.println("cMsgCbThd: Q FULL");
            if (!this.callback.maySkipMessages()) {
                try {
                    while (!this.messageQueue.offer(message, 10L, TimeUnit.SECONDS)) {
                        System.out.println("cMsgCbThd: can't place msg on full cb Q, wait 10 sec,");
                        System.out.println("Q size = " + this.messageQueue.size() + "subject = " + this.subject + ", type = " + this.type);
                    }
                }
                catch (InterruptedException interruptedException) {}
            } else {
                this.messageQueue.drainTo(this.dumpList, this.callback.getSkipSize());
                this.dumpList.clear();
                this.messageQueue.offer(message);
            }
        }
    }

    @Override
    public void run() {
        block2: while (!this.dieNow) {
            boolean qSizeNotChanged;
            int need;
            int qSize = this.messageQueue.size();
            int threadsExisting = this.threads.get();
            if (threadsExisting < this.callback.getMaximumThreads() && qSize > this.callback.getMessagesPerThread() && (need = qSize / this.callback.getMessagesPerThread()) > threadsExisting) {
                int wantToAdd;
                int maxToAdd = this.callback.getMaximumThreads() - threadsExisting;
                int threadsAdded = maxToAdd > (wantToAdd = need - threadsExisting) ? wantToAdd : maxToAdd;
                for (int i = 0; i < threadsAdded; ++i) {
                    new WorkerThread();
                    this.threads.incrementAndGet();
                }
            }
            boolean bl = qSizeNotChanged = qSize - this.messageQueue.size() == 0;
            while (true) {
                if (!qSizeNotChanged) continue block2;
                if (this.dieNow) {
                    return;
                }
                try {
                    Thread.sleep(40L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                qSizeNotChanged = qSize - this.messageQueue.size() == 0;
            }
            break;
        }
        return;
    }

    class WorkerThread
    extends Thread {
        boolean permanent;

        WorkerThread() {
            this(false);
        }

        WorkerThread(boolean permanent) {
            this.permanent = permanent;
            this.setDaemon(true);
            this.setName("cMsg callback worker");
            this.start();
        }

        @Override
        public void run() {
            while (true) {
                int empty = 0;
                cMsgMessageFull message = null;
                while (message == null) {
                    if (cMsgCallbackThread.this.dieNow) {
                        return;
                    }
                    if (!this.permanent && ++empty % 10 == 0) {
                        cMsgCallbackThread.this.threads.decrementAndGet();
                        return;
                    }
                    try {
                        message = (cMsgMessageFull)cMsgCallbackThread.this.messageQueue.poll(200L, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException interruptedException) {}
                }
                if (cMsgCallbackThread.this.dieNow) {
                    return;
                }
                if (cMsgCallbackThread.this.pause) {
                    try {
                        cMsgCallbackThread.this.latch.await();
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    if (cMsgCallbackThread.this.dieNow) {
                        return;
                    }
                }
                ++cMsgCallbackThread.this.msgCount;
                cMsgMessageFull msgCopy = message.copy();
                msgCopy.setContext(cMsgCallbackThread.this.context);
                try {
                    cMsgCallbackThread.this.callback.callback(msgCopy, cMsgCallbackThread.this.arg);
                    continue;
                }
                catch (Exception e) {
                    System.out.println("Error inside callback (user's code): sub = " + cMsgCallbackThread.this.subject + ",type = " + cMsgCallbackThread.this.type);
                    e.printStackTrace();
                    continue;
                }
                break;
            }
        }
    }

    private class myContext
    extends cMsgMessageContextDefault {
        private myContext() {
        }

        @Override
        public String getDomain() {
            return cMsgCallbackThread.this.domain;
        }

        @Override
        public String getSubject() {
            return cMsgCallbackThread.this.subject;
        }

        @Override
        public String getType() {
            return cMsgCallbackThread.this.type;
        }

        @Override
        public int getQueueSize() {
            return cMsgCallbackThread.this.messageQueue.size();
        }
    }
}

