VisiNotify Guide : Developing supplier and consumer applications

Developing supplier and consumer applications
This chapter discusses how you can develop supplier and consumer applications using OMG Notification Service. The following topics are covered:
Using pre-defined Event/Notification Services
The OMG Notification Service specifies three kinds of pre-defined event channels: Untyped, Structured, and Sequence. The advantage of pre-defined channels is that they are easy for user level implementations. Therefore, almost all notification service products on the market support pre-defined channels. The disadvantage of pre-defined channels are:
For these reasons, the pre-defined untyped, structured and sequence channels are not a good choice for new CORBA applications. However, they are supported by VisiNotify for OMG compliance as well as for legacy applications. New applications should consider using the OMG Typed Notification Service. See “Using Typed Event/Notification Service” for detailed information.
Developing push consumer applications
A push consumer is essentially a CORBA callback server application. It provides an push consumer object implementation. The push consumer object implementation supports a pre-defined (untyped, structured or sequence) push consumer interface. The consumer application connects this consumer object to a channel to receive events.
Developing a push consumer application involves two tasks:
To illustrate the development of the push consumer application, the structured push consumer is used.
C++ push consumer example
The following push consumer example is located in
<install_dir>/examples/vbroker/notify/basic_cpp/structPushConsumer.C.
// 1. Implementing the push consumer servant
class StructuredPushConsumerImpl : public POA_CosNotifyComm::StructuredPushConsumer, public virtual PortableServer::RefCountServantBase
{
...
public:
...
void push_structured_event(const CosNotification::StructuredEvent&
event) { ... }
...
};

// The consumer server
int main(int argc, char** argv)
{
// get orb and POA ...
...

// construct a push consumer servant
StructuredPushConsumerImpl* servant = new StructuredPushConsumerImpl;

// 2. Activate the consumer servant on a POA
poa->activate_object(servant);

// 3. Activate the POA
poa->the_POAManager()->activate();

...
// 4. Somehow, we get the channel from somewhere
CosNotifyChannelAdmin::EventChannel_var channel = ...;

// 5. Somehow, we decide to use the default admin
CosNotifyChannelAdmin::ConsumerAdmin_var admin =
channel->default_consumer_admin();

// 6. Obtain a proxy push supplier from the admin
CosNotifyChannelAdmin::ProxyID pxy_id;
CosNotifyChannelAdmin::ProxySupplier_var proxy =
admin->obtain_notification_push_supplier(
CosNotifyChannelAdmin::STRUCTURED_EVENT, pxy_id);

CosNotifyChannelAdmin::StructuredProxyPushSupplier_var supplier =
CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow(proxy);

// 7. Get consumer object reference and connect it to the proxy
CORBA::Object_var obj = poa->servant_to_reference(servant);
CosNotifyComm::StructuredPushConsumer_var consumer
= CosNotifyComm::StructuredPushConsumer::_narrow(obj);
supplier->connect_structured_push_consumer(consumer);

// working loop
orb->run();
}
Java push consumer example
The following push consumer example is located in
<install_dir>/examples/vbroker/notify/basic_java/structPushConsumer.java.
import org.omg.CosNotifyComm.*;
import org.omg.CosNotifyChannelAdmin,*;
import org.omg.CosNotification.*;

class StructPushConsumerImpl extends StructuredPushConsumerPOA
{
...
// 1. Implement the push consumer servant
public void push_structured_event(StructuredEvent event) { ... }
...

public static int main(String[] args) {
// get orb and POA ...
...

// construct a push consumer servant
StructPushConsumerImpl servant = new StructPushConsumerImpl();

// 2. Activate the consumer servant on a POA
poa.activate_object(servant);

// 3. Activate the POA
poa.the_POAManager().activate();

...
// 4. Somehow, we get the channel from somewhere
EventChannel channel = ...;

// 5. Somehow, we decide to use the default admin
ConsumerAdmin admin = channel.default_consumer_admin();

// 6. Obtain a proxy push supplier from the admin
ProxyIDHolder pxy_id = new ProxyIDHolder();
ProxySupplier proxy = admin.obtain_notification_push_supplier(
ClientType.STRUCTURED_EVENT, pxy_id);

StructuredProxyPushSupplier supplier
= StructuredProxyPushSupplierHelper.narrow(proxy);

// 7. Get consumer object reference and connect it to the proxy
org.omg.CORBA::Object obj = poa.servant_to_reference(servant);
StructuredPushConsumer consumer = StructuredPushConsumerHelper.narrow(obj);
supplier.connect_structured_push_consumer(consumer);

// working loop
orb.run();
}
}
Develop pull consumer applications
A pull consumer is essentially a CORBA client. It obtains a proxy pull supplier object in the channel and actively sends request to the proxy to retrieve buffered events.
Developing a pull consumer application involves two tasks:
To illustrate the development of the pull consumer application, the structured pull consumer is used.
C++ pull consumer example
The following pull consumer example is located in
<install_dir>/examples/vbroker/notify/basic_cpp/structPullConsumer.C.
// The consumer client
int main(int argc, char** argv)
{
// get orb ...
...

// 1. Somehow, we get the channel from somewhere
CosNotifyChannelAdmin::EventChannel_var channel = ...;

// 2. Somehow, we decide to use the default admin
CosNotifyChannelAdmin::ConsumerAdmin_var admin =
channel->default_consumer_admin();

// 3. Obtain a proxy pull supplier from the admin
CosNotifyChannelAdmin::ProxyID pxy_id;
CosNotifyChannelAdmin::ProxySupplier_var proxy =
admin->obtain_notification_pull_supplier(
CosNotifyChannelAdmin::STRUCTURED_EVENT, pxy_id);

CosNotifyChannelAdmin::StructuredProxyPullSupplier_var supplier =
CosNotifyChannelAdmin::StructuredProxyPullSupplier::_narrow(proxy);

// 4. Connect to the proxy
supplier->connect_structured_pull_consumer(NULL);

// 5. Pull events from the proxy pull supplier
for(int i=0;i<100;i++) {
CosNotification::StructuredEvent_var event;
event = supplier->pull_structured_event();
...
}

// 6. Gracefully cleanup
supplier->disconnect_structured_pull_supplier();
}
Java pull consumer example
The following pull consumer example is located in
<install_dir>/examples/vbroker/notify/basic_java/structPullConsumer.java.
import org.omg.CosNotifyComm.*;
import org.omg.CosNotifyChannelAdmin,*;
import org.omg.CosNotification.*;

