Testing Stages
Producing & Collecting Objects
You can use the PeriodicProducerStage
and the CollectorStage
from the
pipeline testing (ark::pipeline::testing
) package to push synthetic data
into your stage, and then collect the results after.
This enables you to test that your stage responds properly to inputs, and that you receive the outputs that you expected.
Note that the ark::pipeline::testing
harness is mostly aimed at deterministic
stages right now (stages that only use timers or subscribers, not threads).
Here is a basic example of setting up a periodic producer (at 10Hz) and a collector.
using namespace ark::pipeline::testing;
using namespace std::literals::chrono_literals;
//
// First, construct your pipeline. This pipeline has both a producer
// and a collector, communicating over the "/counts" channel.
//
// The producer will produce four messages, containing 10, 20, 30, and
// then 40. This will occur at a rate of 10Hz (100ms) until there are no
// more objects to publish.
//
Pipeline pipeline;
pipeline.add_stage<PeriodicProducerStage<uint64_t>>({10, 20, 30, 40}, 100ms, "/counts");
auto collector = pipeline.add_stage<CollectorStage<uint64_t>>("/counts");
SimClockExecutor executor(std::move(pipeline));
executor.execute(500ms);
//
// Now, check that we got all of the objects we expected...
//
const auto &received = collector->objects_received();
EXPECT_EQ(received, std::vector<uint64_t>{10, 20, 30, 40});
All stage testing should be using the simclock executor, so the tests are deterministic and non-flaky.
NOTE: By default, the PeriodicProducerStage will stop triggring its internal timer when the objects have all been published. There is a flag trigger_type
that can be
set to TriggerConstantRate
if you need the PeriodicProducerStage to trigger the callback indefinitely. This can be helpful when needing to test stages that are continually being triggered by the executor.
Simple Callbacks
During testing, it is common that you want to create a lightweight stage that is
intended to react to some message. Rather than implementing a complete class for
your stage, you can use the CallbackStage
.
These allow you to supply a simple callback in the form of:
std::optional<PublishedObjectType> (const SubscribedObjectType &type);
It allows you to receive objects over a pipeline, run some conditional logic, and use that to emit objects into the pipeline. If you don’t want to react to the type, you can publish an empty optional, and nothing will be published to the rest of the pipeline.
This is typically used for cases where you need more advanced logic then simply using PeriodicProducerStages or CollectorStages.
For example:
using namespace ark::pipeline::testing;
Pipeline pipeline;
auto callback = [](const uint32_t &counter) {
if (counter > 5)
{
return "Counter is high!";
}
else
{
return "Counter is low!";
}
};
pipeline.add_stage<CallbackStage<uint32_t, std::string>>("/counter", "/message", callback);
This assumes something is producing counter values, and this will emit a message indicating if we consider the counter high or low based on the message provided.
This class is primarily intended for testing and not for production use.
Timer Stages
The TimerStage
is a simple callback wrapper that will execute the callback at a defined
interval. The callback can choose to emit an object or not (the return type is a std::optional
).
In the case that you don’t want to publish anything on a particular cycle, just return an
empty optional.
For example, you can define a simple TimerStage that emits an incrementing number like so:
using namespace ark::pipeline::testing;
Pipeline pipeline;
size_t counter = 0;
auto callback = [&counter]() -> std::optional<size_t> {
counter++;
return counter;
};
pipeline.add_stage<TimerStage<size_t>>("/messages", callback, std::chrono::milliseconds{50});
This will write an incrementing integer value to the /messages
channel at 20Hz.
Note that if needed, you can use this to “inject” messages into your test pipeline after more advanced conditions (or timing conditions) are met, without needing to write custom stages. For example, you can set a boolean that causes the timer stage to only emit messages after the boolean is set to true:
using namespace ark::pipeline::testing;
Pipeline pipeline;
std::atomic<bool> publish_messages = false;
size_t counter = 0;
auto callback = [&counter, &publish_messages]() -> std::optional<size_t> {
counter++;
return (publish_messages ? counter : {});
};
pipeline.add_stage<TimerStage<size_t>>("/messages", callback, std::chrono::milliseconds{50});
This is a slight change to the first example, where the timer stage will not publish
anything until you later set the publish_messages
variable to be true. This can be useful
to script events.
Mirroring Objects
The MirrorStage
will accept messages on a particular input channel, and immediately
republish them on an output channel.
This is occasionally useful for testing things like communications – where you want to push a message and measure round trip time (or delivery) across a non-pipeline communications mechanism.
using namespace ark::pipeline::testing;
Pipeline pipeline;
pipeline.add_stage<MirrorStage<std::string>>("/input", "/output");
pipeline.add_stage<ark::comms::HttpServerStage>();
When that pipeline is executing, you can open a websocket to the /output
channel,
send messages on the /input
channel, and receive them again over the websocket.