Message Routing

posted in Adam Omega
Published January 04, 2012
Advertisement
To continue in the series of our multi-threaded game engine, I feel the next topic of discussion should be messaging. To facilatate communication between modules we have devised a messaging system that runs in its own thread. This system routes packets of data or as we call them Envelopes between different subscribed functions for a given message id.

First our Envelope

#pragma once
/**
* \file file base name
* \author Adam Martin
* \date 2011-07-21
* \brief Template container for message data passed between cores.
*
* Usage:
* Envelope e;
* e.AddData(10);
*
*/
// Standard Includes
#include


// Library Includes
#include
#include
#include

// Local Includes
#include "EventLogger.h"


// Forward Declarations
class Entity;

// Typedefs


class Envelope {
public:
Envelope(void) { this->refCount = 1; }
~Envelope(void) { }
boost::any GetData (unsigned int index = 0) { return this->data.at(index); } // Get the data stored at index i
boost::any GetData (unsigned int index = 0) const { return this->data.at(index); } // Get the data stored at index i
bool GetDataBool (unsigned int index = 0) { return TGetData (index); }
int GetDataInt (unsigned int index = 0) { return TGetData (index); }
long GetDataLong (unsigned int index = 0) { return TGetData (index); }
unsigned int GetDataUInt (unsigned int index = 0) { return TGetData (index); }
float GetDataFloat (unsigned int index = 0) { return TGetData (index); }
std::string GetDataString (unsigned int index = 0) { return TGetData (index); }
Entity* GetDataEntityP(unsigned int index = 0);
void AddData (boost::any data) { this->data.push_back(data); } // Adds more data to the envelope
void AddDataBool (bool data) { this->data.push_back(data); }
void AddDataInt (int data) { this->data.push_back(data); }
void AddDataUInt (unsigned int data) { this->data.push_back(data); }
void AddDataFloat (float data) { this->data.push_back(data); }
void AddDataString (std::string data) { this->data.push_back(data); }
void AddDataColor (D3DXCOLOR data) { this->data.push_back(data); }
void AddDataEntityP(Entity* data) { this->data.push_back(data); }

int msgid; // Used to identify the message type

private:

template
T TGetData(unsigned int index) {
T value = boost::initialized_value;

try {
value = boost::any_cast(GetData(index));
}
catch (boost::bad_any_cast e) {
EventLogger::printToFile(1, "Attempted to get the wrong type from Envelope at index " + boost::lexical_cast(index) + ".");
}
catch (std::out_of_range e) {
EventLogger::printToFile(1, "Index " + boost::lexical_cast(index) + " out of range when accessing Envelope data.");
}

return value;
}

std::vector data;
};


There are some generic getters and setters for various data types that handle the various casting.

Now we must dynamically create envelopes on the stack as our message thread will delete them after it is done.


Here is our messaging system called MessageRouter.


#pragma once

/**
* \file file base name
* \author Adam Martin
* \date 2011-010-28
* \brief Message routing class to allow advanced subscription based messaging.
*
*
*
*/

// Standard Includes
#include
#include
#include

// Library Includes
#include
#include

// Local Includes

// Local Includes

// Forward Declarations
class Envelope;

// Typedefs
typedef boost::function subscriber;

enum CORE_MESSAGE : unsigned int {STARTUP = 0x0000, QUIT = 0x0099, SHUTDOWN = 0x0100, CREATE = 0x0001, LOADLIBRARY = 0x0002, MODULESTARTED = 0x0003};

class MessageRouter {
public:
MessageRouter(void);
~MessageRouter(void);

void Subscribe(int id, std::shared_ptr& s); // Subscribe to message id, with subscriber function s
void Subscribe(std::vector& ids, std::shared_ptr s); // Subscribe to all messages ids, with subscriber function s
void Unsubscribe(int id, std::shared_ptr s);
void Unsubscribe(std::vector& ids, std::shared_ptr s);
void Send(Envelope* e, bool async = true); // Add a new envelope to the backlog, Envelopes must be dynamic memory to allow the dtor to free all unrouted messages safely. If sync is set to true the message is sent synchronously
void Route(void); // Thread to handle the backlog
void Shutdown(Envelope* e); // Used to shutdown this module after quit has been initiated
void Quit(Envelope* e); // Used when the application is quiting.
private:
std::map>> subscriptions; // a mapping of message ids to subscribers
std::queue backlog;
bool routing;
};


We have some defined message ids at the top, the subscribe and unsubscribe functions next, our Send function with an option to send the message in a blocking synchronous way or asynchronous via the backlog, the Route function which is the threaded function, and a few subscribed functions.

The Route function continues to loop through the backlog until routing is false. Routing is set to false when the subscribed Quit function is called via a message sent to the message router with the message id CORE_MESSAGE::QUIT. Route loops through the backlog by popping the front Envelope from the queue, checking to see if anything is subscribed to the given message id, and then looping through all subscribers by calling the given function pointer and passing in the Envelope. After the loop it deletes the Envelope.

The Route function:



void MessageRouter::Route( void ) {
Envelope* e;
while (routing) {
while (!this->backlog.empty()) {
// Get the next Envelope from the queue and pop it from the queue
e = this->backlog.front();
this->backlog.pop();

// See if anyone is subscribed to the message id, get the functor, and call it
if (this->subscriptions.find(e->msgid) != this->subscriptions.end()) {
std::vector> vec = this->subscriptions[e->msgid];
for(std::vector>::iterator itr = vec.begin(); itr != vec.end(); ++itr) {
(*itr->get())(e);
}
}

// Free up memory
delete e;
}
Sleep(1);
}
}
1 likes 0 comments

Comments

Nobody has left a comment. You can be the first!
You must log in to join the conversation.
Don't have a GameDev.net account? Sign up!
Profile
Author
Advertisement
Advertisement