public class structPullConsumer
{
...
public static int main(String[] args) {
// get orb ...
...
// 1. Somehow, we get the channel from somewhere
EventChannel channel = ...;

// 2. Somehow, we decide to use the default admin
ConsumerAdmin admin = channel.default_consumer_admin();

// 3. Obtain a proxy pull supplier from the admin
ProxyIDHolder pxy_id = new ProxyIDHolder();
ProxySupplier proxy = admin.obtain_notification_pull_supplier(
ClientType.STRUCTURED_EVENT, pxy_id);

StructuredProxyPullSupplier supplier =
StructuredProxyPullSupplierHelper.narrow(proxy);

// 4. Connect to the proxy
supplier.connect_structured_pull_consumer(null);

// 5. Pull events from the proxy pull supplier
for(int i=0;i<100;i++) {
StructuredEvent event = supplier.pull_structured_event();
...
}

// 6. Gracefully cleanup
supplier.disconnect_structured_pull_supplier();
}
}
Developing push supplier applications
A push supplier application is a CORBA client. It actively invokes requests on a proxy consumer object to send events to the channel.
Developing a push supplier application involve two tasks:
To illustrate the development of the push supplier application, the structured push supplier is used.
C++ push supplier example
The push supplier example is located in examples/vbroker/notify/basic_cpp/structPushSupplier.C.
// The push supplier client
int main(int argc, char** argv)
{
// get orb ...
...

// 1. Somehow, we get the channel from somewhere
CosNotifyChannelAdmin::EventChannel_var channel = ...;

// 2. Somehow, we decide to use the default admin
CosNotifyChannelAdmin::SupplierAdmin_var admin =
channel->default_supplier_admin();

// 3. Obtain a proxy push consumer from the admin
CosNotifyChannelAdmin::ProxyID pxy_id;
CosNotifyChannelAdmin::ProxyConsumer_var proxy =
admin->obtain_notification_push_consumer(
CosNotifyChannelAdmin::STRUCTURED_EVENT, pxy_id);

CosNotifyChannelAdmin::StructuredProxyPushConsumer_var consumer
= CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow(proxy);

// 4. Connect to the proxy
consumer->connect_structured_push_supplier(NULL);

// 5. Push events to the proxy push consumer
for(int i=0;i<100;i++) {
CosNotification::StructuredEvent_var event = ...;
consumer->push_structured_event(event);
}

// 6. Gracefully cleanup
consumer->disconnect_structured_push_consumer();
}
Java push supplier example
The push supplier example is located in examples/vbroker/notify/basic_java/structPushSupplier.java.
import org.omg.CosNotifyComm.*;
import org.omg.CosNotifyChannelAdmin,*;
import org.omg.CosNotification.*;

