cMsgWrapper.cc

Go to the documentation of this file.
00001 // to do
00002 //   word doc
00003 //   subscribe must lock list until done ?
00004 
00005 
00006 
00007 /*----------------------------------------------------------------------------*
00008 *  Copyright (c) 2005        Southeastern Universities Research Association, *
00009 *                            Thomas Jefferson National Accelerator Facility  *
00010 *                                                                            *
00011 *    This software was developed under a United States Government license    *
00012 *    described in the NOTICE file included as part of this distribution.     *
00013 *                                                                            *
00014 *    E.Wolin, 25-Feb-2005, Jefferson Lab                                     *
00015 *                                                                            *
00016 *    Authors: Elliott Wolin                                                  *
00017 *             wolin@jlab.org                    Jefferson Lab, MS-6B         *
00018 *             Phone: (757) 269-7365             12000 Jefferson Ave.         *
00019 *             Fax:   (757) 269-5519             Newport News, VA 23606       *
00020 *
00021 *----------------------------------------------------------------------------*/
00022 
00023 
00024 #include <cMsg.hxx>
00025 #include <cMsgPrivate.h>
00026 #include <stdio.h>
00027 #include <stdlib.h>
00028 #include <unistd.h>
00029 #include <pthread.h>
00030 #include <vector>
00031 #include <sstream>
00032 
00033 using namespace std;
00034 using namespace cmsg;
00035 
00036 
00037 
00038 //-----------------------------------------------------------------------------
00039 //  local data and static functions
00040 //-----------------------------------------------------------------------------
00041 
00042 
00043 namespace cmsg {
00044 
00045 
00047 typedef struct {
00048   cMsgCallback *cb;      
00049   void *userArg;         
00050 } dispatcherStruct;
00051 
00052 
00054 typedef struct {
00055   void  *domainId;      
00056   void  *handle;        
00057   string subject;       
00058   string type;          
00059   dispatcherStruct *d;  
00060 } subscrStruct;
00061 
00062 
00064 static vector<subscrStruct*> subscrVec;
00065 
00066 
00068 static pthread_mutex_t subscrMutex = PTHREAD_MUTEX_INITIALIZER;
00069 
00070 
00071 //-----------------------------------------------------------------------------
00072 
00073 
00075 static void callbackDispatcher(void *msg, void *userArg) {
00076   dispatcherStruct *ds = (dispatcherStruct*)userArg;
00077   ds->cb->callback(new cMsgMessage(msg),ds->userArg);
00078 }
00079 
00080 
00081 //-----------------------------------------------------------------------------
00082 
00083 
00095 static bool subscriptionExists(void *domainId, const string &subject, const string &type, 
00096                                cMsgCallback *cb, void *userArg) {
00097 
00098   bool itExists = false;
00099 
00100 
00101   // search list for matching subscription
00102   pthread_mutex_lock(&subscrMutex);
00103   for(unsigned int i=0; i<subscrVec.size(); i++) {
00104     if( (subscrVec[i]->domainId   == domainId) &&
00105         (subscrVec[i]->subject    == subject) &&
00106         (subscrVec[i]->type       == type) &&
00107         (subscrVec[i]->d->cb      == cb) &&
00108         (subscrVec[i]->d->userArg == userArg)
00109         ) {
00110       itExists=true;
00111       break;
00112     }
00113   }
00114   pthread_mutex_unlock(&subscrMutex);
00115 
00116 
00117   // done searching
00118   return(itExists);
00119 }
00120 
00121 
00122 //-----------------------------------------------------------------------------
00123 
00124 
00134 static void addSubscription(void *domainId, const string &subject, const string &type,
00135                             dispatcherStruct *d, void *handle) {
00136 
00137   subscrStruct *s = new subscrStruct();
00138 
00139   s->domainId=domainId;
00140   s->subject=subject;
00141   s->type=type;
00142   s->d=d;
00143   s->handle=handle;
00144 
00145   pthread_mutex_lock(&subscrMutex);
00146   subscrVec.push_back(s);
00147   pthread_mutex_unlock(&subscrMutex);
00148 
00149   return;
00150 }
00151 
00152 
00153 //-----------------------------------------------------------------------------
00154 
00155 
00164 static bool deleteSubscription(void *domainId, void *handle) {
00165 
00166   bool deleted = false;
00167   vector<subscrStruct*>::iterator iter;
00168 
00169   pthread_mutex_lock(&subscrMutex);
00170   for(iter=subscrVec.begin(); iter!=subscrVec.end(); iter++) {
00171     if(((*iter)->domainId==domainId)&&((*iter)->handle==handle)) {
00172       delete((*iter)->d);
00173       delete(*iter);
00174       subscrVec.erase(iter);
00175       deleted=true;
00176       break;
00177     }
00178   }
00179   pthread_mutex_unlock(&subscrMutex);
00180 
00181   return(deleted);
00182 }
00183 
00184 
00185 //-----------------------------------------------------------------------------
00186 
00187 } // namespace cmsg
00188 
00189 
00190 
00191 //-----------------------------------------------------------------------------
00192 //  cMsgException methods
00193 //-----------------------------------------------------------------------------
00194 
00195 
00199 cMsgException::cMsgException(void) : descr(""), returnCode(0) {}
00200 
00201 
00202 //-----------------------------------------------------------------------------
00203 
00204 
00210 cMsgException::cMsgException(const string &c) : descr(c), returnCode(0) {}
00211 
00212 
00213 //-----------------------------------------------------------------------------
00214 
00215 
00222 cMsgException::cMsgException(const string &c, int code) : descr(c), returnCode(code) {}
00223 
00224 
00225 //-----------------------------------------------------------------------------
00226 
00227 
00233 cMsgException::cMsgException(const cMsgException &e) : descr(e.descr),  returnCode(e.returnCode) {}
00234 
00235 
00236 //-----------------------------------------------------------------------------
00237 
00238 
00242 cMsgException::~cMsgException(void) throw() {
00243 }
00244 
00245 
00246 //-----------------------------------------------------------------------------
00247 
00248 
00254 string cMsgException::toString(void) const throw() {
00255   stringstream ss;
00256   ss << "?cMsgException returnCode = " << returnCode << "    descr = " << descr << ends;
00257   return(ss.str());
00258 }
00259 
00260 
00261 //-----------------------------------------------------------------------------
00262 
00263 
00269 const char *cMsgException::what(void) const throw() {
00270   return(toString().c_str());
00271 }
00272 
00273 
00274 //-----------------------------------------------------------------------------
00275 //  cMsgMessage methods
00276 //-----------------------------------------------------------------------------
00277 
00278 
00282 cMsgMessage::cMsgMessage(void) throw(cMsgException) {
00283     
00284   myMsgPointer=cMsgCreateMessage();
00285   if(myMsgPointer==NULL) {
00286     throw(cMsgException("?cMsgMessage constructor...unable to create message",CMSG_ERROR));
00287   }
00288 }
00289 
00290 
00291 //-----------------------------------------------------------------------------
00292 
00293 
00299 cMsgMessage::cMsgMessage(const cMsgMessage &msg) throw(cMsgException) {
00300 
00301   myMsgPointer=cMsgCopyMessage(msg.myMsgPointer);
00302   if(myMsgPointer==NULL) {
00303     throw(cMsgException("?cMsgMessage copy constructor...unable to create message",CMSG_ERROR));
00304   }
00305 }
00306 
00307 
00308 //-----------------------------------------------------------------------------
00309 
00310 
00316 cMsgMessage::cMsgMessage(void *msgPointer) throw(cMsgException) {
00317 
00318   myMsgPointer=msgPointer;
00319   if(myMsgPointer==NULL) {
00320     throw(cMsgException("?cMsgMessage pointer constructor...unable to create message",CMSG_ERROR));
00321   }
00322 }
00323 
00324 
00325 //-----------------------------------------------------------------------------
00326 
00327 
00331 cMsgMessage::~cMsgMessage(void) {
00332   if(myMsgPointer!=NULL)cMsgFreeMessage(&myMsgPointer);
00333 }
00334 
00335 
00336 //-----------------------------------------------------------------------------
00337 
00338 
00344 string cMsgMessage::getSubject(void) const throw(cMsgException) {
00345 
00346   const char *s;
00347 
00348   int stat;
00349   if((stat=cMsgGetSubject(myMsgPointer,&s))!=CMSG_OK) {
00350     throw(cMsgException(cMsgPerror(stat),stat));
00351   }
00352 
00353   if(s==NULL) {
00354     return("null");
00355   } else {
00356     return(string(s));
00357   }
00358 }
00359 
00360 
00361 //-----------------------------------------------------------------------------
00362 
00363 
00369 void cMsgMessage::setSubject(const string &subject) throw(cMsgException) {
00370 
00371   int stat;
00372   if((stat=cMsgSetSubject(myMsgPointer,subject.c_str()))!=CMSG_OK) {
00373     throw(cMsgException(cMsgPerror(stat),stat));
00374   }
00375 }
00376 
00377 
00378 //-----------------------------------------------------------------------------
00379 
00380 
00386 string cMsgMessage::getType(void) const throw(cMsgException) {
00387 
00388   const char *s;
00389 
00390   int stat;
00391   if((stat=cMsgGetType(myMsgPointer,&s))!=CMSG_OK) {
00392     throw(cMsgException(cMsgPerror(stat),stat));
00393   }
00394 
00395   if(s==NULL) {
00396     return("null");
00397   } else {
00398     return(string(s));
00399   }
00400 }
00401   
00402 
00403 //-----------------------------------------------------------------------------
00404 
00405 
00411 void cMsgMessage::setType(const string &type) throw(cMsgException) {
00412 
00413   int stat;
00414   if((stat=cMsgSetType(myMsgPointer,type.c_str()))!=CMSG_OK) {
00415     throw(cMsgException(cMsgPerror(stat),stat));
00416   }
00417 }
00418 
00419 
00420 //-----------------------------------------------------------------------------
00421 
00422 
00428 string cMsgMessage::getText(void) const throw(cMsgException) {
00429 
00430   const char *s;
00431 
00432   int stat;
00433   if((stat=cMsgGetText(myMsgPointer,&s))!=CMSG_OK) {
00434     throw(cMsgException(cMsgPerror(stat),stat));
00435   }
00436 
00437   if(s==NULL) {
00438     return("null");
00439   } else {
00440     return(string(s));
00441   }
00442 }
00443 
00444 
00445 //-----------------------------------------------------------------------------
00446 
00447 
00453 void cMsgMessage::setText(const string &text) throw(cMsgException) {
00454 
00455   int stat;
00456   if((stat=cMsgSetText(myMsgPointer,text.c_str()))!=CMSG_OK) {
00457     throw(cMsgException(cMsgPerror(stat),stat));
00458   }
00459 }
00460 
00461 
00462 //-----------------------------------------------------------------------------
00463 
00464 
00470 void cMsgMessage::setByteArrayLength(int length) throw(cMsgException) {
00471 
00472   int stat;
00473   if((stat=cMsgSetByteArrayLength(myMsgPointer,length)!=CMSG_OK)) {
00474     throw(cMsgException(cMsgPerror(stat),stat));
00475   }
00476 }
00477 
00478 
00479 //-----------------------------------------------------------------------------
00480 
00481 
00487 int cMsgMessage::getByteArrayLength(void) const throw(cMsgException) {
00488 
00489   int i;
00490 
00491   int stat;
00492   if((stat=cMsgGetByteArrayLength(myMsgPointer,&i))!=CMSG_OK) {
00493     throw(cMsgException(cMsgPerror(stat),stat));
00494   }
00495   return(i);
00496 }
00497 
00498 
00499 //-----------------------------------------------------------------------------
00500 
00501 
00507 void cMsgMessage::setByteArrayOffset(int offset) throw(cMsgException) {
00508 
00509   int stat;
00510   if((stat=cMsgSetByteArrayOffset(myMsgPointer,offset)!=CMSG_OK)) {
00511     throw(cMsgException(cMsgPerror(stat),stat));
00512   }
00513 }
00514 
00515 
00516 //-----------------------------------------------------------------------------
00517 
00518 
00524 int cMsgMessage::getByteArrayOffset(void) const throw(cMsgException) {
00525 
00526   int i;
00527 
00528   int stat;
00529   if((stat=cMsgGetByteArrayOffset(myMsgPointer,&i))!=CMSG_OK) {
00530     throw(cMsgException(cMsgPerror(stat),stat));
00531   }
00532   return(i);
00533 }
00534 
00535 
00536 //-----------------------------------------------------------------------------
00537 
00538 
00544 void cMsgMessage::setByteArray(char *array) throw(cMsgException) {
00545 
00546   int stat;
00547   if((stat=cMsgSetByteArray(myMsgPointer,array)!=CMSG_OK)) {
00548     throw(cMsgException(cMsgPerror(stat),stat));
00549   }
00550 }
00551 
00552 
00553 //-----------------------------------------------------------------------------
00554 
00555 
00561 char* cMsgMessage::getByteArray(void) const throw(cMsgException) {
00562 
00563   char *p;
00564 
00565   int stat;
00566   if((stat=cMsgGetByteArray(myMsgPointer,&p)!=CMSG_OK)) {
00567     throw(cMsgException(cMsgPerror(stat),stat));
00568   }
00569   return(p);
00570 }
00571 
00572 
00573 //-----------------------------------------------------------------------------
00574 
00575 
00576 
00584 void cMsgMessage::setByteArrayAndLimits(char *array, int offset, int length) throw(cMsgException) {
00585 
00586   int stat;
00587   if((stat=cMsgSetByteArrayAndLimits(myMsgPointer,array,offset,length)!=CMSG_OK)) {
00588     throw(cMsgException(cMsgPerror(stat),stat));
00589   }
00590 }
00591 
00592 
00593 //-----------------------------------------------------------------------------
00594 
00595 
00603 void cMsgMessage::copyByteArray(char* array, int offset, int length) throw(cMsgException) {
00604 
00605   int stat;
00606   if((stat=cMsgCopyByteArray(myMsgPointer,array,offset,length)!=CMSG_OK)) {
00607     throw(cMsgException(cMsgPerror(stat),stat));
00608   }
00609 }
00610 
00611 
00612 //-----------------------------------------------------------------------------
00613 
00614 
00620 int cMsgMessage::getUserInt(void) const throw(cMsgException) {
00621 
00622   int i;
00623 
00624   int stat;
00625   if((stat=cMsgGetUserInt(myMsgPointer,&i))!=CMSG_OK) {
00626     throw(cMsgException(cMsgPerror(stat),stat));
00627   }
00628   return(i);
00629 }
00630 
00631 
00632 //-----------------------------------------------------------------------------
00633 
00634 
00640 void cMsgMessage::setUserInt(int i) throw(cMsgException) {
00641 
00642   int stat;
00643   if((stat=cMsgSetUserInt(myMsgPointer,i))!=CMSG_OK) {
00644     throw(cMsgException(cMsgPerror(stat),stat));
00645   }
00646 }
00647 
00648 
00649 //-----------------------------------------------------------------------------
00650 
00651 
00657 struct timespec cMsgMessage::getUserTime(void) const throw(cMsgException) {
00658 
00659   struct timespec t;
00660 
00661   int stat;
00662   if((stat=cMsgGetUserTime(myMsgPointer,&t))!=CMSG_OK) {
00663     throw(cMsgException(cMsgPerror(stat),stat));
00664   }
00665   return(t);
00666 }
00667 
00668 
00669 //-----------------------------------------------------------------------------
00670 
00671 
00677 void cMsgMessage::setUserTime(const struct timespec &userTime) throw(cMsgException) {
00678 
00679   int stat;
00680   if((stat=cMsgSetUserTime(myMsgPointer, &userTime))!=CMSG_OK) {
00681     throw(cMsgException(cMsgPerror(stat),stat));
00682   }
00683 }
00684 
00685 
00686 //-----------------------------------------------------------------------------
00687 
00688 
00694 int cMsgMessage::getVersion(void) const throw(cMsgException) {
00695 
00696   int version;
00697 
00698   int stat;
00699   if((stat=cMsgGetVersion(myMsgPointer, &version))!=CMSG_OK) {
00700     throw(cMsgException(cMsgPerror(stat),stat));
00701   }
00702   return(version);
00703 }
00704 
00705 
00706 //-----------------------------------------------------------------------------
00707 
00708 
00714 cMsgMessage *cMsgMessage::copy(void) const throw(cMsgException) {
00715 
00716   void *newPointer = cMsgCopyMessage(myMsgPointer);
00717   return(new cMsgMessage(newPointer));
00718 }
00719 
00720 
00721 //-----------------------------------------------------------------------------
00722 
00723 
00729 string cMsgMessage::getDomain(void) const throw(cMsgException) {
00730 
00731   const char *s;
00732 
00733   int stat;
00734   if((stat=cMsgGetDomain(myMsgPointer,&s))!=CMSG_OK) {
00735     throw(cMsgException(cMsgPerror(stat),stat));
00736   };
00737 
00738   if(s==NULL) {
00739     return("null");
00740   } else {
00741     return(string(s));
00742   }
00743 }
00744 
00745 
00746 //-----------------------------------------------------------------------------
00747 
00748 
00754 string cMsgMessage::getReceiver(void) const throw(cMsgException) {
00755 
00756   const char *s;
00757 
00758   int stat;
00759   if((stat=cMsgGetReceiver(myMsgPointer,&s))!=CMSG_OK) {
00760     throw(cMsgException(cMsgPerror(stat),stat));
00761   };
00762 
00763   if(s==NULL) {
00764     return("null");
00765   } else {
00766     return(string(s));
00767   }
00768 }
00769 
00770 
00771 //-----------------------------------------------------------------------------
00772 
00773 
00779 string cMsgMessage::getReceiverHost(void) const throw(cMsgException) {
00780 
00781   const char *s;
00782 
00783   int stat;
00784   if((stat=cMsgGetReceiverHost(myMsgPointer,&s))!=CMSG_OK) {
00785     throw(cMsgException(cMsgPerror(stat),stat));
00786   };
00787 
00788   if(s==NULL) {
00789     return("null");
00790   } else {
00791     return(string(s));
00792   }
00793 }
00794 
00795 
00796 //-----------------------------------------------------------------------------
00797 
00798 
00804 string cMsgMessage::getSender(void) const throw(cMsgException) {
00805 
00806   const char *s;
00807 
00808   int stat;
00809   if((stat=cMsgGetSender(myMsgPointer,&s))!=CMSG_OK) {
00810     throw(cMsgException(cMsgPerror(stat),stat));
00811   };
00812 
00813   if(s==NULL) {
00814     return("null");
00815   } else {
00816     return(string(s));
00817   }
00818 }
00819 
00820 
00821 //-----------------------------------------------------------------------------
00822 
00823 
00829 string cMsgMessage::getSenderHost(void) const throw(cMsgException) {
00830 
00831   const char *s;
00832 
00833   int stat;
00834   if((stat=cMsgGetSenderHost(myMsgPointer,&s))!=CMSG_OK) {
00835     throw(cMsgException(cMsgPerror(stat),stat));
00836   };
00837 
00838   if(s==NULL) {
00839     return("null");
00840   } else {
00841     return(string(s));
00842   }
00843 }
00844 
00845 
00846 //-----------------------------------------------------------------------------
00847 
00848 
00854 struct timespec cMsgMessage::getReceiverTime(void) const throw(cMsgException) {
00855 
00856   struct timespec t;
00857 
00858   int stat;
00859   if((stat=cMsgGetReceiverTime(myMsgPointer,&t))!=CMSG_OK) {
00860     throw(cMsgException(cMsgPerror(stat),stat));
00861   }
00862   return(t);
00863 }
00864 
00865 
00866 //-----------------------------------------------------------------------------
00867 
00868 
00874 struct timespec cMsgMessage::getSenderTime(void) const throw(cMsgException) {
00875 
00876   struct timespec t;
00877 
00878   int stat;
00879   if((stat=cMsgGetSenderTime(myMsgPointer,&t))!=CMSG_OK) {
00880     throw(cMsgException(cMsgPerror(stat),stat));
00881   }
00882   return(t);
00883 }
00884 
00885 
00886 //-----------------------------------------------------------------------------
00887 
00888 
00894 bool cMsgMessage::isGetRequest(void) const throw(cMsgException) {
00895   
00896   int b;
00897 
00898   int stat;
00899   if((stat=cMsgGetGetRequest(myMsgPointer,&b))!=CMSG_OK) {
00900     throw(cMsgException(cMsgPerror(stat),stat));
00901   }
00902   return(b);
00903 }
00904 
00905 
00906 //-----------------------------------------------------------------------------
00907 
00908 
00914 bool cMsgMessage::isGetResponse(void) const throw(cMsgException) {
00915   
00916   int b;
00917 
00918   int stat;
00919   if((stat=cMsgGetGetResponse(myMsgPointer,&b))!=CMSG_OK) {
00920     throw(cMsgException(cMsgPerror(stat),stat));
00921   }
00922   return(b);
00923 }
00924 
00925 
00926 //-----------------------------------------------------------------------------
00927 
00928 
00934 bool cMsgMessage::isNullGetResponse(void) const throw(cMsgException) {
00935   
00936   int b;
00937 
00938   int stat;
00939   if((stat=cMsgGetNullGetResponse(myMsgPointer,&b))!=CMSG_OK) {
00940     throw(cMsgException(cMsgPerror(stat),stat));
00941   }
00942   return(b);
00943 }
00944 
00945 
00946 //-----------------------------------------------------------------------------
00947 
00948 
00954 int cMsgMessage::getByteArrayEndian(void) const throw(cMsgException) {
00955 
00956   int stat,endian;
00957 
00958   if((stat=cMsgGetByteArrayEndian(myMsgPointer,&endian))!=CMSG_OK) {
00959     throw(cMsgException(cMsgPerror(stat),stat));
00960   }
00961 
00962   return(endian);
00963 }
00964 
00965 
00966 //-----------------------------------------------------------------------------
00967 
00968 
00974 void cMsgMessage::setByteArrayEndian(int endian) throw(cMsgException) {
00975   int stat;
00976 
00977   if((stat=cMsgSetByteArrayEndian(myMsgPointer,endian))!=CMSG_OK) {
00978     throw(cMsgException(cMsgPerror(stat),stat));
00979   }
00980 
00981   return;
00982 }
00983 
00984 
00985 //-----------------------------------------------------------------------------
00986 
00987 
00993 bool cMsgMessage::needToSwap(void) const throw(cMsgException) {
00994 
00995   int flag,stat;
00996 
00997   if((stat=cMsgNeedToSwap(myMsgPointer,&flag))!=CMSG_OK) {
00998     throw(cMsgException(cMsgPerror(stat),stat));
00999   }
01000 
01001   return(flag==1);
01002 }
01003 
01004 
01005 //-----------------------------------------------------------------------------
01006 
01007 
01013 void cMsgMessage::makeNullResponse(cMsgMessage &msg) throw(cMsgException) {
01014 
01015   cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01016   cMsgMessage_t *m = (cMsgMessage_t*)msg.myMsgPointer;
01017 
01018   t->sysMsgId    = m->sysMsgId;
01019   t->senderToken = m->senderToken;
01020   t->info        = CMSG_IS_GET_RESPONSE | CMSG_IS_NULL_GET_RESPONSE;
01021 }
01022 
01023 
01024 //-----------------------------------------------------------------------------
01025 
01026 
01032 void cMsgMessage::makeNullResponse(cMsgMessage *msg) throw(cMsgException) {
01033 
01034   cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01035   cMsgMessage_t *m = (cMsgMessage_t*)msg->myMsgPointer;
01036 
01037   t->sysMsgId    = m->sysMsgId;
01038   t->senderToken = m->senderToken;
01039   t->info        = CMSG_IS_GET_RESPONSE | CMSG_IS_NULL_GET_RESPONSE;
01040 }
01041 
01042 
01043 //-----------------------------------------------------------------------------
01044 
01045 
01051 void cMsgMessage::makeResponse(cMsgMessage &msg) throw(cMsgException) {
01052 
01053   cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01054   cMsgMessage_t *m = (cMsgMessage_t*)msg.myMsgPointer;
01055 
01056   t->sysMsgId    = m->sysMsgId;
01057   t->senderToken = m->senderToken;
01058   t->info        = CMSG_IS_GET_RESPONSE;
01059 }
01060 
01061 
01062 //-----------------------------------------------------------------------------
01063 
01064 
01070 void cMsgMessage::makeResponse(cMsgMessage *msg) throw(cMsgException) {
01071 
01072   cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01073   cMsgMessage_t *m = (cMsgMessage_t*)msg->myMsgPointer;
01074 
01075   t->sysMsgId    = m->sysMsgId;
01076   t->senderToken = m->senderToken;
01077   t->info        = CMSG_IS_GET_RESPONSE;
01078 }
01079 
01080 
01081 //-----------------------------------------------------------------------------
01082 
01083 
01089 cMsgMessage *cMsgMessage::nullResponse(void) const throw(cMsgException) {
01090 
01091   void *newMsgPointer;
01092   if((newMsgPointer=cMsgCreateNullResponseMessage(myMsgPointer))==NULL) {
01093     throw(cMsgException("?cMsgMessage::nullResponse...unable to create message",CMSG_ERROR));
01094   }
01095 
01096   return(new cMsgMessage(newMsgPointer));
01097 }
01098 
01099 
01100 //-----------------------------------------------------------------------------
01101 
01102 
01108 cMsgMessage *cMsgMessage::response(void) const throw(cMsgException) {
01109 
01110   void *newMsgPointer;
01111   if((newMsgPointer=cMsgCreateResponseMessage(myMsgPointer))==NULL) {
01112     throw(cMsgException("?cMsgMessage::response...unable to create message",CMSG_ERROR));
01113   }
01114 
01115   return(new cMsgMessage(newMsgPointer));
01116 }
01117 
01118 
01119 //-----------------------------------------------------------------------------
01120 
01121 
01127 void cMsgMessage::setGetResponse(bool b) throw(cMsgException) {
01128 
01129   int stat;
01130   if((stat=cMsgSetGetResponse(myMsgPointer,b))!=CMSG_OK) {
01131     throw(cMsgException(cMsgPerror(stat),stat));
01132   }
01133 }
01134 
01135 
01136 //-----------------------------------------------------------------------------
01137 
01138 
01144 void cMsgMessage::setNullGetResponse(bool b) throw(cMsgException) {
01145 
01146   int stat;
01147   if((stat=cMsgSetNullGetResponse(myMsgPointer,b))!=CMSG_OK) {
01148     throw(cMsgException(cMsgPerror(stat),stat));
01149   }
01150 }
01151 
01152 
01153 //-----------------------------------------------------------------------------
01154 
01155 
01161 string cMsgMessage::toString(void) const throw(cMsgException) {
01162 
01163   char *cs;
01164 
01165   cMsgToString(myMsgPointer,&cs,1);
01166   string s(cs);
01167   free(cs);
01168   return(s);
01169            
01170 }
01171 
01172 
01173 //-----------------------------------------------------------------------------
01174 //   message context accessor functions
01175 //-----------------------------------------------------------------------------
01176 
01177 
01183 string cMsgMessage::getSubscriptionDomain(void) const throw(cMsgException) {
01184 
01185   const char *s;
01186 
01187   int stat;
01188   if((stat=cMsgGetSubscriptionDomain(myMsgPointer,&s))!=CMSG_OK) {
01189     throw(cMsgException(cMsgPerror(stat),stat));
01190   };
01191 
01192   if(s==NULL) {
01193     return("null");
01194   } else {
01195     return(string(s));
01196   }
01197 }
01198 
01199 
01200 //-----------------------------------------------------------------------------
01201 
01202 
01208 string cMsgMessage::getSubscriptionSubject(void) const throw(cMsgException) {
01209 
01210   const char *s;
01211 
01212   int stat;
01213   if((stat=cMsgGetSubscriptionSubject(myMsgPointer,&s))!=CMSG_OK) {
01214     throw(cMsgException(cMsgPerror(stat),stat));
01215   };
01216 
01217   if(s==NULL) {
01218     return("null");
01219   } else {
01220     return(string(s));
01221   }
01222 }
01223 
01224 
01225 //-----------------------------------------------------------------------------
01226 
01227 
01233 string cMsgMessage::getSubscriptionType(void) const throw(cMsgException) {
01234 
01235   const char *s;
01236 
01237   int stat;
01238   if((stat=cMsgGetSubscriptionType(myMsgPointer,&s))!=CMSG_OK) {
01239     throw(cMsgException(cMsgPerror(stat),stat));
01240   };
01241 
01242   if(s==NULL) {
01243     return("null");
01244   } else {
01245     return(string(s));
01246   }
01247 }
01248 
01249 
01250 //-----------------------------------------------------------------------------
01251 
01252 
01258 string cMsgMessage::getSubscriptionUDL(void) const throw(cMsgException) {
01259 
01260   const char *s;
01261 
01262   int stat;
01263   if((stat=cMsgGetSubscriptionUDL(myMsgPointer,&s))!=CMSG_OK) {
01264     throw(cMsgException(cMsgPerror(stat),stat));
01265   };
01266 
01267   if(s==NULL) {
01268     return("null");
01269   } else {
01270     return(string(s));
01271   }
01272 }
01273 
01274 
01275 //-----------------------------------------------------------------------------
01276 
01277 
01283 int cMsgMessage::getSubscriptionCueSize(void) const throw(cMsgException) {
01284 
01285   int i;
01286 
01287   int stat;
01288   if((stat=cMsgGetSubscriptionCueSize(myMsgPointer,&i))!=CMSG_OK) {
01289     throw(cMsgException(cMsgPerror(stat),stat));
01290   }
01291   return(i);
01292 }
01293 
01294 
01295 //-----------------------------------------------------------------------------
01296 
01297 
01303 bool cMsgMessage::getReliableSend(void) const throw(cMsgException) {
01304 
01305   int i;
01306 
01307   int stat;
01308   if((stat=cMsgGetReliableSend(myMsgPointer,&i))!=CMSG_OK) {
01309     throw(cMsgException(cMsgPerror(stat),stat));
01310   }
01311   return(i == 0 ? false : true);
01312 }
01313 
01314 
01315 //-----------------------------------------------------------------------------
01316 
01317 
01323 void cMsgMessage::setReliableSend(bool b) throw(cMsgException) {
01324 
01325   int i = b ? 1 : 0;
01326   
01327   int stat;
01328   if((stat=cMsgSetReliableSend(myMsgPointer,i))!=CMSG_OK) {
01329     throw(cMsgException(cMsgPerror(stat),stat));
01330   }
01331 }
01332 
01333 
01334 //-----------------------------------------------------------------------------
01335 // cMsgSubscriptionConfig methods
01336 //-----------------------------------------------------------------------------
01337 
01338 
01342 cMsgSubscriptionConfig::cMsgSubscriptionConfig(void) {
01343   config = cMsgSubscribeConfigCreate();
01344 }
01345 
01346 
01347 //-----------------------------------------------------------------------------
01348 
01349 
01353 cMsgSubscriptionConfig::~cMsgSubscriptionConfig(void) {
01354   cMsgSubscribeConfigDestroy(config);
01355 }
01356 
01357 
01358 //-----------------------------------------------------------------------------
01359 
01360 
01366 int cMsgSubscriptionConfig::getMaxCueSize(void) {
01367   int size;
01368   cMsgSubscribeGetMaxCueSize(config,&size);
01369   return(size);
01370 }
01371 
01372 
01373 //-----------------------------------------------------------------------------
01374 
01375 
01381 void cMsgSubscriptionConfig::setMaxCueSize(int size) {
01382   cMsgSubscribeSetMaxCueSize(config,size);
01383 }
01384 
01385 
01386 //-----------------------------------------------------------------------------
01387 
01388 
01394 int cMsgSubscriptionConfig::getSkipSize(void) {
01395   int size;
01396   cMsgSubscribeGetSkipSize(config,&size);
01397   return(size);
01398 }
01399 
01400 
01401 //-----------------------------------------------------------------------------
01402 
01403 
01409 void cMsgSubscriptionConfig::setSkipSize(int size) {
01410   cMsgSubscribeSetSkipSize(config,size);
01411 }
01412 
01413 
01414 //-----------------------------------------------------------------------------
01415 
01416 
01422 bool cMsgSubscriptionConfig::getMaySkip(void) {
01423   int maySkip;
01424   cMsgSubscribeGetMaySkip(config,&maySkip);
01425   return((maySkip==0)?false:true);
01426 }
01427 
01428 
01429 //-----------------------------------------------------------------------------
01430 
01431 
01437 void cMsgSubscriptionConfig::setMaySkip(bool maySkip) {
01438   cMsgSubscribeSetMaySkip(config,(maySkip)?1:0);
01439 }
01440 
01441 
01442 //-----------------------------------------------------------------------------
01443 
01444 
01450 bool cMsgSubscriptionConfig::getMustSerialize(void) {
01451   int maySerialize;
01452   cMsgSubscribeGetMustSerialize(config,&maySerialize);
01453   return((maySerialize==0)?false:true);
01454 }
01455 
01456 
01457 //-----------------------------------------------------------------------------
01458 
01459 
01465 void cMsgSubscriptionConfig::setMustSerialize(bool mustSerialize) {
01466   cMsgSubscribeSetMustSerialize(config,(mustSerialize)?1:0);
01467 }
01468 
01469 
01470 //-----------------------------------------------------------------------------
01471 
01472 
01478 int cMsgSubscriptionConfig::getMaxThreads(void) {
01479   int max;
01480   cMsgSubscribeGetMaxThreads(config,&max);
01481   return(max);
01482 }
01483 
01484 
01485 //-----------------------------------------------------------------------------
01486 
01487 
01493 void cMsgSubscriptionConfig::setMaxThreads(int max) {
01494   cMsgSubscribeSetMaxThreads(config,max);
01495 }
01496 
01497 
01498 //-----------------------------------------------------------------------------
01499 
01500 
01506 int cMsgSubscriptionConfig::getMessagesPerThread(void) {
01507   int mpt;
01508   cMsgSubscribeGetMessagesPerThread(config,&mpt);
01509   return(mpt);
01510 }
01511 
01512 
01513 //-----------------------------------------------------------------------------
01514 
01515 
01521 void cMsgSubscriptionConfig::setMessagesPerThread(int mpt) {
01522   cMsgSubscribeSetMessagesPerThread(config,mpt);
01523 }
01524 
01525 
01526 //-----------------------------------------------------------------------------
01527 
01528 
01534 size_t cMsgSubscriptionConfig::getStackSize(void) {
01535   size_t size;
01536   cMsgSubscribeGetStackSize(config,&size);
01537   return(size);
01538 }
01539 
01540 
01541 //-----------------------------------------------------------------------------
01542 
01543 
01549 void cMsgSubscriptionConfig::setStackSize(size_t size) {
01550   cMsgSubscribeSetStackSize(config,size);
01551 }
01552 
01553 
01554 //-----------------------------------------------------------------------------
01555 //  cMsg methods
01556 //-----------------------------------------------------------------------------
01557 
01558 
01566 cMsg::cMsg(const string &UDL, const string &name, const string &descr)
01567   : myUDL(UDL), myName(name), myDescr(descr), initialized(false) {
01568 }
01569 
01570 
01571 //-----------------------------------------------------------------------------
01572 
01573 
01577 cMsg::~cMsg(void) {
01578   cMsg::disconnect();
01579 }
01580 
01581 
01582 //-----------------------------------------------------------------------------
01583 
01584 
01588 void cMsg::connect(void) throw(cMsgException) {
01589 
01590   if(initialized)throw(cMsgException(cMsgPerror(CMSG_ALREADY_INIT),CMSG_ALREADY_INIT));
01591 
01592 
01593   int stat;
01594   if((stat=cMsgConnect(myUDL.c_str(),myName.c_str(),myDescr.c_str(),&myDomainId))!=CMSG_OK) {
01595     throw(cMsgException(cMsgPerror(stat),stat));
01596   }
01597   initialized=true;
01598 }
01599 
01600 
01601 //-----------------------------------------------------------------------------
01602 
01603 
01607 void cMsg::disconnect(void) throw(cMsgException) {
01608 
01609   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01610 
01611 
01612   cMsgDisconnect(&myDomainId);
01613 }
01614 
01615 
01616 //-----------------------------------------------------------------------------
01617 
01618 
01624 void cMsg::send(cMsgMessage &msg) throw(cMsgException) {
01625     
01626   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01627 
01628 
01629   int stat;
01630   if((stat=cMsgSend(myDomainId,msg.myMsgPointer))!=CMSG_OK) {
01631     throw(cMsgException(cMsgPerror(stat),stat));
01632   }
01633 }
01634 
01635 
01636 //-----------------------------------------------------------------------------
01637 
01638 
01644 void cMsg::send(cMsgMessage *msg) throw(cMsgException) {
01645   cMsg::send(*msg);
01646 }
01647 
01648 
01649 //-----------------------------------------------------------------------------
01650 
01651 
01658 int cMsg::syncSend(cMsgMessage &msg, const struct timespec *timeout) throw(cMsgException) {
01659     
01660   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01661 
01662 
01663   int response;
01664 
01665   int stat;
01666   if((stat=cMsgSyncSend(myDomainId,msg.myMsgPointer,timeout,&response))!=CMSG_OK) {
01667     throw(cMsgException(cMsgPerror(stat),stat));
01668   }
01669   return(response);
01670 }
01671 
01672 
01673 //-----------------------------------------------------------------------------
01674 
01675 
01682 int cMsg::syncSend(cMsgMessage *msg, const struct timespec *timeout) throw(cMsgException) {
01683   return(cMsg::syncSend(*msg, timeout));
01684 }
01685 
01686 
01687 //-----------------------------------------------------------------------------
01688 
01689 
01701 void *cMsg::subscribe(const string &subject, const string &type, cMsgCallback *cb, void *userArg,
01702                       const cMsgSubscriptionConfig *cfg) throw(cMsgException) {
01703     
01704   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01705 
01706 
01707   int stat;
01708   void *handle;
01709 
01710 
01711   // check if this is a duplicate subscription
01712   if(subscriptionExists(myDomainId,subject,type,cb,userArg))
01713     throw(cMsgException(cMsgPerror(CMSG_ALREADY_EXISTS),CMSG_ALREADY_EXISTS));
01714 
01715 
01716   // create and fill dispatcher struct
01717   dispatcherStruct *d = new dispatcherStruct();
01718   d->cb=cb;
01719   d->userArg=userArg;
01720 
01721 
01722   // subscribe and get handle
01723   stat=cMsgSubscribe(myDomainId,
01724                      (subject.size()<=0)?NULL:subject.c_str(),
01725                      (type.size()<=0)?NULL:type.c_str(),
01726                      callbackDispatcher,
01727                      (void*)d,
01728                      (cfg==NULL)?NULL:(cfg->config),
01729                      &handle);
01730 
01731 
01732   // check if subscription accepted
01733   if(stat!=CMSG_OK) throw(cMsgException(cMsgPerror(stat),stat));
01734 
01735 
01736   // add this subscription to internal list
01737   addSubscription(myDomainId,subject,type,d,handle);
01738 
01739 
01740   return(handle);
01741 }
01742 
01743 
01744 //-----------------------------------------------------------------------------
01745 
01746 
01758 void *cMsg::subscribe(const string &subject, const string &type, cMsgCallback &cb, void *userArg,
01759                       const cMsgSubscriptionConfig *cfg) throw(cMsgException) {
01760   return(cMsg::subscribe(subject, type, &cb, userArg, cfg));
01761 }
01762 
01763 
01764 //-----------------------------------------------------------------------------
01765 
01766 
01772 void cMsg::unsubscribe(void *handle) throw(cMsgException) {
01773 
01774   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01775 
01776 
01777   int stat;
01778 
01779 
01780   // remove subscription from internal list
01781   if(!deleteSubscription(myDomainId,handle)) {
01782     throw(cMsgException(cMsgPerror(CMSG_BAD_ARGUMENT),CMSG_BAD_ARGUMENT));
01783   }
01784 
01785 
01786   // unsubscribe
01787   if((stat=cMsgUnSubscribe(myDomainId,handle))!=CMSG_OK) {
01788     throw(cMsgException(cMsgPerror(stat),stat));
01789   }
01790 
01791 }
01792 
01793 
01794 //-----------------------------------------------------------------------------
01795 
01796 
01805 cMsgMessage *cMsg::sendAndGet(cMsgMessage &sendMsg, const struct timespec *timeout) throw(cMsgException) {
01806     
01807   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01808 
01809 
01810   void *replyPtr;
01811 
01812   int stat;
01813   if((stat=cMsgSendAndGet(myDomainId,sendMsg.myMsgPointer,timeout,&replyPtr))!=CMSG_OK) {
01814     throw(cMsgException(cMsgPerror(stat),stat));
01815   }
01816 
01817   return(new cMsgMessage(replyPtr));
01818 }
01819 
01820 
01821 //-----------------------------------------------------------------------------
01822 
01823 
01832 cMsgMessage *cMsg::sendAndGet(cMsgMessage *sendMsg, const struct timespec *timeout) throw(cMsgException) {
01833 
01834   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01835 
01836 
01837   void *replyPtr;
01838 
01839   int stat;
01840   if((stat=cMsgSendAndGet(myDomainId,sendMsg->myMsgPointer,timeout,&replyPtr))!=CMSG_OK) {
01841     throw(cMsgException(cMsgPerror(stat),stat));
01842   }
01843 
01844   return(new cMsgMessage(replyPtr));
01845 }
01846 
01847 //-----------------------------------------------------------------------------
01848 
01849 
01859 cMsgMessage *cMsg::subscribeAndGet(const string &subject, const string &type, const struct timespec *timeout) throw(cMsgException) {
01860 
01861   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01862 
01863 
01864   void *replyPtr;
01865 
01866   int stat;
01867   if((stat=cMsgSubscribeAndGet(myDomainId,subject.c_str(),type.c_str(),timeout,&replyPtr))!=CMSG_OK) {
01868     throw(cMsgException(cMsgPerror(stat),stat));
01869   }
01870 
01871   return(new cMsgMessage(replyPtr));
01872 }
01873 
01874 
01875 //-----------------------------------------------------------------------------
01876 
01877 
01883 void cMsg::flush(const struct timespec *timeout) throw(cMsgException) {
01884     
01885   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01886 
01887 
01888   int stat;
01889   if((stat=cMsgFlush(myDomainId, timeout))!=CMSG_OK) {
01890     throw(cMsgException(cMsgPerror(stat),stat));
01891   }
01892 }
01893 
01894 
01895 //-----------------------------------------------------------------------------
01896 
01897 
01901 void cMsg::start(void) throw(cMsgException) {
01902 
01903   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01904 
01905 
01906   int stat;
01907   if((stat=cMsgReceiveStart(myDomainId))!=CMSG_OK) {
01908     throw(cMsgException(cMsgPerror(stat),stat));
01909   }
01910 }
01911 
01912 
01913 //-----------------------------------------------------------------------------
01914 
01915 
01919 void cMsg::stop(void) throw(cMsgException) {
01920 
01921   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01922 
01923 
01924   int stat;
01925   if((stat=cMsgReceiveStop(myDomainId))!=CMSG_OK) {
01926     throw(cMsgException(cMsgPerror(stat),stat));
01927   }
01928 }
01929 
01930 
01931 //-----------------------------------------------------------------------------
01932 
01933 
01939 string cMsg::getDescription(void) const {
01940   return(myDescr);
01941 }
01942 
01943 
01944 //-----------------------------------------------------------------------------
01945 
01946 
01952 string cMsg::getName(void) const {
01953   return(myName);
01954 }
01955 
01956 
01957 //-----------------------------------------------------------------------------
01958 
01959 
01965 string cMsg::getUDL(void) const{
01966   return(myUDL);
01967 }
01968 
01969 
01970 //-----------------------------------------------------------------------------
01971 
01972 
01978 bool cMsg::isConnected(void) const {
01979 
01980   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01981 
01982 
01983   int stat,connected;
01984   if((stat=cMsgGetConnectState(myDomainId,&connected))!=CMSG_OK) {
01985     throw(cMsgException(cMsgPerror(stat),stat));
01986   }
01987   return(connected==1);
01988 }
01989 
01990 
01991 //-----------------------------------------------------------------------------
01992 
01993 
01999 bool cMsg::isReceiving(void) const {
02000 
02001   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02002 
02003 
02004   int stat,receiving;
02005   if((stat=cMsgGetReceiveState(myDomainId,&receiving))!=CMSG_OK) {
02006     throw(cMsgException(cMsgPerror(stat),stat));
02007   }
02008   return(receiving==1);
02009 }
02010 
02011 
02012 //-----------------------------------------------------------------------------
02013 
02014 
02021 void cMsg::setShutdownHandler(cMsgShutdownHandler *handler, void* userArg) throw(cMsgException) {
02022 
02023   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02024 
02025 
02026   int stat;
02027   if((stat=cMsgSetShutdownHandler(myDomainId,handler,userArg))!=CMSG_OK) {
02028     throw(cMsgException(cMsgPerror(stat),stat));
02029   }
02030 }
02031 
02032 
02033 //-----------------------------------------------------------------------------
02034 
02035 
02042 void cMsg::shutdownClients(const string &client, int flag) throw(cMsgException) {
02043 
02044   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02045 
02046 
02047   int stat;
02048   if((stat=cMsgShutdownClients(myDomainId,client.c_str(),flag))!=CMSG_OK) {
02049     throw(cMsgException(cMsgPerror(stat),stat));
02050   }
02051 }
02052 
02053 
02054 //-----------------------------------------------------------------------------
02055 
02056 
02063 void cMsg::shutdownServers(const string &server, int flag) throw(cMsgException) {
02064 
02065   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02066 
02067 
02068   int stat;
02069   if((stat=cMsgShutdownServers(myDomainId,server.c_str(),flag))!=CMSG_OK) {
02070     throw(cMsgException(cMsgPerror(stat),stat));
02071   }
02072 }
02073 
02074 
02075 //-----------------------------------------------------------------------------
02076 
02077 
02085 cMsgMessage *cMsg::monitor(const string &monString) throw(cMsgException) {
02086 
02087   if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02088 
02089   void *m = cMsgCreateMessage();
02090   if(m==NULL)throw(cMsgException("?cMsgMessage constructor...unable to create message",CMSG_ERROR));
02091 
02092   int stat;
02093   if((stat=cMsgMonitor(myDomainId,monString.c_str(),&m))!=CMSG_OK) {
02094     cMsgFreeMessage(&m);
02095     throw(cMsgException(cMsgPerror(stat),stat));
02096   }
02097 
02098   return(new cMsgMessage(m));
02099 }
02100 
02101 
02102 //-----------------------------------------------------------------------------
02103 //-----------------------------------------------------------------------------

Generated on Wed Feb 6 13:37:50 2008 for cMsg Messaging System by  doxygen 1.3.9.1