/*
 * Simple any push supplier.
 */

#include "AnyPushSupplierImpl.h"
#include "Common/ObjectAdapter.h"
#include <orbsvcs/Shutdown_Utilities.h>

bool finish_sending = false;

class Service_Shutdown_Functor : public Shutdown_Functor
{
public:
  Service_Shutdown_Functor (CORBA::ORB_ptr orb)
    : orb_ (CORBA::ORB::_duplicate (orb))
  {
  }

  void operator() (int which_signal)
  {
    finish_sending = true;
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("shutting down on signal %d\n"),
                which_signal));
    (void) this->orb_->shutdown ();
  }

private:
  CORBA::ORB_var orb_;
};

AnyPushSupplierImpl::AnyPushSupplierImpl (CORBA::ORB_ptr orb, ChannelUtil &util)
  : orbVar_ (CORBA::ORB::_duplicate (orb))
  , util_ (util)
  , supplierVar_ (0)
  , proxyVar_ (0)
{
  // Obtain a reference to the root POA and create the CORBA object.
  PortableServer::POA_var poaVar = util_.getRootPoa ();

  CORBA::Object_var objVar = _this ();
  supplierVar_ = CosNotifyComm::PushSupplier::_narrow (objVar.in ());

  // Connect to the channel.
  connectToChannel ();
}

void
AnyPushSupplierImpl::disconnect_push_supplier (void)
{
  ACE_DEBUG ((LM_INFO, ACE_TEXT ("Disconnect callback invoked\n")));
}

void
AnyPushSupplierImpl::subscription_change (const CosNotification::EventTypeSeq &/*added*/,
                                          const CosNotification::EventTypeSeq &/*removed*/)
{
  ACE_DEBUG ((LM_INFO, ACE_TEXT ("Subscription change callback invoked\n")));
}

void
AnyPushSupplierImpl::pushEvents (void)
{
  int sent = 0;
  char message[256];
  ACE_Time_Value atv;
  long delay = 2000L;

  atv.msec (delay);

  while (1)
    {
      if (finish_sending)
        break;

      CORBA::Any_var eventVar = util_.createAnyEvent (sent++);
      util_.printEvent (message, eventVar.in (), 0);
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Sending event: %s\n"), message));
      proxyVar_->push (eventVar.in ());
      ACE_OS::sleep (atv);
    }
}

void
AnyPushSupplierImpl::connectToChannel (void)
{
  // Get a reference to the event channel.
  CosNotifyChannelAdmin::EventChannel_var channelVar = util_.getChannel ();

  // Obtain a reference to the default supplier admin.
  CosNotifyChannelAdmin::SupplierAdmin_var adminVar;
  try
    {
      adminVar = channelVar->default_supplier_admin ();
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Obtained default supplier admin\n")));
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Failed to obtain default supplier admin\n"));
      throw;
    }

  // Obtain a proxy consumer.
  CosNotifyChannelAdmin::ProxyConsumer_var pcVar;
  try
    {
      CosNotifyChannelAdmin::ProxyID pid;
      CosNotifyChannelAdmin::ClientType type = CosNotifyChannelAdmin::ANY_EVENT;

      pcVar = adminVar->obtain_notification_push_consumer (type, pid);
      if (CORBA::is_nil (pcVar.in ()))
        {
          ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Obtained a null proxy consumer\n")));
          throw TestException ();
        }
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Failed to obtain a proxy consumer\n"));
      throw;
    }

  // Narrow to a structured proxy push consumer.
  proxyVar_ =
    CosNotifyChannelAdmin::ProxyPushConsumer::_narrow (pcVar.in ());
  if (CORBA::is_nil (proxyVar_.in ()))
    {
      ACE_ERROR ((LM_ERROR,
                  ACE_TEXT ("ERROR: Narrowed to a null proxy push consumer\n")));
      throw TestException ();
    }

  // Connect.
  try
    {
      proxyVar_->connect_any_push_supplier (supplierVar_.in ());
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Connected supplier to proxy\n")));
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Failed to connect supplier to proxy\n"));
      throw;
    }
}

int
main (int argc, char **argv)
{
  try
    {
      // Initialise the ORB reference.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Creating ORB reference\n")));
      CORBA::ORB_var orbVar = CORBA::ORB_init (argc, argv);
      if (CORBA::is_nil (orbVar.in ()))
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             ACE_TEXT ("ERROR: Could not create the ORB\n")),
                            -1);
        }

      // Cleaning guard.
      Service_Shutdown_Functor killer (orbVar.in ());
      Service_Shutdown kill_contractor (killer);

      // Obtain a reference to the channel utility.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Creating channel utility\n")));
      ChannelUtil util (orbVar.in ());

      // Create the supplier object.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Creating supplier\n")));
      AnyPushSupplierImpl *pSupplierImpl = 0;
      ACE_NEW_THROW_EX (pSupplierImpl,
                        AnyPushSupplierImpl (orbVar.in (), util),
                        CORBA::NO_MEMORY ());
      PortableServer::ServantBase_var holder (pSupplierImpl);

      // Send events.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Sending events\n")));
      pSupplierImpl->pushEvents ();

      // Clean up and exit.
      orbVar->destroy ();
    }
  catch (const CORBA::TRANSIENT &)
    {
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Transient\n")));
    }
  catch (const CORBA::COMM_FAILURE &)
    {
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("COMM_FAILURE\n")));
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Exception in AnyPushSupplierImpl.cpp:"));
      return 1;
    }

  return 0;
}