public class structPushSupplier
{
...
public static int main(String[] args) {
// get orb ...
...

// 1. Somehow, we get the channel from somewhere
EventChannel channel = ...;

// 2. Somehow, we decide to use the default admin
ConsumerAdmin admin = channel.default_supplier_admin();

// 3. Obtain a proxy consumer from the admin
ProxyIDHolder pxy_id = new ProxyIDHolder();
ProxyConsumer proxy = admin.obtain_notification_push_consumer (
ClientType.STRUCTURED_EVENT, pxy_id);

StructuredProxyPushConsumer consumer =
StructuredProxyPushConsumerHelper.narrow(proxy);

// 4. Connect to the proxy
consumer.connect_structured_push_supplier(null);

// 5. Push events to the proxy push consumer
for(int i=0;i<100;i++) {
StructuredEvent event = ...;
c onsumer.push_structured_event(event);
}

// 6. Gracefully cleanup
consumer.disconnect_structured_push_consumer();
}
}
Developing pull supplier applications
A pull supplier application is a CORBA callback server. It provides an pull supplier object implementation. The pull supplier object implementation supports a pre-defined (untyped, structured or sequence) pull supplier interface. The supplier application needs to connect this supplier object to a channel to supply events.
Developing a pull supplier application involves two tasks:
To illustrate the development of the pull supplier application, the structured pull supplier is used.
C++ pull supplier example
The pull supplier example is located in examples/vbroker/notify/basic_cpp/structPullSupplier.C.
// 1. Implement the pull supplier servant
class StructuredPullSupplierImpl : public
POA_CosNotifyComm::StructuredPullSupplier, public virtual
PortableServer::RefCountServantBase
{
...
public:
...
CosNotification::StructuredEvent* pull_structured_event() { ... }
CosNotification::StructuredEvent* try_pull_structured_event(
CORBA::Boolean& has_event) { ... }
...
};

// The supplier server
int main(int argc, char** argv)
{
// get orb and POA ...
...

// Construct a pull supplier servant
StructuredPullSupplierImpl* servant = new StructuredPullSupplierImpl;

// 2. Activate the consumer servant on a POA
poa->activate_object(servant);

// 3. Activate the POA
poa->the_POAManager()->activate();

...
// 4. Somehow, we get the channel from somewhere
CosNotifyChannelAdmin::EventChannel_var channel = ...;

// 5. Somehow, we decide to use the default admin
CosNotifyChannelAdmin::SupplierAdmin_var admin = channel
->default_supplier_admin();

// 6. Obtain a proxy pull consumer from the admin
CosNotifyChannelAdmin::ProxyID pxy_id;
CosNotifyChannelAdmin::ProxyConsumer_var proxy = admin
->obtain_notification_pull_consumer(
CosNotifyChannelAdmin::STRUCTURED_EVENT, pxy_id);

CosNotifyChannelAdmin::StructuredProxyPullConsumer_var consumer =
CosNotifyChannelAdmin::StructuredProxyPullConsumer::_narrow(proxy);

// 7. Get supplier object reference and connect it to the proxy
CORBA::Object_var obj = poa->servant_to_reference(servant);
CosNotifyComm::StructuredPullSupplier_var supplier
= CosNotifyComm::StructuredPullSupplier::_narrow(obj);
consumer->connect_structured_pull_supplier(supplier);

// working loop
orb->run();
}
Java pull supplier example
The pull supplier example is located in examples/vbroker/notify/basic_java/structPullSupplier.java.
import org.omg.CosNotifyComm.*;
import org.omg.CosNotifyChannelAdmin,*;
import org.omg.CosNotification.*;

class structPullSupplierImpl extends StructuredPullSupplierPOA
{
...
// 1. Implement the push consumer servant
public StructuredEvent pull_structured_event() { ... }
public StructuredEvent try_pull_structured_event(
org.omg.CORBA.BooleanHolder has_event) {...}
...

public static int main(String[] args) {
// Get orb and POA ...
...

// A pull supplier servant
structPullSupplierImpl servant = new structPullSupplierImpl();

// 2. Activate the supplier servant on a POA
poa.activate_object(servant);

// 3. Activate the POA
poa.the_POAManager()Activate();

...
// 4. Somehow, we get the channel from somewhere
EventChannel channel = ...;

// 5. Somehow, we decide to use the default admin
ConsumerAdmin admin = channel.default_supplier_admin();

// 6. Obtain a proxy pull consumer from the admin
ProxyIDHolder pxy_id = new ProxyIDHolder();
ProxyConsumer proxy = admin.obtain_notification_pull_consumer(
ClientType.STRUCTURED_EVENT, pxy_id);

StructuredProxyPullConsumer consumer =
StructuredProxyPullConsumerHelper.narrow(proxy);

// 7. Get supplier object reference and connect it to the proxy
org.omg.CORBA::Object obj = poa.servant_to_reference(servant);
StructuredPullSupplier supplier =
StructuredPullSupplierHelper.narrow(obj);
consumer.connect_structured_pull_supplier(supplier);

// working loop
orb.run();
}
}
Using Typed Event/Notification Service
The pre-defined events (untyped, structured, sequence) in OMG Event/Notification Service present a message-oriented approach. The disadvantages of this approach are:
Therefore, in developing new applications, it is recommended to use a user defined typed event and the OMG Typed Event/Notification Service. By using the OMG Typed Event/Notification Service, event interfaces are not pre-defined by OMG but rather by user applications using OMG IDL language. Using this approach results in the following:
There are minor issues with using the OMG Typed Event/Notification service. They include:
VisiBroker Publish/Subscribe Adapter (PSA) architecture resolves these two issues. See “Using the Publish Subscribe Adapter (PSA)” for additional information. The PSA simplifies and unifies the connection procedure to notification and typed notification services. It also presents an elegant solution for typed pulling.
Note
This chapter only discusses how to develop typed push applications directly using OMG Typed Notification Service. Typed pulling and PSA is discussed in “Using the Publish Subscribe Adapter (PSA)”.
As the user-defined event type, the following IDL interface definition is used throughout these examples:
// TMN.idl: typed event definition

