VisiNotify Guide : Using the Publish Subscribe Adapter (PSA)

Using the Publish Subscribe Adapter (PSA)
This chapter introduces VisiBroker Publish/Subscribe Adapter (PSA). The PSA is primarily a programming model and a component that works in conjunction with OMG Event/Notification Service. It is interoperable with applications that use low-level OMG Notification Service interfaces.
Introduction
As “one of the best client/server middleware products,” CORBA provides solid support for traditional client/server applications that are based on OMG object-oriented ORB architecture. However, there are some issues with CORBA in respect to supporting publish/subscribe applications. For many enterprise business applications, the publish/subscribe communication model is as important as the client/server model. Direct support of the publish/subscribe communication model in the CORBA middleware infrastructure substantially reduces the development effort by allowing developers to focus on implementing business logic rather than redesigning system solutions.
Notwithstanding, the ORB level support of the publish/subscribe communication model has been virtually omitted within OMG along with third-party ORB vendors. The publish/subscribe communication model is considered as a “second-class citizen” within the CORBA development sphere. Consequently, application developers have to resort to COS level solutions, such as Event/Notification Services, which are more or less message oriented rather than object oriented. In COS Event/Notification Service, the publish/subscribe is modeled as replicated client/server communications. The disadvantages of this modeling are:
Note
CORBA is not the only distributed object middleware that does not provide support for non-classic communication models at the same object abstraction level. For instance, within the RMI/EJB environment, instead of extending the original Java and RMI object model, a message oriented model (namely, JMS/MDB) is used.
The Publish/Subscribe Adapter (PSA) described in this chapter addresses the problems previously mentioned. PSA is mainly a programming model and a software component working on top of OMG standardized Notification Service. Therefore, PSA can be used along with third party OMG Notification Service implementations and is also interchangeable with applications which are directly built with low-level OMG Notification Service interfaces.
One of the basic functions of the PSA is to hide the details pertaining to channel connections. Typically, when designing a CORBA publish/subscribe application, the main goal is to make the application consumer object receive events from a given channel. The channel is usually specified by its channel reference or consumer admin reference. The consumer object is usually specified by its POA and object id. By using OMG Notification Service directly, the application requires multiple steps in connecting the consumer object to the channel. However, by using PSA, the application only needs a single operation to complete this connection.
To introduce the basic concept of PSA, this example shows how a typed event consumer application is coded. Assume that the typed event is defined by the IDL interface:
// TMN.idl: typed event definition
#pragma prefix "examples.borland.com"
module TMN {
interface TypedEvent {
void attributeValueChange(...);
...
};
};
First, in order for the typed event consumer to be able to receive events, it needs to provide a servant implementation that derives from the user defined event interface skeleton, POA_TMN::TypedEvent:
// 1. Implement typed servant
# include "TMNEvents_s.hh"
class TMNTypedEventImpl : public POA_TMN::TypedEvent,
public PortableServer::RefCountServantBase
{
public:
void attributeValueChange(...) { ... };
...
};
Next, activate this servant on a POA:
int main(int argc, char** argv)
{
...
// 2. Get orb and poa environment
CORBA::ORB_ptr orb = CORBA::ORB_init(argc, argv);
CORBA::Object_var obj =
orb->resolve_initial_references("RootPOA");
PortableServer::POA_var poa = PortableServer::POA::_narrow(obj);
// 3. Construct the typed servant
TMNTypedEventImpl* servant = new TMNTypedEventImpl();
// 4. Activate it on poa
poa->activate_object(servant);
Up to this point, the typed event application is treated as a normal typed consumer application and nothing special has been added. If this was a normal client/server example, then, the application would create an object reference from POA and pass it to clients. In any case, this example is of a publish/subscribe consumer, therefore, the application does not need to pass its reference directly to a client, which is the event publisher. Instead, the consumer needs to connect to a given channel or consumer admin object reference.
With PSA, instead of “connecting” the consumer to the channel, you simply “subscribe” it to the channel:
// 5. Somehow, this consumer is given a channel reference
CORBA::Object_var channel = ... ;
// 6. Get object id of the consumer servant
PortableServer::ObjectId_var oid =
poa-servant_to_id(servant);
// 7. Narrow the POA to PSA
PortableServerExt::PSA_var psa = PortableServerExt::PSA::_narrow(poa);
// 8. Subscribe to the channel
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL:example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PUSH_EVENT };
psa->subscribe(scheme, channel, oid, CORBA::NameValuePairSeq());
// 9. Consumer working loop
poa->the_POAManager()->activate();
orb->run();
}
As shown in the code, the application only needs to create the typed servant implementation with PSA. The application does not need to have the CosTypedNotifyComm::TypedPushConsumer servant to support get_typed_consumer(). Also, notice that the subscribe is a one-step procedure instead of multiple (six steps) operations to make a “connection.”
Here is the Java code example equivalent:
import com.inprise.vbroker.PortableServerExt.*;
// 1. Implement typed servant
public class TMNTypedEventImpl : extend TMN.TypedEventPOA,
{
public void attributeValueChange(...) { ... }
};
public class TypedPushConsumerImpl
{
public static void main(String[] args)
{
...
// 2. Get orb and psa environment
org.omg.CORBA.ORB orb = ORB_init(args, null);
org.omg.PortableServer.POA poa =
org.omg.PortableServer.POA.orb.resolve_initial_references("RootPOA");
// 3. Construct the typed servant
TMNTypedEventImpl servant = new TMNTypedEventImpl();
// 4. Activate it on root psa
poa.activate_object(servant);
// 5. Somehow, this consumer is given a channel reference
org.omg.CORBA.Object channel = ...;
// 6. Get object id of the consumer servant
org.omg.PortableServer.ObjectId oid = psa.servant_to_id(servant);
// 7. Narrow the org.omg.PortableServer.POA to com.inprise.vbroker.PSA
PSA psa = PSA.narrow(poa);
// 8. Subscribe to the channel
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.TYPED_SUBJECT,
"IDL:example.borland.com/TMN/TypedEvent:1.0",
SubjectDeliveryScheme.PUSH_EVENT);
psa.subscribe(scheme, channel, oid, null);

// 9. working loop
poa.the_POAManager().activate();
orb.run();
}
}
This example clearly shows how the PSA works in conjunction with OMG Event/Notification Service or Typed Event/Notification Service. More importantly, it shows how it simplifies the CORBA publish/subscribe application by shielding it from the low level notification service objects such as admins/proxies and operations.
Later in this chapter, you will see how the PSA de-couples connection logic from the event interface and transfer model. Connection logic, such as subscribe, in the PSA is not affected by event interface and transfer model. For instance, changing a structured consumer to a typed consumer or changing a typed consumer from push to pull, requires no change in consumer subscribe logic but only a flag change on subject scheme. These kind of changes would require major code modifications to consumer connection logic if the PSA is not used. Additionally, this chapter provides examples that covers the various application cases and show the power and usage of the PSA.
PSA reference and PSA interface IDL
PSA is an extension of POA and supports all operations defined for POA. Since version 5.1, a VisiBroker POA reference can be narrowed down to a PSA reference and resolve_initial_references() with RootPOA and RootPSA, which actually return the same internal reference.
This code example shows how to get root PSA.
C++
// Getting root PSA in C++
CORBA::Object_var ref =
orb->resolve_initial_references("RootPSA");
PortableServerExt::PSA_var psa = PortableServerExt::_narrow(ref);
Java
// Getting root PSA in Java
// get publisher/subscriber adapter
org.omg.CORBA.Object ref = orb.resolve_initial_references("RootPOA");
PSA psa = PSAHelper.narrow(ref);
PSA is defined in PortableServerExt module and (indirectly) derived from PortableServer::POA.
module PortableServerExt {
interface POA : PortableServer::POA {
readonly attribute CORBA::PolicyList the_policies;
};
enum SubjectAddressScheme {
SUBSCRIBE_ADMIN_ADDR,
PUBLISH_ADMIN_ADDR,
CHANNEL_ADDR,
SUBJECT_ADDR
};

enum SubjectInterfaceScheme {
TYPED_SUBJECT,
UNTYPED_SUBJECT,
STRUCTURED_SUBJECT,
SEQUENCE_SUBJECT
};
enum SubjectDeliveryScheme {
PUSH_EVENT,
PULL_EVENT
};
typedef string SubjectInterfaceId;
struct SubjectScheme {
SubjectAddressScheme address_scheme;
SubjectInterfaceScheme interface_scheme;
SubjectInterfaceId interface_id;
SubjectDeliveryScheme delivery_scheme;
};
typedef Object Subject;
typedef CORBA::OctetSequence PublishSubscribeDesc;
typedef PublishSubscribeDesc SubscribeDesc;
typedef PublishSubscribeDesc PublishDesc;
exception InvalidSubjectScheme { long error; };
exception InvalidSubscribeDesc { long error; };
exception InvalidPublishDesc { long error; };
exception InvalidProperties { CORBA::StringSequence names; };
exception ChannelException { string repository_id; }
// The Publisher/Subscriber Adapter
interface PSA : POA {
// register subject observer
SubscribeDesc subscribe(
in SubjectScheme the_subject_scheme,
in Subject the_subject,
in PortableServer::ObjectId the_observer_id,
in CORBA::NameValuePairSeq the_properties )
raises( InvalidSubjectScheme,
InvalidProperties,
ChannelException );
// Register subject provider
PublishDesc publish(
in SubjectScheme the_subject_scheme,
in Subject the_subject,
in PortableServer::ObjectId the_pullable_publisher_id,
in CORBA::NameValuePairSeq the_properties )
raises( InvalidSubjectScheme,
InvalidProperties,
ChannelException );
// Unregister observer from subject
void unsubscribe(
in SubscribeDesc the_subscribe_desc )
raises( InvalidSubscribeDesc,
ChannelException );
// Unregister (pull mode) provider
void unpublish(
in PublishDesc the_publish_desc)
raises( InvalidPublishDesc,
ChannelException );

// Suspend subject to push into the registered
// observer or suspend subject to pull from the
// registered provider
void suspend(
in PublishSubscribeDesc the_desc)
raises( ChannelException );
// Resume subject to push into the registered
// observer or resume subject to pull from the
// registered provider.
Void resume(
in PublishSubscribeDesc the_desc)
raises( ChannelException );
// Pull (typed) event and dispatch it to a registered
// observer.
unsigned long pull_and_dispatch(
in SubscribeDesc the_subscribe_desc,
in unsigned long max_count,
in boolean block_pulling,
in Boolean async_dispatch)
raises( InvalidSubscribeDesc,
InvalidSubjectScheme,
ChannelException );

// Pull (typed) event and accept a given visitor to
// 'visit' the event.
Unsigned long pull_and_visit(
in SubscribeDesc the_subscribe_desc,
in unsigned long max_count,
in Boolean block_pulling,
in PortableServer::Servant the_visitor)
raises( InvalidSubscribeDesc,
InvalidSubjectScheme,
ChannelException );
Subject the_subject_addr(
in PublishSubscribeDesc the_desc)
raises( InvalidSubjectScheme );
// low level access
Object the_proxy_addr(
in PublishSubscribeDesc the_desc)
raises( InvalidSubjectScheme );
};
...
};
In VisiBroker (version 5.1 and later), all POAs are internally implemented as PSAs. Therefore, any POA reference in VisiBroker can always be narrowed into a PSA reference.
This code example shows the narrowing POA to PSA.
C++
// Narrowing a POA into a PSA in C++
PortableServer::POA_var poa = parent_poa->create_POA(...);
PortableServerExt::PSA_var psa = PortableServerExt::_narrow(poa);
Java
// Narrowing a POA into a PSA in Java
org.omg.PortableServer.POA poa = parent_poa.create_POA(...);
com.inprise.vbroker.PortableServerExt.PSA psa = com.inprise.vbroker.PortableServerExt.PSAHelper.narrow
(poa);
User examples
The following examples compare application code written with the COS notification method and PSA. The examples are:
Structured Push Consumer
The table below compares the same structured push consumer application written in notification connection method (left column) and PSA (right column). The noticeable difference are:
This code example shows the connection/subscribe structured consumer to a channel in C++:
// Connect to channel
// 1. Get default admin
ConsumerAdmin_var admin =
channel->default_consumer_admin();
// Activate it on root psa
psa->activate_object(servant);
// Get consumer object id
PortableServer::ObjectId_var oid =
poa->servant_to_id(servant);
// Subscribe to channel

