/*
 * Simple sequence push supplier.
 */

#include "SequencePushSupplierImpl.h"
#include "Common/ObjectAdapter.h"
#include <orbsvcs/Shutdown_Utilities.h>

bool finish_sending = false;

class Service_Shutdown_Functor : public Shutdown_Functor
{
public:
  Service_Shutdown_Functor (CORBA::ORB_ptr orb)
    : orb_ (CORBA::ORB::_duplicate (orb))
  {
  }

  void operator() (int which_signal)
  {
    finish_sending = true;
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("shutting down on signal %d\n"),
                which_signal));
    (void) this->orb_->shutdown ();
  }

private:
  CORBA::ORB_var orb_;
};

SequencePushSupplierImpl::SequencePushSupplierImpl (CORBA::ORB_ptr orb, ChannelUtil &util)
  : orbVar_ (CORBA::ORB::_duplicate (orb))
  , util_ (util)
  , supplierVar_ (0)
  , proxyVar_ (0)
{
  // Obtain a reference to the root POA and create the CORBA object.
  PortableServer::POA_var poaVar = util_.getRootPoa ();

  CORBA::Object_var objVar = _this ();
  supplierVar_ = CosNotifyComm::SequencePushSupplier::_narrow (objVar.in ());

  // Connect to the channel.
  connectToChannel ();
}

void
SequencePushSupplierImpl::disconnect_sequence_push_supplier (void)
{
  ACE_DEBUG ((LM_INFO, ACE_TEXT ("Disconnect callback invoked\n")));
}

void
SequencePushSupplierImpl::subscription_change (const CosNotification::EventTypeSeq &/*added*/,
                                               const CosNotification::EventTypeSeq &/*removed*/)
{
  ACE_DEBUG ((LM_INFO, ACE_TEXT ("Subscription change callback invoked\n")));
}

void
SequencePushSupplierImpl::pushEvents (void)
{
  const CORBA::ULong batchSize = 5;
  CosNotification::EventBatch batch;
  batch.length (batchSize);

  int sent = 0;
  char message[256];
  ACE_Time_Value atv;
  long delay = 2000L;

  atv.msec (delay);

  while (1)
    {
      if (finish_sending)
        break;

      for (CORBA::ULong i = 0; i < batchSize; ++i)
        {
          CosNotification::StructuredEvent_var tmp =
            util_.createStructuredEvent (sent++);
          batch[i] = tmp;
          util_.printEvent (message, batch[i], 0);
          ACE_DEBUG ((LM_INFO,
                      ACE_TEXT ("Creating event %i of %i: %s\n"),
                      i+1, batchSize, message));
        }

      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Sending %i events\n"), batchSize));
      proxyVar_->push_structured_events (batch);
      ACE_OS::sleep (atv);
    }
}

void
SequencePushSupplierImpl::connectToChannel (void)
{
  // Get a reference to the event channel.
  CosNotifyChannelAdmin::EventChannel_var channelVar = util_.getChannel ();

  // Obtain a reference to the default supplier admin.
  CosNotifyChannelAdmin::SupplierAdmin_var adminVar;
  try
    {
      adminVar = channelVar->default_supplier_admin ();
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Obtained default supplier admin\n")));
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Failed to obtain default supplier admin\n"));
      throw;
    }

  // Obtain a proxy consumer.
  CosNotifyChannelAdmin::ProxyConsumer_var pcVar;
  try
    {
      CosNotifyChannelAdmin::ProxyID pid;
      CosNotifyChannelAdmin::ClientType type = CosNotifyChannelAdmin::SEQUENCE_EVENT;

      pcVar = adminVar->obtain_notification_push_consumer (type, pid);
      if (CORBA::is_nil (pcVar.in ()))
        {
          ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Obtained a null proxy consumer\n")));
          throw TestException ();
        }
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Failed to obtain a proxy consumer\n"));
      throw;
    }

  // Narrow to a sequence proxy push consumer.
  proxyVar_ = CosNotifyChannelAdmin::SequenceProxyPushConsumer::_narrow (pcVar.in ());
  if (CORBA::is_nil (proxyVar_.in ()))
    {
      ACE_ERROR ((LM_ERROR, ACE_TEXT ("ERROR: Narrowed to a null sequence proxy push consumer\n")));
      throw TestException ();
    }

  // Connect.
  try
    {
      proxyVar_->connect_sequence_push_supplier (supplierVar_.in ());
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Connected supplier to proxy\n")));
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Failed to connect supplier to proxy\n"));
      throw;
    }
}

int
main (int argc, char **argv)
{
  try
    {
      // Initialise the ORB reference.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Creating ORB reference\n")));
      CORBA::ORB_var orbVar = CORBA::ORB_init (argc, argv);
      if (CORBA::is_nil (orbVar.in ()))
        {
          ACE_ERROR_RETURN ((LM_ERROR,
                             ACE_TEXT ("ERROR: Could not create the ORB\n")),
                            -1);
        }

      // Cleaning guard.
      Service_Shutdown_Functor killer (orbVar.in ());
      Service_Shutdown kill_contractor (killer);

      // Obtain a reference to the channel utility.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Creating channel utility\n")));
      ChannelUtil util (orbVar.in ());

      // Create the supplier object.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Creating supplier\n")));
      SequencePushSupplierImpl *pSupplierImpl = 0;
      ACE_NEW_THROW_EX (pSupplierImpl,
                        SequencePushSupplierImpl (orbVar.in (), util),
                        CORBA::NO_MEMORY ());
      PortableServer::ServantBase_var holder (pSupplierImpl);

      // Send events.
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Sending events\n")));
      pSupplierImpl->pushEvents ();

      // Clean up and exit.
      orbVar->destroy ();
    }
  catch (const CORBA::TRANSIENT &)
    {
      ACE_DEBUG ((LM_INFO, ACE_TEXT ("Transient on shutdown\n")));
    }
  catch (const CORBA::Exception &ex)
    {
      ex._tao_print_exception (ACE_TEXT ("ERROR: Exception in AnyPushSupplierImpl.cpp:"));
      return 1;
    }

  return 0;
}