// user defined pragma
# pragma prefix "example.borland.com"

// user defined module
module TMN {

// user defined event interface
interface TypedEvent {
void attributeValueChange(...);
void qosAlarm(...);
...
};
};
Developing typed push consumer applications
A typed push consumer is essentially a CORBA callback server application. It provides an user defined typed consumer object implementation. The typed push consumer object implementation supports the user defined IDL interface. The consumer application connects this consumer object to a typed channel to receive typed events.
Developing a typed push consumer application involves two tasks:
Implementing a handler servant. This handler servant supports the CosTypedNotifyComm::TypedPushConsumer interface and its get_typed_consumer() operation, which returns a reference of the user defined typed consumer object (for example, returns the <I> interface).
C++ typed push consumer example
The typed push consumer example is located in examples/vbroker/notify/basic_cpp/typedPushConsumer.C.
// 1. Implement the user defined typed consumer servant
class TMNTypedEventImpl : public POA_TMN::TypedEvent,
public virtual PortableServer::RefCountServantBase
{
...
public:
...
void attributeValueChange (...) { ... }
void qosAlarm(...) { ... }
...
};

// 2. Implement the handler servant
class HandlerImpl : public POA_CosTypedNotifyComm::TypedPushConsumer,
public virtual PortableServer::RefCountServantBase
{
CORBA::Object_var _the_typed_consumer; // the <I> interface

public:
HandlerImpl(CORBA::Object_ptr ref)
: _the_typed_consumer(CORBA::Object::_duplicate(ref)) {}

CORBA::Object_ptr get_typed_consumer() {
// return the <I> interface
return CORBA::Object::_duplicate(_the_typed_consumer); }
...
};

// The typed consumer server
int main(int argc, char** argv)
{
// Get orb and POA ...
...

// Construct a push consumer servant
TMNTypedEventImpl* servant = new TMNTypedEventImpl;

// 3. Activate the typed consumer on a POA
poa->activate_object(servant);

// 4. Get typed consumer reference
CORBA::Object_var obj = poa->servant_to_reference(servant);

// 5. Construct a handler servant and pass it the typed consumer reference
HandlerImpl* handler = new HandlerImpl(obj);

// 6. Activate the handler object on a POA
poa->activate_object(handler);

// 7. Activate the POA(s)
poa->the_POAManager()->activate();
...
// 8. Somehow, we get a typed channel
CosTypedNotifyChannelAdmin::TypedEventChannel_var channel = ...;

// 9. Somehow, we decide to use the default admin
CosTypedNotifyChannelAdmin::TypedConsumerAdmin_var admin =
channel->default_consumer_admin();

// 10. Obtain a proxy push supplier from the admin using the event
// repository id "IDL:example.borland.com/TMN/TypedEvent:1.0" as the key.
CosNotifyChannelAdmin::ProxyID pxy_id;
CosTypedNotifyChannelAdmin::ProxySupplier_var proxy
= admin->obtain_typed_notification_push_supplier(
"IDL:example.borland.com/TMN/TypedEvent:1.0", pxy_id);

// 11. Get handler object reference and connect it to the proxy
CORBA::Object_var ref = poa->servant_to_reference(handler);
CosTypedNotifyComm::TypedPushConsumer_var consumer
= CosTypedNotifyComm::TypedPushConsumer::_narrow(ref);
proxy->connect_typed_push_consumer(consumer);

// working loop
orb->run();
}
Java typed push consumer example
The typed push consumer example is located in examples/vbroker/notify/basic_java/typedPushConsumer.java.
// 1. Implement the user defined typed consumer servant
class TMNTypedEventImpl extends TMN.TypedEventPOA {
...
public void attributeValueChange (...) { ... }
public void qosAlarm(...) { ... }
...
}

