Pipeline Overview

A “pipeline” is a system that is composed of multiple “stages”. Each stage communicates to other stages through a publisher/subscriber interface. Additionally, you can set up timers for your stage, so that a callback is invoked periodically.

After you have assembled a pipeline, you execute it in an “executor”.

There are two executors right now:

  • the real time executor (typically used for running onboard)
  • the simulated clock executor (typically used for “offboard” pipelines or simulations)

The real time executor launches timers based on the time on the wall elapsing and executes subscriber callbacks greedily, as data arrives.

The sim clock executor will execute things deterministically (in the same order every time). It does so by simulating the clock, allowing your pipeline to both run deterministically and faster or slower than real time.

This is an example of assembling a pipeline and executing it:

#include "ark/example/hello_publisher.hh"
#include "ark/example/hello_subscriber.hh"
#include "ark/pipeline/realtime_executor.hh"

// Then construct the pipeline and add the stages to it.
pipeline::Pipeline pipeline;

pipeline.add_stage<ark::example::HelloPublisher>();
pipeline.add_stage<ark::example::HelloSubscriber>();

// Execute it in real time.
pipeline::RealTimeExecutor executor(std::move(pipeline));
executor.execute();

There are a few helper methods available to you in order to add stages to pipelines with configuration. See the section on pipeline configuration for more detail.

Subscribers

When you construct a subscriber, you need a few pieces of information (in the typical case):

  • the “type” of data that you are consuming
  • the “name” of the publisher that you are receiving the data from
  • the number of messages you wish to queue
  • the “expected runtime” of the callback (used for simulation purposes)

Initializing a subscriber looks something like:

pipeline::SubscriberConfiguration<serialization::ExampleMessage> config;

config.channel_name = "/example";
config.maximum_queue_depth = 1;
config.callback = [](std::shared_ptr<const serialization::ExampleMessage> message) {
  // Work on "message" here
  ...
};

interface.add_subscriber(config);

Subscribers can also receive the time that a message was published at. This can be useful if your message doesn’t already contain capture time or other timing information:

config.callback = [](std::shared_ptr<const serialization::ExampleMessage> message,
                      std::chrono::steady_clock::time_point publish_time) {
};

This callback can be used in place of the above callback, and will have the publish_time parameter populated with the pipeline time that the example message was originally published with.

Publishers

An example publisher looks like this:

class Example : public pipeline::Stage
{
public:
    Example() : pipeline::Stage("Example") {}

    /// This is invoked when the system is starting up, giving you a chance to register
    /// your I/O (your publishers and subscribers).
    void initialize(pipeline::StageInterface &interface)
    {
        /// This tells the system to create a publisher that publishes messages of type
        /// std::string out. The name ("/my/channel/name") is how you reference this
        /// publisher from other places in the pipeline.
        publisher_ = interface.add_publisher<std::string>("/my/channel/name");
    }

private:
    /// A reference to the publisher -- with this, you can send data for
    /// subscribers to consume!
    pipeline::PublisherPtr<std::string> publisher_;
}

In this case, you are publishing a std::string object. You can publish objects that are not serializable (although, if you do so, you cannot transmit them over comms boundaries, spy on them with ark-spy, or log them).

Typically you would publish objects that are rbuf objects (see the serialization guide).

Publishers do not create or own a message queue of their own. Each subscriber owns its own message queue, which a publisher will deliver messages to.

There are no guarantees that your subscriber callbacks will be invoked for any message you publish. A message can be dropped either due to subscriber queue overflows or comms loss.

If you suspect messages are being dropped, please make sure you have the PipelineMetricsStage added to your pipeline, and look at its periodic output.

Periodic Timers

Periodic Timers are invoked periodically. In the real-time executor, these are executed on a best-effort basis to match your interval. In the sim-clock executor, they will be executed at their precise intervals.

In order to make your pipelines deterministic and reproducible, you must make use of timers (rather than trying to create your own timers with chrono or equivalent).

Here is an example use of a timer:

void initialize(pipeline::StageInterface &interface)
{
    pipeline::PeriodicTimerConfig config;

    config.name = "my_timer_name";
    config.rate = std::chrono::seconds{1};
    config.callback = [](std::chrono::steady_clock::time_point activation_time) {
        std::cout << "Hello, the pipeline time is " << activation_time.time_since_epoch().count() << "ns!" << std::endl;
    };

    interface.add_timer(config);
}

The executor will execute that timer at 1Hz, and you will see a message printed to the console indicating that.

The timestamp passed in to the callback is the ‘pipeline time’ – the time according to the pipeline. For example, with the real time executor, this will be roughly equivalent to the steady clock, offset such that it starts at zero when the pipeline starts. For the sim clock executor, the time is the simulated clock, which could potentially be the log time or some virtual time.

