25 #include <cMsgPrivate.h>
64 static vector<subscrStruct*> subscrVec;
68 static pthread_mutex_t subscrMutex = PTHREAD_MUTEX_INITIALIZER;
75 static void callbackDispatcher(
void *msg,
void *userArg) {
76 dispatcherStruct *ds = (dispatcherStruct*)userArg;
135 static void addSubscription(
void *domainId,
const string &subject,
const string &type,
136 dispatcherStruct *d,
void *handle) {
138 subscrStruct *s =
new subscrStruct();
140 s->domainId=domainId;
146 pthread_mutex_lock(&subscrMutex);
147 subscrVec.push_back(s);
148 pthread_mutex_unlock(&subscrMutex);
165 static bool deleteSubscription(
void *domainId,
void *handle) {
167 bool deleted =
false;
169 pthread_mutex_lock(&subscrMutex);
170 vector<subscrStruct*>::iterator iter;
171 for(iter=subscrVec.begin(); iter!=subscrVec.end(); iter++) {
172 if(((*iter)->domainId==domainId)&&((*iter)->handle==handle)) {
175 subscrVec.erase(iter);
180 pthread_mutex_unlock(&subscrMutex);
194 static void deleteSubscriptions(
void *domainId) {
196 pthread_mutex_lock(&subscrMutex);
197 vector<subscrStruct*>::iterator iter;
198 for(iter=subscrVec.begin(); iter!=subscrVec.end(); iter++) {
199 if((*iter)->domainId==domainId) {
202 subscrVec.erase(iter);
205 pthread_mutex_unlock(&subscrMutex);
225 cMsgException::cMsgException(
void) : descr(
""), returnCode(0) {}
282 ss <<
"?cMsgException returnCode = " <<
returnCode <<
" descr = " <<
descr << ends;
313 throw(cMsgException(
"?cMsgMessage constructor...unable to create message",CMSG_ERROR));
329 myMsgPointer=cMsgCopyMessage(msg.myMsgPointer);
330 if(myMsgPointer==NULL) {
331 throw(
cMsgException(
"?cMsgMessage copy constructor...unable to create message",CMSG_ERROR));
347 myMsgPointer=msgPointer;
348 if(myMsgPointer==NULL) {
349 throw(
cMsgException(
"?cMsgMessage pointer constructor...unable to create message",CMSG_ERROR));
380 throw(cMsgException(cMsgPerror(stat),stat));
403 if((stat=cMsgSetSubject(myMsgPointer,subject.c_str()))!=CMSG_OK) {
424 throw(cMsgException(cMsgPerror(stat),stat));
447 if((stat=cMsgSetType(myMsgPointer,type.c_str()))!=CMSG_OK) {
468 throw(cMsgException(cMsgPerror(stat),stat));
491 if((stat=cMsgSetText(myMsgPointer,text.c_str()))!=CMSG_OK) {
509 if((stat=cMsgSetByteArrayLength(myMsgPointer,length)!=CMSG_OK)) {
570 if((stat=cMsgSetByteArrayOffset(myMsgPointer,offset)!=CMSG_OK)) {
605 if((stat=cMsgSetByteArray(myMsgPointer,array, length)!=CMSG_OK)) {
624 if((stat=cMsgSetByteArrayNoCopy(myMsgPointer,array,length)!=CMSG_OK)) {
687 if((stat=cMsgSetByteArrayEndian(myMsgPointer,endian))!=CMSG_OK) {
708 if((stat=cMsgNeedToSwap(
myMsgPointer,&flag))!=CMSG_OK) {
709 throw(cMsgException(cMsgPerror(stat),stat));
731 throw(cMsgException(cMsgPerror(stat),stat));
749 if((stat=cMsgSetUserInt(myMsgPointer,i))!=CMSG_OK) {
764 struct timespec
cMsgMessage::getUserTime(void) const throw(cMsgException) {
769 if((stat=cMsgGetUserTime(myMsgPointer,&t))!=CMSG_OK) {
787 if((stat=cMsgSetUserTime(myMsgPointer, &userTime))!=CMSG_OK) {
806 if((stat=cMsgGetVersion(
myMsgPointer, &version))!=CMSG_OK) {
807 throw(cMsgException(cMsgPerror(stat),stat));
843 throw(cMsgException(cMsgPerror(stat),stat));
869 throw(cMsgException(cMsgPerror(stat),stat));
894 if((stat=cMsgGetReceiverHost(
myMsgPointer,&s))!=CMSG_OK) {
895 throw(cMsgException(cMsgPerror(stat),stat));
921 throw(cMsgException(cMsgPerror(stat),stat));
946 if((stat=cMsgGetSenderHost(
myMsgPointer,&s))!=CMSG_OK) {
947 throw(cMsgException(cMsgPerror(stat),stat));
967 struct timespec
cMsgMessage::getReceiverTime(void) const throw(cMsgException) {
972 if((stat=cMsgGetReceiverTime(myMsgPointer,&t))!=CMSG_OK) {
988 struct timespec
cMsgMessage::getSenderTime(void) const throw(cMsgException) {
993 if((stat=cMsgGetSenderTime(myMsgPointer,&t))!=CMSG_OK) {
1013 if((stat=cMsgGetGetRequest(
myMsgPointer,&b))!=CMSG_OK) {
1014 throw(cMsgException(cMsgPerror(stat),stat));
1033 if((stat=cMsgGetGetResponse(
myMsgPointer,&b))!=CMSG_OK) {
1034 throw(cMsgException(cMsgPerror(stat),stat));
1054 if((stat=cMsgGetNullGetResponse(
myMsgPointer,&b))!=CMSG_OK) {
1055 throw(cMsgException(cMsgPerror(stat),stat));
1072 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
1073 cMsgMessage_t *m = (cMsgMessage_t*)msg.myMsgPointer;
1075 t->sysMsgId = m->sysMsgId;
1076 t->senderToken = m->senderToken;
1077 t->info = CMSG_IS_GET_RESPONSE | CMSG_IS_NULL_GET_RESPONSE;
1092 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
1093 cMsgMessage_t *m = (cMsgMessage_t*)msg->myMsgPointer;
1095 t->sysMsgId = m->sysMsgId;
1096 t->senderToken = m->senderToken;
1097 t->info = CMSG_IS_GET_RESPONSE | CMSG_IS_NULL_GET_RESPONSE;
1112 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
1113 cMsgMessage_t *m = (cMsgMessage_t*)msg.myMsgPointer;
1115 t->sysMsgId = m->sysMsgId;
1116 t->senderToken = m->senderToken;
1117 t->info = CMSG_IS_GET_RESPONSE;
1132 cMsgMessage_t *t = (cMsgMessage_t*)myMsgPointer;
1133 cMsgMessage_t *m = (cMsgMessage_t*)msg->myMsgPointer;
1135 t->sysMsgId = m->sysMsgId;
1136 t->senderToken = m->senderToken;
1137 t->info = CMSG_IS_GET_RESPONSE;
1152 void *newMsgPointer;
1153 if((newMsgPointer=cMsgCreateNullResponseMessage(
myMsgPointer))==NULL) {
1154 throw(cMsgException(
"?cMsgMessage::nullResponse...unable to create message",CMSG_ERROR));
1172 void *newMsgPointer;
1173 if((newMsgPointer=cMsgCreateResponseMessage(
myMsgPointer))==NULL) {
1174 throw(cMsgException(
"?cMsgMessage::response...unable to create message",CMSG_ERROR));
1193 if((stat=cMsgSetGetResponse(myMsgPointer,b))!=CMSG_OK) {
1211 if((stat=cMsgSetNullGetResponse(myMsgPointer,b))!=CMSG_OK) {
1232 throw(cMsgException(cMsgPerror(stat),stat));
1258 if((stat=cMsgGetSubscriptionDomain(
myMsgPointer,&s))!=CMSG_OK) {
1259 throw(cMsgException(cMsgPerror(stat),stat));
1284 if((stat=cMsgGetSubscriptionSubject(
myMsgPointer,&s))!=CMSG_OK) {
1285 throw(cMsgException(cMsgPerror(stat),stat));
1310 if((stat=cMsgGetSubscriptionType(
myMsgPointer,&s))!=CMSG_OK) {
1311 throw(cMsgException(cMsgPerror(stat),stat));
1336 if((stat=cMsgGetSubscriptionUDL(
myMsgPointer,&s))!=CMSG_OK) {
1337 throw(cMsgException(cMsgPerror(stat),stat));
1362 if((stat=cMsgGetSubscriptionCueSize(
myMsgPointer,&i))!=CMSG_OK) {
1363 throw(cMsgException(cMsgPerror(stat),stat));
1383 if((stat=cMsgGetReliableSend(
myMsgPointer,&i))!=CMSG_OK) {
1384 throw(cMsgException(cMsgPerror(stat),stat));
1386 return(i == 0 ?
false :
true);
1404 if((stat=cMsgSetReliableSend(myMsgPointer,i))!=CMSG_OK) {
1419 config = cMsgSubscribeConfigCreate();
1430 cMsgSubscribeConfigDestroy(
config);
1444 cMsgSubscribeGetMaxCueSize(
config,&size);
1458 cMsgSubscribeSetMaxCueSize(
config,size);
1472 cMsgSubscribeGetSkipSize(
config,&size);
1486 cMsgSubscribeSetSkipSize(
config,size);
1500 cMsgSubscribeGetMaySkip(
config,&maySkip);
1501 return((maySkip==0)?
false:
true);
1514 cMsgSubscribeSetMaySkip(
config,(maySkip)?1:0);
1528 cMsgSubscribeGetMustSerialize(
config,&maySerialize);
1529 return((maySerialize==0)?
false:
true);
1542 cMsgSubscribeSetMustSerialize(
config,(mustSerialize)?1:0);
1556 cMsgSubscribeGetMaxThreads(
config,&max);
1570 cMsgSubscribeSetMaxThreads(
config,max);
1584 cMsgSubscribeGetMessagesPerThread(
config,&mpt);
1598 cMsgSubscribeSetMessagesPerThread(
config,mpt);
1612 cMsgSubscribeGetStackSize(
config,&size);
1626 cMsgSubscribeSetStackSize(
config,size);
1642 cMsg::cMsg(
const string &UDL,
const string &name,
const string &descr)
1643 : myUDL(UDL), myName(name), myDescr(descr), initialized(false) {
1678 if((stat=cMsgReconnect(myDomainId))!=CMSG_OK) {
1679 throw(cMsgException(cMsgPerror(stat),stat));
1685 if((stat=cMsgConnect(myUDL.c_str(),myName.c_str(),myDescr.c_str(),&myDomainId))!=CMSG_OK) {
1686 throw(cMsgException(cMsgPerror(stat),stat));
1701 if(!initialized)
throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1703 cMsgDisconnect(&myDomainId);
1705 deleteSubscriptions(myDomainId);
1720 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1724 if((stat=cMsgSend(myDomainId,msg.myMsgPointer))!=CMSG_OK) {
1756 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1762 if((stat=cMsgSyncSend(myDomainId,msg.myMsgPointer,timeout,&response))!=CMSG_OK) {
1803 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1811 dispatcherStruct *d =
new dispatcherStruct();
1817 stat=cMsgSubscribe(myDomainId,
1818 (subject.size()<=0)?NULL:subject.c_str(),
1819 (type.size()<=0)?NULL:type.c_str(),
1822 (cfg==NULL)?NULL:(cfg->config),
1834 addSubscription(myDomainId,subject,type,d,handle);
1874 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1881 if(!deleteSubscription(myDomainId,handle)) {
1882 throw(
cMsgException(cMsgPerror(CMSG_BAD_ARGUMENT),CMSG_BAD_ARGUMENT));
1887 if((stat=cMsgUnSubscribe(myDomainId,handle))!=CMSG_OK) {
1905 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1910 if((stat=cMsgSubscriptionPause(myDomainId,handle))!=CMSG_OK) {
1928 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1933 if((stat=cMsgSubscriptionResume(myDomainId,handle))!=CMSG_OK) {
1951 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1956 if((stat=cMsgSubscriptionQueueClear(myDomainId,handle))!=CMSG_OK) {
1975 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
1980 if((stat=cMsgSubscriptionQueueCount(myDomainId,handle,&count))!=CMSG_OK) {
2000 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2005 if((stat=cMsgSubscriptionMessagesTotal(myDomainId,handle,&count))!=CMSG_OK) {
2025 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2030 if((stat=cMsgSubscriptionQueueIsFull(myDomainId,handle,&val))!=CMSG_OK) {
2033 return (val == 0 ?
false :
true);
2052 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2058 if((stat=cMsgSendAndGet(myDomainId,sendMsg.myMsgPointer,timeout,&replyPtr))!=CMSG_OK) {
2080 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2086 if((stat=cMsgSendAndGet(myDomainId,sendMsg->myMsgPointer,timeout,&replyPtr))!=CMSG_OK) {
2108 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2114 if((stat=cMsgSubscribeAndGet(myDomainId,subject.c_str(),type.c_str(),timeout,&replyPtr))!=CMSG_OK) {
2133 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2137 if((stat=cMsgFlush(myDomainId, timeout))!=CMSG_OK) {
2152 if(!initialized)
throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2156 if((stat=cMsgReceiveStart(myDomainId))!=CMSG_OK) {
2157 throw(cMsgException(cMsgPerror(stat),stat));
2171 if(!initialized)
throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2175 if((stat=cMsgReceiveStop(myDomainId))!=CMSG_OK) {
2176 throw(cMsgException(cMsgPerror(stat),stat));
2233 if((stat=cMsgSetUDL(myDomainId,udl.c_str()))!=CMSG_OK) {
2252 if((stat=cMsgGetCurrentUDL(myDomainId,&s))!=CMSG_OK) {
2253 throw(cMsgException(cMsgPerror(stat),stat));
2275 if(!initialized)
throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2279 if((stat=cMsgGetConnectState(myDomainId,&connected))!=CMSG_OK) {
2280 throw(cMsgException(cMsgPerror(stat),stat));
2282 return(connected==1);
2297 if(!initialized)
throw(cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2301 if((stat=cMsgGetReceiveState(myDomainId,&receiving))!=CMSG_OK) {
2302 throw(cMsgException(cMsgPerror(stat),stat));
2304 return(receiving==1);
2320 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2324 if((stat=cMsgSetShutdownHandler(myDomainId,handler,userArg))!=CMSG_OK) {
2342 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2346 if((stat=cMsgShutdownClients(myDomainId,client.c_str(),flag))!=CMSG_OK) {
2364 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2368 if((stat=cMsgShutdownServers(myDomainId,server.c_str(),flag))!=CMSG_OK) {
2386 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2388 void *m = cMsgCreateMessage();
2389 if(m==NULL)
throw(
cMsgException(
"?cMsgMessage constructor...unable to create message",CMSG_ERROR));
2392 if((stat=cMsgMonitor(myDomainId,monString.c_str(),&m))!=CMSG_OK) {
2393 cMsgFreeMessage(&m);
2412 if(!initialized)
throw(
cMsgException(cMsgPerror(CMSG_NOT_INITIALIZED),CMSG_NOT_INITIALIZED));
2415 if((stat=cMsgMonitor(myDomainId,monString.c_str(),NULL))!=CMSG_OK)
virtual int getByteArrayEndian(void)
Gets endian-ness of message byte array.
virtual cMsgMessage * nullResponse(void) const
Creates a null response message.
virtual void * subscribe(const string &subject, const string &type, cMsgCallback *cb, void *userArg, const cMsgSubscriptionConfig *cfg=NULL)
Subscribes to subject,type and specifies callback, userArg.
virtual int syncSend(cMsgMessage &msg, const struct timespec *timeout=NULL)
Synchronously sends message.
virtual const char * what(void) const
Gets char* represention of exception.
virtual string getName(void) const
Gets connection name.
virtual void connect()
Connects to cMsg system.
virtual void setByteArrayNoCopy(char *array, int length)
Specifies the byte array by only copying the pointer to the array.
virtual void setMessagesPerThread(int mpt)
Sets max messages per thread.
virtual string getSubscriptionUDL() const
Gets subscription UDL.
virtual string getSubscriptionSubject() const
Gets subscription subject.
virtual int getByteArrayLength(void)
Gets message region-of-interest byte array length.
virtual void setShutdownHandler(cMsgShutdownHandler *handler, void *userArg)
Sets shutdown handler.
virtual void setMustSerialize(bool mustSerialize)
Sets must serialize flag.
virtual void flush(const struct timespec *timeout=NULL)
Flushes outgoing message queues.
virtual size_t getStackSize(void) const
Gets message stack size.
virtual bool isNullGetResponse(void) const
True if message is a NULL get response.
void * myMsgPointer
Pointer to C message structure.
virtual string getDomain(void) const
Gets message domain.
cMsgSubscriptionConfig(void)
Constructor creates empty subscription config.
virtual bool isReceiving(void) const
True if receiving messages.
virtual string getDescription(void) const
Gets connection description.
virtual int subscriptionMessagesTotal(void *handle)
Return the total number of messages passed to the given subscription's callback.
virtual void setStackSize(size_t size)
Sets message stack size.
virtual void setType(const string &type)
Sets message type.
virtual cMsgMessage * subscribeAndGet(const string &subject, const string &type, const struct timespec *timeout=NULL)
Subscribes to subject/type, returns one matching message, then unsubscribes.
virtual cMsgMessage * response(void) const
Creates a response message.
Class for wrapping cMsg message.
virtual int getMessagesPerThread(void) const
Gets max messages per thread.
virtual void setGetResponse(bool b)
Makes message a get response message.
virtual ~cMsgSubscriptionConfig(void)
Deletes subscription config.
virtual void subscriptionPause(void *handle)
Pause delivery of messages to the given subscription's callback.
virtual string getUDL(void) const
Gets connection UDL.
virtual bool getMaySkip(void) const
True if may skip messages upon overflow.
virtual string getSenderHost(void) const
Gets message sender host.
virtual void setByteArrayOffset(int offset)
Specifies offset in byte array.
virtual int getUserInt(void) const
Gets message user int.
virtual string getSubscriptionType() const
Gets subscription type.
virtual cMsgMessage * monitor(const string &monString)
Returns domain-dependent monitoring information.
virtual void start(void)
Enables delivery of messages to callbacks.
virtual bool isGetResponse(void) const
True if message is a get response.
virtual bool isGetRequest(void) const
True if message is a get request.
virtual int getByteArrayOffset(void)
Gets offset in byte array.
virtual string getCurrentUDL(void) const
Gets UDL of the current connection, "null" if no connection.
virtual void resetByteArrayLength()
Sets message region-of-interest byte array length to the full length of the array.
virtual void setUDL(const string &udl)
Sets the connection UDL.
virtual int getVersion(void) const
Gets cMsg version.
virtual bool getMustSerialize(void) const
Gets must serialize flag.
virtual void makeNullResponse(const cMsgMessage &msg)
Makes a message a null response message.
virtual void send(cMsgMessage &msg)
Sends message.
Interface defines callback method.
virtual string toString(void) const
Gets string represention of exception.
virtual void setMonitoringString(const string &monString)
Sets monitoring string.
virtual void makeResponse(const cMsgMessage &msg)
Makes a message a response message.
virtual string getSender(void) const
Gets message sender.
virtual bool isConnected(void) const
True if connected.
virtual string getReceiver(void) const
Gets message receiver.
cMsgException(void)
Empty constructor.
virtual ~cMsgMessage(void)
Destructor frees C message pointer struct.
cMsg(const string &UDL, const string &name, const string &descr)
Constructor for cMsg system object.
virtual void setByteArray(char *array, int length)
Specifies byte array by copying it.
virtual int getByteArrayLengthFull(void)
Gets message full byte array length.
virtual void setSubject(const string &subject)
Sets message subject.
int returnCode
Return code.
virtual int getMaxThreads(void) const
Gets max callback threads.
virtual cMsgMessage * copy(void) const
Copies a message.
virtual void setMaxCueSize(int size)
Sets max cue size.
virtual void setSkipSize(int size)
Sets skip size.
virtual int getSkipSize(void) const
Gets skip size.
virtual char * getByteArray(void)
Gets byte array.
virtual bool needToSwap(void) const
True if need to swap byte array.
virtual ~cMsgException(void)
Destructor does nothing.
virtual cMsgMessage * sendAndGet(cMsgMessage &sendMsg, const struct timespec *timeout=NULL)
Sends message and gets reply.
virtual void unsubscribe(void *handle)
Unsubscribes.
virtual void subscriptionResume(void *handle)
Resume delivery of messages to the given subscription's callback if paused.
virtual void shutdownServers(const string &server, int flag)
Shuts down a server.
cMsgSubscribeConfig * config
Pointer to subscription config struct.
virtual void setText(const string &text)
Sets message text.
virtual void setByteArrayLength(int length)
Sets message region-of-interest byte array length.
virtual bool subscriptionQueueIsFull(void *handle)
Returns whether the given subscription callback's queue is full (true) or not.
virtual string getText(void) const
Gets message text.
virtual bool getReliableSend(void) const
True if message sent via reliable send.
virtual void setByteArrayEndian(int endian)
Sets endian-ness of message byte array.
virtual void setMaySkip(bool maySkip)
Sets message skip permission.
virtual string getType(void) const
Gets message type.
virtual void stop(void)
Disables delivery of messages to callbacks.
Exception includes description and return code.
virtual ~cMsg(void)
Destructor disconects from cMsg system.
virtual void subscriptionQueueClear(void *handle)
Clear all messages from the given subscription callback's queue.
virtual void setReliableSend(bool b)
Sets message reliable send flag.
virtual int subscriptionQueueCount(void *handle)
Return the number of messages currently in the given subscription callback's queue.
virtual void disconnect(void)
Disconnects from cMsg system.
virtual void setUserTime(const struct timespec &userTime)
Sets message user time.
Manages subscriptions configurations.
virtual string getSubject(void) const
Gets message subject.
virtual string toString(void) const
Gets xml representation of message.
virtual void setNullGetResponse(bool b)
Makes message a null response message.
virtual int getMaxCueSize(void) const
Gets max cue size.
virtual void setUserInt(int i)
Sets message user int.
virtual int getSubscriptionCueSize(void) const
Gets current subscription cue size.
cMsgMessage(void)
Default constructor creates message.
virtual string getSubscriptionDomain() const
Gets subscription domain.
virtual void shutdownClients(const string &client, int flag)
Shuts down a client.
virtual string getReceiverHost(void) const
Gets message receiver host.
virtual void setMaxThreads(int max)
Sets max callback threads.