ark::pipeline::SubscriberBase

Defined in header “ark/pipeline/subscriber_base.hh”.


This is the base class of the subscriber, which is used template-free to make it easier to store within pipelines/stages.

Typedefs

  • using ReadyCallbackFunction = std::function< void()>
    The definition of the ready callback function pointer.

Methods

  • SubscriberBase(std::string channel_name)
    Constructor. Initializes with the given channel name.

  • ~SubscriberBase()
    Virtual destructor, to make things polymorphic.

  • const std::string & channel_name()
    Returns the channel name of this subscriber.

  • bool consumes_type(const std::type_info & type)
    Returns true if this subscriber can execute messages of the given type or not.

  • bool consumes_message_type(const std::string & type)
    Returns true if this subscriber can execute messages of the given message type or not. Assumes the type is serializable.

  • const std::type_info & message_type()
    Returns type information about the messages that flow through this subscriber.

  • bool message_is_serializable()
    Returns true if the message type for this subscriber is serializable.

  • std::string message_schema_registry()
    Returns the schema registry for the object that flows over this subscriber, if available.

  • std::string message_schema_protocol()
    Returns the message schema type name of the object that flows over this subscriber.

  • std::string message_type_name()
    Returns the message type name of the object that flows over this subscriber.

  • size_t queue_size()
    Returns the size of the queue for this subscriber. This value might change frequently; this is just a snapshot.

  • void push(DataEnvelopePtr message)
    Enqueues the given data envelope onto the subscriber. The object will be deserialized into a concrete type, and then queued for execution as if you had called push() on the concrete type.

  • void push(AbstractEnvelopeCPtr message)
    Enqueues the given envelope onto the subscriber. If the queue exceeds it’s limits, the oldest item on the queue will be dropped.

    If you enqueue an envelope that does not match the subscribers type, this will throw an error.

  • bool execute()
    Pops the oldest item off the queue and executes the callback against it. Returns false if the queue is empty.

  • bool pop(SubscriberWorkItem & item, std::chrono::nanoseconds * expected_runtime)
    Pops a work item off the subscriber. The execute() function is actually broken into two halves, a phase for popping off the queue, and a phase for running the callback. This is the first half, allowing for you to deterministically pop an envelope off a subscriber.

    @oaram[in] item Work item to populate @oaram[out] expected_runtime Pointer to store expected runtime

  • void execute_work_item(SubscriberWorkItem && item)
    Executes the given work item, as received from a call to pop(). Consumes the work item, taking ownership of it.

  • SubscriberStatistics statistics()
    Returns the statistics related to this subscriber. These are running statistics, initializing when the subscriber starts, and counting up from there.

  • SubscriberTimings timings()
    Returns timing information related to this subscriber. Resets the timing information after this call (so the maximums are reset). This API is meant to be called periodically to determine the max timing information over some period of time.

  • size_t configured_max_queue_depth()
    Returns the max queue depth configured for this subscriber.

  • std::chrono::nanoseconds configured_expected_runtime()
    Returns the configured execution time for this subscriber.

  • void record_publisher_connection()
    Marks that this subscriber has been connected to a new publisher.

  • void shutdown()
    Invoked to shut this subscriber down, clearing state to release all captured resources.

  • void set_ready_callback(ReadyCallbackFunction function)
    Installs a callback that will be notified when the subscriber is ready to execute.