/*
 * Decompiled with CFR 0.152.
 */
package org.jlab.coda.cMsg.apps;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import org.jlab.coda.cMsg.cMsg;
import org.jlab.coda.cMsg.cMsgCallbackAdapter;
import org.jlab.coda.cMsg.cMsgException;
import org.jlab.coda.cMsg.cMsgMessage;
import org.jlab.coda.cMsg.cMsgPayloadItem;
import org.jlab.coda.cMsg.common.cMsgMessageFull;

public class cMsgQueue {
    private static String UDL = "cMsg://localhost/cMsg";
    private static cMsg cmsg = null;
    private static String name = null;
    private static String host = null;
    private static String description = null;
    private static String queueName = "default";
    private static String subject = "*";
    private static String type = "*";
    private static String getSubject = null;
    private static String getType = "*";
    private static String dir = null;
    private static String base = null;
    private static String fileBase = null;
    private static String hiSeqFile = null;
    private static String loSeqFile = null;
    private static String url = null;
    private static String table = null;
    private static String driver = "com.mysql.jdbc.Driver";
    private static String account = "";
    private static String password = "";
    private static Connection con = null;
    private static Statement stmt = null;
    private static PreparedStatement pStmt = null;
    private static int recvCount = 0;
    private static int getCount = 0;
    private static boolean done = false;
    private static boolean broadcast = false;
    private static boolean debug = false;

