00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <cMsg.hxx>
00025 #include <cMsgPrivate.h>
00026 #include <stdio.h>
00027 #include <stdlib.h>
00028 #include <unistd.h>
00029 #include <pthread.h>
00030 #include <vector>
00031 #include <sstream>
00032
00033 using namespace std;
00034 using namespace cmsg;
00035
00036
00037
00038
00039
00040
00041
00042
00043 namespace cmsg {
00044
00045
00047 typedef struct {
00048 cMsgCallback *cb;
00049 void *userArg;
00050 } dispatcherStruct;
00051
00052
00054 typedef struct {
00055 void *domainId;
00056 void *handle;
00057 string subject;
00058 string type;
00059 dispatcherStruct *d;
00060 } subscrStruct;
00061
00062
00064 static vector<subscrStruct*> subscrVec;
00065
00066
00068 static pthread_mutex_t subscrMutex = PTHREAD_MUTEX_INITIALIZER;
00069
00070
00071
00072
00073
00075 static void callbackDispatcher(void *msg, void *userArg) {
00076 dispatcherStruct *ds = (dispatcherStruct*)userArg;
00077 ds->cb->callback(new cMsgMessage(msg),ds->userArg);
00078 }
00079
00080
00081
00082
00083
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00135 static void addSubscription(void *domainId, const string &subject, const string &type,
00136 dispatcherStruct *d, void *handle) {
00137
00138 subscrStruct *s = new subscrStruct();
00139
00140 s->domainId=domainId;
00141 s->subject=subject;
00142 s->type=type;
00143 s->d=d;
00144 s->handle=handle;
00145
00146 pthread_mutex_lock(&subscrMutex);
00147 subscrVec.push_back(s);
00148 pthread_mutex_unlock(&subscrMutex);
00149
00150 return;
00151 }
00152
00153
00154
00155
00156
00165 static bool deleteSubscription(void *domainId, void *handle) {
00166
00167 bool deleted = false;
00168
00169 pthread_mutex_lock(&subscrMutex);
00170 vector<subscrStruct*>::iterator iter;
00171 for(iter=subscrVec.begin(); iter!=subscrVec.end(); iter++) {
00172 if(((*iter)->domainId==domainId)&&((*iter)->handle==handle)) {
00173 delete((*iter)->d);
00174 delete(*iter);
00175 subscrVec.erase(iter);
00176 deleted=true;
00177 break;
00178 }
00179 }
00180 pthread_mutex_unlock(&subscrMutex);
00181
00182 return(deleted);
00183 }
00184
00185
00186
00187
00188
00194 static void deleteSubscriptions(void *domainId) {
00195
00196 pthread_mutex_lock(&subscrMutex);
00197 vector<subscrStruct*>::iterator iter;
00198 for(iter=subscrVec.begin(); iter!=subscrVec.end(); iter++) {
00199 if((*iter)->domainId==domainId) {
00200 delete((*iter)->d);
00201 delete(*iter);
00202 subscrVec.erase(iter);
00203 }
00204 }
00205 pthread_mutex_unlock(&subscrMutex);
00206
00207 return;
00208 }
00209
00210
00211
00212
00213 }
00214
00215
00216
00217
00218
00219
00220
00221
00225 cMsgException::cMsgException(void) : descr(""), returnCode(0) {}
00226
00227
00228
00229
00230
00236 cMsgException::cMsgException(const string &c) : descr(c), returnCode(0) {}
00237
00238
00239
00240
00241
00248 cMsgException::cMsgException(const string &c, int code) : descr(c), returnCode(code) {}
00249
00250
00251
00252
00253
00259 cMsgException::cMsgException(const cMsgException &e) : descr(e.descr), returnCode(e.returnCode) {}
00260
00261
00262
00263
00264
00268 cMsgException::~cMsgException(void) throw() {
00269 }
00270
00271
00272
00273
00274
00280 string cMsgException::toString(void) const throw() {
00281 stringstream ss;
00282 ss << "?cMsgException returnCode = " << returnCode << " descr = " << descr << ends;
00283 return(ss.str());
00284 }
00285
00286
00287
00288
00289
00295 const char *cMsgException::what(void) const throw() {
00296 return(toString().c_str());
00297 }
00298
00299
00300
00301
00302
00303
00304
00309 cMsgMessage::cMsgMessage(void) throw(cMsgException) {
00310
00311 myMsgPointer=cMsgCreateMessage();
00312 if(myMsgPointer==NULL) {
00313 throw(cMsgException("?cMsgMessage constructor...unable to create message",CMSG_ERROR));
00314 }
00315 }
00316
00317
00318
00319
00320
00327 cMsgMessage::cMsgMessage(const cMsgMessage &msg) throw(cMsgException) {
00328
00329 myMsgPointer=cMsgCopyMessage(msg.myMsgPointer);
00330 if(myMsgPointer==NULL) {
00331 throw(cMsgException("?cMsgMessage copy constructor...unable to create message",CMSG_ERROR));
00332 }
00333 }
00334
00335
00336
00337
00338
00345 cMsgMessage::cMsgMessage(void *msgPointer) throw(cMsgException) {
00346
00347 myMsgPointer=msgPointer;
00348 if(myMsgPointer==NULL) {
00349 throw(cMsgException("?cMsgMessage pointer constructor...unable to create message",CMSG_ERROR));
00350 }
00351 }
00352
00353
00354
00355
00356
00360 cMsgMessage::~cMsgMessage(void) {
00361 if(myMsgPointer!=NULL)cMsgFreeMessage(&myMsgPointer);
00362 }
00363
00364
00365
00366
00367
00374 string cMsgMessage::getSubject(void) const throw(cMsgException) {
00375
00376 const char *s;
00377
00378 int stat;
00379 if((stat=cMsgGetSubject(myMsgPointer,&s))!=CMSG_OK) {
00380 throw(cMsgException(cMsgPerror(stat),stat));
00381 }
00382
00383 if(s==NULL) {
00384 return("null");
00385 } else {
00386 return(string(s));
00387 }
00388 }
00389
00390
00391
00392
00393
00400 void cMsgMessage::setSubject(const string &subject) throw(cMsgException) {
00401
00402 int stat;
00403 if((stat=cMsgSetSubject(myMsgPointer,subject.c_str()))!=CMSG_OK) {
00404 throw(cMsgException(cMsgPerror(stat),stat));
00405 }
00406 }
00407
00408
00409
00410
00411
00418 string cMsgMessage::getType(void) const throw(cMsgException) {
00419
00420 const char *s;
00421
00422 int stat;
00423 if((stat=cMsgGetType(myMsgPointer,&s))!=CMSG_OK) {
00424 throw(cMsgException(cMsgPerror(stat),stat));
00425 }
00426
00427 if(s==NULL) {
00428 return("null");
00429 } else {
00430 return(string(s));
00431 }
00432 }
00433
00434
00435
00436
00437
00444 void cMsgMessage::setType(const string &type) throw(cMsgException) {
00445
00446 int stat;
00447 if((stat=cMsgSetType(myMsgPointer,type.c_str()))!=CMSG_OK) {
00448 throw(cMsgException(cMsgPerror(stat),stat));
00449 }
00450 }
00451
00452
00453
00454
00455
00462 string cMsgMessage::getText(void) const throw(cMsgException) {
00463
00464 const char *s;
00465
00466 int stat;
00467 if((stat=cMsgGetText(myMsgPointer,&s))!=CMSG_OK) {
00468 throw(cMsgException(cMsgPerror(stat),stat));
00469 }
00470
00471 if(s==NULL) {
00472 return("null");
00473 } else {
00474 return(string(s));
00475 }
00476 }
00477
00478
00479
00480
00481
00488 void cMsgMessage::setText(const string &text) throw(cMsgException) {
00489
00490 int stat;
00491 if((stat=cMsgSetText(myMsgPointer,text.c_str()))!=CMSG_OK) {
00492 throw(cMsgException(cMsgPerror(stat),stat));
00493 }
00494 }
00495
00496
00497
00498
00499
00506 void cMsgMessage::setByteArrayLength(int length) throw(cMsgException) {
00507
00508 int stat;
00509 if((stat=cMsgSetByteArrayLength(myMsgPointer,length)!=CMSG_OK)) {
00510 throw(cMsgException(cMsgPerror(stat),stat));
00511 }
00512 }
00513
00514
00515
00516
00517
00522 void cMsgMessage::resetByteArrayLength() {
00523 cMsgResetByteArrayLength(myMsgPointer);
00524 }
00525
00526
00527
00528
00529
00535 int cMsgMessage::getByteArrayLength(void) {
00536
00537 int i;
00538 cMsgGetByteArrayLength(myMsgPointer,&i);
00539 return(i);
00540 }
00541
00542
00543
00544
00545
00551 int cMsgMessage::getByteArrayLengthFull(void) {
00552 int i;
00553 cMsgGetByteArrayLengthFull(myMsgPointer,&i);
00554 return(i);
00555 }
00556
00557
00558
00559
00560
00567 void cMsgMessage::setByteArrayOffset(int offset) throw(cMsgException) {
00568
00569 int stat;
00570 if((stat=cMsgSetByteArrayOffset(myMsgPointer,offset)!=CMSG_OK)) {
00571 throw(cMsgException(cMsgPerror(stat),stat));
00572 }
00573 }
00574
00575
00576
00577
00578
00584 int cMsgMessage::getByteArrayOffset(void) {
00585
00586 int i;
00587 cMsgGetByteArrayOffset(myMsgPointer,&i);
00588 return(i);
00589 }
00590
00591
00592
00593
00594
00602 void cMsgMessage::setByteArray(char *array, int length) throw(cMsgException) {
00603
00604 int stat;
00605 if((stat=cMsgSetByteArray(myMsgPointer,array, length)!=CMSG_OK)) {
00606 throw(cMsgException(cMsgPerror(stat),stat));
00607 }
00608 }
00609
00610
00611
00612
00613
00621 void cMsgMessage::setByteArrayNoCopy(char* array, int length) throw(cMsgException) {
00622
00623 int stat;
00624 if((stat=cMsgSetByteArrayNoCopy(myMsgPointer,array,length)!=CMSG_OK)) {
00625 throw(cMsgException(cMsgPerror(stat),stat));
00626 }
00627 }
00628
00629
00630
00631
00632
00638 char* cMsgMessage::getByteArray(void) {
00639
00640 char *p;
00641 cMsgGetByteArray(myMsgPointer,&p);
00642 return(p);
00643 }
00644
00645
00646
00647
00648
00659 int cMsgMessage::getByteArrayEndian(void) {
00660
00661 int endian;
00662 cMsgGetByteArrayEndian(myMsgPointer,&endian);
00663 return(endian);
00664 }
00665
00666
00667
00668
00669
00684 void cMsgMessage::setByteArrayEndian(int endian) throw(cMsgException) {
00685 int stat;
00686
00687 if((stat=cMsgSetByteArrayEndian(myMsgPointer,endian))!=CMSG_OK) {
00688 throw(cMsgException(cMsgPerror(stat),stat));
00689 }
00690
00691 return;
00692 }
00693
00694
00695
00696
00697
00704 bool cMsgMessage::needToSwap(void) const throw(cMsgException) {
00705
00706 int flag,stat;
00707
00708 if((stat=cMsgNeedToSwap(myMsgPointer,&flag))!=CMSG_OK) {
00709 throw(cMsgException(cMsgPerror(stat),stat));
00710 }
00711
00712 return(flag==1);
00713 }
00714
00715
00716
00717
00718
00725 int cMsgMessage::getUserInt(void) const throw(cMsgException) {
00726
00727 int i;
00728
00729 int stat;
00730 if((stat=cMsgGetUserInt(myMsgPointer,&i))!=CMSG_OK) {
00731 throw(cMsgException(cMsgPerror(stat),stat));
00732 }
00733 return(i);
00734 }
00735
00736
00737
00738
00739
00746 void cMsgMessage::setUserInt(int i) throw(cMsgException) {
00747
00748 int stat;
00749 if((stat=cMsgSetUserInt(myMsgPointer,i))!=CMSG_OK) {
00750 throw(cMsgException(cMsgPerror(stat),stat));
00751 }
00752 }
00753
00754
00755
00756
00757
00764 struct timespec cMsgMessage::getUserTime(void) const throw(cMsgException) {
00765
00766 struct timespec t;
00767
00768 int stat;
00769 if((stat=cMsgGetUserTime(myMsgPointer,&t))!=CMSG_OK) {
00770 throw(cMsgException(cMsgPerror(stat),stat));
00771 }
00772 return(t);
00773 }
00774
00775
00776
00777
00778
00784 void cMsgMessage::setUserTime(const struct timespec &userTime) throw(cMsgException) {
00785
00786 int stat;
00787 if((stat=cMsgSetUserTime(myMsgPointer, &userTime))!=CMSG_OK) {
00788 throw(cMsgException(cMsgPerror(stat),stat));
00789 }
00790 }
00791
00792
00793
00794
00795
00801 int cMsgMessage::getVersion(void) const throw(cMsgException) {
00802
00803 int version;
00804
00805 int stat;
00806 if((stat=cMsgGetVersion(myMsgPointer, &version))!=CMSG_OK) {
00807 throw(cMsgException(cMsgPerror(stat),stat));
00808 }
00809 return(version);
00810 }
00811
00812
00813
00814
00815
00821 cMsgMessage *cMsgMessage::copy(void) const throw(cMsgException) {
00822
00823 void *newPointer = cMsgCopyMessage(myMsgPointer);
00824 return(new cMsgMessage(newPointer));
00825 }
00826
00827
00828
00829
00830
00837 string cMsgMessage::getDomain(void) const throw(cMsgException) {
00838
00839 const char *s;
00840
00841 int stat;
00842 if((stat=cMsgGetDomain(myMsgPointer,&s))!=CMSG_OK) {
00843 throw(cMsgException(cMsgPerror(stat),stat));
00844 };
00845
00846 if(s==NULL) {
00847 return("null");
00848 } else {
00849 return(string(s));
00850 }
00851 }
00852
00853
00854
00855
00856
00863 string cMsgMessage::getReceiver(void) const throw(cMsgException) {
00864
00865 const char *s;
00866
00867 int stat;
00868 if((stat=cMsgGetReceiver(myMsgPointer,&s))!=CMSG_OK) {
00869 throw(cMsgException(cMsgPerror(stat),stat));
00870 };
00871
00872 if(s==NULL) {
00873 return("null");
00874 } else {
00875 return(string(s));
00876 }
00877 }
00878
00879
00880
00881
00882
00889 string cMsgMessage::getReceiverHost(void) const throw(cMsgException) {
00890
00891 const char *s;
00892
00893 int stat;
00894 if((stat=cMsgGetReceiverHost(myMsgPointer,&s))!=CMSG_OK) {
00895 throw(cMsgException(cMsgPerror(stat),stat));
00896 };
00897
00898 if(s==NULL) {
00899 return("null");
00900 } else {
00901 return(string(s));
00902 }
00903 }
00904
00905
00906
00907
00908
00915 string cMsgMessage::getSender(void) const throw(cMsgException) {
00916
00917 const char *s;
00918
00919 int stat;
00920 if((stat=cMsgGetSender(myMsgPointer,&s))!=CMSG_OK) {
00921 throw(cMsgException(cMsgPerror(stat),stat));
00922 };
00923
00924 if(s==NULL) {
00925 return("null");
00926 } else {
00927 return(string(s));
00928 }
00929 }
00930
00931
00932
00933
00934
00941 string cMsgMessage::getSenderHost(void) const throw(cMsgException) {
00942
00943 const char *s;
00944
00945 int stat;
00946 if((stat=cMsgGetSenderHost(myMsgPointer,&s))!=CMSG_OK) {
00947 throw(cMsgException(cMsgPerror(stat),stat));
00948 };
00949
00950 if(s==NULL) {
00951 return("null");
00952 } else {
00953 return(string(s));
00954 }
00955 }
00956
00957
00958
00959
00960
00967 struct timespec cMsgMessage::getReceiverTime(void) const throw(cMsgException) {
00968
00969 struct timespec t;
00970
00971 int stat;
00972 if((stat=cMsgGetReceiverTime(myMsgPointer,&t))!=CMSG_OK) {
00973 throw(cMsgException(cMsgPerror(stat),stat));
00974 }
00975 return(t);
00976 }
00977
00978
00979
00980
00981
00988 struct timespec cMsgMessage::getSenderTime(void) const throw(cMsgException) {
00989
00990 struct timespec t;
00991
00992 int stat;
00993 if((stat=cMsgGetSenderTime(myMsgPointer,&t))!=CMSG_OK) {
00994 throw(cMsgException(cMsgPerror(stat),stat));
00995 }
00996 return(t);
00997 }
00998
00999
01000
01001
01002
01008 bool cMsgMessage::isGetRequest(void) const throw(cMsgException) {
01009
01010 int b;
01011
01012 int stat;
01013 if((stat=cMsgGetGetRequest(myMsgPointer,&b))!=CMSG_OK) {
01014 throw(cMsgException(cMsgPerror(stat),stat));
01015 }
01016 return(b);
01017 }
01018
01019
01020
01021
01022
01028 bool cMsgMessage::isGetResponse(void) const throw(cMsgException) {
01029
01030 int b;
01031
01032 int stat;
01033 if((stat=cMsgGetGetResponse(myMsgPointer,&b))!=CMSG_OK) {
01034 throw(cMsgException(cMsgPerror(stat),stat));
01035 }
01036 return(b);
01037 }
01038
01039
01040
01041
01042
01049 bool cMsgMessage::isNullGetResponse(void) const throw(cMsgException) {
01050
01051 int b;
01052
01053 int stat;
01054 if((stat=cMsgGetNullGetResponse(myMsgPointer,&b))!=CMSG_OK) {
01055 throw(cMsgException(cMsgPerror(stat),stat));
01056 }
01057 return(b);
01058 }
01059
01060
01061
01062
01063
01070 void cMsgMessage::makeNullResponse(const cMsgMessage &msg) throw(cMsgException) {
01071
01072 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01073 cMsgMessage_t *m = (cMsgMessage_t*)msg.myMsgPointer;
01074
01075 t->sysMsgId = m->sysMsgId;
01076 t->senderToken = m->senderToken;
01077 t->info = CMSG_IS_GET_RESPONSE | CMSG_IS_NULL_GET_RESPONSE;
01078 }
01079
01080
01081
01082
01083
01090 void cMsgMessage::makeNullResponse(const cMsgMessage *msg) throw(cMsgException) {
01091
01092 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01093 cMsgMessage_t *m = (cMsgMessage_t*)msg->myMsgPointer;
01094
01095 t->sysMsgId = m->sysMsgId;
01096 t->senderToken = m->senderToken;
01097 t->info = CMSG_IS_GET_RESPONSE | CMSG_IS_NULL_GET_RESPONSE;
01098 }
01099
01100
01101
01102
01103
01110 void cMsgMessage::makeResponse(const cMsgMessage &msg) throw(cMsgException) {
01111
01112 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01113 cMsgMessage_t *m = (cMsgMessage_t*)msg.myMsgPointer;
01114
01115 t->sysMsgId = m->sysMsgId;
01116 t->senderToken = m->senderToken;
01117 t->info = CMSG_IS_GET_RESPONSE;
01118 }
01119
01120
01121
01122
01123
01130 void cMsgMessage::makeResponse(const cMsgMessage *msg) throw(cMsgException) {
01131
01132 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
01133 cMsgMessage_t *m = (cMsgMessage_t*)msg->myMsgPointer;
01134
01135 t->sysMsgId = m->sysMsgId;
01136 t->senderToken = m->senderToken;
01137 t->info = CMSG_IS_GET_RESPONSE;
01138 }
01139
01140
01141
01142
01143
01150 cMsgMessage *cMsgMessage::nullResponse(void) const throw(cMsgException) {
01151
01152 void *newMsgPointer;
01153 if((newMsgPointer=cMsgCreateNullResponseMessage(myMsgPointer))==NULL) {
01154 throw(cMsgException("?cMsgMessage::nullResponse...unable to create message",CMSG_ERROR));
01155 }
01156
01157 return(new cMsgMessage(newMsgPointer));
01158 }
01159
01160
01161
01162
01163
01170 cMsgMessage *cMsgMessage::response(void) const throw(cMsgException) {
01171
01172 void *newMsgPointer;
01173 if((newMsgPointer=cMsgCreateResponseMessage(myMsgPointer))==NULL) {
01174 throw(cMsgException("?cMsgMessage::response...unable to create message",CMSG_ERROR));
01175 }
01176
01177 return(new cMsgMessage(newMsgPointer));
01178 }
01179
01180
01181
01182
01183
01190 void cMsgMessage::setGetResponse(bool b) throw(cMsgException) {
01191
01192 int stat;
01193 if((stat=cMsgSetGetResponse(myMsgPointer,b))!=CMSG_OK) {
01194 throw(cMsgException(cMsgPerror(stat),stat));
01195 }
01196 }
01197
01198
01199
01200
01201
01208 void cMsgMessage::setNullGetResponse(bool b) throw(cMsgException) {
01209
01210 int stat;
01211 if((stat=cMsgSetNullGetResponse(myMsgPointer,b))!=CMSG_OK) {
01212 throw(cMsgException(cMsgPerror(stat),stat));
01213 }
01214 }
01215
01216
01217
01218
01219
01226 string cMsgMessage::toString(void) const throw(cMsgException) {
01227
01228 char *cs;
01229
01230 int stat;
01231 if((stat=cMsgToString(myMsgPointer,&cs))!=CMSG_OK) {
01232 throw(cMsgException(cMsgPerror(stat),stat));
01233 }
01234
01235 string s(cs);
01236 free(cs);
01237 return(s);
01238
01239 }
01240
01241
01242
01243
01244
01245
01246
01253 string cMsgMessage::getSubscriptionDomain(void) const throw(cMsgException) {
01254
01255 const char *s;
01256
01257 int stat;
01258 if((stat=cMsgGetSubscriptionDomain(myMsgPointer,&s))!=CMSG_OK) {
01259 throw(cMsgException(cMsgPerror(stat),stat));
01260 };
01261
01262 if(s==NULL) {
01263 return("null");
01264 } else {
01265 return(string(s));
01266 }
01267 }
01268
01269
01270
01271
01272
01279 string cMsgMessage::getSubscriptionSubject(void) const throw(cMsgException) {
01280
01281 const char *s;
01282
01283 int stat;
01284 if((stat=cMsgGetSubscriptionSubject(myMsgPointer,&s))!=CMSG_OK) {
01285 throw(cMsgException(cMsgPerror(stat),stat));
01286 };
01287
01288 if(s==NULL) {
01289 return("null");
01290 } else {
01291 return(string(s));
01292 }
01293 }
01294
01295
01296
01297
01298
01305 string cMsgMessage::getSubscriptionType(void) const throw(cMsgException) {
01306
01307 const char *s;
01308
01309 int stat;
01310 if((stat=cMsgGetSubscriptionType(myMsgPointer,&s))!=CMSG_OK) {
01311 throw(cMsgException(cMsgPerror(stat),stat));
01312 };
01313
01314 if(s==NULL) {
01315 return("null");
01316 } else {
01317 return(string(s));
01318 }
01319 }
01320
01321
01322
01323
01324
01331 string cMsgMessage::getSubscriptionUDL(void) const throw(cMsgException) {
01332
01333 const char *s;
01334
01335 int stat;
01336 if((stat=cMsgGetSubscriptionUDL(myMsgPointer,&s))!=CMSG_OK) {
01337 throw(cMsgException(cMsgPerror(stat),stat));
01338 };
01339
01340 if(s==NULL) {
01341 return("null");
01342 } else {
01343 return(string(s));
01344 }
01345 }
01346
01347
01348
01349
01350
01357 int cMsgMessage::getSubscriptionCueSize(void) const throw(cMsgException) {
01358
01359 int i;
01360
01361 int stat;
01362 if((stat=cMsgGetSubscriptionCueSize(myMsgPointer,&i))!=CMSG_OK) {
01363 throw(cMsgException(cMsgPerror(stat),stat));
01364 }
01365 return(i);
01366 }
01367
01368
01369
01370
01371
01378 bool cMsgMessage::getReliableSend(void) const throw(cMsgException) {
01379
01380 int i;
01381
01382 int stat;
01383 if((stat=cMsgGetReliableSend(myMsgPointer,&i))!=CMSG_OK) {
01384 throw(cMsgException(cMsgPerror(stat),stat));
01385 }
01386 return(i == 0 ? false : true);
01387 }
01388
01389
01390
01391
01392
01399 void cMsgMessage::setReliableSend(bool b) throw(cMsgException) {
01400
01401 int i = b ? 1 : 0;
01402
01403 int stat;
01404 if((stat=cMsgSetReliableSend(myMsgPointer,i))!=CMSG_OK) {
01405 throw(cMsgException(cMsgPerror(stat),stat));
01406 }
01407 }
01408
01409
01410
01411
01412
01413
01414
01418 cMsgSubscriptionConfig::cMsgSubscriptionConfig(void) {
01419 config = cMsgSubscribeConfigCreate();
01420 }
01421
01422
01423
01424
01425
01429 cMsgSubscriptionConfig::~cMsgSubscriptionConfig(void) {
01430 cMsgSubscribeConfigDestroy(config);
01431 }
01432
01433
01434
01435
01436
01442 int cMsgSubscriptionConfig::getMaxCueSize(void) const {
01443 int size;
01444 cMsgSubscribeGetMaxCueSize(config,&size);
01445 return(size);
01446 }
01447
01448
01449
01450
01451
01457 void cMsgSubscriptionConfig::setMaxCueSize(int size) {
01458 cMsgSubscribeSetMaxCueSize(config,size);
01459 }
01460
01461
01462
01463
01464
01470 int cMsgSubscriptionConfig::getSkipSize(void) const {
01471 int size;
01472 cMsgSubscribeGetSkipSize(config,&size);
01473 return(size);
01474 }
01475
01476
01477
01478
01479
01485 void cMsgSubscriptionConfig::setSkipSize(int size) {
01486 cMsgSubscribeSetSkipSize(config,size);
01487 }
01488
01489
01490
01491
01492
01498 bool cMsgSubscriptionConfig::getMaySkip(void) const {
01499 int maySkip;
01500 cMsgSubscribeGetMaySkip(config,&maySkip);
01501 return((maySkip==0)?false:true);
01502 }
01503
01504
01505
01506
01507
01513 void cMsgSubscriptionConfig::setMaySkip(bool maySkip) {
01514 cMsgSubscribeSetMaySkip(config,(maySkip)?1:0);
01515 }
01516
01517
01518
01519
01520
01526 bool cMsgSubscriptionConfig::getMustSerialize(void) const {
01527 int maySerialize;
01528 cMsgSubscribeGetMustSerialize(config,&maySerialize);
01529 return((maySerialize==0)?false:true);
01530 }
01531
01532
01533
01534
01535
01541 void cMsgSubscriptionConfig::setMustSerialize(bool mustSerialize) {
01542 cMsgSubscribeSetMustSerialize(config,(mustSerialize)?1:0);
01543 }
01544
01545
01546
01547
01548
01554 int cMsgSubscriptionConfig::getMaxThreads(void) const {
01555 int max;
01556 cMsgSubscribeGetMaxThreads(config,&max);
01557 return(max);
01558 }
01559
01560
01561
01562
01563
01569 void cMsgSubscriptionConfig::setMaxThreads(int max) {
01570 cMsgSubscribeSetMaxThreads(config,max);
01571 }
01572
01573
01574
01575
01576
01582 int cMsgSubscriptionConfig::getMessagesPerThread(void) const {
01583 int mpt;
01584 cMsgSubscribeGetMessagesPerThread(config,&mpt);
01585 return(mpt);
01586 }
01587
01588
01589
01590
01591
01597 void cMsgSubscriptionConfig::setMessagesPerThread(int mpt) {
01598 cMsgSubscribeSetMessagesPerThread(config,mpt);
01599 }
01600
01601
01602
01603
01604
01610 size_t cMsgSubscriptionConfig::getStackSize(void) const {
01611 size_t size;
01612 cMsgSubscribeGetStackSize(config,&size);
01613 return(size);
01614 }
01615
01616
01617
01618
01619
01625 void cMsgSubscriptionConfig::setStackSize(size_t size) {
01626 cMsgSubscribeSetStackSize(config,size);
01627 }
01628
01629
01630
01631
01632
01633
01634
01642 cMsg::cMsg(const string &UDL, const string &name, const string &descr)
01643 : myUDL(UDL), myName(name), myDescr(descr), initialized(false) {
01644 }
01645
01646
01647
01648
01649
01654 cMsg::~cMsg(void) {
01655 cMsg::disconnect();
01656 }
01657
01658
01659
01660
01661
01673 void cMsg::connect(void) throw(cMsgException) {
01674 int stat;
01675
01676
01677 if(initialized) {
01678 if((stat=cMsgReconnect(myDomainId))!=CMSG_OK) {
01679 throw(cMsgException(cMsgPerror(stat),stat));
01680 }
01681 return;
01682 }
01683
01684
01685 if((stat=cMsgConnect(myUDL.c_str(),myName.c_str(),myDescr.c_str(),&myDomainId))!=CMSG_OK) {
01686 throw(cMsgException(cMsgPerror(stat),stat));
01687 }
01688 initialized=true;
01689 }
01690
01691
01692
01693
01694
01699 void cMsg::disconnect(void) throw(cMsgException) {
01700
01701 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01702
01703 cMsgDisconnect(&myDomainId);
01704
01705 deleteSubscriptions(myDomainId);
01706 }
01707
01708
01709
01710
01711
01718 void cMsg::send(cMsgMessage &msg) throw(cMsgException) {
01719
01720 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01721
01722
01723 int stat;
01724 if((stat=cMsgSend(myDomainId,msg.myMsgPointer))!=CMSG_OK) {
01725 throw(cMsgException(cMsgPerror(stat),stat));
01726 }
01727 }
01728
01729
01730
01731
01732
01739 void cMsg::send(cMsgMessage *msg) throw(cMsgException) {
01740 cMsg::send(*msg);
01741 }
01742
01743
01744
01745
01746
01754 int cMsg::syncSend(cMsgMessage &msg, const struct timespec *timeout) throw(cMsgException) {
01755
01756 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01757
01758
01759 int response;
01760
01761 int stat;
01762 if((stat=cMsgSyncSend(myDomainId,msg.myMsgPointer,timeout,&response))!=CMSG_OK) {
01763 throw(cMsgException(cMsgPerror(stat),stat));
01764 }
01765 return(response);
01766 }
01767
01768
01769
01770
01771
01779 int cMsg::syncSend(cMsgMessage *msg, const struct timespec *timeout) throw(cMsgException) {
01780 return(cMsg::syncSend(*msg, timeout));
01781 }
01782
01783
01784
01785
01786
01800 void *cMsg::subscribe(const string &subject, const string &type, cMsgCallback *cb, void *userArg,
01801 const cMsgSubscriptionConfig *cfg) throw(cMsgException) {
01802
01803 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01804
01805
01806 int stat;
01807 void *handle;
01808
01809
01810
01811 dispatcherStruct *d = new dispatcherStruct();
01812 d->cb=cb;
01813 d->userArg=userArg;
01814
01815
01816
01817 stat=cMsgSubscribe(myDomainId,
01818 (subject.size()<=0)?NULL:subject.c_str(),
01819 (type.size()<=0)?NULL:type.c_str(),
01820 callbackDispatcher,
01821 (void*)d,
01822 (cfg==NULL)?NULL:(cfg->config),
01823 &handle);
01824
01825
01826
01827 if(stat!=CMSG_OK) {
01828 delete(d);
01829 throw(cMsgException(cMsgPerror(stat),stat));
01830 }
01831
01832
01833
01834 addSubscription(myDomainId,subject,type,d,handle);
01835
01836
01837 return(handle);
01838 }
01839
01840
01841
01842
01843
01857 void *cMsg::subscribe(const string &subject, const string &type, cMsgCallback &cb, void *userArg,
01858 const cMsgSubscriptionConfig *cfg) throw(cMsgException) {
01859 return(cMsg::subscribe(subject, type, &cb, userArg, cfg));
01860 }
01861
01862
01863
01864
01865
01872 void cMsg::unsubscribe(void *handle) throw(cMsgException) {
01873
01874 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01875
01876
01877 int stat;
01878
01879
01880
01881 if(!deleteSubscription(myDomainId,handle)) {
01882 throw(cMsgException(cMsgPerror(CMSG_BAD_ARGUMENT),CMSG_BAD_ARGUMENT));
01883 }
01884
01885
01886
01887 if((stat=cMsgUnSubscribe(myDomainId,handle))!=CMSG_OK) {
01888 throw(cMsgException(cMsgPerror(stat),stat));
01889 }
01890
01891 }
01892
01893
01894
01895
01896
01903 void cMsg::subscriptionPause(void *handle) throw(cMsgException) {
01904
01905 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01906
01907 int stat;
01908
01909
01910 if((stat=cMsgSubscriptionPause(myDomainId,handle))!=CMSG_OK) {
01911 throw(cMsgException(cMsgPerror(stat),stat));
01912 }
01913
01914 }
01915
01916
01917
01918
01919
01926 void cMsg::subscriptionResume(void *handle) throw(cMsgException) {
01927
01928 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01929
01930 int stat;
01931
01932
01933 if((stat=cMsgSubscriptionResume(myDomainId,handle))!=CMSG_OK) {
01934 throw(cMsgException(cMsgPerror(stat),stat));
01935 }
01936
01937 }
01938
01939
01940
01941
01942
01949 void cMsg::subscriptionQueueClear(void *handle) throw(cMsgException) {
01950
01951 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01952
01953 int stat;
01954
01955
01956 if((stat=cMsgSubscriptionQueueClear(myDomainId,handle))!=CMSG_OK) {
01957 throw(cMsgException(cMsgPerror(stat),stat));
01958 }
01959
01960 }
01961
01962
01963
01964
01965
01973 int cMsg::subscriptionQueueCount(void *handle) throw(cMsgException) {
01974
01975 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
01976
01977 int stat, count;
01978
01979
01980 if((stat=cMsgSubscriptionQueueCount(myDomainId,handle,&count))!=CMSG_OK) {
01981 throw(cMsgException(cMsgPerror(stat),stat));
01982 }
01983 return count;
01984
01985 }
01986
01987
01988
01989
01990
01998 int cMsg::subscriptionMessagesTotal(void *handle) throw(cMsgException) {
01999
02000 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02001
02002 int stat, count;
02003
02004
02005 if((stat=cMsgSubscriptionMessagesTotal(myDomainId,handle,&count))!=CMSG_OK) {
02006 throw(cMsgException(cMsgPerror(stat),stat));
02007 }
02008 return count;
02009
02010 }
02011
02012
02013
02014
02015
02023 bool cMsg::subscriptionQueueIsFull(void *handle) throw(cMsgException) {
02024
02025 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02026
02027 int stat, val;
02028
02029
02030 if((stat=cMsgSubscriptionQueueIsFull(myDomainId,handle,&val))!=CMSG_OK) {
02031 throw(cMsgException(cMsgPerror(stat),stat));
02032 }
02033 return (val == 0 ? false : true);
02034
02035 }
02036
02037
02038
02039
02040
02050 cMsgMessage *cMsg::sendAndGet(cMsgMessage &sendMsg, const struct timespec *timeout) throw(cMsgException) {
02051
02052 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02053
02054
02055 void *replyPtr;
02056
02057 int stat;
02058 if((stat=cMsgSendAndGet(myDomainId,sendMsg.myMsgPointer,timeout,&replyPtr))!=CMSG_OK) {
02059 throw(cMsgException(cMsgPerror(stat),stat));
02060 }
02061
02062 return(new cMsgMessage(replyPtr));
02063 }
02064
02065
02066
02067
02068
02078 cMsgMessage *cMsg::sendAndGet(cMsgMessage *sendMsg, const struct timespec *timeout) throw(cMsgException) {
02079
02080 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02081
02082
02083 void *replyPtr;
02084
02085 int stat;
02086 if((stat=cMsgSendAndGet(myDomainId,sendMsg->myMsgPointer,timeout,&replyPtr))!=CMSG_OK) {
02087 throw(cMsgException(cMsgPerror(stat),stat));
02088 }
02089
02090 return(new cMsgMessage(replyPtr));
02091 }
02092
02093
02094
02095
02106 cMsgMessage *cMsg::subscribeAndGet(const string &subject, const string &type, const struct timespec *timeout) throw(cMsgException) {
02107
02108 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02109
02110
02111 void *replyPtr;
02112
02113 int stat;
02114 if((stat=cMsgSubscribeAndGet(myDomainId,subject.c_str(),type.c_str(),timeout,&replyPtr))!=CMSG_OK) {
02115 throw(cMsgException(cMsgPerror(stat),stat));
02116 }
02117
02118 return(new cMsgMessage(replyPtr));
02119 }
02120
02121
02122
02123
02124
02131 void cMsg::flush(const struct timespec *timeout) throw(cMsgException) {
02132
02133 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02134
02135
02136 int stat;
02137 if((stat=cMsgFlush(myDomainId, timeout))!=CMSG_OK) {
02138 throw(cMsgException(cMsgPerror(stat),stat));
02139 }
02140 }
02141
02142
02143
02144
02145
02150 void cMsg::start(void) throw(cMsgException) {
02151
02152 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02153
02154
02155 int stat;
02156 if((stat=cMsgReceiveStart(myDomainId))!=CMSG_OK) {
02157 throw(cMsgException(cMsgPerror(stat),stat));
02158 }
02159 }
02160
02161
02162
02163
02164
02169 void cMsg::stop(void) throw(cMsgException) {
02170
02171 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02172
02173
02174 int stat;
02175 if((stat=cMsgReceiveStop(myDomainId))!=CMSG_OK) {
02176 throw(cMsgException(cMsgPerror(stat),stat));
02177 }
02178 }
02179
02180
02181
02182
02183
02189 string cMsg::getDescription(void) const {
02190 return(myDescr);
02191 }
02192
02193
02194
02195
02196
02202 string cMsg::getName(void) const {
02203 return(myName);
02204 }
02205
02206
02207
02208
02209
02215 string cMsg::getUDL(void) const {
02216 return(myUDL);
02217 }
02218
02219
02220
02221
02222
02230 void cMsg::setUDL(const string &udl) throw(cMsgException) {
02231
02232 int stat;
02233 if((stat=cMsgSetUDL(myDomainId,udl.c_str()))!=CMSG_OK) {
02234 throw(cMsgException(cMsgPerror(stat),stat));
02235 }
02236 }
02237
02238
02239
02240
02241
02248 string cMsg::getCurrentUDL(void) const throw(cMsgException) {
02249 const char *s;
02250
02251 int stat;
02252 if((stat=cMsgGetCurrentUDL(myDomainId,&s))!=CMSG_OK) {
02253 throw(cMsgException(cMsgPerror(stat),stat));
02254 };
02255
02256 if(s==NULL) {
02257 return("null");
02258 } else {
02259 return(string(s));
02260 }
02261 }
02262
02263
02264
02265
02266
02273 bool cMsg::isConnected(void) const throw(cMsgException) {
02274
02275 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02276
02277
02278 int stat,connected;
02279 if((stat=cMsgGetConnectState(myDomainId,&connected))!=CMSG_OK) {
02280 throw(cMsgException(cMsgPerror(stat),stat));
02281 }
02282 return(connected==1);
02283 }
02284
02285
02286
02287
02288
02295 bool cMsg::isReceiving(void) const throw(cMsgException) {
02296
02297 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02298
02299
02300 int stat,receiving;
02301 if((stat=cMsgGetReceiveState(myDomainId,&receiving))!=CMSG_OK) {
02302 throw(cMsgException(cMsgPerror(stat),stat));
02303 }
02304 return(receiving==1);
02305 }
02306
02307
02308
02309
02310
02318 void cMsg::setShutdownHandler(cMsgShutdownHandler *handler, void* userArg) throw(cMsgException) {
02319
02320 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02321
02322
02323 int stat;
02324 if((stat=cMsgSetShutdownHandler(myDomainId,handler,userArg))!=CMSG_OK) {
02325 throw(cMsgException(cMsgPerror(stat),stat));
02326 }
02327 }
02328
02329
02330
02331
02332
02340 void cMsg::shutdownClients(const string &client, int flag) throw(cMsgException) {
02341
02342 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02343
02344
02345 int stat;
02346 if((stat=cMsgShutdownClients(myDomainId,client.c_str(),flag))!=CMSG_OK) {
02347 throw(cMsgException(cMsgPerror(stat),stat));
02348 }
02349 }
02350
02351
02352
02353
02354
02362 void cMsg::shutdownServers(const string &server, int flag) throw(cMsgException) {
02363
02364 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02365
02366
02367 int stat;
02368 if((stat=cMsgShutdownServers(myDomainId,server.c_str(),flag))!=CMSG_OK) {
02369 throw(cMsgException(cMsgPerror(stat),stat));
02370 }
02371 }
02372
02373
02374
02375
02376
02384 cMsgMessage *cMsg::monitor(const string &monString) throw(cMsgException) {
02385
02386 if(!initialized)throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
02387
02388 void *m = cMsgCreateMessage();
02389 if(m==NULL)throw(cMsgException("?cMsgMessage constructor...unable to create message",CMSG_ERROR));
02390
02391 int stat;
02392 if((stat=cMsgMonitor(myDomainId,monString.c_str(),&m))!=CMSG_OK) {
02393 cMsgFreeMessage(&m);
02394 throw(cMsgException(cMsgPerror(stat),stat));
02395 }
02396
02397 return(new cMsgMessage(m));
02398 }
02399
02400
02401
02402