Part 7 (Namespacing)

In this section, we’ll add a second instance of the SecondStage, to demonstrate two instances of the same stage working off the same data, but working independently.

Let’s update our ${APP_ROOT}/tutorial/pipeline/tutorial_pipeline.cc:

#include "tutorial/first_stage/first_stage.hh"
#include "tutorial/second_stage/second_stage.hh"

#include "ark/comms/stages/http_server_stage.hh"
#include "ark/pipeline/config.hh"
#include "ark/debuglog/log.hh"
#include "ark/main/main_offboard.hh"

int main(int argc, const char **argv)
{
    ark::pipeline::Pipeline pipeline;

    pipeline.add_stage<FirstStage>();
    pipeline.add_stage<ark::comms::HttpServerStage>();

    ark::pipeline::add_namespaced_stage_to_pipeline<SecondStage>(pipeline, "/second01", "SecondStage01");
    ark::pipeline::add_namespaced_stage_to_pipeline<SecondStage>(pipeline, "/second02", "SecondStage02");

    try
    {
        return ark::main::execute_realtime_pipeline(argc, argv, std::move(pipeline));
    }
    catch (const std::exception &exception)
    {
        LERROR("Exiting due to error: {}", exception.what());
        return 1;
    }
}

Notice that we dropped the original pipeline.add_stage<SecondStage>() line and replaced it with two function calls.

These function calls add a stage into a ’namespace’. A namespace means that all relative channel names get placed into the stage’s namespace.

For example, you should now have two new channels in your pipeline:

  • /second01/stats
  • /second02/stats

Inspecting

Let’s confirm. Build and re-run your pipeline, you should see:

./build/tutorial_pipeline
(2022-02-02 14:14:24.704) [info] (http_server_stage.cc:874) HTTP Server is listening on "0.0.0.0:8080".
(2022-02-02 14:14:25.704) [info] (second_stage.cc:39) SecondStage01 stage received message: 'Hello world, it's now 1.000475879.'
(2022-02-02 14:14:25.704) [info] (second_stage.cc:39) SecondStage02 stage received message: 'Hello world, it's now 1.000475879.'
(2022-02-02 14:14:26.704) [info] (second_stage.cc:39) SecondStage02 stage received message: 'Hello world, it's now 2.000475879.'
(2022-02-02 14:14:26.704) [info] (second_stage.cc:39) SecondStage01 stage received message: 'Hello world, it's now 2.000475879.'
(2022-02-02 14:14:27.704) [info] (second_stage.cc:39) SecondStage02 stage received message: 'Hello world, it's now 3.000475879.'
(2022-02-02 14:14:27.704) [info] (second_stage.cc:39) SecondStage01 stage received message: 'Hello world, it's now 3.000475879.'
(2022-02-02 14:14:28.705) [info] (second_stage.cc:39) SecondStage01 stage received message: 'Hello world, it's now 4.000475879.'
(2022-02-02 14:14:28.705) [info] (second_stage.cc:39) SecondStage02 stage received message: 'Hello world, it's now 4.000475879.'
(2022-02-02 14:14:29.705) [info] (second_stage.cc:39) SecondStage01 stage received message: 'Hello world, it's now 5.000475879.'
(2022-02-02 14:14:29.705) [info] (second_stage.cc:39) SecondStage02 stage received message: 'Hello world, it's now 5.000475879.'

Great! We can see SecondStage01 and SecondStage02 both writing to the debuglog. Let’s check with ark-spy:

~/ark$ ./build/ark-spy
/debuglog
/my_strings
/scope_timing_reports
/second01/stats
/second02/stats

We can spy on both channels to see their messages:

~/ark$ ./build/ark-spy -c /second01/stats
(2022-02-02 14:17:25.002) [info] (websocket_client.cc:512) Established connection to '127.0.0.1:8080/second01/stats'.
{
  "messages_received": 30,
  "stage_name": "SecondStage01"
}
^C
~/ark$ ./build/ark-spy -c /second02/stats
(2022-02-02 14:17:28.677) [info] (websocket_client.cc:512) Established connection to '127.0.0.1:8080/second02/stats'.
{
  "messages_received": 34,
  "stage_name": "SecondStage02"
}
^C

Messages are indeed flowing on both channels, independently.

Let’s next look into using configuration for your pipeline, in Part 8.