/*
 * Simple sequence push consumer.
 */

#include "SequencePushConsumerImpl.h"
#include "Common/ObjectAdapter.h"
#include <orbsvcs/Shutdown_Utilities.h>

class Service_Shutdown_Functor : public Shutdown_Functor
{
public:
  Service_Shutdown_Functor (CORBA::ORB_ptr orb)
    : orb_ (CORBA::ORB::_duplicate (orb))
  {
  }

  void operator() (int which_signal)
  {
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("shutting down on signal %d\n"),
                which_signal));
    (void) this->orb_->shutdown ();
  }

private:
  CORBA::ORB_var orb_;
};

SequencePushConsumerImpl::SequencePushConsumerImpl (CORBA::ORB_ptr orb, ChannelUtil &util)
  : orbVar_ (CORBA::ORB::_duplicate (orb))
  , util_ (util)
  , consumerVar_ (0)
{
  // Obtain a reference to the root POA and create the CORBA object.
  PortableServer::POA_var poaVar = util_.getRootPoa ();

  CORBA::Object_var objVar = _this ();
  consumerVar_ = CosNotifyComm::SequencePushConsumer::_narrow (objVar.in ());

  // Connect to the channel.
  connectToChannel ();
}

void
SequencePushConsumerImpl::disconnect_sequence_push_consumer (void)
{
  ACE_DEBUG ((LM_INFO, ACE_TEXT ("Disconnect callback invoked\n")));
}

void
SequencePushConsumerImpl::offer_change (const CosNotification::EventTypeSeq &/*added*/,
                                        const CosNotification::EventTypeSeq &/*removed*/)
{
  ACE_DEBUG ((LM_INFO, ACE_TEXT ("Offer change callback invoked\n")));
}

void
SequencePushConsumerImpl::push_structured_events (const CosNotification::EventBatch &events)
{
  char message[1024];
  CORBA::ULong msgCount = events.length ();

  ACE_DEBUG ((LM_INFO, ACE_TEXT ("Received a batch of %i message(s)\n"), msgCount));

  for (CORBA::ULong i = 0; i < msgCount; i++)
    {
      util_.printEvent (message, events[i], 0);
      ACE_DEBUG ((LM_INFO,
                  ACE_TEXT ("Message %i of %i: %s\n"),
                  i+1, msgCount, message));
    }
}

void
SequencePushConsumerImpl::connectToChannel (void)
{
  // Get a reference to the event channel.
  CosNotifyChannelAdmin::EventChannel_var channelVar = util_.getChannel ();

  // Obtain a reference to the default consumer admin.
  CosNotifyChannelAdmin::ConsumerAdmin_var adminVar;
  try
    {
      adminVar = channelVar->default_consumer_admin ();
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Obtained default consumer admin\n")));
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Failed to obtain default consumer admin\n"));
      throw;
    }

  // Obtain a proxy supplier.
  CosNotifyChannelAdmin::ProxySupplier_var psVar;
  try
    {
      CosNotifyChannelAdmin::ProxyID pid;
      CosNotifyChannelAdmin::ClientType type = CosNotifyChannelAdmin::SEQUENCE_EVENT;

      psVar = adminVar->obtain_notification_push_supplier (type, pid);
      if (CORBA::is_nil (psVar.in ()))
        {
          ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Obtained a null proxy supplier\n")));
          throw TestException ();
        }
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Failed to obtain a proxy supplier\n"));
      throw;
    }

  // Narrow to a sequence proxy push supplier.
  CosNotifyChannelAdmin::SequenceProxyPushSupplier_var sppsVar =
    CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow (psVar.in ());
  if (CORBA::is_nil (sppsVar.in ()))
    {
      ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Narrowed to a null sequence proxy push supplier\n")));
      throw TestException ();
    }

  // Set up custom MaxBatchSize and PacingInterval values.  Comment out this code to inherit the
  // values from the consumer admin.
  CosNotification::QoSProperties qos = CosNotification::QoSProperties (2);
  qos.length (2);
  TimeBase::TimeT pacing = 10L * 10000000L;
  qos[0].name = CORBA::string_dup (ACE_TEXT ("PacingInterval"));
  qos[0].value <<= pacing;
  qos[1].name = CORBA::string_dup (ACE_TEXT ("MaximumBatchSize"));
  qos[1].value <<= 10;
  try
    {
      sppsVar->set_qos (qos);
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Failed to set QoS properties on the sequence proxy push supplier.\n"));
      throw;
    }

  // Connect.
  try
    {
      sppsVar->connect_sequence_push_consumer (consumerVar_.in ());
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Connected consumer to proxy\n")));
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Failed to connect consumer to proxy\n"));
      throw;
    }
}

int
main (int argc, char **argv)
{
  try
    {
      // Initialise the ORB reference.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Creating ORB reference\n")));
      CORBA::ORB_var orbVar = CORBA::ORB_init (argc, argv);
      if (CORBA::is_nil (orbVar.in ()))
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             ACE_TEXT ("ERROR: Could not create the ORB\n")),
                            -1);
        }

      // Cleaning guard.
      Service_Shutdown_Functor killer (orbVar.in ());
      Service_Shutdown kill_contractor (killer);

      // Obtain a reference to the channel utility.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Creating channel utility\n")));
      ChannelUtil util (orbVar.in ());

      // Create the consumer object.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Creating consumer\n")));
      SequencePushConsumerImpl *pConsumerImpl = 0;
      ACE_NEW_THROW_EX (pConsumerImpl,
                        SequencePushConsumerImpl (orbVar.in (), util),
                        CORBA::NO_MEMORY ());
      PortableServer::ServantBase_var holder (pConsumerImpl);

      // Enter the orb run loop.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Entering ORB run loop\n")));
      orbVar->run ();

      // Clean up and exit.
      orbVar->destroy ();
    }
  catch (const CORBA::TRANSIENT &)
    {
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Transient on shutdown\n")));
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Exception in AnyPushSupplierImpl.cpp:"));
      return 1;
    }

  return 0;
}
