ark::pipeline::StageInterface

Defined in header “ark/pipeline/stage_interface.hh”.


This structure defines the interface points for a particular stage. It is spawned when the stage is being initialized by a host pipeline.

This was mostly intended to separate I/O of a stage into a separate layer, but it could easily be merged back into Stage.

Methods

  • StageInterface(StageInterfaceConfig config)
    Constructor. Initializes a stage interface for a particular stage.

  • ~StageInterface()
    Destructor. Declared privately to help cleanup details.

  • AbstractTimerPtr add_timer(const PeriodicTimerFunction & function, std::chrono::nanoseconds rate)
    Registers a new timer, aimed at executing the callback at the specified rate.

    This corresponds to the PeriodicTimer.

  • AbstractTimerPtr add_timer(PeriodicTimerConfiguration config)
    Registers a new timer, aimed at executing the callback at the specified rate.

    This corresponds to the PeriodicTimer.

  • AbstractTimerPtr add_timer(const std::function< void()> & function, std::chrono::nanoseconds rate)
    Registers a new timer, aimed at executing the callback at the specified rate.

    This corresponds to the PeriodicTimer.

  • AbstractTimerPtr add_timer(const EventTimerFunction & function)
    Registers a new event-based timer. The callback will be invoked as soon as the pipeline begins execution from there, you must return the time that you want to sleep before executing again.

    This corresponds to the EventTimer.

  • AbstractTimerPtr queue_singleshot_work(const SingleShotWorkFunction & function)
    Quees a single function of work to be executed as soon as the pipeline starts.

    This corresponds an EventTimer.

  • ShutdownTokenPtr add_shutdown_token(ShutdownTokenType type)
    Registers a new shutdown token. A shutdown token allows you to request that a pipeline exit. See the documentation on the type if you specify a type of ‘RequiredForShutdown’, you must be prepared to indicate when you shutdown.

  • std::shared_ptr< Publisher< Type > > add_publisher(const std::string & channel_name)
    Registers a new publisher of the given type. Returns a reference to it. These publishers will be connected when the host pipeline is done initializing.

  • void add_subscriber(SubscriberConfiguration< Type > config)
    Registers a new subscriber of the given type. Returns a reference to it. These subscribers will be connected when the pipeline is done initializing.

  • void add_subscriber(SubscriberConfiguration< AbstractEnvelope > config, std::string message_type_name, std::string message_schema, std::string message_schema_protocol)
    Registers a new subscriber with the system. This is an abstract envelope, but allows you to indicate that the messages are serializable (with the given type name and schema information). This communicates to downstream listeners that this subscriber is only interested in a specific type (and that it is serializable).

  • std::shared_ptr< DataEnvelopePublisher > add_publisher(const std::string & channel_name, std::string type_name, std::string schema_registry, std::string schema_protocol)
    Registers a new data envelope publisher with the system. These publishers allow you to transmit messages that are simply wrapped blobs, rather than concrete types.

  • void restrict_topology_changes()
    Prevents addition of any new publishers/subscribers. After this is called, no new publishers/subscribers can be added without throwing.

  • void register_topology_change_callback(TopologyChangeCallback callback)
    Registers a topology-changed callback. This is invoked each time a stage registers new publishers/subscribers. This normally occurs during startup, and gives stages a chance to reconfigure themselves.

  • TopologyChangeCallback & topology_change_callback()
    Returns a reference to the topology change callback for this stage.

  • StagePtr & parent_stage()
    Returns the parent stage of this interface.

  • uint64_t stage_instance_id()
    Returns the instance ID of this interface/stage. This is a number assigned by the context that provides a unique ID and method for looking up stages without relying on names.

  • std::vector< AbstractTimerPtr > & timers()
    Returns all of the timers registered in this interface.

  • std::vector< PublisherBasePtr > & publishers()
    Returns all of the publishers registered in this interface.

  • std::vector< SubscriberBasePtr > & subscribers()
    Returns all of the subscribers registered in this interface.

  • std::vector< ShutdownTokenPtr > & shutdown_tokens()
    Returns all of the shutdown tokens registered in this interface.

  • config::ConfigPackageCPtr config_package()
    Returns the configuration package that this stage should be using.

  • core::ArgumentParseResultCPtr parsed_arguments()
    Returns the argument parser results.

  • void set_preferred_start_time(std::chrono::steady_clock::time_point time)
    Sets the preferred start time for the system with simclock runners, this will initialize the simclock to this time. If multiple stages try to do this, the system will fault out.

  • std::chrono::steady_clock::time_point preferred_start_time()
    Returns the preferred start time (if any).

  • std::string apply_namespace_to_name(const std::string & channel_name)
    Applies the prefix of the parent stage to the given channel name, according to our channel rules.

  • const std::string & comms_namespace()
    Returns the currently configured namespace.

  • TimeProviderPtr pipeline_time_provider()
    Returns the time provider for this interface. Note that relying on this will make your stage non-deterministic. It’s only intended for stages that need to directly work with pipeline time.

  • core::Guid pipeline_identifier()
    Returns the pipeline identifier. This is originally sourced from the.

  • ExecutorStatisticsRetrievalFunction executor_statistics_retrieval_function()
    Retrieves the callback that allows you to fetch executor statistics.

  • ExecutorRuntimeControlFunction executor_runtime_control_function()
    Retrieves the callback that allows you to change the executor’s runtime status.