VisiBroker for C++ Developer’s Guide : Using the Event Service

Using the Event Service
This section describes the VisiBroker Event Service.
Note
The OMG Event Service has been superseded by the OMG Notification Service. The VisiBroker Event Service is still supported for backward compatibility and light weight purposes. For mission critical applications, we strongly recommend using VisiBroker VisiNotify. For more information, see the VisiBroker VisiNotify Guide.
Overview
The Event Service package provides a facility that de-couples the communication between objects. It provides a supplier-consumer communication model that allows multiple supplier objects to send data asynchronously to multiple consumer objects through an event channel. The supplier-consumer communication model allows an object to communicate an important change in state, such as a disk running out of free space, to any other objects that might be interested in such an event.
Figure 27
The figure above shows three supplier objects communicating through an event channel with two consumer objects. The flow of data into the event channel is handled by the supplier objects, while the flow of data out of the event channel is handled by the consumer objects. If each of the three suppliers shown in the figure above sends one message every second, then each consumer will receive three messages every second and the event channel will forward a total of six messages per second.
The event channel is both a consumer of events and a supplier of events. The data communicated between suppliers and consumers is represented by the Any class, allowing any CORBA type to be passed in a type safe manner. Supplier and consumer objects communicate through the event channel using standard CORBA requests.
Proxy consumers and suppliers
Consumers and suppliers are completely de-coupled from one another through the use of proxy objects. Instead of interacting with each other directly, they obtain a proxy object from the EventChannel and communicate with it. Supplier objects obtain a consumer proxy and consumer objects obtain a supplier proxy. The EventChannel facilitates the data transfer between consumer and supplier proxy objects. The figure below shows how one supplier can distribute data to multiple consumers.
Figure 28
Note
The event channel is shown above as a separate process, but it may also be implemented as part of the supplier object's process.
OMG Common Object Services specification
The VisiBroker Event Service implementation conforms to the OMG Common Object Services Specification, with the following exceptions:
Communication models
The Event Service provides both a pull and push communication model for suppliers and consumers. In the push model, supplier objects control the flow of data by pushing it to consumers. In the pull model, consumer objects control the flow of data by pulling data from the supplier.
The EventChannel insulates suppliers and consumers from having to know which model is being used by other objects on the channel. This means that a pull supplier can provide data to a push consumer and a push supplier can provide data to a pull consumer.
Figure 29
Note
The EventChannel is shown above as a separate process, but it may also be implemented as part of the supplier object's process.
Push model
The push model is the more common of the two communication models. An example use of the push model is a supplier that monitors available free space on a disk and notifies interested consumers when the disk is filling up. The push supplier sends data to its ProxyPushConsumer in response to events that it is monitoring.
The push consumer spends most of its time in an event loop, waiting for data to arrive from the ProxyPushSupplier. The EventChannel facilitates the transfer of data from the ProxyPushSupplier to the ProxyPushConsumer.
The figure below shows a push supplier and its corresponding ProxyPushConsumer object. It also shows three push consumers and their respective ProxyPushSupplier objects.
Pull model
In the pull model, the event channel regularly pulls data from a supplier object, puts the data in a queue, and makes it available to be pulled by a consumer object. An example of a pull consumer would be one or more network monitors that periodically poll a network router for statistics.
The pull supplier spends most of its time in an event loop waiting for data requests to be received from the ProxyPullConsumer. The pull consumer requests data from the ProxyPullSupplier when it is ready for more data. The EventChannel pulls data from the supplier to a queue and makes it available to the ProxyPullSupplier.
The figure below shows a pull supplier and its corresponding ProxyPullConsumer object. It also shows three pull consumers and their respective ProxyPullSupplier objects.
Figure 30
Note
The event channel is shown above as a separate process, but it may also be implemented as part of the supplier object's process.
Using event channels
To create an EventChannel, connect a supplier or consumer to it and use it:
1
Windows
prompt> start vbj com.inprise.vbroker.CosEvent.EventServer -ior <iorFilename> <channelName>
UNIX
prompt> vbj com.inprise.vbroker.CosEvent.EventServer -ior <iorFilename> <channelName> &
Note
Only one instance of the EventChannel is supported. All binding to the EventChannel is done through the call to orb.resolve_initial_references("EventService"), where EventService is the hardcoded EventChannel name.
2
3
4
5
The methods used for these steps vary, depending on whether the object being connected is a supplier or a consumer, and on the communication model being used. The table below shows the appropriate methods for suppliers.
The table below shows the appropriate methods for consumers.
Examples of push supplier and consumer
This section describes the example of the push supplier and the consumer applications.
Push supplier and consumer example
This section describes the example push supplier and consumer applications. When executed, the supplier application prompts the user to enter data and then pushes the data to the consumer application. The consumer application receives the data and writes it to the screen.
The push supplier application is implemented in the PushModel.C file and the push consumer is implemented in the PushView.C file. These files can be found in the <install_dir>/examples/vbe/events directory.
Deriving a PushSupplier class
The first step in implementing a supplier is to derive our own PushModel class from the PushSupplier interface, shown below.
module CosEventComm {
interface
PushSupplier {
void disconnect_push_supplier();
};
};
The code sample below shows the PushModel class, implemented in C++. The disconnect_push_supplier method is called by the EventChannel to disconnect the supplier when the channel is being destroyed. This implementation simply prints out a message and exits. If the PushModel object were persistent, this method might also call deactivate_obj to deactivate the object.
// PushModel.C
#include "CosEventComm_s.hh"
#include "CosEventChannelAdmin_c.hh"
class PushModel : public POA_CosEventComm::PushSupplier, public VISThread {
public:
void disconnect_push_supplier() {
cout << "Model::disconnect_push_supplier()" << endl;
try {
PortableServer::ObjectId_var objId =
PortableServer::string_to_ObjectId("PushModel");
_myPOA->deactivate_object(objId);
}
catch(const CORBA::Exception& e) {
cout << e << endl;

}
}
};
Implementing the PushSupplier
The first portion of the supplier implementation is fairly routine. After doing some initialization, a local scope is set, resulting in a locally-scoped PushModel object.
int main(int argc, char* const* argv)
{
try {
// Initialize the ORB.
CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);

// get a reference to the root POA
CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
//Create the POA serverPOA
...

CPushModel* model = NULL;
CosEventChannelAdmin::ProxyPushConsumer_var pushConsumer = NULL;

model = new PushModel(orb, pushConsumer, serverPOA);
CORBA::String_var supplier_name(CORBA::string_dup("PushModel"));
PortableServer::ObjectId_var objId =
PortableServer::string_to_ObjectId(supplier_name);
serverPOA->activate_object_with_id(objId, model);
// Activate the POA Manager
serverPOA->the_POAManager()->activate();
CORBA::Object_var reference = serverPOA->servant_to_reference(model);
cout << "Created model: " << reference << endl;
}
...
}
The example uses command line options to implement the PushSupplier. When the command line option is m, it initializes and instantiates the PushModel object.
If the command line option is p, the example binds to the EventChannel and obtains a SupplierAdmin object from the EventChannel. Note that the application could specify an object name for a specific EventChannel. In a real implementation, the object could be passed as an argument to the application or obtained from the naming service (VisiNaming), if it is available. For more information, go to , “Using the VisiNaming Service.” Next the SupplierAdmin object is used to obtain a proxy for the pushConsumer object from the EventChannel.
If the command line option is c, the pushSupplier object is connected to the EventChannel.
...
if (cmd == 'p') {
if (channel == NULL) {
cout << "Need to locate an [e]vent channel" << endl;
}
else {
pushConsumer = channel->for_suppliers()->obtain_push_consumer();
cout << "Obtained push consumer: " << pushConsumer << endl;
continue;
}
}
else if (cmd == 'c' ) {
if (model == NULL) {
cout << "Need to create a [m]odel" << endl;
}
else if (pushConsumer == NULL) {
cout << "Need to obtain a [p]ush consumer" << endl;
}
else {
cout << "Connecting..." << endl;
pushConsumer->connect_push_supplier(model->_this());
model->start();
continue;
}
}
A different thread of the supplier application prompts the user for a string, waits for a string to be entered and converts the string to an Any object. Lastly, the data is “pushed” to the consumer proxy object.
. . .
while(true) {
VISPortable::vsleep(_delay);
try {
char buf[81];
std::string str;
sprintf(buf, "%s%d", "Hello #", ++_counter);
str = buf;

CORBA::Any_var message = new CORBA::Any();
*message <<= str.c_str();
cout << "Supplier pushing: " << str.c_str() << endl;

_pushConsumer->push(*message);
}
catch(CosEventComm::Disconnected e) {
cout << "Disconnected #" << _counter << endl;
}
catch(CORBA::OBJECT_NOT_EXIST e)
{
cout << "Push Consumer has been disconnected" << endl;
return;
}
catch(const CORBA::Exception& e) {
cout << e << endl;
disconnect_push_supplier();
return;
}
catch(...) {
cout << "Unexpected exception" << endl;
disconnect_push_supplier();
return;
}
}
. . .
Complete implementation for a sample push supplier
#include "corba.h"
#include "CosEventComm_s.hh"
#include "CosEventChannelAdmin_c.hh"
#include "vport.h"
#include <string>

