VisiBroker for Java Developer’s Guide : Using the Event Service

Using the Event Service
This section describes the VisiBroker Event Service.
Note
The OMG Event Service has been superseded by the OMG Notification Service. The VisiBroker Event Service is still supported for backward compatibility and light weight purposes. For mission critical applications, we strongly recommend using VisiBroker VisiNotify. For more information, see the VisiBroker VisiNotify Guide.
Overview
The Event Service package provides a facility that de‑couples the communication between objects. It provides a supplier-consumer communication model that allows multiple supplier objects to send data asynchronously to multiple consumer objects through an event channel. The supplier-consumer communication model allows an object to communicate an important change in state, such as a disk running out of free space, to any other objects that might be interested in such an event.
Figure 27
The figure above shows three supplier objects communicating through an event channel with two consumer objects. The flow of data into the event channel is handled by the supplier objects, while the flow of data out of the event channel is handled by the consumer objects. If each of the three suppliers shown in the figure above sends one message every second, then each consumer will receive three messages every second and the event channel will forward a total of six messages per second.
The event channel is both a consumer of events and a supplier of events. The data communicated between suppliers and consumers is represented by the Any class, allowing any CORBA type to be passed in a type safe manner. Supplier and consumer objects communicate through the event channel using standard CORBA requests.
Proxy consumers and suppliers
Consumers and suppliers are completely de-coupled from one another through the use of proxy objects. Instead of interacting with each other directly, they obtain a proxy object from the EventChannel and communicate with it. Supplier objects obtain a consumer proxy and consumer objects obtain a supplier proxy. The EventChannel facilitates the data transfer between consumer and supplier proxy objects. The figure below shows how one supplier can distribute data to multiple consumers.
Figure 28
Note
The event channel is shown above as a separate process, but it may also be implemented as part of the supplier object's process. See “Starting the Event Service” for more information.
OMG Common Object Services specification
The VisiBroker Event Service implementation conforms to the OMG Common Object Services Specification, with the following exceptions:
Communication models
The Event Service provides both a pull and push communication model for suppliers and consumers. In the push model, supplier objects control the flow of data by pushing it to consumers. In the pull model, consumer objects control the flow of data by pulling data from the supplier.
The EventChannel insulates suppliers and consumers from having to know which model is being used by other objects on the channel. This means that a pull supplier can provide data to a push consumer and a push supplier can provide data to a pull consumer.
Figure 29
Note
The EventChannel is shown above as a separate process, but it may also be implemented as part of the supplier object's process. See “Starting the Event Service” for more information.
Push model
The push model is the more common of the two communication models. An example use of the push model is a supplier that monitors available free space on a disk and notifies interested consumers when the disk is filling up. The push supplier sends data to its ProxyPushConsumer in response to events that it is monitoring.
The push consumer spends most of its time in an event loop, waiting for data to arrive from the ProxyPushSupplier. The EventChannel facilitates the transfer of data from the ProxyPushSupplier to the ProxyPushConsumer.
The figure below shows a push supplier and its corresponding ProxyPushConsumer object. It also shows three push consumers and their respective ProxyPushSupplier objects.
Pull model
In the pull model, the event channel regularly pulls data from a supplier object, puts the data in a queue, and makes it available to be pulled by a consumer object. An example of a pull consumer would be one or more network monitors that periodically poll a network router for statistics.
The pull supplier spends most of its time in an event loop waiting for data requests to be received from the ProxyPullConsumer. The pull consumer requests data from the ProxyPullSupplier when it is ready for more data. The EventChannel pulls data from the supplier to a queue and makes it available to the ProxyPullSupplier.
The figure below shows a pull supplier and its corresponding ProxyPullConsumer object. It also shows three pull consumers and their respective ProxyPullSupplier objects.
Figure 30
Note
The event channel is shown above as a separate process, but it may also be implemented as part of the supplier object's process.
See “In-process event channel” for more information about how to start the event service in Java.
Using event channels
To create an EventChannel, connect a supplier or consumer to it and use it:
1
Windows
prompt> start vbj com.inprise.vbroker.CosEvent.EventServer -ior <iorFilename> <channelName>
UNIX
prompt> vbj com.inprise.vbroker.CosEvent.EventServer -ior <iorFilename> <channelName> &
where <channelName> is the user-specified object name of the event channel and <iorFilename> is a user-specified filename of the file to which the ior of the object is to be written.
Another way to create the EventChannel is to run PushModelChannel:
prompt> vbj PushModelChannel <iorFilename>
PushModelChannel first creates an EventChannel and publishes its ior to the file <iorFilename> given by the user. Other clients (for example, PushModel) can then bind to the EventChannel by using the initial reference.
To run this:
prompt> vbj -DORBInitRef=EventService=file:<fullpath + iorFilename> PushModel
Regardless of how the event channel is created, make sure that the name specified in <iorFilename> is created in the specified directory.
Note
Only one instance of the EventChannel is supported. All binding to the EventChannel is done through the call to orb.resolve_initial_references("EventService"), where EventService is the hardcoded EventChannel name.
2
3
4
5
The methods used for these steps vary, depending on whether the object being connected is a supplier or a consumer, and on the communication model being used. The table below shows the appropriate methods for suppliers.
The table below shows the appropriate methods for consumers.
Creating event channels
VisiBroker provides a proprietary interface called EventChannelFactory in the CosEventChannelAdmin module to allow Event Service clients to create event channels on demand. To enable this feature, start the event service for your operating system as follows:
Windows
start vbj -Dvbroker.events.factory=true
com.inprise.vbroker.CosEvent.EventServer <factoryName>
UNIX
vbj -Dvbroker.events.factory=true com.inprise.vbroker.CosEvent.EventServer
<factoryName>
The property vbroker.events.factory instructs the service to create a factory object with the name <factoryName> (with a default value of VisiEvent) instead of a channel object. To write the IOR of the factory to a file, use the –ior option to provide the file name. By default, the IOR is written to the console.
The factory object created can then be bound by the client, either using the IOR written to the file (or console) or using the osagent bind mechanism to pass the factory object name. Once the factory object reference is obtained, it can be used to create, look up, or destroy event channel objects. An event channel object obtained from the factory object can be used to connect suppliers and consumers.
Examples of push supplier and consumer
This section describes the example of the push supplier and the consumer applications.
Push supplier and consumer example
This section describes the example push supplier and consumer applications. The files PullSupply.java and PullConsume.java implement the supplier and consumer. These files can be found in the <install_dir>/examples/vbroker/events directory.
To run these examples, you need a supplier-consumer pair. You can pair a consumer of type Push or Pull can be paired with any supplier of type Push or Pull. The order in which you invoke the supplier and consumer does not matter. However, the event channel must be the same object instance.
Before you can start using the Push model example, you need to run this example. The next few sections describe how to run this example.
Running the Push model example
To run the PushModel example, enter:
prompt> vbj -DORBInitRef=EventService=file:<fullpath of iorFilename> PushModel
Select e to bind to an event channel, p to get a proxy to a push consumer from the event channel, m to instantiate a PushModel, and c to connect the event channel.
Continuous sentences indicating the content of the message being pushed to the EventChannel will be displayed. You can continue to make selections regardless of what is displayed on the screen. You can specify the number of seconds between events using the s option. Lastly, select d to disconnect and q to quit.
To run the PushView, enter:
prompt>vbj -DORBInitRef=EventService=file:
<fullpath of iorFilename> PushView
Select e to bind to an event channel, p to get a proxy to a push supplier from the event channel, v to instantiate a PushView, c to connect the event channel, d to disconnect and q to quit. To run this example, a supplier of type Push or Pull must be running on another terminal, continuously sending data to the same event channel in order for PushView to receive the data. The supplier and consumer can be started in any order.
Running the PullModel example
To run the PullModel example, enter:
prompt> vbj -DORBInitRef=EventService=file:
<fullpath of iorFilename> PullModel
Select e to bind to an event channel, p to get a proxy to a push consumer from the event channel, m to instantiate a PullModel, c to connect the event channel, d to disconnect and q to quit.
Running the PullView example
To run the PullView, enter:
prompt>vbj -DORBInitRef=EventService=file:
<fullpath of iorFilename> PullView
Select e to bind to an event channel, p to get a proxy to a push supplier from the event channel, v to instantiate a PushView, c to connect the event channel. Then select a to pull asynchronously or s to pull synchronously. To exit, select d to disconnect and q to quit.
To run this example, a supplier of type Push or Pull must be running on another terminal, continuously sending data to the same event channel in order for PullView to receive the data. The supplier and consumer can be started in any order.
PullSupply
The PullSupply class is derived from the PullSupplierPOA class and provides implementations for the main, pull and try_pull methods. The pull method, shown below, returns a numbered “hello” message. The try_pull method always sets the hasEvent flag to true and calls the pull method to provide the message. Once a PullSupply object is connected to an EventChannel, these methods are used by the channel to pull data from the supplier.
The main method, shown below, performs the usual VisiBroker ORB and POA creation, connects to the specified EventChannel, obtains a ProxyPullConsumer from the EventChannel, instantiates a PullSupply object, activates the PullSupply object on the POA, then connects this pull supplier to proxy pull consumers.
Executing PullSupply
After compiling PullSupply.java and starting the Event Service, described in “In-process event channel”, you can execute the supplier with the following command:
vbj -DORBInitRef = <channel_name> = file:<fullpath of iOrFilename> PullSupply
Implementation of the pull and try_pull methods
// PullSupply.java
import org.omg.CosEventComm.*;
import org.omg.CosEventChannelAdmin.*;
import org.omg.PortableServer.*;
public class PullSupply extends PullSupplierPOA {
private POA _myPOA;
private PullConsumer _pullConsumer;
private int _counter;
PullSupply(PullConsumer pullConsumer, POA myPOA) {
_pullConsumer = pullConsumer;
_myPOA = myPOA;
}
public void disconnect_pull_supplier() {
System.out.println("Model::disconnect_pull_supplier()");
try {
_myPOA.deactivate_object("PullSupply".getBytes());
} catch(Exception e) {
e.printStackTrace();
}
}
public org.omg.CORBA.Any pull() throws Disconnected {
if(_pullConsumer == null) {
throw new Disconnected();
}
try {
Thread.currentThread().sleep(1000);
} catch(Exception e) {
}
//org.omg.CORBA.Any message =
new org.omg.CORBA.Any().from_string("Hello #" + ++_counter);
org.omg.CORBA.Any message = _orb().create_any();
message.insert_string("Hello #" + ++_counter);
System.out.println("Supplier being pulled: " + message);
return message;
}
public org.omg.CORBA.Any try_pull(org.omg.CORBA.BooleanHolder hasEvent) throws
org.omg.CORBA.SystemException, Disconnected {
hasEvent.value = true;
return pull();
}
...
Main method of PullSupply
// PullSupply.java
import org.omg.CosEventComm.*;
import org.omg.CosEventChannelAdmin.*;
import org.omg.PortableServer.*;
public class PullSupply extends PullSupplierPOA {
...
public static void main(String[] args) {
try {
org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, null);
// get a reference to the root POA
POA rootPoa =
POAHelper.narrow(orb.resolve_initial_references("RootPOA"));
// Create policies for our persistent POA
org.omg.CORBA.Policy[] policies = {
rootPoa.create_lifespan_policy(LifespanPolicyValue.PERSISTENT)
};
// Create myPOA with the right policies
POA myPOA = rootPoa.create_POA("event_service_poa",
rootPoa.the_POAManager(), policies);
EventChannel channel = null;
PullSupply model = null; ProxyPullConsumer pullConsumer = null;
channel =
EventChannelHelper.narrow(orb.resolve_initial_references("EventService"));
System.out.println("Located event channel: " + channel);
pullConsumer = channel.for_suppliers().obtain_pull_consumer();
System.out.println("Obtained pull consumer: " + pullConsumer);
model = new PullSupply(pullConsumer, myPOA);
myPOA.activate_object_with_id("PullSupply".getBytes(), model);
myPOA.the_POAManager().activate();
System.out.println("Created model: " + model);
System.out.println("Connecting ...");
pullConsumer.connect_pull_supplier(model._this());
} catch(Exception e) {
e.printStackTrace();
}
}
}
PullConsume
The PullConsume class is derived from PullConsumerPOA class and provides a command line interface for pulling data from the PullSupply class. The code sample above shows how the application connects to any available EventChannel, obtains a ProxyPullSupplier, connects to the channel, and displays a command prompt. The table below summarizes the commands that may be entered.
Asynchronously pulls data from the event channel, using the try_pull method. If no data is currently available, the command will return with a no data message.
Synchronously pulls data from the event channel, using the pull method. If there is no data currently available, the command will block until data is available.
Executing PullConsume
After compiling PullConsume.java and starting the Event Service, described in
“In-process event channel”, you can execute the consumer with the following command:
vbj -DORBInitRef = <channel_name> = file:<fullpath of ior_filename> PullConsume
// PullConsume.java
import org.omg.CosEventComm.*;
import org.omg.CosEventChannelAdmin.*;
import org.omg.PortableServer.*;
import java.io.*;
public class PullConsume extends PullConsumerPOA {
public void disconnect_pull_consumer() {
System.out.println("View.disconnect_pull_consumer");
}
public static void main(String[] args) {
try {
org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, null);
// get a reference to the root POA
POA rootPoa =
POAHelper.narrow(orb.resolve_initial_references("RootPOA"));
// Create policies for our persistent POA
org.omg.CORBA.Policy[] policies = {
rootPoa.create_lifespan_policy(LifespanPolicyValue.PERSISTENT)
};
// Create myPOA with the right policies
POA myPOA = rootPoa.create_POA("event_service_poa",
rootPoa.the_POAManager(), policies );
EventChannel channel = null;
PullConsume view = null;
ProxyPullSupplier pullSupplier = null;
BufferedReader in = new BufferedReader(new
InputStreamReader(System.in));
channel =EventChannelHelper.narrow(orb.resolve_initial_references
("EventService"));
System.out.println("Located event channel: " + channel);
view = new PullConsume();
myPOA.activate_object_with_id("PullConsume".getBytes(), view);
myPOA.the_POAManager().activate();
System.out.println("Created view: " + view);
pullSupplier = channel.for_consumers().obtain_pull_supplier();
System.out.println("Obtained pull supplier: " + pullSupplier);
System.out.println("Connecting...");
System.out.flush();
pullSupplier.connect_pull_consumer(view._this());
         while(true) {
System.out.print("-> ");
System.out.flush();
if (System.getProperty("VM_THREAD_BUG") != null) {
while(!in.ready()) {
try {
Thread.currentThread().sleep(100);
} catch(InterruptedException e) {
}
}
}
String line = in.readLine();
if(line.startsWith("a")) {
org.omg.CORBA.BooleanHolder hasEvent = new
org.omg.CORBA.BooleanHolder();
org.omg.CORBA.Any result = pullSupplier.try_pull(hasEvent);
System.out.println("try_pull: " +
(hasEvent.value ? result.toString() : "NO DATA"));
continue;
} else if(line.startsWith("s")) {
org.omg.CORBA.Any result = pullSupplier.pull();
System.out.println("pull: " + result);
continue;
} else if(line.startsWith("q")) {
System.out.println("Disconnecting...");
pullSupplier.disconnect_pull_supplier();
System.out.println("Quitting...");
break;
}
System.out.println("Commands: a [a]synchronous pull\n" +
" s [s]ynchronous pull\n" +
" q [q]uit\n");
}
} catch(Exception e) {
e.printStackTrace();
}
}
}
Starting the Event Service
When using VisiBroker for Java, the Event Service can be started by using the following command.
vbj [-Dvbroker.events.debug] [-Dvbroker.events.interactive] [-Dvbroker.events.max_queue_length=<number>] [-Dvbroker.events.debug.factory] \
[-Dvbroker.events.vm_thread_bug] com.inprise.vbroker.CosEvent.EventServer -ior <ior filename> <channel name>
Note
There is a known bug in some implementations of the Java Virtual Machine, including Solaris, that may cause this command to hang. If you experience difficulties, try specifying the -Dvbroker.events.vm_thread_bug parameter when you start the Event Service.
Setting the queue length
In some environments, consumer applications may run slower than supplier applications. The maxQueueLength parameter prevents out-of-memory conditions by limiting the number of outstanding messages that will be held for each consumer that cannot keep up with the rate of messages from the supplier.
If a supplier generates 10 messages per second and a consumer can only process one message per second, the queue will quickly fill up. Messages in the queue have a fixed maximum length and if an attempt is made to add a message to a queue that is full, the channel will remove the oldest message in the queue to make room for the new message.
Each consumer has a separate queue, so a slow consumer may miss messages while another, faster consumer may not lose any. The code sample below shows how to limit each consumer to 15 outstanding messages.
vbj -Dvbroker.events.maxQueueLength=15 CosEvent.EventServer -ior myChannel.ior
MyChannel
Note
If maxQueueLength is not specified or if an invalid number is specified, a default queue length of 100 is used.
In-process event channel
In addition to running an EventChannel as a separate, stand-alone server, the Event Service allows you to create an EventChannel within your server or client application. This frees you from having to start a separate process to provide the EventChannel for your supplier or consumer applications.
For Java applications, an EventLibrary class is provided that provides methods for creating an EventChannel which, in turn, handles the loading of the necessary classes. To create an in-process EventChannel object within a supplier/consumer application, make the following call:
EventLibrary.create_Channel("MyChannel",whetherToDebug,maxQueueLength);
So, to create a channel named MyChannel with debugging off and a maximum queue length of 100, you would write:
EventLibrary.create_Channel("MyChannel",false,100);
After this call completes, the resulting client application can bind to the EventChannel as it would bind to any other CORBA object.
For example, you might have a supplier application creating the channel in-process and want the consumer application to connect to the same channel. To accomplish this, you need to pass the channel object from the supplier application to the consumer application. To do this, convert the EventChannel object to an ior string and write the string to a file:
try {
EventChannel channel = EventLibrary.create_Channel("MyChannel",false,100);
PrintWriter pw = new PrintWriter(new FileWriter(ior_filename));
pw.println(orb.object_to_string(channel));
pw.close();
}
catch(IOException e) {
System.out.println("Error writing the IOR to file " ior_filename);
}
The ior_filename specifies the name of the file to which the ior string of the channel will be written.
To run PushModelChannel:
vbj PushModelChannel <ior_filenamegt;
PushModelChannel is a push supplier. You can connect either a push consumer or pull consumer to the event channel created in PushModelChannel:
vbj -DORBInitRef=EventService=file:<fullpath of ior_filename> PushView
where <fullpath of ior_filename> is the full path of the ior filename passed into PushModelChannel and EventService is the name (or identifier) bound to the ior contained in <ior_filename>. From within PushView, you can bind to the event channel as follows:
EventChannel channel =
EventChannelHelper.narrow(orb.resolve_initial_references("EventService"));
Using the in-process Event Channel
If your application uses the in-process event channel feature, you must add the following import statement:
import com.inprise.vbroker.CosEvent.*;
Java EventLibrary class
The EventLibrary class provides several methods for creating an EventChannel within an application's process.
Java example
The file PushModelChannel.java implements a push supplier that uses an in-process event channel. This application presents a command prompt and allows you to enter one of the commands shown below.
Creates a PushModelChannel and activates it on the POA.
The code sample below contains an excerpt from PushModelChannel.java that shows how you can use the ChannelLib.create_channel method.
public static void main(String[] args) {
...
channel = EventLibrary.create_channel("channel_server", false, 100);
...
Import statements
The following import statements should be used by applications that wish to use the Event Service:
import org.omg.CosEventComm.*;
import org.omg.CosEventChannelAdmin.*;
...