package org.jlab.coda.cMsg.common;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jlab.coda.cMsg.cMsgCallbackInterface;
import org.jlab.coda.cMsg.cMsgSubscriptionHandle;

/* loaded from: input_file:jars/cMsg-5.2.jar:org/jlab/coda/cMsg/common/cMsgCallbackThread.class */
public class cMsgCallbackThread extends Thread implements cMsgSubscriptionHandle {
    String domain;
    String subject;
    String type;
    private LinkedBlockingQueue<cMsgMessageFull> messageQueue;
    private ArrayList<cMsgMessageFull> dumpList;
    private Object arg;
    cMsgCallbackInterface callback;
    cMsgMessageFull message;
    long msgCount;
    private volatile boolean dieNow;
    private CountDownLatch latch;
    private volatile boolean pause;
    private AtomicInteger threads = new AtomicInteger();
    private int count = 1;
    private myContext context = new myContext();

    /* loaded from: input_file:jars/cMsg-5.2.jar:org/jlab/coda/cMsg/common/cMsgCallbackThread$WorkerThread.class */
    class WorkerThread extends Thread {
        boolean permanent;

        WorkerThread(cMsgCallbackThread cmsgcallbackthread) {
            this(false);
        }

        WorkerThread(boolean z) {
            this.permanent = z;
            setDaemon(true);
            setName("cMsg callback worker");
            start();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                int i = 0;
                cMsgMessageFull cmsgmessagefull = null;
                while (cmsgmessagefull == null) {
                    if (cMsgCallbackThread.this.dieNow) {
                        return;
                    }
                    if (!this.permanent) {
                        i++;
                        if (i % 10 == 0) {
                            cMsgCallbackThread.this.threads.decrementAndGet();
                            return;
                        }
                    }
                    try {
                        cmsgmessagefull = cMsgCallbackThread.this.messageQueue.poll(200L, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                    }
                }
                if (cMsgCallbackThread.this.dieNow) {
                    return;
                }
                if (cMsgCallbackThread.this.pause) {
                    try {
                        cMsgCallbackThread.this.latch.await();
                    } catch (InterruptedException e2) {
                    }
                    if (cMsgCallbackThread.this.dieNow) {
                        return;
                    }
                }
                cMsgCallbackThread.this.msgCount++;
                cMsgMessageFull copy = cmsgmessagefull.copy();
                copy.setContext(cMsgCallbackThread.this.context);
                try {
                    cMsgCallbackThread.this.callback.callback(copy, cMsgCallbackThread.this.arg);
                } catch (Exception e3) {
                    System.out.println("Error inside callback (user's code): sub = " + cMsgCallbackThread.this.subject + ",type = " + cMsgCallbackThread.this.type);
                    e3.printStackTrace();
                }
            }
        }
    }

    /* loaded from: input_file:jars/cMsg-5.2.jar:org/jlab/coda/cMsg/common/cMsgCallbackThread$myContext.class */
    private class myContext extends cMsgMessageContextDefault {
        private myContext() {
        }

        @Override // org.jlab.coda.cMsg.common.cMsgMessageContextDefault, org.jlab.coda.cMsg.common.cMsgMessageContextInterface
        public String getDomain() {
            return cMsgCallbackThread.this.domain;
        }

        @Override // org.jlab.coda.cMsg.common.cMsgMessageContextDefault, org.jlab.coda.cMsg.common.cMsgMessageContextInterface
        public String getSubject() {
            return cMsgCallbackThread.this.subject;
        }

        @Override // org.jlab.coda.cMsg.common.cMsgMessageContextDefault, org.jlab.coda.cMsg.common.cMsgMessageContextInterface
        public String getType() {
            return cMsgCallbackThread.this.type;
        }

        @Override // org.jlab.coda.cMsg.common.cMsgMessageContextDefault, org.jlab.coda.cMsg.common.cMsgMessageContextInterface
        public int getQueueSize() {
            return cMsgCallbackThread.this.messageQueue.size();
        }
    }

    public void dieNow(boolean z) {
        this.dieNow = true;
        clearQueue();
        restart();
        if (z) {
            interrupt();
        }
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public synchronized void pause() {
        if (this.pause || this.dieNow) {
            return;
        }
        this.latch = new CountDownLatch(1);
        this.pause = true;
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public synchronized void restart() {
        if (this.pause) {
            this.pause = false;
            this.latch.countDown();
        }
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public long getMsgCount() {
        return this.msgCount;
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public String getDomain() {
        return this.domain;
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public String getSubject() {
        return this.subject;
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public String getType() {
        return this.type;
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public int getQueueSize() {
        return this.messageQueue.size();
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public boolean isQueueFull() {
        return this.messageQueue.remainingCapacity() < 1;
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public void clearQueue() {
        this.messageQueue.clear();
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public cMsgCallbackInterface getCallback() {
        return this.callback;
    }

    @Override // org.jlab.coda.cMsg.cMsgSubscriptionHandle
    public Object getUserObject() {
        return this.arg;
    }

    public int getCount() {
        return this.count;
    }

    public void setCount(int i) {
        this.count = i;
    }

    public cMsgCallbackThread(cMsgCallbackInterface cmsgcallbackinterface, Object obj, String str, String str2, String str3) {
        this.callback = cmsgcallbackinterface;
        this.arg = obj;
        this.domain = str;
        this.subject = str2;
        this.type = str3;
        this.messageQueue = new LinkedBlockingQueue<>(cmsgcallbackinterface.getMaximumQueueSize());
        this.dumpList = new ArrayList<>(cmsgcallbackinterface.getSkipSize());
        setName("cMsg callback main");
        setDaemon(true);
        new WorkerThread(true);
        this.threads.incrementAndGet();
        if (cmsgcallbackinterface.mustSerializeMessages()) {
            return;
        }
        start();
    }

    public void sendMessage(cMsgMessageFull cmsgmessagefull) {
        if (this.messageQueue.offer(cmsgmessagefull) || this.dieNow) {
            return;
        }
        System.out.println("cMsgCbThd: Q FULL");
        if (this.callback.maySkipMessages()) {
            this.messageQueue.drainTo(this.dumpList, this.callback.getSkipSize());
            this.dumpList.clear();
            this.messageQueue.offer(cmsgmessagefull);
        } else {
            while (!this.messageQueue.offer(cmsgmessagefull, 10L, TimeUnit.SECONDS)) {
                try {
                    System.out.println("cMsgCbThd: can't place msg on full cb Q, wait 10 sec,");
                    System.out.println("Q size = " + this.messageQueue.size() + "subject = " + this.subject + ", type = " + this.type);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        int messagesPerThread;
        while (!this.dieNow) {
            int size = this.messageQueue.size();
            int i = this.threads.get();
            if (i < this.callback.getMaximumThreads() && size > this.callback.getMessagesPerThread() && (messagesPerThread = size / this.callback.getMessagesPerThread()) > i) {
                int maximumThreads = this.callback.getMaximumThreads() - i;
                int i2 = messagesPerThread - i;
                int i3 = maximumThreads > i2 ? i2 : maximumThreads;
                for (int i4 = 0; i4 < i3; i4++) {
                    new WorkerThread(this);
                    this.threads.incrementAndGet();
                }
            }
            boolean z = size - this.messageQueue.size() == 0;
            while (z) {
                if (this.dieNow) {
                    return;
                }
                try {
                    Thread.sleep(40L);
                } catch (InterruptedException e) {
                }
                z = size - this.messageQueue.size() == 0;
            }
        }
    }
}