USE_STD_NS
class PushModel : public POA_CosEventComm::PushSupplier, public VISThread{
public:
PushModel(CORBA::ORB_ptr orb,
CosEventComm::PushConsumer_ptr pushConsumer,
PortableServer::POA_ptr myPOA) :
_orb(orb), _pushConsumer(pushConsumer), _myPOA(myPOA), _counter(0),
_delay(1)
{}
void delay(int time) { delay = time; }
void start() {
// start the thread
run();
}
void disconnect_push_supplier() {
cout << "Model::disconnect_push_supplier()" << endl;
try {
PortableServer::ObjectId_var objId =
PortableServer::string_to_ObjectId("PushModel");
_myPOA->deactivate_object(objId);
}
catch(const CORBA::Exception& e) {
cout << e << endl;
}
}
// implement begin() callback
void begin() {
while(true) {
VISPortable::vsleep(_delay);
try {
char buf[81];
std::string str;
sprintf(buf, "%s%d", "Hello #", ++_counter);
str = buf;
CORBA::Any_var message = new CORBA::Any();
*message <<= str.c_str();
cout << "Supplier pushing: " << str.c_str() << endl;
_pushConsumer->push(*message);
}
catch(CosEventComm::Disconnected e) {
cout << "Disconnected #" << _counter << endl;
}


catch(CORBA::OBJECT_NOT_EXIST e)
{
cout << "Push Consumer has been disconnected" << endl;
return;
}
catch(const CORBA::Exception& e) {
cout << e << endl;
disconnect_push_supplier();
return;
}
catch(...) {
cout << "Unexpected exception" << endl;
disconnect_push_supplier();
return;
}
}
}

private :
int _delay;
int _counter;
CORBA::ORB_var _orb;
PortableServer::POA_var _myPOA;
CosEventComm::PushConsumer_var _pushConsumer;
};

