Part 7 (Namespacing)
In this section, we’ll add a second instance of the SecondStage
, to demonstrate
two instances of the same stage working off the same data, but working independently.
Let’s update our ${APP_ROOT}/tutorial/pipeline/tutorial_pipeline.cc
:
#include "tutorial/first_stage/first_stage.hh"
#include "tutorial/second_stage/second_stage.hh"
#include "ark/comms/stages/http_server_stage.hh"
#include "ark/pipeline/config.hh"
#include "ark/debuglog/log.hh"
#include "ark/main/main_offboard.hh"
int main(int argc, const char **argv)
{
ark::pipeline::Pipeline pipeline;
pipeline.add_stage<FirstStage>();
pipeline.add_stage<ark::comms::HttpServerStage>();
ark::pipeline::add_namespaced_stage_to_pipeline<SecondStage>(pipeline, "/second01", "SecondStage01");
ark::pipeline::add_namespaced_stage_to_pipeline<SecondStage>(pipeline, "/second02", "SecondStage02");
try
{
return ark::main::execute_realtime_pipeline(argc, argv, std::move(pipeline));
}
catch (const std::exception &exception)
{
LERROR("Exiting due to error: {}", exception.what());
return 1;
}
}
Notice that we dropped the original pipeline.add_stage<SecondStage>()
line and replaced
it with two function calls.
These function calls add a stage into a ’namespace’. A namespace means that all relative channel names get placed into the stage’s namespace.
For example, you should now have two new channels in your pipeline:
/second01/stats
/second02/stats
Inspecting
Let’s confirm. Build and re-run your pipeline, you should see:
./build/tutorial_pipeline
(2022-02-02 14:14:24.704) [info] (http_server_stage.cc:874) HTTP Server is listening on "0.0.0.0:8080".
(2022-02-02 14:14:25.704) [info] (second_stage.cc:39) SecondStage01 stage received message: 'Hello world, it's now 1.000475879.'
(2022-02-02 14:14:25.704) [info] (second_stage.cc:39) SecondStage02 stage received message: 'Hello world, it's now 1.000475879.'
(2022-02-02 14:14:26.704) [info] (second_stage.cc:39) SecondStage02 stage received message: 'Hello world, it's now 2.000475879.'
(2022-02-02 14:14:26.704) [info] (second_stage.cc:39) SecondStage01 stage received message: 'Hello world, it's now 2.000475879.'
(2022-02-02 14:14:27.704) [info] (second_stage.cc:39) SecondStage02 stage received message: 'Hello world, it's now 3.000475879.'
(2022-02-02 14:14:27.704) [info] (second_stage.cc:39) SecondStage01 stage received message: 'Hello world, it's now 3.000475879.'
(2022-02-02 14:14:28.705) [info] (second_stage.cc:39) SecondStage01 stage received message: 'Hello world, it's now 4.000475879.'
(2022-02-02 14:14:28.705) [info] (second_stage.cc:39) SecondStage02 stage received message: 'Hello world, it's now 4.000475879.'
(2022-02-02 14:14:29.705) [info] (second_stage.cc:39) SecondStage01 stage received message: 'Hello world, it's now 5.000475879.'
(2022-02-02 14:14:29.705) [info] (second_stage.cc:39) SecondStage02 stage received message: 'Hello world, it's now 5.000475879.'
Great! We can see SecondStage01
and SecondStage02
both writing to the debuglog. Let’s
check with ark-spy:
~/ark$ ./build/ark-spy
/debuglog
/my_strings
/scope_timing_reports
/second01/stats
/second02/stats
We can spy on both channels to see their messages:
~/ark$ ./build/ark-spy -c /second01/stats
(2022-02-02 14:17:25.002) [info] (websocket_client.cc:512) Established connection to '127.0.0.1:8080/second01/stats'.
{
"messages_received": 30,
"stage_name": "SecondStage01"
}
^C
~/ark$ ./build/ark-spy -c /second02/stats
(2022-02-02 14:17:28.677) [info] (websocket_client.cc:512) Established connection to '127.0.0.1:8080/second02/stats'.
{
"messages_received": 34,
"stage_name": "SecondStage02"
}
^C
Messages are indeed flowing on both channels, independently.
Let’s next look into using configuration for your pipeline, in Part 8.