ark::pipeline

Enums

  • DataEnvelopeFormat (int8_t)
    Indicates the contents of the binary data stored in this envelope. May be a serialized bitstream or JSON.

  • ShutdownTokenType (int8_t)
    Defines the type of the shutdown token.

    The ‘standard’ token will allow you to request shutdown, but won’t prevent other tokens from requesting a shutdown. The ‘required-for-shutdown’ token will allow you to request shutdown, and the pipeline will not exit until you have requested shutdown. In other words, other stages that request shutdown will not be successful until you have also requested a shutdown.

  • CallbackExecutionType

  • PeriodicProducerTriggerType

  • TopologyChangeEventType (int32_t)
    Indicates the type of topology change event.

Typedefs

Defined in “ark/pipeline/abstract_timer.hh”:

Defined in “ark/pipeline/data_envelope.hh”:

Defined in “ark/pipeline/envelope.hh”:

Defined in “ark/pipeline/envelope_carrier.hh”:

Defined in “ark/pipeline/event_timer.hh”:

  • using EventTimerPtr = std::shared_ptr< EventTimer >

Defined in “ark/pipeline/executor_function_pointers.hh”:

  • using ExecutorStatisticsRetrievalFunction = std::function< ExecutorStatistics()>
    A function pointer that allows you to request statistics from the executor.

  • using ExecutorRuntimeControlFunction = std::function< void(ExecutorRuntimeControlState)>
    A function pointer that allows you to request that the executor freezes execution immediately (or resumes execution). At this point, threads will still be active, but all message delivery, timers, and clock advancement will be frozen until you unpaused the pipeline.

Defined in “ark/pipeline/forward.hh”:

Defined in “ark/pipeline/periodic_timer.hh”:

Defined in “ark/pipeline/sim_mailbox.hh”:

Defined in “ark/pipeline/stages/message_synchronizer_stage.hh”:

  • using ExtractComparisonValueFunction = std::function< int64_t(const AbstractEnvelopeCPtr &)>
    Signature for the difference functions.

Defined in “ark/pipeline/stages/metrics_utility.hh”:

  • using CallbackExecStatMap = std::map< std::pair< std::string, std::string >, ViewedExecutionStats >
    Define a map type.

Defined in “ark/pipeline/timer_callbacks.hh”:

  • using EventTimerFunction = std::function< std::chrono::nanoseconds(std::chrono::steady_clock::time_point)>
    The callback that is executed when an event timer goes off. The time point provided is the activation time of the timer (in other words, the time when the timer officially ‘activated’, not necessarily when your callback was invoked).

    You must return how long you want to wait until the timer is invoked again. If the elapsed time is 5 seconds, and you return 1 second, the next activation will occur at 6 seconds.

  • using PeriodicTimerFunction = std::function< void(std::chrono::steady_clock::time_point)>
    A callback that is executed for periodic timer functions. The time point provided is the activation time of the timer (in other words, the time when the timer officially ‘activated’, not necessarily when your callback was invoked).

  • using SingleShotWorkFunction = std::function< void(std::chrono::steady_clock::time_point)>
    Function callback executed for a single shot of work. The time point provided is the activation of the timer, which is typically the time that the pipeline started for single shot work items.

Defined in “ark/pipeline/topology_callback.hh”:

  • using TopologyChangeCallback = std::function< void(StageInterface &, const TopologyChangeEvent &)>
    A callback that is invoked when topology shifts this includes the stage that caused the shift, along with all of the newly added publishers/subscribers.

Variables

Defined in “ark/pipeline/subscriber.hh”:

  • constexpr size_t PREFERRED_LARGE_SUBSCRIBER_QUEUE_DEPTH= 16384
    A preferred queue depth to use when you want to store “a large number of items”.

  • constexpr size_t MAXIMUM_SUBSCRIBER_QUEUE_DEPTH= 2097152
    The maximum size of any queue depth in the system.

