Testing Stages

Producing & Collecting Objects

You can use the PeriodicProducerStage and the CollectorStage from the pipeline testing (ark::pipeline::testing) package to push synthetic data into your stage, and then collect the results after.

This enables you to test that your stage responds properly to inputs, and that you receive the outputs that you expected.

Note that the ark::pipeline::testing harness is mostly aimed at deterministic stages right now (stages that only use timers or subscribers, not threads).

Here is a basic example of setting up a periodic producer (at 10Hz) and a collector.

using namespace ark::pipeline::testing;
using namespace std::literals::chrono_literals;

//
// First, construct your pipeline. This pipeline has both a producer
// and a collector, communicating over the "/counts" channel.
//
// The producer will produce four messages, containing 10, 20, 30, and
// then 40. This will occur at a rate of 10Hz (100ms) until there are no 
// more objects to publish.
//

Pipeline pipeline;

pipeline.add_stage<PeriodicProducerStage<uint64_t>>({10, 20, 30, 40}, 100ms, "/counts");

auto collector = pipeline.add_stage<CollectorStage<uint64_t>>("/counts");

SimClockExecutor executor(std::move(pipeline));
executor.execute(500ms);

//
// Now, check that we got all of the objects we expected...
//

const auto &received = collector->objects_received();

EXPECT_EQ(received, std::vector<uint64_t>{10, 20, 30, 40});

All stage testing should be using the simclock executor, so the tests are deterministic and non-flaky.

NOTE: By default, the PeriodicProducerStage will stop triggring its internal timer when the objects have all been published. There is a flag trigger_type that can be set to TriggerConstantRate if you need the PeriodicProducerStage to trigger the callback indefinitely. This can be helpful when needing to test stages that are continually being triggered by the executor.

Simple Callbacks

During testing, it is common that you want to create a lightweight stage that is intended to react to some message. Rather than implementing a complete class for your stage, you can use the CallbackStage.

These allow you to supply a simple callback in the form of:

std::optional<PublishedObjectType> (const SubscribedObjectType &type);

It allows you to receive objects over a pipeline, run some conditional logic, and use that to emit objects into the pipeline. If you don’t want to react to the type, you can publish an empty optional, and nothing will be published to the rest of the pipeline.

This is typically used for cases where you need more advanced logic then simply using PeriodicProducerStages or CollectorStages.

For example:

using namespace ark::pipeline::testing;

Pipeline pipeline;

auto callback = [](const uint32_t &counter) {
    if (counter > 5)
    {
        return "Counter is high!";
    }
    else
    {
        return "Counter is low!";
    }
};

pipeline.add_stage<CallbackStage<uint32_t, std::string>>("/counter", "/message", callback);

This assumes something is producing counter values, and this will emit a message indicating if we consider the counter high or low based on the message provided.

This class is primarily intended for testing and not for production use.

Timer Stages

The TimerStage is a simple callback wrapper that will execute the callback at a defined interval. The callback can choose to emit an object or not (the return type is a std::optional). In the case that you don’t want to publish anything on a particular cycle, just return an empty optional.

For example, you can define a simple TimerStage that emits an incrementing number like so:

using namespace ark::pipeline::testing;

Pipeline pipeline;

size_t counter = 0;
auto callback = [&counter]() -> std::optional<size_t> {
    counter++;
    return counter;
};

pipeline.add_stage<TimerStage<size_t>>("/messages", callback, std::chrono::milliseconds{50});

This will write an incrementing integer value to the /messages channel at 20Hz.

Note that if needed, you can use this to “inject” messages into your test pipeline after more advanced conditions (or timing conditions) are met, without needing to write custom stages. For example, you can set a boolean that causes the timer stage to only emit messages after the boolean is set to true:

using namespace ark::pipeline::testing;

Pipeline pipeline;

std::atomic<bool> publish_messages = false;
size_t counter = 0;
auto callback = [&counter, &publish_messages]() -> std::optional<size_t> {
    counter++;
    return (publish_messages ? counter : {});
};

pipeline.add_stage<TimerStage<size_t>>("/messages", callback, std::chrono::milliseconds{50});

This is a slight change to the first example, where the timer stage will not publish anything until you later set the publish_messages variable to be true. This can be useful to script events.

Mirroring Objects

The MirrorStage will accept messages on a particular input channel, and immediately republish them on an output channel.

This is occasionally useful for testing things like communications – where you want to push a message and measure round trip time (or delivery) across a non-pipeline communications mechanism.

using namespace ark::pipeline::testing;

Pipeline pipeline;

pipeline.add_stage<MirrorStage<std::string>>("/input", "/output");
pipeline.add_stage<ark::comms::HttpServerStage>();

When that pipeline is executing, you can open a websocket to the /output channel, send messages on the /input channel, and receive them again over the websocket.