In either case, the timestamp passed in is the ‘activation time’ (or the time when the timer ‘activated’ or was ‘made ready’). There might be some latency between when a timer is first activated and when the callback is actually invoked.

By default, timers start out at ‘zero’ and will expire after ‘rate’ has gone by. In other words, in the above example, the timer will fire for the first time after one second has gone by. You can set the trigger_immediately_on_startup parameter to true, and the timer will fire as soon as the executor enters normal execution (and then after ‘rate’ from that point on).

Event Timers

Event Timers are similar to Periodic Timers – in the real-time executor, these are executed on a best-effort basis, and in the sim-clock executor, they will be executed precisely when you request.

The major difference is that Event Timers allow you to execute at “dynamic” rates, essentially requesting that your callback is triggered after some period of time has passed.

This can be very useful when you want to adjust the rate based on your current data. They are used for the log player, for example, as messages do not come in at an even rate, but we want to play them back with their exact timestamps.

Here is an example:

void initialize(pipeline::StageInterface &interface)
{
    interface.add_timer([](std::chrono::steady_clock::time_point activation_time) {
        std::cout << "Hello, the pipeline time is " << activation_time.time_since_epoch().count() << "ns!" << std::endl;

        if (elapsed < std::chrono::seconds{1})
        {
            return std::chrono::milliseconds{50};
        }
        else
        {
            return std::chrono::milliseconds{100};
        }
    });
}

This timer will invoke right at startup (zero seconds elapsed), and then for the first second of execution, invoke every 50ms, and then every 100ms for the rest of the simulation.

The activation_time parameter passed in follows the same rules as the activation_time passed in to the periodic timer. It represents the pipeline time at the point in time when the timer was activated. This will be at precisely the time that you requested.

Queue Single Shot Work

There are some cases where there is some work that needs to be done when the pipeline first starts. This can be accomplished by queuing a callback to be executed during stage initialization:

void initialize(pipeline::StageInterface &interface)
{
    interface.queue_singleshot_work([](std::chrono::steady_clock::time_point activation_time) {
        std::cout << "Start of the pipeline time " << activation_time.time_since_epoch().count() << "ns!" << std::endl;
        
        publish_startup_information();
    });
}

The function will only be executed once at the start of the pipeline (directly after ‘start’ is invoked). The activation time in this case will always be zero.

Namespaces

When configuring a stage, you can specify the namespace that node exists in.

If you specify a channel name (in your publisher or subscriber) and do not have it qualified (ie, my_name instead of /my_name), then the name will automatically be prefixed by whatever namespace your stage is in.

For example, if you request to publish on statistics, and your stage is in the /imu namespace, your channel name is /imu/statistics. If you request to publish on /statistics, it doesn’t matter what your namespace is, it will always publish on /statistics.

The namespace is configured by applying metadata to the configuration of your stage. For example, if you have a stage YAML file, it might look like:

config:
  metadata:
    stage_config:
      comms_namespace: "/imu"

  default:
    my_value_a: 10
    my_value_b: 20

You can also override this in your config package YAML:

package:
  configs:
    MyStage:
      path: "path/to/my/stage.yml"

      metadata:
        stage_config:
          comms_namespace: "/imu"

See the configuration section for more explanation on config packages and metadata.

Finally, if you wish to configure your namespace programmatically, you can set it when adding your stage to a pipeline:

ark::pipeline::Pipeline my_pipeline;

ark::pipeline::add_namespaced_stage_to_pipeline<MyStageType>(my_pipeline, "/my/namespace", "StageName");

This works identically to pipeline::Pipeline::add_stage in terms of adding the stage to the pipeline and returning a pointer to said stage.

Channel Remapping

You can “remap” a publisher or subscriber by using the ark::pipeline::ChannelMappingConfig metadata, stored in the channel_remapping_config key.

This will allow you to remap a publisher/subscriber channel name to a different name (for example, you could choose to remap the name “laser/scanner” to “lidar/scanner”). As an example:

config:
  metadata:
    channel_remapping_config:
      original_to_remapped_subscribers:
        "laser/scanner": "lidar/scanner"
        "camera_a": "my_camera_a"
      original_to_remapped_publishers:
        "camera_controls": "my_camera_controls"

Channel remapping is applied before namespaces are applied, so if you have a stage in namespace “test” and used the above remapping, the channel code for add_publisher("laser/scanner") would create the channel “/test/lidar/scanner”. Normal rules apply – if you remap to a channel that is fully qualified, namesapces will not be applied.