Classes

  • ark::pipeline::AbstractEnvelope
    This is the base envelope class, which allows you to work with envelopes without knowing their type.

  • ark::pipeline::AbstractTimer
    A timer interface is used by the executors to understand when to fire an event next. This allows you to deterministically execute and trigger events.

  • ark::pipeline::Callback
    A dynamic callback, invoking (at compile time) an appropriate callback based on the subscriber’s signature.

  • ark::pipeline::CallbackExecutionManager
    A callback execution manager is responsible for ensuring that a stage’s callbacks are only executing once-at-a-time (ie, not executing in parallel). It also provides mechanisms for tracking timing information.

  • ark::pipeline::CallbackStage
    This is a stage that allows you to wrap a callback, and takes care of publishing/subscribing for you automatically.

    It is considered a convenience routine for testing, as there are many ways to abuse this in production code (in particular, not being able to capture variables in a meaningful way makes this not very useful outside of testing contexts).

  • ark::pipeline::CollectRuntimesStage
    This stage will retrieve all pipeline metrics during execution to tally the expected runtimes for all subscribers in this pipeline. Upon pipeline completion, the final tally for the expected runtimes will be emitted.

  • ark::pipeline::CollectorStage
    A stage that consumes every object it receives, allowing you to iterate through them later. Useful for testing, to ensure you are producing the outputs you expect.

  • ark::pipeline::Context
    A context allows you to register things like callbacks and timers, along with maintain state across a pipeline. It is typically automatically created and managed for you.

    This is an internal implementation detail, and not expected to be used by consumers.

  • ark::pipeline::DataEnvelope
    This envelope type wraps raw serialized data, rather than a type. Calling payload() will deserialize and give you data in the type you requested.

    This is typically used when reading data from a log or over the comms system.

  • ark::pipeline::DataEnvelopePublisher
    This is a specialization of a publisher, which allows you to publish “data envelopes” rather than wrapped types. This allows you to publish data that is simply a wrapped serialized buffer, and allow the downstream subscribers to handle deserialization of that data.

  • ark::pipeline::DistillPipelineMetricsStage
    This stage will consume PipelineMetrics reports, and use that information to generate a distillation report.

  • ark::pipeline::DynamicTimerTestStage
    This is a stage that adds in a event-based timer, both keeping track of how often it executes, and also when.

    This dynamically adjusts when it executes, running in a short sequence.

  • ark::pipeline::Envelope
    An envelope wraps messages that are published, recording a publish time, and supporting an abstraction to make it easier to serialize messages without knowing their type.

    These are typically created automatically by the publisher.

  • ark::pipeline::EnvelopeCarrier
    This represents an object that can ‘carry’ envelopes to some source. For example, it might be a subscriber, which consumes envelopes and stores them on behalf of a stage, or it might be something that simply delivers envelopes along a more complex chain.

  • ark::pipeline::EnvelopeCollectorStage
    A stage to collect envelopes coming in over a particular channel.

  • ark::pipeline::EventTimer
    This timer allows you to register events that will occur at some point in the future essentially allowing you to schedule events at a custom/dynamic frequency.

  • ark::pipeline::ExecutionMonitor
    Simple class that tracks stages that are actively being executed on a thread pool, and warns if they are still executing after some period of time.

  • ark::pipeline::ExecutionTimeoutStage
    This stage provides a mechanism to timeout (via realtime) the overall pipeline. For example, if you want to make sure your simulation never goes beyond five minutes.

  • ark::pipeline::MessageSynchronizer
    Helper class for performing basic channel syncrhonization. The user provides the input channels and the is_sync predicate to determine when messages are synchronized. Whenever synchronized messages are found they are published on the output channel. The user can then create a normal subscriber to receive the synchronized messages.

  • ark::pipeline::MessageSynchronizerStage
    This stage allows you to synchronize messages arriving over multiple channels, by using your provided synchronization function (and other configuration).

    The result will be published as a set containing all of the synchronized messages in one batch. A custom function may be supplied (via C++) to determine the raw value that is being synchronized on; otherwise, the pipeline time in the envelope is used to synchronize the messages.

  • ark::pipeline::MirrorStage
    A stage that “mirrors” inputs any input it receives, it publishes on its own subscriber.

  • ark::pipeline::MissingConnectionsStage
    This stage allows you to detect any publishers/subscribers that might be missing connections. It will throw during startup if that situation is detected.

  • ark::pipeline::PeriodicProducerStage
    A stage that periodically publishes the given object from the specified list at a fixed rate. This can be used to help pump messages through your stage to test it.

  • ark::pipeline::PeriodicTimer
    A timer that is used to execute a function periodically. That function takes in the current time. This allows you to deterministically execute functions at some fixed rate.

  • ark::pipeline::Pipeline
    Defines a ‘pipeline’ to run. A pipeline consists of multiple stages that communicate through well-defined interfaces.

  • ark::pipeline::PipelineMetricsPlugin
    This plugin allows you to visualize metrics coming in from a pipeline.

  • ark::pipeline::PipelineMetricsStage
    This stage will register with the debug logger to retrieve all debug log outputs; these can then be emitted to the real logger (or sent over comms to remote viewers).

  • ark::pipeline::PipelineMetricsTableModel
    This class can take in a batch of pipeline metrics and visualize it in a tree table.

  • ark::pipeline::Publisher
    A publisher is a class that allows you to send an object through the pipeline, to all of the connected subscribers. It is the primary means of communication through a pipeline.

    Different executors will handle message delivery in different ways. There are no guarantees made on if data will be delivered or not. There is no publisher-level buffering of data; each subscriber is responsible for its own buffers/queues.

  • ark::pipeline::PublisherBase
    This is the base class of the publisher, which is used template-free to make it easier to store within pipelines/stages.

  • ark::pipeline::PublisherConnector
    A simple class that will connect a publisher to a subscriber. Allows for over-riding the connection mechanism that is used for topology establishment in the context.

  • ark::pipeline::RealTimeExecutor
    A pipeline executor which operates in real time. This uses the system’s local clock to determine when to fire timers. This is generally used when a pipeline is operating in real time (for example, on a robot).

  • ark::pipeline::RuntimesProvider
    This class will hold any runtimes configured by the user for this pipeline.

  • ark::pipeline::SerializableSubscriber
    A wrapper over a standard subscriber that provides functionality related to communicating message type name and schema. Use this for when you are subscribing to an envelope, but realize that the type is serializable, and can further communicate that type information.

  • ark::pipeline::ShutdownDuringShutdownStage
    This is a stage that requests a failure shutdown during shutdown itself.

  • ark::pipeline::ShutdownStage
    This is a stage that requests a shutdown after a period of time, this just verifies the real time executor exits when someone requests it. This is shared between multiple tests.

  • ark::pipeline::ShutdownToken
    These tokens can be retrieved from a stage interface, and allow customers to request a shutdown of the pipeline.

  • ark::pipeline::ShutdownTokenOnInitializeStage
    This stage requests an exit during initialize. You could throw here (which would be more typical), but its valid so test it.

  • ark::pipeline::SimClockExecutor
    A pipeline executor which operates with a simulated clock. This is the typical executor for all pipelines which expect to be operating in a simulation or faster/slower than real time.

  • ark::pipeline::SimClockThrottle
    This class manages a simulated clock. As you execute stage callbacks/timers, this will monitor the expected runtimes. The clock’s advancement will be throttled based on stages that are executing.

  • ark::pipeline::SimEnvelopeCarrierProxy
    A specialized envelope carrier that delivers messages to a parent sim mailbox.

  • ark::pipeline::SimMailbox
    This class will intercept all messages coming from different sources and organize them onto a timeline. Messages can then be injected into the simulated pipeline at the appropriate times.

  • ark::pipeline::SimMailboxConnector
    A specialized publisher connector used to link publishers to the sim mailbox.

  • ark::pipeline::SimTimeProvider
    A custom time-provider for the sim-clock executor, which allows us to timestamp messages with our simulated clock.

  • ark::pipeline::SimTimeProviderFactory
    A factory that provides SimTimeProvider objects. This provides a unique time provider for each stage, allowing tight control over the clock.

  • ark::pipeline::SimpleSynchronizer

  • ark::pipeline::SingleShotProducerStage
    A stage that publishes the given object a single time, on startup, using the single shot strategy.

  • ark::pipeline::Stage
    A stage is a step within a pipeline. This allows you to receive inputs, write outputs, execute periodically, and more. This makes up a core execution step within your pipeline.

  • ark::pipeline::StageInterface
    This structure defines the interface points for a particular stage. It is spawned when the stage is being initialized by a host pipeline.

    This was mostly intended to separate I/O of a stage into a separate layer, but it could easily be merged back into Stage.

  • ark::pipeline::Subscriber
    This is used to indicate that you are subscribed to a particular piece of data. This subscriber receives data (up to some queue depth) from other publishers in the pipeline, and can invoke a callback when data is ready.

    Callback will not be executed simultaneously; in other words, only one instance of a subscriber will be executed at any particular point in time.

  • ark::pipeline::SubscriberBase
    This is the base class of the subscriber, which is used template-free to make it easier to store within pipelines/stages.

  • ark::pipeline::SubscriberWorkItem
    This structure is part of the execution phase of a subscriber. It contains enough information for the subscriber to safely execute() it and track its metrics.

  • ark::pipeline::TimeProvider
    A time provider provides a timestamp on behalf of a particular execution environment. The main customer of this is publishers, who use this to time-tag envelopes.

  • ark::pipeline::TimeProviderFactory
    This is used internally by the pipeline/context system to give control over how time providers are spawned and sent to various stages.

  • ark::pipeline::TimerStage
    This is a stage that allows you to wrap a callback and will allow you to publish a message on some fixed interval, indefinitely.

    The CallbackFunction signature allows you to return the type that you wish to publish. If you don’t want to publish anything on a cycle, simply return an empty optional. It is considered a convenience routine for testing.

  • ark::pipeline::PeriodicTimerConfiguration
    Configuration for a periodic timer.

  • ark::pipeline::PipelineDotOutputConfig
    Configuration for the ‘dot output’, allowing you to specify filters at runtime.

  • ark::pipeline::PublisherStatistics
    Statistics related to a publisher, allowing you to check on how it’s operating.

  • ark::pipeline::QueuedSimEnvelope
    An envelope that was queued from some particular publisher. This is the base unit that is organized by the SimMailbox.

  • ark::pipeline::StageInterfaceConfig
    Configuration elements for creating StageInterface.

  • ark::pipeline::SubscriberConfiguration
    A configuration for a particular subscriber, based on a message type and channel name.

  • ark::pipeline::SubscriberStatistics
    Statistics and debugging information related to a subscriber.

  • ark::pipeline::SubscriberTimings
    Timing-related information to a subscriber.

  • ark::pipeline::SynchronizedMessageSet
    Publishes from the synchronized message set publisher, this contains all of the messages that were ‘synchronized’ together.

  • ark::pipeline::TimerCallbackTimings
    A structure reporing the timing-related information for the timer’s callback.

  • ark::pipeline::TimerStatistics
    A structure reporting back core timer statistics.

  • ark::pipeline::TimerWorkItem
    A “work item”, which is something that can be executed from a timer. These are produced when popping items off a timer, and is the first phase of timer execution.

  • ark::pipeline::TopologyChangeEvent
    An event message containing a list of publishers/subscribers that were recently added by a particular stage.

  • ark::pipeline::ViewedExecutionStats
    Build the observed stats from the pipeline subscriber/timer metrics.