// 2. Implement the handler servant
class TypedPushConsumerImpl
extends org.omg.CosTypedNotifyComm.TypedPushConsumer {
org.omg.CORBA.Object _the_typed_consumer = null; // the <I> interface

TypedPushConsumerImpl(org.omg.CORBA.Object ref) {
_the_typed_consumer = ref;
}
org.omg.CORBA.Object get_typed_consumer() {
// Return the <I> interface
return _the_typed_consumer; }
...

public static void main(String [] args) {
// Get orb and POA ...
...
// Construct a push consumer servant
TMNTypedEventImpl servant = new TMNTypedEventImpl();

// 3. Activate the typed consumer on a POA
poa.activate_object(servant);

// 4. Get typed consumer reference
org.omg.CORBA.Object obj = poa.servant_to_reference(servant);

// 5. Construct a handler servant and pass it the typed consumer reference
TypedPushConsumerImpl handler = new TypedPushConsumerImpl(obj);

// 6. Activate the handler object on a POA
poa.activate_object(handler);

// 7. Activate the POA(s)
poa.the_POAManager()Activate();
...
// 8. Somehow, we get a typed channel from somewhere
org.omg.CosTypedNotifyChannelAdmin.TypedEventChannel channel = ...;

// 9. Somehow, we decide to use the default admin
org.omg.CosTypedNotifyChannelAdmin.TypedConsumerAdmin admin =
channel.default_consumer_admin();

// 10. Obtain a proxy push supplier from the admin using the event
// repository id "IDL:example.borland.com/TMN/TypedEvent:1.0" as the key.
org.omg.CosNotifyChannelAdmin.ProxyIDHolder pxy_id_holder
= new org.omg.CosNotifyChannelAdmin.ProxyIDHolder();
org.omg.CosTypedNotifyChannelAdmin.ProxySupplier proxy
= admin.obtain_typed_notification_push_supplier(
"IDL:example.borland.com/TMN/TypedEvent:1.0", pxy_id_holder);

// 11. Get handler object reference and connect it to the proxy
org.omg.CORBA.Object ref = poa.servant_to_reference(handler);
org.omg.CosTypedNotifyComm.TypedPushConsumer consumer =
org.omg.CosTypedNotifyComm.TypedPushConsumerHelper.narrow(ref);
proxy.connect_typed_push_consumer(consumer);

// working loop
orb.run();
}
}
Developing typed push supplier applications
A typed push supplier application is a CORBA client. It actively invokes requests on a typed consumer proxy object to send typed events to the channel.
Developing a typed push supplier application involves two tasks:
Calling get_typed_consumer() on the typed proxy push consumer to get the <I> interface reference.
The following example compares the procedure of using a pre-defined event interface. Notice that using typed events requires an additional procedure, such as get_typed_consumer().
C++ typed push supplier example
The typed push supplier example is located in examples/vbroker/notify/basic_cpp/typedPushSupplier.C.
// The typed push supplier client
int main(int argc, char** argv)
{
// Get orb ...
...

// 1. Somehow, we get the typed channel from somewhere
CosTypedNotifyChannelAdmin::TypedEventChannel_var channel = ...;

// 2. Somehow, we decide to use the default admin
CosTypedNotifyChannelAdmin::TypedSupplierAdmin_var admin =
channel->default_supplier_admin();

// 3. Obtain a typed proxy push consumer from the admin using the event
// repository id "IDL:example.borland.com/TMN/TypedEvent:1.0"
// as the key.
CosTypedNotifyChannelAdmin::ProxyID pxy_id;
CosTypedNotifyChannelAdmin::TypedProxyPushConsumer_var proxy
= admin->obtain_typed_notification_push_consumer(
"IDL:example.borland.com/TMN/TypedEvent:1.0", pxy_id);

// 4. Connect to the proxy
proxy->connect_typed_push_supplier(NULL);

// 5. Get the <I> interface
CORBA::Object_var obj = proxy->get_typed_consumer();
TMN::TypedEvent_var consumer = TMN::TypedEvent::_narrow(obj);

// 6. Push events to the <I> interface
for(int i=0;i<100;i++) {
consumer->attributeValueChange(...);
consumer->qosAlarm(...);
}

// 7. Flush buffered events
consumer->_non_existent();

// 8. Gracefully cleanup
proxy->disconnect_typed_push_consumer();
}
Java typed push supplier example
The typed push supplier example is located in examples/vbroker/notify/basic_java/typedPushSupplier.java.
import org.omg.CosTypedNotifyComm.*;
import org.omg.CosTypedNotifyChannelAdmin,*;
import org.omg.CosNotification.*;