// Specify the subject scheme
PortableServerExt::SubjectScheme
  scheme = PortableServerExt::CHANNEL_ADDR,
PortableServerExt::STRUCTURED_SUBJECT,
(const char*)"",
PortableServerExt::PUSH_EVENT };
// 2. Create a proxy
ProxyID proxy_id;
ProxySupplier_var proxy = admin->
obtain_notification_push_supplier(
STRUCTURED_EVENT, proxy_id);
// Narrow to the stub
StructuredProxyPushSupplier_var supplier
= StructuredProxyPushSupplier::
_narrow(proxy);
// 3. Connect proxy supplier
supplier->
connect_structured_push_consumer(
consumer);
// working loop
orb->run();
}
// 1. Subscribe
psa->subscribe(scheme, channel, oid,
CORBA::NameValuePairSeq());
// working loop
orb->run();
}
This is the Java equivalent.
// Connect to channel
// 1. Get default admin
ConsumerAdmin admin
= channel.default_consumer_admin();
// 2. Create a proxy
ProxyID proxy_id;
ProxySupplier proxy
  = admin.
// Subscribe to channel

// Specify the subject scheme
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.
      STRUCTURED_SUBJECT,
(const char*)"",
SubjectDeliveryScheme.PUSH_EVENT);
// 1. Subscribe
psa.subscribe(
scheme, channel, oid, null);
// working loop
orb.run();
}
Typed Push Consumer
The table below shows the code written with the notification connection method (left column) and PSA (right column). The noticeable differences are:
This code example show the connection/subscribe typed consumer to a channel in C++:
// Connect to channel
// 1. Construct the proxy consumer
TypedPushConsumerImpl* servant =
new TypedPushConsumerImpl(typed_ref);
// Subscribe to channel
// Specify the subject scheme
PortableServerExt::SubjectScheme scheme =
{ PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL:example.borland.com"
"TMN/TypedEvent:1.0",
PortableServerExt::PUSH_EVENT };
// 1. Subscribe
psa->subscribe(scheme, channel, oid,
CORBA::NameValuePairSeq());
// working loop
orb->run();
}
// 2. Activate it on root poa
poa->activate_object(servant);
// 3. Get consumer object reference/
obj = poa
  ->servant_to_reference(servant);