Functions

Declared in “ark/pipeline/config.hh”:

  • std::shared_ptr< StageType > add_stage_with_config_to_pipeline(pipeline::Pipeline & pipeline, const ConfigType & config, auto… stage_args)
    Adds the given stage/configuration combination to the pipeline. the stage will be added (with stage_args forwarded), and then the configuration structure will be registered with the pipeline’s config package, under the stage’s name.

  • void register_namespaced_stage_config_with_pipeline(pipeline::Pipeline & pipeline, const std::shared_ptr< StageType > & stage, const ConfigType & config, const std::string & comms_namespace)
    Registers configuration and namespace for an already-added stage to a pipeline’s configuration package.

  • std::shared_ptr< StageType > add_namespaced_stage_with_config_to_pipeline(pipeline::Pipeline & pipeline, const ConfigType & config, const std::string & comms_namespace, auto… stage_args)
    Adds the given stage/configuration combination to the pipeline. the stage will be added (with stage_args forwarded), and then the configuration structure will be registered with the pipeline’s config package, under the stage’s name. Further, stage metadata will be registered, allowing you to define the namespace this stage operates in.

  • void register_namespaced_stage_config_with_pipeline(pipeline::Pipeline & pipeline, const std::shared_ptr< StageType > & stage, const std::string & comms_namespace)
    Registers configuration and namespace for an already-added stage to a pipeline’s configuration package.

  • std::shared_ptr< StageType > add_namespaced_stage_to_pipeline(pipeline::Pipeline & pipeline, const std::string & comms_namespace, StageArgs &&… args)
    Adds the given stage to the pipeline, inside the specified namespace. The stage will be added (with stage_args forwarded), and then the metadata will be registered against the configuration.

  • void add_pipeline_metadata_to_pipeline(pipeline::Pipeline & pipeline, const std::string & metadata_name, const MetadataType & metadata)
    Adds pipeline-specific configuration to the given config package, adding it as metadata with the given name.

  • Type get_stage_config(const StageInterface & interface, const std::string & stage_name)
    Returns the given configuration from the stage interface’s config package. This will use the configuration package in the stage interface, but retrieve the configuration for the stage with the given name, deserializing it and returning it as a concrete type.

  • Type get_stage_config(const StageInterface & interface)
    Returns the given configuration from the stage interface’s config package. This will essentially grab the config under the stage’s name from the interface’s package, deserializing it and returning it as a concrete type.

  • Type get_stage_metadata(const StageInterface & interface, const std::string & stage_name, const std::string & metadata_name)
    Returns the given metadata from the stage interface’s config package. This will use the configuration package in the stage interface, but retrieve the metadata for the stage with the given name, deserializing it and returning it as a concrete type.

  • Type get_stage_config_or_default(const StageInterface & interface)
    Returns the given configuration from the stage interface’s config package. If that configuration does not exist, a default instance of the configuration will be returned.

    This will grab the config under the stage’s name, deserialize it, and return it as a concrete type.

  • MetadataType get_pipeline_metadata_or_default(const pipeline::Pipeline & pipeline, const std::string & metadata_name)
    Returns the given pipeline metadata from the pipeline configuration. If not present, returns the default instantiation of that type.

    This reads metadata of the given name from the config package’s “Pipeline” entry, deserializes it, and returns it.