    public static void main(String[] args) {
        cMsgQueue.decode_command_line(args);
        if (dir == null && url == null) {
            System.err.println("?cMsgQueue...must specify either dir OR url");
            System.exit(-1);
        }
        if (dir != null && url != null) {
            System.err.println("?cMsgQueue...cannot specify both dir AND url");
            System.exit(-1);
        }
        try {
            host = InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            System.err.println("?unknown host exception");
        }
        if (name == null) {
            name = "cMsgQueue/" + queueName;
        }
        if (description == null) {
            description = url != null ? "cMsgQueue (database): " + queueName : "cMsgQueue (file): " + queueName;
        }
        if (getSubject == null) {
            getSubject = name;
        }
        if (dir != null) {
            if (base == null) {
                base = "cMsgQueue_" + queueName + "_";
            }
            fileBase = dir + "/" + base;
            hiSeqFile = fileBase + "Hi";
            loSeqFile = fileBase + "Lo";
            File hi = new File(hiSeqFile);
            File lo = new File(loSeqFile);
            if (!hi.exists() || !lo.exists()) {
                try {
                    FileWriter h = new FileWriter(hi);
                    h.write("0\n");
                    h.close();
                    FileWriter l = new FileWriter(lo);
                    l.write("0\n");
                    l.close();
                }
                catch (IOException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
            }
        }
        if (url != null) {
            if (table == null) {
                table = "cMsgQueue_" + queueName;
            }
            try {
                Class.forName(driver);
            }
            catch (Exception e) {
                System.err.println("?unable to load driver: " + driver + "\n" + e);
                System.exit(-1);
            }
            try {
                con = DriverManager.getConnection(url, account, password);
            }
            catch (SQLException e) {
                System.err.println("?unable to connect to database url: " + url + "\n" + e);
                System.exit(-1);
            }
            try {
                DatabaseMetaData dbmeta = con.getMetaData();
                ResultSet dbrs = dbmeta.getTables(null, null, table, new String[]{"TABLE"});
                if (!dbrs.next() || !dbrs.getString(3).equalsIgnoreCase(table)) {
                    String sql = "create table " + table + "(id int not null primary key auto_increment, msgTime datetime, creator varchar(255), subject varchar(255), type varchar(128),  userTime datetime, userInt int, message " + cMsgQueue.getBlobName(con) + ")";
                    con.createStatement().executeUpdate(sql);
                }
            }
            catch (SQLException e) {
                e.printStackTrace();
                System.exit(-1);
            }
            try {
                stmt = con.createStatement();
                String sql = "insert into " + table + " (msgTime,creator,subject,type,userTime,userInt,message) values (?,?,?,?,?,?,?)";
                pStmt = con.prepareStatement(sql);
            }
            catch (SQLException e) {
                System.err.println("?unable to prepare statement\n" + e);
                System.exit(-1);
            }
        }
        try {
            cmsg = new cMsg(UDL, name, description);
            cmsg.connect();
        }
        catch (cMsgException e) {
            e.printStackTrace();
            System.exit(-1);
        }
        try {
            cmsg.subscribe(subject, type, new subscribeCB(), null);
            cmsg.subscribe(getSubject, getType, new getCB(), null);
        }
        catch (cMsgException e) {
            e.printStackTrace();
            System.exit(-1);
        }
        cmsg.start();
        try {
            while (!done && cmsg.isConnected()) {
                Thread.sleep(1L);
            }
        }
        catch (Exception e) {
            System.err.println(e);
        }
        cmsg.stop();
        try {
            if (url != null) {
                con.close();
            }
            cmsg.disconnect();
        }
        catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
        System.exit(0);
    }

    private static void usage() {
        System.out.println("\nUsage:\n\n   java cMsgQueue\n        [-name <name>]             name of this cmsg client\n        [-udl <udl>]               UDL for cmsg connection\n        [-descr <description>]     string describing this cmsg client\n        [-subject <subject>]       subject of messages being queued\n        [-type <type>]             type of messages being queued\n        [-getSubject <subject>]    subject of sendAndGet msgs for retrieving a msg from queue\n        [-getType <type>]          type of sendAndGet msgs for retrieving a msg from queue\n        [-queue <name>]            used to generate name, description, getSubject, base, and table\n                                   if any not given, (default = \"default\"\n        [-dir <dir>]               directory of queue files\n        [-base <name>]             base of queue file names\n        [-broadcast]               when oldest queue msg is sent in response to sendAndGet,\n                                   same message sent to all subscribed to getSubject, getType\n        [-table <table>]           db table storing messages\n        [-url <url>]               database url (for connection to db)\n        [-driver <driver>]         database driver (for connection to db)\n        [-account <account>]       database account (for connection to db)\n        [-pwd <password>]          database password (for connection to db)\n        [-debug]                   enable debug output\n        [-h]                       print this help\n");
    }

    static String getBlobName(Connection conn) {
        try {
            DatabaseMetaData dbmeta = conn.getMetaData();
            String type = dbmeta.getDatabaseProductName();
            if (type.equalsIgnoreCase("mysql")) {
                return "blob";
            }
            if (type.equalsIgnoreCase("oracle")) {
                return "blob";
            }
            if (type.equalsIgnoreCase("postgresql")) {
                return "bytea";
            }
            System.out.println("?getBlobName...unknown database type " + type + ", trying blob");
            return "blob";
        }
        catch (Exception e) {
            System.out.println("?getBlobName...unable to get database product name, trying blob");
            return "blob";
        }
    }

    private static void decode_command_line(String[] args) {
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equalsIgnoreCase("-h")) {
                cMsgQueue.usage();
                System.exit(-1);
                continue;
            }
            if (args[i].equalsIgnoreCase("-name")) {
                name = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-descr")) {
                description = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-udl")) {
                UDL = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-subject")) {
                subject = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-type")) {
                type = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-queue")) {
                queueName = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-getSubject")) {
                getSubject = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-getType")) {
                getType = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-broadcast")) {
                broadcast = true;
                continue;
            }
            if (args[i].equalsIgnoreCase("-dir")) {
                dir = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-base")) {
                base = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-url")) {
                url = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-table")) {
                table = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-driver")) {
                driver = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-account")) {
                account = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-pwd")) {
                password = args[i + 1];
                ++i;
                continue;
            }
            if (args[i].equalsIgnoreCase("-debug")) {
                debug = true;
                continue;
            }
            cMsgQueue.usage();
            System.exit(-1);
        }
    }

    static class getCB
    extends cMsgCallbackAdapter {
        getCB() {
        }

        @Override
        public void callback(cMsgMessage m, Object userObject) {
            cMsgMessageFull msg = (cMsgMessageFull)m;
            if (!msg.isGetRequest()) {
                return;
            }
            getCount++;
            cMsgMessageFull response = null;
            if (dir != null) {
                try {
                    RandomAccessFile rHi = new RandomAccessFile(hiSeqFile, "rw");
                    FileChannel cHi = rHi.getChannel();
                    FileLock lHi = cHi.lock();
                    RandomAccessFile rLo = new RandomAccessFile(loSeqFile, "rw");
                    FileChannel cLo = rLo.getChannel();
                    FileLock lLo = cLo.lock();
                    long hi = Long.parseLong(rHi.readLine());
                    long lo = Long.parseLong(rLo.readLine());
                    if (hi > lo) {
                        rLo.seek(0L);
                        rLo.writeBytes(++lo + "\n");
                        FileInputStream fis = null;
                        ObjectInputStream oin = null;
                        try {
                            fis = new FileInputStream(String.format("%s%08d", fileBase, lo));
                            oin = new ObjectInputStream(fis);
                            response = (cMsgMessageFull)oin.readObject();
                            oin.close();
                            fis.close();
                            new File(String.format("%s%08d", fileBase, lo)).delete();
                            response.expandPayload();
                            response.setNoHistoryAdditions(true);
                            response.makeResponse(msg);
                        }
                        catch (FileNotFoundException e) {
                            System.err.println("?missing message file " + lo + " in queue " + queueName);
                        }
                        catch (ClassNotFoundException e) {
                            e.printStackTrace();
                            System.exit(-1);
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                            System.exit(-1);
                        }
                    }
                    lLo.release();
                    lHi.release();
                    rHi.close();
                    rLo.close();
                }
                catch (IOException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
            }
            if (url != null) {
                try {
                    byte[] buf;
                    stmt.execute("lock tables " + table + " write");
                    ResultSet rs = stmt.executeQuery("select * from " + table + " order by id limit 1");
                    if (rs.next() && (buf = rs.getBytes("message")) != null) {
                        try {
                            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buf));
                            response = (cMsgMessageFull)ois.readObject();
                            response.expandPayload();
                            response.setNoHistoryAdditions(true);
                            response.makeResponse(msg);
                            stmt.execute("delete from " + table + " where id=" + rs.getInt("id"));
                        }
                        catch (ClassNotFoundException e) {
                            e.printStackTrace();
                            System.exit(-1);
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                            System.exit(-1);
                        }
                    }
                    rs.close();
                    stmt.execute("unlock tables");
                }
                catch (SQLException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
            }
            if (response == null) {
                response = cMsgMessageFull.createDeliverableMessage();
                response.makeNullResponse(msg);
            }
            try {
                cmsg.send(response);
                if (broadcast) {
                    cmsg.send(response);
                }
                cmsg.flush(0);
            }
            catch (cMsgException e) {
                e.printStackTrace();
                System.exit(-1);
            }
        }
    }

    static class subscribeCB
    extends cMsgCallbackAdapter {
        subscribeCB() {
        }

        @Override
        public void callback(cMsgMessage m, Object userObject) {
            cMsgMessageFull msg = (cMsgMessageFull)m;
            if (msg.isGetRequest()) {
                return;
            }
            if (msg.isGetResponse()) {
                return;
            }
            if (msg.getSender().equals(name)) {
                return;
            }
            recvCount++;
            String creator = null;
            try {
                cMsgPayloadItem creatorItem = msg.getPayloadItem("cMsgCreator");
                if (creatorItem != null) {
                    creator = creatorItem.getString();
                }
            }
            catch (cMsgException e) {
                System.err.println("?cMsgQueue...message has no creator!");
            }
            msg.compressPayload();
            if (dir != null) {
                try {
                    RandomAccessFile r = new RandomAccessFile(hiSeqFile, "rw");
                    FileChannel c = r.getChannel();
                    FileLock l = c.lock();
                    long hi = Long.parseLong(r.readLine());
                    r.seek(0L);
                    r.writeBytes(++hi + "\n");
                    FileOutputStream fos = null;
                    ObjectOutputStream oos = null;
                    fos = new FileOutputStream(String.format("%s%08d", fileBase, hi));
                    oos = new ObjectOutputStream(fos);
                    oos.writeObject(msg);
                    oos.close();
                    l.release();
                    r.close();
                }
                catch (IOException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
            }
            if (url != null) {
                try {
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    ObjectOutputStream oos = new ObjectOutputStream(baos);
                    oos.writeObject(msg);
                    oos.close();
                    int i = 1;
                    pStmt.setTimestamp(i++, new Timestamp(msg.getReceiverTime().getTime()));
                    pStmt.setString(i++, creator);
                    pStmt.setString(i++, msg.getSubject());
                    pStmt.setString(i++, msg.getType());
                    pStmt.setTimestamp(i++, new Timestamp(msg.getUserTime().getTime()));
                    pStmt.setInt(i++, msg.getUserInt());
                    pStmt.setBytes(i++, baos.toByteArray());
                    pStmt.executeUpdate();
                }
                catch (SQLException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
                catch (IOException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
            }
        }
    }
}