CosTypedNotifyComm::TypedPushConsumer_
var consumer = CosTypedNotifyComm::
TypedPushConsumer::_narrow(obj);
// 4. Get default admin
TypedConsumerAdmin_var admin =
channel->default_consumer_admin();
// 5. Create a proxy
CosNotifyChannelAdmin::ProxyID proxy_id;
TypedProxySupplier_var proxy = admin-> obtain_notification_push_supplier(
        "IDL:example.borland.com/"
        "TMN/TypedEvent:1.0", proxy_id);
// 6. Connect proxy supplier
proxy->connect_typed_push_consumer(
         consumer);
// working loop
orb->run();
}
The following code examples show the connection/subscribe typed consumer to a channel in Java:
// Connect to channel
// Subscribe to channel
// 1. Allocate the proxy consumer
TypedPushConsumerImpl servant = New TypedPushConsumerImpl(typed_ref);
// 2. Activate it on root poa
poa.activate_object(servant);
// 3. Get consumer object reference
obj = poa->
   servant_to_reference(servant);
TypedPushConsumer Consumer
= TypedPushConsumer.narrow(obj);
// 4. Get default admin
TypedConsumerAdmin admin =
Channel.default_consumer_admin();
// 5. Create a proxy
CosNotifyChannelAdmin::ProxyID proxy_id;
TypedProxySupplier proxy = admin.
Obtain_notification_push_supplier(
        "IDL:example.borland.com/"
        "TMN/TypedEvent:1.0", proxy_id);
// 6. Connect proxy supplier
proxy.connect_typed_push_consumer(
        consumer);