Declared in “ark/pipeline/dot_output.hh”:

  • std::string pipeline_to_dot(const PipelineDotOutputConfig & config, Pipeline && pipeline)
    Takes the given pipeline, and produces a dot-file output suitable for being rendered in xdot. This is useful for looking at the topology of a pipeline graphically.

Declared in “ark/pipeline/envelope_helpers.hh”:

  • std::shared_ptr< const Type > extract_envelope_payload(const AbstractEnvelopeCPtr & envelope)
    Returns the payload of the given envelope as the specified type. The might incur deserialization overhead, depending on the type of the object.

Declared in “ark/pipeline/namespaces.hh”:

  • std::string apply_namespace_to_string(const std::string & ns, const std::string & channel_name)
    Applies the given namespace to the specified string. This follows the basic rules of “if the specified string is fully specified, ignore the namespace, otherwise, prepend the specified string with the namespace.”.

  • std::string get_namespace_from_channel(const std::string & channel_name)
    Returns the namespace from the given channel name. This essentially returns all components of a path except the last component.

Declared in “ark/pipeline/stages/metrics_utility.hh”:

  • void process_pipeline_metrics(CallbackExecStatMap & stats_map, const PipelineMetrics & metrics)
    Process Pipeline Metrics to build out the map.

  • void emit_runtime_metrics(const CallbackExecStatMap & stats_map, std::filesystem::path & desired_output_path, const uint64_t & execution_count_threshold, bool set_1ns_flag)
    Emit the generated runtime values based on the subscriber map to a file. Metrics will only be set for those subscribers that meet the execution_count_threshold.

    stats_map : Map of the values desired_output_path : Full path to the file to emit execution_count_threshold : Runtime metrics will only emitted for subscribers that meet this threshold value set_1ns_flag : Set all of the runtime values to 1ns

Declared in “ark/pipeline/validate.hh”:

  • void validate_realtime_pipeline_config(const pipeline::Pipeline & pipeline)
    Validates the pipeline’s configuration and metadata this will check that the given pipeline has valid configuration that is capable of starting without errors. Throws if there is an error.