00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <cMsg.hxx>
00025 #include <cMsgPrivate.h>
00026 #include <stdio.h>
00027 #include <stdlib.h>
00028 #include <unistd.h>
00029 #include <pthread.h>
00030 #include <vector>
00031 #include <sstream>
00032
00033 using namespace std;
00034 using namespace cmsg;
00035
00036
00037
00038
00039
00040
00041
00042
00043 namespace cmsg {
00044
00045
00047 typedef struct {
00048 cMsgCallback *cb;
00049 void *userArg;
00050 } dispatcherStruct;
00051
00052
00054 typedef struct {
00055 void *domainId;
00056 void *handle;
00057 string subject;
00058 string type;
00059 dispatcherStruct *d;
00060 } subscrStruct;
00061
00062
00064 static vector<subscrStruct*> subscrVec;
00065
00066
00068 static pthread_mutex_t subscrMutex = PTHREAD_MUTEX_INITIALIZER;
00069
00070
00071
00072
00073
00075 static void callbackDispatcher(void *msg, void *userArg) {
00076 dispatcherStruct *ds = (dispatcherStruct*)userArg;
00077 ds->cb->callback(new cMsgMessage(msg),ds->userArg);
00078 }
00079
00080
00081
00082
00083
00095 static bool subscriptionExists(void *domainId, const string &subject, const string &type,
00096 cMsgCallback *cb, void *userArg) {
00097
00098 bool itExists = false;
00099
00100
00101
00102 pthread_mutex_lock(&subscrMutex);
00103 for(unsigned int i=0; i<subscrVec.size(); i++) {
00104 if( (subscrVec[i]->domainId == domainId) &&
00105 (subscrVec[i]->subject == subject) &&
00106 (subscrVec[i]->type == type) &&
00107 (subscrVec[i]->d->cb == cb) &&
00108 (subscrVec[i]->d->userArg == userArg)
00109 ) {
00110 itExists=true;
00111 break;
00112 }
00113 }
00114 pthread_mutex_unlock(&subscrMutex);
00115
00116
00117
00118 return(itExists);
00119 }
00120
00121
00122
00123
00124
00134 static void addSubscription(void *domainId, const string &subject, const string &type,
00135 dispatcherStruct *d, void *handle) {
00136
00137 subscrStruct *s = new subscrStruct();
00138
00139 s->domainId=domainId;
00140 s->subject=subject;
00141 s->type=type;
00142 s->d=d;
00143 s->handle=handle;
00144
00145 pthread_mutex_lock(&subscrMutex);
00146 subscrVec.push_back(s);
00147 pthread_mutex_unlock(&subscrMutex);
00148
00149 return;
00150 }
00151
00152
00153
00154
00155
00164 static bool deleteSubscription(void *domainId, void *handle) {
00165
00166 bool deleted = false;
00167 vector<subscrStruct*>::iterator iter;
00168
00169 pthread_mutex_lock(&subscrMutex);
00170 for(iter=subscrVec.begin(); iter!=subscrVec.end(); iter++) {
00171 if(((*iter)->domainId==domainId)&&((*iter)->handle==handle)) {
00172 delete((*iter)->d);
00173 delete(*iter);
00174 subscrVec.erase(iter);
00175 deleted=true;
00176 break;
00177 }
00178 }
00179 pthread_mutex_unlock(&subscrMutex);
00180
00181 return(deleted);
00182 }
00183
00184
00185
00186
00187 }
00188
00189
00190
00191
00192
00193
00194
00195
00199 cMsgException::cMsgException(void) : descr(""), returnCode(0) {}
00200
00201
00202
00203
00204
00210 cMsgException::cMsgException(const string &c) : descr(c), returnCode(0) {}
00211
00212
00213
00214
00215
00222 cMsgException::cMsgException(const string &c, int code) : descr(c), returnCode(code) {}
00223
00224
00225
00226
00227
00233 cMsgException::cMsgException(const cMsgException &e) : descr(e.descr), returnCode(e.returnCode) {}
00234
00235
00236
00237
00238
00242 cMsgException::~cMsgException(void) throw() {
00243 }
00244
00245
00246
00247
00248
00254 string cMsgException::toString(void) const throw() {
00255 stringstream ss;
00256 ss << "?cMsgException returnCode = " << returnCode << " descr = " << descr << ends;
00257 return(ss.str());
00258 }
00259
00260
00261
00262
00263
00269 const char *cMsgException::what(void) const throw() {
00270 return(toString().c_str());
00271 }
00272
00273
00274
00275
00276
00277
00278
00282 cMsgMessage::cMsgMessage(void) throw(cMsgException) {
00283
00284 myMsgPointer=cMsgCreateMessage();
00285 if(myMsgPointer==NULL) {
00286 throw(cMsgException("?cMsgMessage constructor...unable to create message",CMSG_ERROR));
00287 }
00288 }
00289
00290
00291
00292
00293
00299 cMsgMessage::cMsgMessage(const cMsgMessage &msg) throw(cMsgException) {
00300
00301 myMsgPointer=cMsgCopyMessage(msg.myMsgPointer);
00302 if(myMsgPointer==NULL) {
00303 throw(cMsgException("?cMsgMessage copy constructor...unable to create message",CMSG_ERROR));
00304 }
00305 }
00306
00307
00308
00309
00310
00316 cMsgMessage::cMsgMessage(void *msgPointer) throw(cMsgException) {
00317
00318 myMsgPointer=msgPointer;
00319 if(myMsgPointer==NULL) {
00320 throw(cMsgException("?cMsgMessage pointer constructor...unable to create message",CMSG_ERROR));
00321 }
00322 }
00323
00324
00325
00326
00327
00331 cMsgMessage::~cMsgMessage(void) {
00332 if(myMsgPointer!=NULL)cMsgFreeMessage(&myMsgPointer);
00333 }
00334
00335
00336
00337
00338
00344 string cMsgMessage::getSubject(void) const throw(cMsgException) {
00345
00346 const char *s;
00347
00348 int stat;
00349 if((stat=cMsgGetSubject(myMsgPointer,&s))!=CMSG_OK) {
00350 throw(cMsgException(cMsgPerror(stat),stat));
00351 }
00352
00353 if(s==NULL) {
00354 return("null");
00355 } else {
00356 return(string(s));
00357 }
00358 }
00359
00360
00361
00362
00363
00369 void cMsgMessage::setSubject(const string &subject) throw(cMsgException) {
00370
00371 int stat;
00372 if((stat=cMsgSetSubject(myMsgPointer,subject.c_str()))!=CMSG_OK) {
00373 throw(cMsgException(cMsgPerror(stat),stat));
00374 }
00375 }
00376
00377
00378
00379
00380
00386 string cMsgMessage::getType(void) const throw(cMsgException) {
00387
00388 const char *s;
00389
00390 int stat;
00391 if((stat=cMsgGetType(myMsgPointer,&s))!=CMSG_OK) {
00392 throw(cMsgException(cMsgPerror(stat),stat));
00393 }
00394
00395 if(s==NULL) {
00396 return("null");
00397 } else {
00398 return(string(s));
00399 }
00400 }
00401
00402
00403
00404
00405
00411 void cMsgMessage::setType(const string &type) throw(cMsgException) {
00412
00413 int stat;
00414 if((stat=cMsgSetType(myMsgPointer,type.c_str()))!=CMSG_OK) {
00415 throw(cMsgException(cMsgPerror(stat),stat));
00416 }
00417 }
00418
00419
00420
00421
00422
00428 string cMsgMessage::getText(void) const throw(cMsgException) {
00429
00430 const char *s;
00431
00432 int stat;
00433 if((stat=cMsgGetText(myMsgPointer,&s))!=CMSG_OK) {
00434 throw(cMsgException(cMsgPerror(stat),stat));
00435 }
00436
00437 if(s==NULL) {
00438 return("null");
00439 } else {
00440 return(string(s));
00441 }
00442 }
00443
00444
00445
00446
00447
00453 void cMsgMessage::setText(const string &text) throw(cMsgException) {
00454
00455 int stat;
00456 if((stat=cMsgSetText(myMsgPointer,text.c_str()))!=CMSG_OK) {
00457 throw(cMsgException(cMsgPerror(stat),stat));
00458 }
00459 }
00460
00461
00462
00463
00464
00470 void cMsgMessage::setByteArrayLength(int length) throw(cMsgException) {
00471
00472 int stat;
00473 if((stat=cMsgSetByteArrayLength(myMsgPointer,length)!=CMSG_OK)) {
00474 throw(cMsgException(cMsgPerror(stat),stat));
00475 }
00476 }
00477
00478
00479
00480
00481
00487 int cMsgMessage::getByteArrayLength(void) const throw(cMsgException) {
00488
00489 int i;
00490
00491 int stat;
00492 if((stat=cMsgGetByteArrayLength(myMsgPointer,&i))!=CMSG_OK) {
00493 throw(cMsgException(cMsgPerror(stat),stat));
00494 }
00495 return(i);
00496 }
00497
00498
00499
00500
00501
00507 void cMsgMessage::setByteArrayOffset(int offset) throw(cMsgException) {
00508
00509 int stat;
00510 if((stat=cMsgSetByteArrayOffset(myMsgPointer,offset)!=CMSG_OK)) {
00511 throw(cMsgException(cMsgPerror(stat),stat));
00512 }
00513 }
00514
00515
00516
00517
00518
00524 int cMsgMessage::getByteArrayOffset(void) const throw(cMsgException) {
00525
00526 int i;
00527
00528 int stat;
00529 if((stat=cMsgGetByteArrayOffset(myMsgPointer,&i))!=CMSG_OK) {
00530 throw(cMsgException(cMsgPerror(stat),stat));
00531 }
00532 return(i);
00533 }
00534
00535
00536
00537
00538
00544 void cMsgMessage::setByteArray(char *array) throw(cMsgException) {
00545
00546 int stat;
00547 if((stat=cMsgSetByteArray(myMsgPointer,array)!=CMSG_OK)) {
00548 throw(cMsgException(cMsgPerror(stat),stat));
00549 }
00550 }
00551
00552
00553
00554
00555
00561 char* cMsgMessage::getByteArray(void) const throw(cMsgException) {
00562
00563 char *p;
00564
00565 int stat;
00566 if((stat=cMsgGetByteArray(myMsgPointer,&p)!=CMSG_OK)) {
00567 throw(cMsgException(cMsgPerror(stat),stat));
00568 }
00569 return(p);
00570 }
00571
00572
00573
00574
00575
00576
00584 void cMsgMessage::setByteArrayAndLimits(char *array, int offset, int length) throw(cMsgException) {
00585
00586 int stat;
00587 if((stat=cMsgSetByteArrayAndLimits(myMsgPointer,array,offset,length)!=CMSG_OK)) {
00588 throw(cMsgException(cMsgPerror(stat),stat));
00589 }
00590 }
00591
00592
00593
00594
00595
00603 void cMsgMessage::copyByteArray(char* array, int offset, int length) throw(cMsgException) {
00604
00605 int stat;
00606 if((stat=cMsgCopyByteArray(myMsgPointer,array,offset,length)!=CMSG_OK)) {
00607 throw(cMsgException(cMsgPerror(stat),stat));
00608 }
00609 }
00610
00611
00612
00613
00614
00620 int cMsgMessage::getUserInt(void) const throw(cMsgException) {
00621
00622 int i;
00623
00624 int stat;
00625 if((stat=cMsgGetUserInt(myMsgPointer,&i))!=CMSG_OK) {
00626 throw(cMsgException(cMsgPerror(stat),stat));
00627 }
00628 return(i);
00629 }
00630
00631
00632
00633
00634
00640 void cMsgMessage::setUserInt(int i) throw(cMsgException) {
00641
00642 int stat;
00643 if((stat=cMsgSetUserInt(myMsgPointer,i))!=CMSG_OK) {
00644 throw(cMsgException(cMsgPerror(stat),stat));
00645 }
00646 }
00647
00648
00649
00650
00651
00657 struct timespec cMsgMessage::getUserTime(void) const throw(cMsgException) {
00658
00659 struct timespec t;
00660
00661 int stat;
00662 if((stat=cMsgGetUserTime(myMsgPointer,&t))!=CMSG_OK) {
00663 throw(cMsgException(cMsgPerror(stat),stat));
00664 }
00665 return(t);
00666 }
00667
00668
00669
00670
00671
00677 void cMsgMessage::setUserTime(const struct timespec &userTime) throw(cMsgException) {
00678
00679 int stat;
00680 if((stat=cMsgSetUserTime(myMsgPointer, &userTime))!=CMSG_OK) {
00681 throw(cMsgException(cMsgPerror(stat),stat));
00682 }
00683 }
00684
00685
00686
00687
00688
00694 int cMsgMessage::getVersion(void) const throw(cMsgException) {
00695
00696 int version;
00697
00698 int stat;
00699 if((stat=cMsgGetVersion(myMsgPointer, &version))!=CMSG_OK) {
00700 throw(cMsgException(cMsgPerror(stat),stat));
00701 }
00702 return(version);
00703 }
00704
00705
00706
00707
00708
00714 cMsgMessage *cMsgMessage::copy(void) const throw(cMsgException) {
00715
00716 void *newPointer = cMsgCopyMessage(myMsgPointer);
00717 return(new cMsgMessage(newPointer));
00718 }
00719
00720
00721
00722
00723
00729 string cMsgMessage::getDomain(void) const throw(cMsgException) {
00730
00731 const char *s;
00732
00733 int stat;
00734 if((stat=cMsgGetDomain(myMsgPointer,&s))!=CMSG_OK) {
00735 throw(cMsgException(cMsgPerror(stat),stat));
00736 };
00737
00738 if(s==NULL) {
00739 return("null");
00740 } else {
00741 return(string(s));
00742 }
00743 }
00744
00745
00746
00747
00748
00754 string cMsgMessage::getReceiver(void) const throw(cMsgException) {
00755
00756 const char *s;
00757
00758 int stat;
00759 if((stat=cMsgGetReceiver(myMsgPointer,&s))!=CMSG_OK) {
00760 throw(cMsgException(cMsgPerror(stat),stat));
00761 };
00762
00763 if(s==NULL) {
00764 return("null");
00765 } else {
00766 return(string(s));
00767 }
00768 }
00769
00770
00771
00772
00773
00779 string cMsgMessage::getReceiverHost(void) const throw(cMsgException) {
00780
00781 const char *s;
00782
00783 int stat;
00784 if((stat=cMsgGetReceiverHost(myMsgPointer,&s))!=CMSG_OK) {
00785 throw(cMsgException(cMsgPerror(stat),stat));
00786 };
00787
00788 if(s==NULL) {
00789 return("null");
00790 } else {
00791 return(string(s));
00792 }
00793 }
00794
00795
00796
00797
00798
00804 string cMsgMessage::getSender(void) const throw(cMsgException) {
00805
00806 const char *s;
00807
00808 int stat;
00809 if((stat=cMsgGetSender(myMsgPointer,&s))!=CMSG_OK) {
00810 throw(cMsgException(cMsgPerror(stat),stat));
00811 };
00812
00813 if(s==NULL) {
00814 return("null");
00815 } else {
00816 return(string(s));
00817 }
00818 }
00819
00820
00821
00822
00823
00829 string cMsgMessage::getSenderHost(void) const throw(cMsgException) {
00830
00831 const char *s;
00832
00833 int stat;
00834 if((stat=cMsgGetSenderHost(myMsgPointer,&s))!=CMSG_OK) {
00835 throw(cMsgException(cMsgPerror(stat),stat));
00836 };
00837
00838 if(s==NULL) {
00839 return("null");
00840 } else {
00841 return(string(s));
00842 }
00843 }
00844
00845
00846
00847
00848
00854 struct timespec cMsgMessage::getReceiverTime(void) const throw(cMsgException) {
00855
00856 struct timespec t;
00857
00858 int stat;
00859 if((stat=cMsgGetReceiverTime(myMsgPointer,&t))!=CMSG_OK) {
00860 throw(cMsgException(cMsgPerror(stat),stat));
00861 }
00862 return(t);
00863 }
00864
00865
00866
00867
00868
00874 struct timespec cMsgMessage::getSenderTime(void) const throw(cMsgException) {
00875
00876 struct timespec t;
00877
00878 int stat;
00879 if((stat=cMsgGetSenderTime(myMsgPointer,&t))!=CMSG_OK) {
00880 throw(cMsgException(cMsgPerror(stat),stat));
00881 }
00882 return(t);
00883 }
00884
00885
00886
00887
00888
00894 bool cMsgMessage::isGetRequest(void) const throw(cMsgException) {
00895
00896 int b;
00897
00898 int stat;
00899 if((stat=cMsgGetGetRequest(myMsgPointer,&b))!=CMSG_OK) {
00900 throw(cMsgException(cMsgPerror(stat),stat));
00901 }
00902 return(b);
00903 }
00904
00905
00906
00907
00908
00914 bool cMsgMessage::isGetResponse(void) const throw(cMsgException) {
00915
00916 int b;
00917
00918 int stat;
00919 if((stat=cMsgGetGetResponse(myMsgPointer,&b))!=CMSG_OK) {
00920 throw(cMsgException(cMsgPerror(stat),stat));
00921 }
00922 return(b);
00923 }
00924
00925
00926
00927
00928
00934 bool cMsgMessage::isNullGetResponse(void) const throw(cMsgException) {
00935
00936 int b;
00937
00938 int stat;
00939 if((stat=cMsgGetNullGetResponse(myMsgPointer,&b))!=CMSG_OK) {
00940 throw(cMsgException(cMsgPerror(stat),stat));
00941 }
00942 return(b);
00943 }
00944
00945
00946
00947
00948
00954 int cMsgMessage::getByteArrayEndian(void) const throw(cMsgException) {
00955
00956 int stat,endian;
00957
00958 if((stat=cMsgGetByteArrayEndian(myMsgPointer,&endian))!=CMSG_OK) {
00959 throw(cMsgException(cMsgPerror(stat),stat));
00960 }
00961
00962 return(endian);
00963 }
00964
00965
00966
00967
00968
00974 void cMsgMessage::setByteArrayEndian(int endian) throw(cMsgException) {
00975 int stat;
00976
00977 if((stat=cMsgSetByteArrayEndian(myMsgPointer,endian))!=CMSG_OK) {
00978 throw(cMsgException(cMsgPerror(stat),stat));
00979 }
00980
00981 return;
00982 }
00983
00984
00985
00986
00987
00993 bool cMsgMessage::needToSwap(void) const throw(cMsgException) {
00994
00995 int flag,stat;
00996
00997 if((stat=cMsgNeedToSwap(myMsgPointer,&flag))!=CMSG_OK) {
00998 throw(cMsgException(cMsgPerror(stat),stat));
00999 }
01000
01001 return(flag==1);
01002 }
01003
01004
01005
01006
01007
01013 void cMsgMessage::makeNullResponse(cMsgMessage &msg) throw(cMsgException) {
01014
01015 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01016 cMsgMessage_t *m = (cMsgMessage_t*)msg.myMsgPointer;
01017
01018 t->sysMsgId = m->sysMsgId;
01019 t->senderToken = m->senderToken;
01020 t->info = CMSG_IS_GET_RESPONSE | CMSG_IS_NULL_GET_RESPONSE;
01021 }
01022
01023
01024
01025
01026
01032 void cMsgMessage::makeNullResponse(cMsgMessage *msg) throw(cMsgException) {
01033
01034 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01035 cMsgMessage_t *m = (cMsgMessage_t*)msg->myMsgPointer;
01036
01037 t->sysMsgId = m->sysMsgId;
01038 t->senderToken = m->senderToken;
01039 t->info = CMSG_IS_GET_RESPONSE | CMSG_IS_NULL_GET_RESPONSE;
01040 }
01041
01042
01043
01044
01045
01051 void cMsgMessage::makeResponse(cMsgMessage &msg) throw(cMsgException) {
01052
01053 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01054 cMsgMessage_t *m = (cMsgMessage_t*)msg.myMsgPointer;
01055
01056 t->sysMsgId = m->sysMsgId;
01057 t->senderToken = m->senderToken;
01058 t->info = CMSG_IS_GET_RESPONSE;
01059 }
01060
01061
01062
01063
01064
01070 void cMsgMessage::makeResponse(cMsgMessage *msg) throw(cMsgException) {
01071
01072 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01073 cMsgMessage_t *m = (cMsgMessage_t*)msg->myMsgPointer;
01074
01075 t->sysMsgId = m->sysMsgId;
01076 t->senderToken = m->senderToken;
01077 t->info = CMSG_IS_GET_RESPONSE;
01078 }
01079
01080
01081
01082
01083
01089 cMsgMessage *cMsgMessage::nullResponse(void) const throw(cMsgException) {
01090
01091 void *newMsgPointer;
01092 if((newMsgPointer=cMsgCreateNullResponseMessage(myMsgPointer))==NULL) {
01093 throw(cMsgException("?cMsgMessage::nullResponse...unable to create message",CMSG_ERROR));
01094 }
01095
01096 return(new cMsgMessage(newMsgPointer));
01097 }
01098
01099
01100
01101
01102
01108 cMsgMessage *cMsgMessage::response(void) const throw(cMsgException) {
01109
01110 void *newMsgPointer;
01111 if((newMsgPointer=cMsgCreateResponseMessage(myMsgPointer))==NULL) {
01112 throw(cMsgException("?cMsgMessage::response...unable to create message",CMSG_ERROR));
01113 }
01114
01115 return(new cMsgMessage(newMsgPointer));
01116 }
01117
01118
01119
01120
01121
01127 void cMsgMessage::setGetResponse(bool b) throw(cMsgException) {
01128
01129 int stat;
01130 if((stat=cMsgSetGetResponse(myMsgPointer,b))!=CMSG_OK) {
01131 throw(cMsgException(cMsgPerror(stat),stat));
01132 }
01133 }
01134
01135
01136
01137
01138
01144 void cMsgMessage::setNullGetResponse(bool b) throw(cMsgException) {
01145
01146 int stat;
01147 if((stat=cMsgSetNullGetResponse(myMsgPointer,b))!=CMSG_OK) {
01148 throw(cMsgException(cMsgPerror(stat),stat));
01149 }
01150 }
01151
01152
01153
01154
01155
01161 string cMsgMessage::toString(void) const throw(cMsgException) {
01162
01163 char *cs;
01164
01165 cMsgToString(myMsgPointer,&cs,1);
01166 string s(cs);
01167 free(cs);
01168 return(s);
01169
01170 }
01171
01172
01173
01174
01175
01176
01177
01183 string cMsgMessage::getSubscriptionDomain(void) const throw(cMsgException) {
01184
01185 const char *s;
01186
01187 int stat;
01188 if((stat=cMsgGetSubscriptionDomain(myMsgPointer,&s))!=CMSG_OK) {
01189 throw(cMsgException(cMsgPerror(stat),stat));
01190 };
01191
01192 if(s==NULL) {
01193 return("null");
01194 } else {
01195 return(string(s));
01196 }
01197 }
01198
01199
01200
01201
01202
01208 string cMsgMessage::getSubscriptionSubject(void) const throw(cMsgException) {
01209
01210 const char *s;
01211
01212 int stat;
01213 if((stat=cMsgGetSubscriptionSubject(myMsgPointer,&s))!=CMSG_OK) {
01214 throw(cMsgException(cMsgPerror(stat),stat));
01215 };
01216
01217 if(s==NULL) {
01218 return("null");
01219 } else {
01220 return(string(s));
01221 }
01222 }
01223
01224
01225
01226
01227
01233 string cMsgMessage::getSubscriptionType(void) const throw(cMsgException) {
01234
01235 const char *s;
01236
01237 int stat;
01238 if((stat=cMsgGetSubscriptionType(myMsgPointer,&s))!=CMSG_OK) {
01239 throw(cMsgException(cMsgPerror(stat),stat));
01240 };
01241
01242 if(s==NULL) {
01243 return("null");
01244 } else {
01245 return(string(s));
01246 }
01247 }
01248
01249
01250
01251
01252
01258 string cMsgMessage::getSubscriptionUDL(void) const throw(cMsgException) {
01259
01260 const char *s;
01261
01262 int stat;
01263 if((stat=cMsgGetSubscriptionUDL(myMsgPointer,&s))!=CMSG_OK) {
01264 throw(cMsgException(cMsgPerror(stat),stat));
01265 };
01266
01267 if(s==NULL) {
01268 return("null");
01269 } else {
01270 return(string(s));
01271 }
01272 }
01273
01274
01275
01276
01277
01283 int cMsgMessage::getSubscriptionCueSize(void) const throw(cMsgException) {
01284
01285 int i;
01286
01287 int stat;
01288 if((stat=cMsgGetSubscriptionCueSize(myMsgPointer,&i))!=CMSG_OK) {
01289 throw(cMsgException(cMsgPerror(stat),stat));
01290 }
01291 return(i);
01292 }
01293
01294
01295
01296
01297
01303 bool cMsgMessage::getReliableSend(void) const throw(cMsgException) {
01304
01305 int i;
01306
01307 int stat;
01308 if((stat=cMsgGetReliableSend(myMsgPointer,&i))!=CMSG_OK) {
01309 throw(cMsgException(cMsgPerror(stat),stat));
01310 }
01311 return(i == 0 ? false : true);
01312 }
01313
01314
01315
01316
01317
01323 void cMsgMessage::setReliableSend(bool b) throw(cMsgException) {
01324
01325 int i = b ? 1 : 0;
01326
01327 int stat;
01328 if((stat=cMsgSetReliableSend(myMsgPointer,i))!=CMSG_OK) {
01329 throw(cMsgException(cMsgPerror(stat),stat));
01330 }
01331 }
01332
01333
01334
01335
01336
01337
01338
01342 cMsgSubscriptionConfig::cMsgSubscriptionConfig(void) {
01343 config = cMsgSubscribeConfigCreate();
01344 }
01345
01346
01347
01348
01349
01353 cMsgSubscriptionConfig::~cMsgSubscriptionConfig(void) {
01354 cMsgSubscribeConfigDestroy(config);
01355 }
01356
01357
01358
01359
01360
01366 int cMsgSubscriptionConfig::getMaxCueSize(void) {
01367 int size;
01368 cMsgSubscribeGetMaxCueSize(config,&size);
01369 return(size);
01370 }
01371
01372
01373
01374
01375
01381 void cMsgSubscriptionConfig::setMaxCueSize(int size) {
01382 cMsgSubscribeSetMaxCueSize(config,size);
01383 }
01384
01385
01386
01387
01388
01394 int cMsgSubscriptionConfig::getSkipSize(void) {
01395 int size;
01396 cMsgSubscribeGetSkipSize(config,&size);
01397 return(size);
01398 }
01399
01400
01401
01402
01403
01409 void cMsgSubscriptionConfig::setSkipSize(int size) {
01410 cMsgSubscribeSetSkipSize(config,size);
01411 }
01412
01413
01414
01415
01416
01422 bool cMsgSubscriptionConfig::getMaySkip(void) {
01423 int maySkip;
01424 cMsgSubscribeGetMaySkip(config,&maySkip);
01425 return((maySkip==0)?false:true);
01426 }
01427
01428
01429
01430
01431
01437 void cMsgSubscriptionConfig::setMaySkip(bool maySkip) {
01438 cMsgSubscribeSetMaySkip(config,(maySkip)?1:0);
01439 }
01440
01441
01442
01443
01444
01450 bool cMsgSubscriptionConfig::getMustSerialize(void) {
01451 int maySerialize;
01452 cMsgSubscribeGetMustSerialize(config,&maySerialize);
01453 return((maySerialize==0)?false:true);
01454 }
01455
01456
01457
01458
01459
01465 void cMsgSubscriptionConfig::setMustSerialize(bool mustSerialize) {
01466 cMsgSubscribeSetMustSerialize(config,(mustSerialize)?1:0);
01467 }
01468
01469
01470
01471
01472
01478 int cMsgSubscriptionConfig::getMaxThreads(void) {
01479 int max;
01480 cMsgSubscribeGetMaxThreads(config,&max);
01481 return(max);
01482 }
01483
01484
01485
01486
01487
01493 void cMsgSubscriptionConfig::setMaxThreads(int max) {
01494 cMsgSubscribeSetMaxThreads(config,max);
01495 }
01496
01497
01498
01499
01500
01506 int cMsgSubscriptionConfig::getMessagesPerThread(void) {
01507 int mpt;
01508 cMsgSubscribeGetMessagesPerThread(config,&mpt);
01509 return(mpt);
01510 }
01511
01512
01513
01514
01515
01521 void cMsgSubscriptionConfig::setMessagesPerThread(int mpt) {
01522 cMsgSubscribeSetMessagesPerThread(config,mpt);
01523 }
01524
01525
01526
01527
01528
01534 size_t cMsgSubscriptionConfig::getStackSize(void) {
01535 size_t size;
01536 cMsgSubscribeGetStackSize(config,&size);
01537 return(size);
01538 }
01539
01540
01541
01542
01543
01549 void cMsgSubscriptionConfig::setStackSize(size_t size) {
01550 cMsgSubscribeSetStackSize(config,size);
01551 }
01552
01553
01554
01555
01556
01557
01558
01566 cMsg::cMsg(const string &UDL, const string &name, const string &descr)
01567 : myUDL(UDL), myName(name), myDescr(descr), initialized(false) {
01568 }
01569
01570
01571
01572
01573
01577 cMsg::~cMsg(void) {
01578 cMsg::disconnect();
01579 }
01580
01581
01582
01583
01584
01588 void cMsg::connect(void) throw(cMsgException) {
01589
01590 if(initialized)throw(cMsgException(cMsgPerror(CMSG_ALREADY_INIT),CMSG_ALREADY_INIT));
01591
01592
01593 int stat;
01594 if((stat=cMsgConnect(myUDL.c_str(),myName.c_str(),myDescr.c_str(),&myDomainId))!=CMSG_OK) {
01595 throw(cMsgException(cMsgPerror(stat),stat));
01596 }
01597 initialized=true;
01598 }
01599
01600
01601
01602
01603
01607 void cMsg::disconnect(void) throw(cMsgException) {
01608
01609 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01610
01611
01612 cMsgDisconnect(&myDomainId);
01613 }
01614
01615
01616
01617
01618
01624 void cMsg::send(cMsgMessage &msg) throw(cMsgException) {
01625
01626 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01627
01628
01629 int stat;
01630 if((stat=cMsgSend(myDomainId,msg.myMsgPointer))!=CMSG_OK) {
01631 throw(cMsgException(cMsgPerror(stat),stat));
01632 }
01633 }
01634
01635
01636
01637
01638
01644 void cMsg::send(cMsgMessage *msg) throw(cMsgException) {
01645 cMsg::send(*msg);
01646 }
01647
01648
01649
01650
01651
01658 int cMsg::syncSend(cMsgMessage &msg, const struct timespec *timeout) throw(cMsgException) {
01659
01660 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01661
01662
01663 int response;
01664
01665 int stat;
01666 if((stat=cMsgSyncSend(myDomainId,msg.myMsgPointer,timeout,&response))!=CMSG_OK) {
01667 throw(cMsgException(cMsgPerror(stat),stat));
01668 }
01669 return(response);
01670 }
01671
01672
01673
01674
01675
01682 int cMsg::syncSend(cMsgMessage *msg, const struct timespec *timeout) throw(cMsgException) {
01683 return(cMsg::syncSend(*msg, timeout));
01684 }
01685
01686
01687
01688
01689
01701 void *cMsg::subscribe(const string &subject, const string &type, cMsgCallback *cb, void *userArg,
01702 const cMsgSubscriptionConfig *cfg) throw(cMsgException) {
01703
01704 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01705
01706
01707 int stat;
01708 void *handle;
01709
01710
01711
01712 if(subscriptionExists(myDomainId,subject,type,cb,userArg))
01713 throw(cMsgException(cMsgPerror(CMSG_ALREADY_EXISTS),CMSG_ALREADY_EXISTS));
01714
01715
01716
01717 dispatcherStruct *d = new dispatcherStruct();
01718 d->cb=cb;
01719 d->userArg=userArg;
01720
01721
01722
01723 stat=cMsgSubscribe(myDomainId,
01724 (subject.size()<=0)?NULL:subject.c_str(),
01725 (type.size()<=0)?NULL:type.c_str(),
01726 callbackDispatcher,
01727 (void*)d,
01728 (cfg==NULL)?NULL:(cfg->config),
01729 &handle);
01730
01731
01732
01733 if(stat!=CMSG_OK) throw(cMsgException(cMsgPerror(stat),stat));
01734
01735
01736
01737 addSubscription(myDomainId,subject,type,d,handle);
01738
01739
01740 return(handle);
01741 }
01742
01743
01744
01745
01746
01758 void *cMsg::subscribe(const string &subject, const string &type, cMsgCallback &cb, void *userArg,
01759 const cMsgSubscriptionConfig *cfg) throw(cMsgException) {
01760 return(cMsg::subscribe(subject, type, &cb, userArg, cfg));
01761 }
01762
01763
01764
01765
01766
01772 void cMsg::unsubscribe(void *handle) throw(cMsgException) {
01773
01774 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01775
01776
01777 int stat;
01778
01779
01780
01781 if(!deleteSubscription(myDomainId,handle)) {
01782 throw(cMsgException(cMsgPerror(CMSG_BAD_ARGUMENT),CMSG_BAD_ARGUMENT));
01783 }
01784
01785
01786
01787 if((stat=cMsgUnSubscribe(myDomainId,handle))!=CMSG_OK) {
01788 throw(cMsgException(cMsgPerror(stat),stat));
01789 }
01790
01791 }
01792
01793
01794
01795
01796
01805 cMsgMessage *cMsg::sendAndGet(cMsgMessage &sendMsg, const struct timespec *timeout) throw(cMsgException) {
01806
01807 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01808
01809
01810 void *replyPtr;
01811
01812 int stat;
01813 if((stat=cMsgSendAndGet(myDomainId,sendMsg.myMsgPointer,timeout,&replyPtr))!=CMSG_OK) {
01814 throw(cMsgException(cMsgPerror(stat),stat));
01815 }
01816
01817 return(new cMsgMessage(replyPtr));
01818 }
01819
01820
01821
01822
01823
01832 cMsgMessage *cMsg::sendAndGet(cMsgMessage *sendMsg, const struct timespec *timeout) throw(cMsgException) {
01833
01834 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01835
01836
01837 void *replyPtr;
01838
01839 int stat;
01840 if((stat=cMsgSendAndGet(myDomainId,sendMsg->myMsgPointer,timeout,&replyPtr))!=CMSG_OK) {
01841 throw(cMsgException(cMsgPerror(stat),stat));
01842 }
01843
01844 return(new cMsgMessage(replyPtr));
01845 }
01846
01847
01848
01849
01859 cMsgMessage *cMsg::subscribeAndGet(const string &subject, const string &type, const struct timespec *timeout) throw(cMsgException) {
01860
01861 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01862
01863
01864 void *replyPtr;
01865
01866 int stat;
01867 if((stat=cMsgSubscribeAndGet(myDomainId,subject.c_str(),type.c_str(),timeout,&replyPtr))!=CMSG_OK) {
01868 throw(cMsgException(cMsgPerror(stat),stat));
01869 }
01870
01871 return(new cMsgMessage(replyPtr));
01872 }
01873
01874
01875
01876
01877
01883 void cMsg::flush(const struct timespec *timeout) throw(cMsgException) {
01884
01885 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01886
01887
01888 int stat;
01889 if((stat=cMsgFlush(myDomainId, timeout))!=CMSG_OK) {
01890 throw(cMsgException(cMsgPerror(stat),stat));
01891 }
01892 }
01893
01894
01895
01896
01897
01901 void cMsg::start(void) throw(cMsgException) {
01902
01903 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01904
01905
01906 int stat;
01907 if((stat=cMsgReceiveStart(myDomainId))!=CMSG_OK) {
01908 throw(cMsgException(cMsgPerror(stat),stat));
01909 }
01910 }
01911
01912
01913
01914
01915
01919 void cMsg::stop(void) throw(cMsgException) {
01920
01921 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01922
01923
01924 int stat;
01925 if((stat=cMsgReceiveStop(myDomainId))!=CMSG_OK) {
01926 throw(cMsgException(cMsgPerror(stat),stat));
01927 }
01928 }
01929
01930
01931
01932
01933
01939 string cMsg::getDescription(void) const {
01940 return(myDescr);
01941 }
01942
01943
01944
01945
01946
01952 string cMsg::getName(void) const {
01953 return(myName);
01954 }
01955
01956
01957
01958
01959
01965 string cMsg::getUDL(void) const{
01966 return(myUDL);
01967 }
01968
01969
01970
01971
01972
01978 bool cMsg::isConnected(void) const {
01979
01980 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01981
01982
01983 int stat,connected;
01984 if((stat=cMsgGetConnectState(myDomainId,&connected))!=CMSG_OK) {
01985 throw(cMsgException(cMsgPerror(stat),stat));
01986 }
01987 return(connected==1);
01988 }
01989
01990
01991
01992
01993
01999 bool cMsg::isReceiving(void) const {
02000
02001 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02002
02003
02004 int stat,receiving;
02005 if((stat=cMsgGetReceiveState(myDomainId,&receiving))!=CMSG_OK) {
02006 throw(cMsgException(cMsgPerror(stat),stat));
02007 }
02008 return(receiving==1);
02009 }
02010
02011
02012
02013
02014
02021 void cMsg::setShutdownHandler(cMsgShutdownHandler *handler, void* userArg) throw(cMsgException) {
02022
02023 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02024
02025
02026 int stat;
02027 if((stat=cMsgSetShutdownHandler(myDomainId,handler,userArg))!=CMSG_OK) {
02028 throw(cMsgException(cMsgPerror(stat),stat));
02029 }
02030 }
02031
02032
02033
02034
02035
02042 void cMsg::shutdownClients(const string &client, int flag) throw(cMsgException) {
02043
02044 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02045
02046
02047 int stat;
02048 if((stat=cMsgShutdownClients(myDomainId,client.c_str(),flag))!=CMSG_OK) {
02049 throw(cMsgException(cMsgPerror(stat),stat));
02050 }
02051 }
02052
02053
02054
02055
02056
02063 void cMsg::shutdownServers(const string &server, int flag) throw(cMsgException) {
02064
02065 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02066
02067
02068 int stat;
02069 if((stat=cMsgShutdownServers(myDomainId,server.c_str(),flag))!=CMSG_OK) {
02070 throw(cMsgException(cMsgPerror(stat),stat));
02071 }
02072 }
02073
02074
02075
02076
02077
02085 cMsgMessage *cMsg::monitor(const string &monString) throw(cMsgException) {
02086
02087 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02088
02089 void *m = cMsgCreateMessage();
02090 if(m==NULL)throw(cMsgException("?cMsgMessage constructor...unable to create message",CMSG_ERROR));
02091
02092 int stat;
02093 if((stat=cMsgMonitor(myDomainId,monString.c_str(),&m))!=CMSG_OK) {
02094 cMsgFreeMessage(&m);
02095 throw(cMsgException(cMsgPerror(stat),stat));
02096 }
02097
02098 return(new cMsgMessage(m));
02099 }
02100
02101
02102
02103