public class typedPushSupplier
{
...

// The typed push supplier client
public static void main(String[] args) {
// get orb ...
...

// 1. Somehow, we get the typed channel from somewhere
org.omg.CosTypedNotifyChannelAdmin.TypedEventChannel_var channel = ...;

// 2. Somehow, we decide to use the default admin
org.omg.CosTypedNotifyChannelAdmin.TypedSupplierAdmin admin
= channel.default_supplier_admin();

// 3. Obtain a typed proxy push consumer from the admin using the event
// repository id "IDL:example.borland.com/TMN/TypedEvent:1.0"
// as the key.
Org.omg.CosTypedNotifyChannelAdmin.ProxyIDHolder pxy_id = new
org.omg.CosTypedNotifyChannelAdmin.ProxyIDHolder();
CosTypedNotifyChannelAdmin::TypedProxyPushConsumer_var proxy
= admin.obtain_typed_notification_push_consumer(
"IDL:example.borland.com/TMN/TypedEvent:1.0", pxy_id);

// 4. Connect to the proxy
proxy.connect_typed_push_supplier(null);

// 5. Get the <I> interface
org.omg.CORBA.Object obj = proxy.get_typed_consumer();
TMN.TypedEvent consumer = TMN.TypedEventHelper.narrow(obj);

// 6. Push events to the <I> interface
for(int i=0;i<100;i++) {
consumer.attributeValueChange(...);
consumer.qosAlarm(...);
}

// 7. Flush buffered events
consumer._non_existent();

// 8. Gracefully cleanup
proxy.disconnect_typed_push_consumer();
}
}
Developing RMI/EJB applications with VisiNotify
With the introduction of J2EE 1.3, RMI-over-IIOP has been standardized in the J2EE implementations. Therefore, the interoperation and interconnection between CORBA and J2EE environments have become seamless. J2EE is basically a framework for client/server applications. However, J2EE technology does not provide adequate support for publish/subscribe applications. The only solutions defined in J2EE are Java Messaging Service (JMS) and Message Driver Bean (MDB). JMS is purely a message-oriented service that is mainly used for integrating or interconnecting with legacy message middleware. MDB is simply defined following the use of JMS and allows legacy message middleware to send messages to an enterprise bean through JMS. In this regard, the JMS and MDB based solutions usually share the disadvantages of legacy message oriented middlewares. They include:
The OMG Typed Event/Notification Service resolves these issues. A typed notification can be used as a publish/subscribe middleware for RMI/EJB application. In addition, VisiNotify offers support for direct connection between OMG Structured channel and RMI/EJB. VisiNotify also provides direct support of CORBA valuetypes (either in standard marshaling or in customer marshaling) as well as Java serializeable objects. With these standard facilities from OMG Typed Event/Notification Service, J2EE 1.3, and VisiNotify extensions, event driven RMI/EJB applications can be developed as normal object oriented applications rather than mapping OMG Notification Service as a JMS provider and then using JMS/MDB. The advantages of this approach are:
This section describes how OMG Typed Event/Notification Service VisiNotify is used in the RMI/EJB environments.
This user defined Java RMI remote interface is used as either an RMI server interface or an EJB consumer bean remote interface throughout the examples in this section.

package TMN;

import java.rmi.Remote;
import java.rmi.RemoteException;

public interface Notification extends Remote {
void attributeValueChange(...) throws RemoteException;
void qosAlarm(...) throws RemoteException;
...
}
Developing an RMI typed consumer
A RMI typed push consumer is essentially an RMI callback application connected to the OMG Typed Notification service. The typed push consumer RMI object implements user defined RMI interface. An RMI typed consumer is very similar to a CORBA typed consumer with slight differences. They include:
This code example shows an RMI typed push consumer:
// 1. Implement the user defined typed consumer RMI object
class RMINotifyImpl
extends PortableRemoteObject
implements TMN.Notification {
...
public void attributeValueChange (...) { ... }
public void qosAlarm(...) { ... }
...
}

// 2. Implement the handler servant
public class TypedPushConsumerImpl
extends org.omg.CosTypedNotifyComm.TypedPushConsumer {
org.omg.CORBA.Object _the_typed_consumer = null;
// the <I> interface

TypedPushConsumerImpl(org.omg.CORBA.Object ref){
_the_typed_consumer = ref;
}
org.omg.CORBA.Object get_typed_consumer() {
// return the <I> interface
return _the_typed_consumer; }
...

public static void main(String [] args) {
// Get orb and POA ...
...

// 3. Allocate a RMI consumer object
RMINotifyImpl consumer = new RMINotifyImpl();

// 4. Get the CORBA object reference of the RMI consumer
org.omg.CORBA.Object corba_obj = javax.rmi.CORBA.Util.getTie(consumer).thisObject();

// 5. Allocate a handler servant and pass it the typed // consumer reference
TypedPushConsumerImpl handler = new TypedPushConsumerImpl(corba_obj);

// 6. Activate the handler object on a POA
poa.activate_object(handler);

// 7. Activate the POA(s)
poa.the_POAManager()Activate();

...
// 8. Somehow, we get a typed channel from somewhere
org.omg.CosTypedNotifyChannelAdmin.TypedEventChannel channel = ...;

// 9. Somehow, we decide to use the default admin
org.omg.CosTypedNotifyChannelAdmin.TypedConsumerAdmin admin =
channel.default_consumer_admin();

// 10. Obtain a proxy push supplier from the admin
org.omg.CosNotifyChannelAdmin.ProxyIDHolder pxy_id_holder = new
org.omg.CosNotifyChannelAdmin.ProxyIDHolder();
org.omg.CosTypedNotifyChannelAdmin.ProxySupplier proxy =
admin.obtain_typed_notification_push_supplier(
"RMI.Test" ,
pxy_id_holder);

// 10. Get handler object reference and connect it to the proxy
org.omg.CORBA.Object ref = poa.servant_to_reference (handler);
org.omg.CosTypedNotifyComm.TypedPushConsumer consumer =
org.omg.CosTypedNotifyComm.TypedPushConsumerHelper.narrow(ref);
proxy.connect_typed_push_consumer(consumer);

// working loop
orb.run();
}
}
Developing an RMI typed supplier
An RMI typed supplier is very similar to its CORBA counterpart except that the <I> reference, which is returned from get_typed_consumer(), should be narrowed into the correspondent RMI stub (see step 6 in example below).
This code example shows an RMI typed push supplier:
import org.omg.CosTypedNotifyComm.*;
import org.omg.CosTypedNotifyChannelAdmin,*;
import org.omg.CosNotification.*;