// working loop
orb.run();
}
These two examples clearly illustrate how PSA dramatically simplifies and unifies the procedures of connecting to a notification channel for both structured and typed consumers. It also shows how PSA de-couples the event format selection and connecting logic. The application code between structured and typed channel are substantially different when low level COS Notification Service is directly used. With PSA, the two examples have almost no difference in subscribing logic.
Structured and Typed Push Supplier
In these two examples, a typed push supplier and a structured push supplier applications are written in notification connection method (left column) and PSA (right column). The noticeable difference is that the typed push supplier using PSA is almost identical to the PSA structured push supplier. In both cases, the PSA shields the application from the different makeup of each channel.
Structured Supplier to a Channel
This code example shows the connection/subscribe structured supplier to a channel in C++:
Using namespace CosNotifyChannelAdmin;
int main(int argc, char** argv)
{
// Get orb and poa environment
CORBA::ORB_ptr orb
= CORBA::ORB_init(argc, argv);
CORBA::Object_var obj = orb->
Resolve_initial_references(
"RootPOA");
PortableServer::POA_var poa =
PortableServer::POA::_narrow(obj);
// Get channel reference
EventChannel channel = ... ;
// Connect to channel
// 1. Get default admin
ConsumerAdmin_var admin =
Channel->default_supplier_admin();
// 2. Create a proxy
ProxyID proxy_id;
ProxyConsumer_var proxy = admin->
obtain_notification_push_consumer(
STRUCTURED_EVENT, proxy_id);
// 3. Get the StructuredProxyConsumer
StructuredProxyPushConsumer_var consumer = StructuredProxyPushConsumer::
     _narrow(proxy);
consumer->
connect_structured_push_supplier (NULL);
// Push typed events interface
for(;;) {
consumer->push_structured_event(...);
...
}
...
}
int main(int argc, char** argv)
{
// Get orb and psa environment
CORBA::ORB_ptr orb
= CORBA::ORB_init(argc, argv);
CORBA::Object_var obj = orb->
Resolve_initial_references(
"
RootPSA");
PSA_var psa = PSA::_narrow(obj);

// Get channel reference
CORBA::Object_var channel = ... ;
// Publish to channel
// 1. Publish
PortableServerExt::SubjectScheme scheme = { PortableServerExt::CHANNEL_ADDR,
PortableServerExt::STRUCTURED_SUBJECT,
(const char*)"",
PortableServerExt::PUSH_EVENT };
PortableServerExt::PublishDesc_var desc
= psa->publish(scheme, channel,

PortableServer::ObjectId(),
CORBA::NameValuePairSeq());
// 2. Get the StructuredProxyConsumer
CORBA::Object_var obj
= psa->the_subject_addr(desc);
StructuredProxyPushConsumer_var consumer
= StructuredProxyPushConsumer::
_narrow(proxy);

// Push typed events interface
for(;;) {
consumer->
push_structured_event(...);
...
}
...
}
Typed Supplier to a Channel
This code example shows the connection/subscribe typed supplier to a channel in C++:
using namespace CosTypedNotifyChannelAdmin;
int main(int argc, char** argv)
{
// Get orb and poa environment
CORBA::ORB_ptr orb
= CORBA::ORB_init(argc, argv);
CORBA::Object_var obj = orb->
Resolve_initial_references(
"RootPOA");
PortableServer::POA_var poa =
  PortableServer::POA::_narrow(obj);
// Get channel reference
TypedEventChannel channel = ... ;
// Connect to channel
// 1. Get default admin
TypedConsumerAdmin_var admin =
Channel->default_supplier_admin();
// 2. Create a proxy
CosNotifyChannelAdmin::ProxyID proxy_id;
TypedProxyConsumer_var proxy = admin->
obtain_notification_push_consumer(
        "IDL:example.borland.com/"
        "TMN/TypedEvent:1.0", proxy_id);
// 3. Get the <I> interface
CORBA::Object_var obj
= proxy->get_typed_consumer();
TMN::TypedEvent_var consumer
= TMN::TypedEvent::_narrow(obj);
proxy->connect_typed_push_supplier (NULL)
// Push typed events interface
for(;;) {
    consumer->attributeValueChange(...);
...
}
...
}
int main(int argc, char** argv)
{
// Get orb and psa environment
CORBA::ORB_ptr orb
= CORBA::ORB_init(argc, argv);
CORBA::Object_var obj = orb->
Resolve_initial_references(
"
RootPSA");
PSA_var psa = PSA::_narrow(obj);

// Get channel reference
CORBA::Object_var channel = ... ;
// Publish to channel
// 1. Publish
PortableServerExt::SubjectScheme scheme = { PortableServerExt::CHANNEL_ADDR,
  PortableServerExt::TYPED_SUBJECT,
  (const char*)"IDL:example.borland.com"
"TMN/TypedEvent:1.0",
PortableServerExt::PUSH_EVENT };
PortableServerExt::PublishDesc_var desc
= psa->publish(scheme, channel,
  PortableServer::ObjectId(),
  CORBA::NameValuePairSeq());
// 2. Get the <I> interface
CORBA::Object_var obj
= psa->the_subject_addr(desc);
TMN::TypedEvent_var consumer
= TMN::TypedEvent::_narrow(obj);

// Push typed events interface
for(;;) {
consumer->
attributeValueChange(...);
...
}
...
}
These examples illustrate that there is a noticeable difference between the code and procedure for making a connection for structured and typed supplier applications when using the Notification Service interface. More importantly, while using the PSA, the connection code and procedures are almost identical for both applications.
Note
All examples used in this chapter are condensed from shipped VisiNotify and PSA examples. These examples are located in the directories:
Subscribe a subject using PSA
The subscribe operation in PSA allows a consumer object to attach to a notification/event source for receiving (either push or pull) event messages. This is a very broad concept which can cover all possible publish/subscribe scenarios such as:
PSA supports these scenarios under one single programming model regardless of the low level transport mechanism and type of the channel/message format. In this release, PSA only supports subscribe to OMG notification/event channel (all four channel types). PSA subscribe operation is defined as:
SubscribeDesc subscribe(
in SubjectScheme the_subject_scheme,
in Subject the_subject,
in PortableServer::ObjectId the_observer_id,
in CORBA::NameValuePairSeq the_properties )
raises( InvalidSubjectScheme,
InvalidProperties,
ChannelException );
When PSA is used on top of COS Notification, this operation performs all low-level operations of getting consumer admin, obtaining proxy suppliers and making the connection. For subscribing to a typed subject, the PSA also creates and manages the handler proxy object internally to support the get_typed_consumer() operation and only requires the application to supply the observer servant implementation that supports the application-specified typed <I> interface.
SubjectScheme
The first parameter to the subscribe() is SubjectScheme and is defined as:
struct SubjectScheme {
SubjectAddressScheme address_scheme;
SubjectInterfaceScheme interface_scheme;
SubjectInterfaceId interface_id;
SubjectDeliveryScheme delivery_scheme;
};
The SubjectScheme specifies what is the subject reference's address scheme, interface scheme, interface repository id (for typed channel only), and delivery scheme.
The address_scheme field specifies the subject reference. For example, an address can be specified, which can be used directly for push event or an address to only do subscribe. Currently, there are three values on this field for subscribing; SUBSCRIBE_ADMIN_ADDR, CHANNEL_ADDR, and SUBJECT_ADDR, which indicates that the subject reference to the subscribe() operation is a OMG Notification Consumer Admin, a OMG Notification Channel (or typed channel) or an event direct pushing address, respectively.
The three values for the address_scheme field allow the application to subscribe in the following manner:
SUBSCRIBE_ADMIN_ADDR - The subject reference to subscribe() is an OMG Notification Consumer Admin reference, PSA simply calls obtain_<...>_supplier() on the admin to allocate a proxy on the admin and then calls connect_<...>_consumer() on the proxy. The consumer reference connected to the proxy is either null (for pull mode consumer) or a push consumer object reference created from this PSA with the_observer_id parameter. For typed channels, the get_typed_consumer() and get_typed_supplier() are automatically handled by PSA.
CHANNEL_ADDR - The subject reference to subscribe() is an OMG Notification Channel (or typed channel). PSA simply calls _get_default_consumer_admin() on the channel to get the default admin and then handles it as a connection through this consumer admin reference.
SUBJECT_ADDR - The subject reference to subscribe() is a direct event pushing address. For example, it could be a multicast IOR, or a typed <I> interface. For any other channel than typed, it is a proxy push consumer. PSA calls _get_MyAdmin()/_get_MyChannel()/_get_default_consumer_admin() and then handles it as a connection through consumer admin. For typed channels, this is already a push <I> interface. PSA looks into the reference for a consumer admin component (not currently supported) and handles it as a connection through consumer admin.
Additionally, applications need to specify SubjectInterfaceScheme and SubjectDeliveryScheme.
For SubjectInterfaceScheme the valid values are:
TYPED_SUBJECT - Subject uses either multicast or OMG Typed Notification Channel.
UNTYPED_SUBJECT - Subject uses OMG Untyped Notification Channel.
STRUCTURED_SUBJECT - Subject uses OMG Structured Notification Channel.
SEQUENCE_SUBJECT - Subject uses OMG Sequence Notification Channel.
For SubjectDeliveryScheme the valid values are:
PUSH_EVENT - Subject uses either multicast or OMG Push Notification mode (any of the four OMG event types).
PULL_EVENT - Subject uses OMG Pull Notification mode (any of the four OMG event types).
For connecting to a typed channel, the repository id of <I> interface must also be specified. This repository is used as the implicit event filter.
Subject Reference, Observer ID, and Properties to Subscribe()
The second and third parameters to subscribe() are the reference of the subject and the object id of a passive consumer object. The subject reference's interpretation is specified by the SubjectScheme as the first parameter to subscribe() and has been described above. The passive consumer object id specifies which consumer object, a received event, can be dispatched to.
There are two kind of consumer objects; passive and active. All push consumers are passive consumers and all pull consumers, except for typed consumer using pull_and_dispatch(), are active.
Passive consumers need to be subscribed with a valid object id and the consumer servant should be activated or able to be activated (such as, by servant manager) in the subscribing PSA (that is, the POA). Active consumers, on the other hand, do not need a valid object id to subscribe(). In fact, PSA ignores the actual object id parameter when subscribe() is called to subscribe an active consumer. Also, active consumers do not need to be activated or able to be activated.
Examples of Subscribe()
Example
This example shows how to connect to an untyped service through channel reference as a push consumer.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::UNTYPED_SUBJECT,
(const char*)"",
PortableServerExt::PUSH_EVENT };
PortableServerExt::SubscribeDesc_var desc psa->
subscribe(
scheme, channel, observer_oid, CORBA::NameValuePairSeq());
Java
// Java code to connect to an untyped service through
//channel reference as a push consumer
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.UNTYPED_SUBJECT,
"",
SubjectDeliveryScheme.PUSH_EVENT);
SubscribeDesc desc = psa.subscribe(scheme, channel, observer_oid, null);
Example
This example shows how to connect to an untyped service through channel reference as a pull consumer.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::UNTYPED_SUBJECT,
(const char*)"",
PortableServerExt::PULL_EVENT };
PortableServerExt::SubscribeDesc_var desc =
psa->subscribe(
scheme, channel, PortableServer::ObjectId(), CORBA::NameValuePairSeq());
Java
// Java code to connect to an untyped service through
// channel reference as a push consumer
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.UNTYPED_SUBJECT,
"",
SubjectDeliveryScheme.PULL_EVENT);
SubscribeDesc desc = psa.subscribe(scheme, channel,
null, null);
Example
This example shows how to connect to an structured service through channel reference as a push consumer.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::STRUCTURED_SUBJECT,
(const char*)"",
PortableServerExt::PUSH_EVENT };
PortableServerExt::SubscribeDesc_var desc psa->subscribe(
scheme, channel, observer_oid, CORBA::NameValuePairSeq());
Java
// Java code to connect to a structured service through
// channel reference as a push consumer
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.STRUCTURED_SUBJECT,
"",
SubjectDeliveryScheme.PUSH_EVENT);
SubscribeDesc desc = psa.subscribe(scheme, channel, observer_oid, null);
Example
This example shows how to connect to an structured service through channel reference as a pull consumer.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::STRUCTURED_SUBJECT,
(const char*)"",
PortableServerExt::PULL_EVENT };
PortableServerExt::SubscribeDesc_var desc = psa->
subscribe(
scheme, channel, PortableServer::ObjectId(), CORBA::NameValuePairSeq());
Java
// Java code to connect to a structured service through
// channel reference as a pull consumer
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.STRUCTURED_SUBJECT,
"",
SubjectDeliveryScheme.PULL_EVENT);
SubscribeDesc desc = psa.subscribe(scheme, channel, null, null);
Example
This example shows how to connect to a typed service through channel reference as a push consumer.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL:example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PUSH_EVENT };
PortableServerExt::SubscribeDesc_var desc psa->subscribe(
scheme, channel, observer_oid, CORBA::NameValuePairSeq());
Java
// Java code to connect to an typed service through
// channel reference as a push consumer
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.TYPED_SUBJECT,
"IDL:example.borland.com/TMN/TypedEvent:1.0",
SubjectDeliveryScheme.PUSH_EVENT);
SubscribeDesc desc = psa.subscribe(scheme, channel, observer_oid, null);
Example
This example shows how to connect to a typed service through channel reference as a pull consumer.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL::example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PULL_EVENT };
PortableServerExt::SubscribeDesc_var desc = psa->
subscribe(
scheme, channel, PortableServer::ObjectId(), CORBA::NameValuePairSeq());
Java
// Java code to connect to a typed service through channel
// reference as a pull consumer
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.TYPED_SUBJECT,
"IDL:example.borland.com/TMN/TypedEvent:1.0",
SubjectDeliveryScheme.PULL_EVENT);
SubscribeDesc desc = psa.subscribe(scheme, channel, null, null);
Subscribe Descriptor and the_subject_addr()
Object the_subject_addr(in PublishSubscribeDesc the_desc);
After a successful subscribe() operation, a subscribe descriptor is returned which encapsulates all information and mapping to make other operations on the subscription, such as unsubscribe(), suspend(), resume(). Also, this descriptor can be saved into a persistent repository and can be later loaded into the same consumer process session or a new restarted consumer process session. However, the format of this descriptor is internal to the given ORB which created it. Therefore, like the object key, a subscribe descriptor must be used by the same ORB that created it.
For a subscribed push consumer, the channel will actively push events to the consumer servants activated with the specified observer ids. After a successful subscribe() operation, applications with untyped/structured/sequence pull consumers can get their pull addresses (proxy pull suppliers) from PSA's the_subject_addr() along with the subscribe descriptor. The subscribe descriptor was returned from the PSA subscribe() method, as a parameter.
Example
This example shows how to get a proxy untyped/structured/sequence pull supplier from subscribe descriptor:
C++
CORBA::Object_var proxy_pull_supplier = psa->the_subject_addr(the_desc);
Java
org.omg.CORBA.Object proxy_pull_supplier = psa.the_subject_addr(the_desc);
After narrowing this reference to a specified proxy, the application can pull the event from the supplier with pull()/try_pull()/pull_structured_event() and try_pull_structured_event().
Typed pull consumer is discussed in “Support of Typed Pulling”.
Unsubscribe a Subject
PSA unsubscribe() disconnects the consumer from a connected channel and cleans up any local resource, if necessary (for multicast case, it removes the subject key to observer id mapping). If the consumer is connected to an untyped or a typed channel, the PSA invokes disconnect_push/pull_supplier() to the proxy.
If the consumer is connected to a structured or sequence channel, the PSA invokes disconnect_structured_push/pull_supplier() or disconnect_sequence_push/pull_supplier(), respectively.
This code example shows how to unsubscribe a subject:
void unsubscribe(in SubscribeDesc the_subscribe_desc)
Publish a Subject
Publish in the PSA model is defined as an operation, which attaches a supplier object or source to a notification/event channel that provides (either push or pull) event messages.
This is a very broad concept that covers all possible publish/subscribe scenarios such as:
PSA supports many publish/subscribe scenarios under one single programming model regardless of the low level transport mechanism and the type of the channel/message format. Currently, the PSA only supports connect to OMG notification/event channel (all four channel types). The PSA operation to subscribe a consumer (such as, observer) to a given subject is:
PublishDesc publish(
in SubjectScheme the_subject_scheme,
in Subject the_subject,
in PortableServer::ObjectId the_provider_id,
in CORBA::NameValuePairSeq the_properties )
raises( InvalidSubjectScheme,
InvalidProperties,
ChannelException );
When the publish operation is used on top of the COS Notification, it performs all the operations of getting supplier admin, obtaining proxy consumers, and connecting to them. Additionally, when the publish operation is used with a typed subject, PSA also calls get_typed_consumer() on the proxy consumers to get the <I> reference.
SubjectScheme
SubjectScheme is the first parameter to publish() and is defined as:
struct SubjectScheme {
SubjectAddressScheme address_scheme;
SubjectInterfaceScheme interface_scheme;
SubjectInterfaceId interface_id;
SubjectDeliveryScheme delivery_scheme;
};
The SubjectScheme parameters specify the subject reference's address scheme, interface scheme, interface repository id (for typed channel only) and delivery scheme.
The address_scheme field specifies the subject reference, such as whether it is an address that can directly push events or an address that can only subscribe. Currently, VisiBroker supports three valid values for this field, which indicates that the subject reference to the publish() operation is an OMG notification consumer admin, an OMG notification channel (or typed channel) or an event direct pushing address, respectively.
The three address schemes supported for subscribe are:
PUBLISH_ADMIN_ADDR - The subject reference to publish() is an OMG notification supplier admin reference. PSA simply calls obtain_<...>_consumer() on the admin reference to allocate a proxy on the admin and then calls connect_<...>_supplier() on the proxy. The supplier reference connected to the proxy is either null (for push supplier) or a pull supplier reference created from this PSA with provider_id parameter. For typed channels, get_typed_consumer() operation and get_typed_supplier() implementation are automatically handled by PSA.
CHANNEL_ADDR - The subject reference to publish() is an OMG notification channel (or typed channel). PSA simply calls _get_default_supplier_admin() on the channel to get the default supplier admin. It handles it as connect through this consumer admin reference.
SUBJECT_ADDR - The subject reference to subscribe() is a direct event pushing address. For example, it could be a multicast IOR or a typed <I> interface. This is a trivial case. PSA simply wraps a publisher descriptor and returns.
Your application will also need to specify SubjectInterfaceScheme and SubjectDeliveryScheme.
The valid SubjectInterfaceScheme values are:
TYPED_SUBJECT - Subject uses either multicast or OMG typed notification channel.
UNTYPED_SUBJECT - Subject uses OMG untyped notification channel.
STRUCTURED_SUBJECT - Subject uses OMG structured notification channel.
SEQUENCE_SUBJECT - Subject uses OMG sequence notification channel.
The valid SubjectDeliveryScheme values are:
PUSH_EVENT - Subject uses either multicast or OMG push notification mode (any of the four OMG event types).
PULL_EVENT - Subject uses OMG pull notification mode (any of the four OMG event types).
To connect to a typed channel, you must also specify the repository id of <I> interface. This repository is actually used for narrowing a typed push reference returned from get_typed_consumer() into <I> stub, which is also used as a filtering key for both push and pull events.
Subject Reference, Provider ID, and Properties to Publish()
The subject reference's interpretation is specified by the SubjectScheme as the first parameter to publish() operation. The second and third parameters to publish() are the reference of the subject and the object id of a passive supplier (such as a supplier) object. The passive supplier object id specifies which supplier object should be used by PSA to pull events for publishing.
There are two kind of supplier objects; passive and active. All push suppliers are active while all pull suppliers are passive.
Passive suppliers need to be published with a valid object id and the publish servant should be activated or able to be activated (for example, by the servant manager) in the publishing PSA (such as POA). Active publishes, on the other hand, do not need a valid object id to call publish(). In fact, PSA ignores the actual object id parameter when publish() is called to publish an active supplier. Also, active suppliers do not need to be activated or able to be activated. Active suppliers do not even need servant implementations.
Note
Active suppliers do not even need servant implementations.
Examples of publish()
Example
This example shows how to connect to an untyped service through channel reference as a push supplier using namespace PortableServerExt.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::UNTYPED_SUBJECT,
PortableServerExt::(const char*)"",
PortableServerExt::PUSH_EVENT };
PortableServerExt::PublishDesc_var desc psa->publish(
Scheme, channel, PortableServer::ObjectId(), CORBA::NameValuePairSeq());
Java
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.UNTYPED_SUBJECT,
"",
SubjectDeliveryScheme.PUSH_EVENT);
Byte[] desc = psa.publish(scheme, channel, null, null);
As specified by the scheme, the given subject reference is actually a COS Notification Service channel reference. PSA internally performs the following operations:
Example
This example shows how to connect to an untyped service through a channel reference as a pull supplier using namespace PortableServerExt.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::UNTYPED_SUBJECT,
(const char*)"",
PortableServerExt::PULL_EVENT };
PortableServerExt::PublishDesc_var desc = psa->publish(
scheme, channel, provider_id, CORBA::NameValuePairSeq());
Java
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.UNTYPED_SUBJECT,
"",
SubjectDeliveryScheme.PULL_EVENT);
PublishDesc desc = psa.publish(scheme, channel, provider_id, null);
As specified by the scheme, the given subject reference is actually a COS Notification Service channel reference. PSA performs following operations:
Example
This example shows how to connect to a structured service through a channel reference as a push supplier using namespace PortableServerExt.
C++
SubjectScheme scheme = {
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::STRUCTURED_SUBJECT,
(const char*)"",
PortableServerExt::PUSH_EVENT
};
PortableServerExt::PublishDesc_var desc = psa->publish(
scheme, channel, PortableServer::ObjectId(), CORBA::NameValuePairSeq());
Java
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.STRUCTURED_SUBJECT,
"",
SubjectDeliveryScheme.PUSH_EVENT);
PublishDesc desc = psa.publish(scheme, channel, null, null);
As specified by the scheme, the given subject reference is actually a COS Notification Service channel reference. PSA performs the following operations:
Example
This example shows how C++ code connects to a structured service through channel reference as a push supplier using namespace PortableServerExt.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::STRUCTURED_SUBJECT,
(const char*)"",
PortableServerExt::PULL_EVENT };
PortableServerExt::PublishDesc_var desc = psa->publish(
scheme, channel, provider_id, CORBA::NameValuePairSeq());
Java
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.STRUCTURED_SUBJECT,
"",
SubjectDeliveryScheme.PULL_EVENT);
Byte[] desc = psa.publish(scheme, channel, provider_id, null);
As specified by the scheme, the given subject reference is actually a COS Notification Service channel reference. PSA performs the following operations:
Example
This example shows how to connect to typed services through a channel reference as a push supplier using namespace PortableServerExt.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL:example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PUSH_EVENT };
PortableServerExt::PublishDesc_var desc psa->publish(
scheme, channel, PortableServer::ObjectId(), CORBA::NameValuePairSeq());
Java
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.TYPED_SUBJECT,
"IDL:example.borland.com/TMN/TypedEvent:1.0",
SubjectDeliveryScheme.PUSH_EVENT);
PublishDesc desc = psa.publish(scheme, channel, null, null);
As specified by the scheme, the given subject reference is actually a COS Notification Service channel reference. PSA performs the following operations:
Calls get_typed_consumer() on the proxy reference to get the <I> interface.
Example
This example shows how to connect to typed services through a channel reference as a pull supplier using namespace PortableServerExt.
C++
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
ortableServerExt::TYPED_SUBJECT,
(const char*)"IDL::example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PULL_EVENT };
PortableServerExt::PublishDesc_var desc = psa->publish(
scheme, channel, provider_id, CORBA::NameValuePairSeq());
Java
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.TYPED_SUBJECT,
"IDL:example.borland.com/TMN/TypedEvent:1.0"
SubjectDeliveryScheme.PULL_EVENT);
PublishDesc desc = psa.publish(scheme, channel, provider_id, null);
As specified by the scheme, the given subject reference is actually a COS Notification Service channel reference. PSA performs the following operations:
Publish Descriptor and the_subject_addr()
Object the_subject_addr(in PublishSubscribeDesc the_desc);
After a successful publish() operation, a publish descriptor is returned. It contains information/mapping to implement other publish() operations, such as unpublish(), suspend(), and resume(). This descriptor can be saved into a persistent repository and reloaded into the same supplier process session or a restart of a new supplier session. However, the format of this descriptor is internal to the ORB that creates it. Therefore, like the object key, a subscribe descriptor should only be used by the same ORB.
For a published passive supplier (such as pull), the channel will pull events from the supplier servant that is activated with the specified provider id.
After a successful publish() operation, applications with active (push) suppliers can get push addresses for the proxy push consumers or for typed channel with the <I> interface references from PSA's the_subject_addr() using the publish descriptor. The publish descriptor is returned from the PSA publish() method, as a parameter.
Example
This example shows how to get a proxy untyped/structured/sequence pull supplier from a subscribe descriptor:
C++
CORBA::Object_var proxy_pull_supplier =
psa->the_subject_addr(the_desc);
Java
org.omg.CORBA.Object proxy_pull_supplier =
psa.the_subject_addr(the_desc);
After narrowing this reference to a specified proxy or <I> interface stubs, applications can push events into the connected channels.
Note
Typed pull supplier is discussed in“Support of Typed Pulling”.
Unpublish a subject
void unpublish(in PublishDesc the_publish_desc)
raises( InvalidPublishDesc,
ChannelException );
The PSA unpublish() disconnects the supplier from a connected channel and cleans up any local resource. If the supplier is connected to an untyped and typed channel, the PSA invokes disconnect_push/pull_consumer() to the proxy. If it is connected to a structured or sequence channel, the PSA invokes disconnect_structured_push/pull_consumer() or disconnect_sequence_push/pull_consumer(), respectively.
Support of Typed Pulling
One major problem of the Notification Service is dealing with typed pulling. The programming model defined by OMG makes it difficult to use. It requires a pull consumer to use a mangled Pull<I> interface and a pull supplier to implement a Pull<I> servant. Changing from typed push <I> consumer/supplier to typed Pull<I> consumer/supplier requires a substantial code change and refinement to the application designs. In addition, the Pull<I> interfaces are operation specific. For example, pulling a typed event from a channel requires the pull consumer to be selective on the operation associated with the event. This does not parallel either typed push consumer nor structured pull consumer. In typed push consumer, the pushing consumers do not specify which operation should be associated with the next arrived events. In structured event case, a structured pull consumer is not selective on the type_name (a counterpart of operation in typed event) of next returned event.
The PSA resolves all issues (mentioned above). The PSA has the following unique advantages:
The PSA supports three kinds of typed pulling implementation:
Passive typed pull consumer
At the code level, the passive typed pull consumer is similar to a typed push consumer. Actually, changing a typed push consumer application into a passive typed pull consumer application requires nearly no code change. To create a passive typed consumer, a consumer object still needs to be available on the POA and requires it to be subscribed to the subject with associated object id. The only difference between passive typed pull consumer and typed push consumer is:
Therefore, to subscribe a passive typed pull consumer, a valid object id is needed in PSA subscribe() operation. After the subscribe(), application uses PSA's pull_and_dispatch() method to pull typed event from channel and dispatches into the passive consumer. Passive typed pull consumer is designed for applications that want to use passive consumer along with the control of incoming events from consumer applications.
C++ passive typed pull consumer example
This example is of the passive typed pull consumer.
// (examples/vbroker/notify/psa_cpp/typedPullConsumer2.C)
// Implement a passive observer
# include "TMNEvents_s.hh"
class TMNTypedEventObserver : public POA_TMN::TypedEvent
{
...
public: void attributeValueChange(...) { ... }
...
void qosAlarm(...) { ... }
};
int main(int argc, char** argv)
{
// construct the observer implementation
TMNTypedEventObserver* observer = new TMNTypedEventObserver;

// activate it on PSA
psa->activate_object (observer);
PortableServer::ObjectId_var oid = psa->servant_to_id (observer);

// activate the PSA
PortableServer::POAManager_var poa_manager =
psa->the_POAManager ();
poa_manager->activate ();
// subscribe to the channel as typed pull consumer
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL::example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PULL_EVENT };
PortableServerExt::SubscribeDesc_var desc =
psa->subscribe scheme, channel, oid.in(),
CORBA::NameValuePairSeq());
// pull and visit max 100 events using block mode.
psa->pull_and_dispatch(desc, 100, (CORBA::Boolean)1, (CORBA::Boolean)0);
...
}
Compared to a pushed consumer application, the only difference is how the application gets the typed event. Passive typed consumers require explicit pull (using pull_and_dispatch()) by the application using PSA, instead of blocking on the ORB run() and waiting for the channel to send events asynchronously.
The logic and procedure of a passive typed pull consumer can be summarized as:
Call pull_and_dispatch() on the subscribe PSA with the subscribe descriptor as parameters.
Active typed pull consumer
For active typed pull consumers, the consumer servant is not registered to a POA, nor does a POA need to be activated. The replied typed events are directly visited (think about visitor pattern) by a visitor instance derived from POA_<I> servant skeleton. The visitor instance is directly specified on each call of pull_and_visit() and does not need to associate with or be registered on any POA. An active typed pull consumer is more like conventional typed pulling except it implements POA_<I> to backward visit the event instead of the Pull<I> stub.
C++ active typed pull consumer example
This example is of the active typed pull consumer.
// (examples/vbe/notify/psa_cpp/typedPullConsumer1.C)
// Implement an active visitor
# include "TMNEvents_s.hh"
class TMNTypedEventVisitor : public POA_TMN::TypedEvent
{
...
public: void attributeValueChange(...) { ... }
...
void qosAlarm(...) { ... }
};
int main(int argc, char** argv)
{
...
// subscribe to the channel as typed pull consumer
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL::example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PULL_EVENT };
PortableServerExt::SubscribeDesc_var desc = psa->subscribe(
scheme, channel, PortableServer::ObjectId(), CORBA::NameValuePairSeq());
// create a visitor instance
TMNTypedEventVisitor visitor;
// pull and visit max 100 events using block mode.
psa->pull_and_visit(desc, 100, (CORBA::Boolean)1, &visitor);
...
}
Java active typed pull consumer example
// (examples/vbe/notify/psa_java/TypedPullConsumer1.java)
import com.inprise.vbroker.PortableServerExt.*;
// Implement an active visitor
class TMNTypedEventVisitor extends TMN.TypedEventPOA {
{
public void attributeValueChange(...) { ... }
...
public void qosAlarm(...) { ... }
};
public class TypedPullConsumer1 {
public static void main(String[] args) {
...
// subscribe to the channel as typed pull consumer
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.TYPED_SUBJECT,
"IDL::example.borland.com/TMN/TypedEvent:1.0",
SubjectDeliveryScheme.PULL_EVENT };
SubscribeDesc desc = psa.subscribe(scheme, channel, null, null);
// create a visitor instance
TMNTypedEventVisitor visitor = new TMNTypedEventVisitor();
// pull and visit max 100 events using block mode.
psa.pull_and_visit(desc, 100, true, visitor);
}
}
The logic and procedure of an active typed pull consumer can be summarized as:
Call pull_and_visit() on the subscribe PSA with the subscribe descriptor and a visitor instance as parameters.
Typed pull supplier
The PSA supports typed pull supplier using the “piggybacked reflective callback” technology. The reflective callback allows pull supplier to be pulled and still issue events in the same <I> interface originally defined for push mode.
Simple reflective callback without piggyback works as follows:
Here are some characteristics when a channel pulls the supplier:
The channel callbacks pull supplier's TypedCallback::PullEvent handler's pull_typed_event() operation with the event receiver reference.
The advantage of simple reflective callback is that it does not require special support on pull supplier side ORB. The disadvantage is that each pull operation requires two remote round trips. The first round trip requires the callback from channel to supplier. The second round trip requires the callback from supplier to the event receiver.
The PSA and VisiNotify support and implement piggybacked reflective callback. Piggybacked reflective callback is a variation of a simple reflective callback with the following mechanism:
The channel callback to the agent's pull_typed_event() method has no input parameter.
The agent makes a local call on the handler's pull_typed_event() with this local event receiver reference as input parameter.
The piggybacked reflective callback is transparent to applications. For example, the application code is independent from simple or piggybacked reflective callback. Piggybacked reflective callback needs only one round trip for each pull. However, piggybacked reflective callback requires pull supplier side ORB to support. VisiBroker PSA supports piggybacked reflective callback, and VisiNotify only uses piggybacked reflective callback for reasons of efficiency.
C++ typed pull supplier example
This example is of the typed pull supplier.
// (examples/vbroker/notify/psa_cpp/typedPullSupplier.C)
// implement the TypedCallback::PullEvent handler,
// with piggybacked double callback, this handler is
// called back by local PSA instead of by remote proxy
// pull consumer. Therefore, the event receiver is also
// a local object.
# include <TypedCallback_s.hh>
# include "TMNEvents_c.hh"
class PullEventImpl : public POA_TypedCallback::PullEvent,
public virtual PortableServer::RefCountServantBase
{
public:
// on typed pulling
void pull_typed_event(
CORBA::Object_ptr event_receiver,
CORBA::Boolean block)
{
// narrow to typed stub
TMN::TypedEvent_ptr stub
= TMN::TypedEvent::_narrow(event_receiver);
// reflect the callback to issue an
// attributeValueChange event
stub->attributeValueChange(...);
}
};
...
// create a supplier handler servant to activate it on// the PSA
PullEventImpl* supplier = new PullEventImpl;
psa->activate_object (handler);
PortableServer::ObjectId_var oid = psa->servant_to_id(supplier);
// publish to the channel as typed pull supplier with the
// handler_id but the real <I> interface repository id.
PortableServerExt::SubjectScheme scheme = {
PortableServerExt::CHANNEL_ADDR,
PortableServerExt::TYPED_SUBJECT,
(const char*)"IDL::example.borland.com/TMN/TypedEvent:1.0",
PortableServerExt::PULL_EVENT };
PortableServerExt::SubscribeDesc_var desc = psa->publish(
scheme, channel, oid.in(), CORBA::NameValuePairSeq());
// activate the PSA and wait for pulling.
psa->the_POAManager()->activate();
orb->run();
 