int main(int argc, char* const* argv)
{
try {
// Initialize the ORB.
CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);

// get a reference to the root POA
CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
PortableServer::POA_var rootPOA = PortableServer::POA::_narrow(obj);

// Create policies for our persistent POA
CORBA::PolicyList policies;
policies.length(1);
policies[(CORBA::ULong)0] =
rootPOA->create_lifespan_policy(PortableServer::PERSISTENT);

PortableServer::POAManager_var poa_manager = rootPOA->the_POAManager();

// Create serverPOA with the right policies
PortableServer::POA_var serverPOA =
rootPOA->create_POA("event_service_poa", poa_manager, policies);

CosEventChannelAdmin::EventChannel_var channel = NULL;
PushModel* model = NULL;
CosEventChannelAdmin::ProxyPushConsumer_var pushConsumer = NULL;
 
while(true) {
try {

cout << "-> ";
cout.flush();
char cmd;
if (cin >> cmd ) {
if (cmd == 'e') {
obj = orb->resolve_initial_references("EventService");
channel =
                       CosEventChannelAdmin::EventChannel::_narrow(obj);
cout << "Located event channel: " << channel << endl;
continue;
}
else if (cmd == 'p') {
if (channel == NULL) {
cout << "Need to locate an [e]vent channel" << endl;
}
else {
pushConsumer = channel->
                                  for_suppliers()_>obtain_push_consumer();
cout << "Obtained push consumer: " << pushConsumer
                                  << endl;
continue;
}
}
else if (cmd == 'm') {
if (pushConsumer == NULL) {
cout << "Need to obtain a [p]ush consumer" << endl;
}
else {
model = new PushModel(orb, pushConsumer, serverPOA);
CORBA::String_var
                            supplier_name(CORBA::string_dup("PushModel"));
PortableServer::ObjectId_var objId =
PortableServer::string_to_ObjectId(supplier_name);
serverPOA->activate_object_with_id(objId, model);
// Activate the POA Manager
serverPOA->the_POAManager()->activate();
CORBA::Object_var reference = serverPOA->
servant_to_reference(model);
cout << "Created model: " << reference << endl;
continue;
}
}
else if (cmd == 's') {
if (model == NULL) {
cout << "Need to create a [m]odel" << endl;
}
else {
int delay;
if (cin >> delay ) {
if (delay < 0)
cout << "[s]leep delay must be positive" ;
else
model->delay(delay);
}

else {
cerr << "Invalid argument to [s]leep" << endl;
}
}
}
else if (cmd == 'c' ) {
if (model == NULL) {
cout << "Need to create a [m]odel" << endl;
}
else if (pushConsumer == NULL) {
cout << "Need to obtain a [p]ush consumer" << endl;
}
else {
cout << "Connecting..." << endl;
pushConsumer->connect_push_supplier(model->_this());
model->start();
continue;
}
}
else if (cmd == 'd') {
if (pushConsumer == NULL) {
cout << "Need to obtain a [p]ush consumer" << endl;
}
else {
cout << "Disconnecting..." << endl;
pushConsumer->disconnect_push_consumer();
continue;
}
}
else if (cmd == 'q') {
cout << "Quitting..." << endl;
CORBA::ORB::shutdown();
break;
}
else {
   cout << "Commands: e [e]vent channel" << endl
<< " s <# seconds> set [s]leep delay" << endl
<< " p [p]ush consumer" << endl
<< " m [m]odel" << endl
<< " c [c]onnect" << endl
<< " d [d]isconnect" << endl
<< " q [q]uit" << endl;
}
}
}
catch(const CORBA::SystemException& e) {
cerr << e << endl;
}
}
}
catch(const CORBA::Exception& e) {
cerr << e << endl;
}
return 0;
}
Deriving a PushConsumer class
The code sample below shows the first part of the supplier application, which defines a PushView class that is derived from the PushConsumer interface, shown below.
module CosEventComm {
exception Disconnected();
interface
PushConsumer {
void push(in any data) raises(Disconnected);
void disconnect_push_consumer();
};
};
The push method receives an Any type and attempts to convert it to a string and print it. The disconnect_push_supplier method is called by the EventChannel to disconnect the consumer when the channel is destroying itself.
// PushView.C
#include "CosEventComm_s.hh"
#include "CosEventChannelAdmin_c.hh"
class PushView : public POA_CosEventComm::PushConsumer
{
public:
void push(const CORBA::Any& data) {
cout << "Consumer being pushed: " << data << endl;
}

void disconnect_push_consumer() {
cout << "PushView::disconnect_push_consumer" << endl;
}
};
Implementing the PushConsumer
If the command line is v, then the PushConsumer object is instantiated and activated. Different command line options cause it to bind to the EventChannel, obtain the supplier proxy object and connect to the consumer object and wait to receive push requests.
// PushView.C
#include "CosEventComm_s.hh"
#include "CosEventChannelAdmin_c.hh"
. . .
int main(int argc, char* const* argv)
{
try {
// Initialize the ORB.
CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);

// get a reference to the root POA
CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
PortableServer::POA_var rootPOA = PortableServer::POA::_narrow(obj);

// Create policies for our persistent POA
CORBA::PolicyList policies;
policies.length(1);
policies[(CORBA::ULong)0] =
rootPOA->create_lifespan_policy(PortableServer::PERSISTENT);

PortableServer::POAManager_var poa_manager =
                                              rootPOA->the_POAManager();
// Create serverPOA with the right policies
PortableServer::POA_var serverPOA =
rootPOA->create_POA("event_service_poa", poa_manager, policies);

CosEventChannelAdmin::EventChannel_var channel = NULL;
PushView* view = NULL;
CosEventChannelAdmin::ProxyPushSupplier_var pushSupplier = NULL;

while(true) {
try {
cout << "-> ";
cout.flush();
char cmd;
if (cin >> cmd) {
if (cmd == 'e') {
                 obj = orb->resolve_initial_references("EventService");
                 channel = CosEventChannelAdmin::EventChannel::_narrow(obj);
                 cout << "Located event channel: " << channel << endl;
continue;
}
else if (cmd == 'v') {
view = new PushView();
CORBA::
                    String_var consumer_name(CORBA::string_dup("PushView"));
PortableServer::ObjectId_var objId =
PortableServer::string_to_ObjectId(consumer_name);
serverPOA->activate_object_with_id(objId, view);
// Activate the POA Manager
serverPOA->the_POAManager()->activate();
CORBA::Object_var reference =
                                     serverPOA ->servant_to_reference(view);
cout << "Created view: " << reference << endl;
continue;
}
else if (cmd == 'p') {
if (channel == NULL) {
cout << "Need to locate an [e]vent channel" << endl;
}
else {
pushSupplier = channel->for_consumers()
                       ->obtain_push_supplier();
cout << "Obtained push consumer: "
                       << pushSupplier << endl;
continue;
}
}
else if (cmd == 'c' ) {
if (view == NULL) {
cout << "Need to create a [v]iew" << endl;
}
else if (pushSupplier == NULL) {
cout << "Need to obtain a [p]ush supplier" << endl;
}
else {
cout << "Connecting..." << endl;
pushSupplier->connect_push_consumer(view->_this());
continue;
}
}
else if (cmd == 'd') {
if (pushSupplier == NULL) {
cout << "Need to obtain a [p]ush supplier" << endl;
}
else {
cout << "Disconnecting..." << endl;
pushSupplier->disconnect_push_supplier();
continue;
}
}
else if (cmd == 'q') {
cout << "Quitting..." << endl;
break;
}
cout << "Commands: e [e]vent channel" << endl
<< " p [p]ush supplier" << endl
<< " v [v]iew" << endl
<< " c [c]onnect" << endl
<< " d [d]isconnect" << endl
<< " q [q]uit" << endl;
}
}
catch(const CORBA::SystemException& e) {
cerr << e << endl;
}
}
}
catch(const CORBA::Exception& e) {
cerr << e << endl;
}
}
Setting the queue length
In some environments, consumer applications may run slower than supplier applications. The maxQueueLength parameter prevents out-of-memory conditions by limiting the number of outstanding messages that will be held for each consumer that cannot keep up with the rate of messages from the supplier.
If a supplier generates 10 messages per second and a consumer can only process one message per second, the queue will quickly fill up. Messages in the queue have a fixed maximum length and if an attempt is made to add a message to a queue that is full, the channel will remove the oldest message in the queue to make room for the new message.
Each consumer has a separate queue, so a slow consumer may miss messages while another, faster consumer may not lose any. The code sample below shows how to limit each consumer to 15 outstanding messages.
CosEventChannel -maxQueueLength=15 MyChannel
Note
If maxQueueLength is not specified or if an invalid number is specified, a default queue length of 100 is used.
Compiling and linking programs
Applications that use the Event Service need to include the following generated files:
#include "CosEventComm_s.hh"
#include "CosEventChannelAdmin_c.hh"
UNIX
UNIX applications need to be linked with one of the libraries:
Windows
Windows applications need to be linked with the cosev_r.lib (cosev_r.dll) library.