public class TypedPushSupplierImpl
{
...

// The typed push supplier client
public static void main(String[] args) {
// get orb ...
...

// 1. Somehow, we get the typed channel from somewhere
org.omg.CosTypedNotifyChannelAdmin.TypedEventChannel_var channel = ...;

// 2. Somehow, we decide to use the default admin
org.omg.CosTypedNotifyChannelAdmin.TypedSupplierAdmin admin
= channel.default_supplier_admin();

// 3. Obtain a typed proxy push consumer from the admin
Org.omg.CosTypedNotifyChannelAdmin.ProxyIDHolder pxy_id
= new org.omg.CosTypedNotifyChannelAdmin.ProxyIDHolder();
CosTypedNotifyChannelAdmin::TypedProxyPushConsumer_var proxy
= admin.obtain_typed_notification_push_consumer("RMI.Test, pxy_id);

// 4. Connect to the proxy
proxy.connect_typed_push_supplier(null);

// 5. Get the <I> interface
org.omg.CORBA.Object obj = proxy.get_typed_consumer();

// 6. Narrowing the CORBA object reference into RMI stub.
TMN.Notification consumer = (TMN.Notification)PortableRemoteObject.
narrow(obj, TMN.Notification.class);

// 7. Push events to the <I> RMI stub
for(int i=0;i<100;i++) {
consumer.attributeValueChange(...);
consumer.qosAlarm(...);
}

// 8. Flush buffered events
com.inprise.vbroker.orb.BufferedEvents.flush();

// 9. Gracefully cleanup
proxy.disconnect_typed_push_consumer();
}
}
Developing an EJB bean as a Typed Notification consumer
An EJB typed event bean can be any type of bean (session or entity, stateless or stateful), except for an MDB. The EJB typed event bean implements event operations as declared in the given associated user-defined remote interface.
This code example shows an EJB bean as a push consumer of a user-defined TMN.Notification remote interface:
import javax.ejb.*;

// 1. The bean implementation
public class TMNNotifyBean implements SessionBean {
...
// implement operations declared in bean's remote interface
public void attributeValueChange (...) { ... }
public void qosAlarm(...) { ... }
...
}
After building and deploying this typed EJB bean implementation, your application can:
In this release, a command line utility, subtool, is provided to subscribe an EJB bean as a typed RMI consumer by knowing its JNDI name. To connect a typed event EJB bean to an OMG Typed Notification channel, use the subtool command:

% subtool [-channel <ior>|-admin <ior>] \
-home <jndi_name> \
-type typed \
-key <proxy_key>
where:
The -channel or -admin option specify the channel or consumer admin object as the subscribe point.
The -home <jndi_name> tells subtool the JNDI name of the bean's Home interface.
The -type typed option tells subtool to connect the bean's remote interface as typed consumer.
The -key <proxy_key> option tells the subtool what should be the key parameter for obtain_typed_notification_push_supplier().
This example shows using subtool to subscribe a typed event bean to a OMG Typed Notification Channel:
% subtool -channel corbaloc::127.0.0.1:14100/default_channel \
-home stock_home -type typed -key Stock
Developing an EJB bean as a Structured Notification consumer
An EJB structured event bean can be any type of EJB bean (session or entity, stateless or stateful), except for an MDB. This EJB structured event bean can connect to an VisiNotify Structured Notification Service and receive structured events originating from non-RMI CORBA applications. An EJB structured event bean implements, among other mandatory operations, a void push_structured_event (org.omg.CosNotification.StructuredEvent) operation. This operation should not be overloaded in this bean and its remote interface.
Unlike typed event beans, support of structured event beans is a VisiNotify extension. VisiNotify does a special translation to convert a StructuredEvent structure that is sent into the channel from a CORBA Structured supplier application into a StructuredEvent valuebox when it detects the connected consumer is a structured event EJB bean.
This example shows an EJB bean as a structured push consumer:
import javax.ejb.*;

// The bean implementation
public class MyStructuredNotifyBean implements SessionBean {
...
public void push_structured_event( org.omg.CosNotification.StructuredEvent event) { ... }
...
}
After building and deploying this structured event bean, connect its remote interface to the given VisiNotify's structured event channel. The remote interface of this bean should declare the push_structured_event() operation. This code example shows the connection as:
import java.rmi.Remote;
import java.rmi.RemoteException;

// The bean's remote interface
public interface MyStructuredInterface extends Remote {
public void push_structured_event(
org.omg.CosNotification.StructuredEvent event)
throws
RemoteException;
...
}
The ORB type system does not permit directly connecting this structure event bean's remote interface as a structured event consumer to OMG Notification Service structured channel. Therefore, to connect a structured event bean to a VisiNotify Structured Notification channel, use the subtool command:
% subtool [-channel <ior>|-admin <ior>] \
-home <jndi_name> \
-type struct
where:
The -channel or -admin option specify the channel or consumer admin object as the subscribe point.
The -home <jndi_name> tells subtool the JNDI name of the bean's Home interface.
The -type struct option tells subtool to connect the bean's remote interface as structured consumer.
This example shows using subtool to subscribe a structured event bean to a VisiNotify Structured Notification Channel:
% subtool -channel corbaloc::127.0.0.1:14100/default_channel \
-home stock_home -type struct
VisiBroker Event Buffering/Batch
Event buffering/batch is a mechanism implemented in VisiBroker (since version 5.1) to optimize VisiNotify event throughput. By default, events are buffered in supplier-side stubs before being flushed to VisiNotify as a larger batch message. Also, if VisiNotify detects that the consumer is working on top of VisiNotify (5.1 or later), it will try to buffer/batch events together.
Disable supplier-side event buffering
Supplier applications can disable supplier-side event buffering by setting vbroker.orb.supplier.eventBatch to false. For example:
% typedPushSupplier ... -Dvbroker.orb.supplier.eventBatch=false
or
% vbj ... StructPushSupplier ... -Dvbroker.orb.supplier.eventBatch=false
Disable consumer-side event buffering
Consumer applications can also disable VisiNotify from sending events in batch by setting vbroker.orb.consumer.eventBatch to false. For example:
% typedPushConsumer ... -Dvbroker.orb.consumer.eventBatch=false
or
% vbj ... StructPushConsumerImpl ... -Dvbroker.orb.consumer.eventBatch=false
Flush buffered events in supplier application
The supplier-side VisiBroker runtime will flush an event when these conditions occur:
Event buffer is full: This is a per-stub level flush. The default size of this stub level event buffer is 32K. A given supplier application can use vbroker.orb.supplier.eventBufferSize to change this size between 8K and 64K. For example:
% typedPushSupplier ... -Dvbroker.orb.supplier.eventBufferSize=48000
Number of buffered events reaches the maximum batch size: This is a per-stub level flush. The default maximum number of events that a stub can hold in its buffer is 128. A supplier application can use vbroker.orb.supplier.maxBatchSize to change this size to any value less than 256. For example:
% vbj ... UntypedPushSupplier ... -Dvbroker.orb.supplier.eventBatchSize=32
Internal buffer flush timeout: This is a global flush. On timeout, all events buffered in all stubs will be flushed out. The default timeout interval is 2,000 milliseconds (2 seconds). A supplier application can use vbroker.orb.supplier.eventBatchTimerInterval to change this time between 100 millisecond (0.1 second) and 10,000 milliseconds (10 seconds). For example:
% typedPushSupplier ... -Dvbroker.orb.supplier.eventBatchTimerInterval=5000
Supplier invoked a non-bufferable operation on the stub: This is a per-stub level flush and includes:
Note
Therefore, invoking disconnect_..._push_consumer() operations or _non_existent() on the proxy stubs (above) will flush out all buffered events.
Therefore, a supplier application can invoke _non_existent() operation on an <I> interface stub to flush its buffered events. Notice that the calling disconnect_typed_push_consumer() on a typed proxy consumer stub will not cause the buffer in a corresponding <I> stub to be flushed. The application should explicitly call _non_existent() on an <I> interface stub before calling disconnect_typed_push_consumer() on proxy stub.
Java application calling BufferedEvent.flush()
A Java supplier application can explicitly call com.inprise.vbroker.orb.BufferedEvents.flush() to flush. This is a global level event flush. It is to support VisiBroker RMI applications because there is no pseudo operation on a java.rmi.Remote interface, which can be used for event flush. Calling this static method will flush out all events in every stub.
Initial Reference of VisiNotify
By default, VisiNotify uses TCP port number 14100 unless -Dvbroker.notify.listener.port=<port> is used in the command line. Therefore, as specified by OMG Notification Service, the URL of the Channel factory and typed channel factory are:
corbaloc::<host>:14100/NotificationService
corbaloc::<host>:14100/TypedNotificationService
where, <host> is the domain name or dotted IP address of the VisiNotify host machine. The VisiNotify server also creates a default channel. The URL of this default channel is:
corbaloc::<host>:14100/default_channel
This URL can be registered to the supplier or consumer application's ORB using these two OMG standardized scenarios:
1
-ORBInitRef NotificationService=corbaloc::127.0.0.1:14100/NotificationService
or
-ORBInitRef TypedNotificationService=corbaloc::127.0.0.1:14100/TypedNotificationService
2
orb.register_initial_reference("TypedNotificationService",
orb.string_to_object(
"corbaloc::127.0.0.1:14100/TypedNotificationService");
After registering them as an initial service, the application can use resolve_initial_reference().