Java typed pull supplier example
// (examples/vbe/notify/psa_java/TypedPullSupplier.java)
import com.inprise.vbroker.PortableServerExt.*;
// Implement the TypedCallback::PullEvent handler,
// with piggybacked double callback, this handler is
// called back by local PSA instead of by remote proxy
// pull consumer. Therefore, the event receiver is also a
//local object.
class TypedPullSupplierImpl
extends com.borland.vbroker.TypedCallback.PullEventPOA {
...
public void pull_typed_event(
org.omg.CORBA.Object event_receiver,
Boolean block) {
// narrow to typed stub
TMN.TypedEvent stub = TMN.TypedEventHelper.narrow(event_receiver);
// reflect the callback to issue an attributeValueChange event
stub.attributeValueChange(...);
}
}
public class typedPullSupplier {
...
public static void main(String[] args) {
...
// create a supplier handler servant to activate it on the PSA
TypedPullSupplierImpl supplier = new TypedPullSupplierImpl ();

psa.activate_object (supplier);

byte [] oid = psa.servant_to_id (supplier);
// publish to the channel as typed pull supplier with the oid
// but the real <I> interface repository id.
SubjectScheme scheme = new SubjectScheme(
SubjectAddressScheme.CHANNEL_ADDR,
SubjectInterfaceScheme.TYPED_SUBJECT,
"IDL::example.borland.com/TMN/TypedEvent:1.0",
SubjectDeliveryScheme.PULL_EVENT);
SubscribeDesc desc = psa.publish(scheme, channel, oid, null);
// activate the PSA and wait for pulling.
psa.the_POAManager().activate();
orb.run();
}
}
The logic and procedure of a typed pull supplier can be summarized as:
Write a TypedCallback::PullEvent supplier servant implementation from POA skeleton. The pull_typed_event() operation of this servant uses reflective callback to generate typed event using the original IDL interface stub.
Publish this callback to the channel with SubjectDeliveryScheme to be PULL_EVENT and the object id as publish() parameters.
Additional topics and summary
This section contains miscellaneous information pertaining to the PSA.
ChannelException
Most PSA operations, with the exception of the the_subject_addr() and the_proxy_addr(), can raise PortableServerExt::ChannelException. This exception has a string member that is the repository of low level CORBA User exception. For example, when calling suspend() twice while using a given push consumer subscribe describer as parameter, you will get a ChannelException with its repository_id member as being IDL:omg.org/CosNotifyChannelAdmin/ConnectionAlreadyInactive.
The intention of not declaring a PSA operation to raise Notification Service exception is to have the PSA framework generic. Although the current implementation of the PSA works in conjunction with OMG Notification Service or Typed Notification Service, it is straightforward and extends the support to cover other publish/subscribe infrastructure such as multicast.
Setting Notification Service QoS in PSA
One approach for setting a QoS policy is to directly get the proxy reference. After using subscribe/publish operation using the _proxy_addr() method, change the policy by using set_qos() on the proxy reference.
Another possible approach for setting a QoS policy on a connected proxy within a PSA application is using property parameters of subscribe() and publish(). This approach is not implemented in VisiBroker.
PSA Summary
This list summarizes PSA's concepts and features:
The PSA automatically handles get_typed_consumer()/get_typed_supplier() and the <I> interface to proxy mapping. This largely simplifies application code of using typed event/notification service. Typed notification applications only need to implement and install the <I> interfaces observers.