Part 5 (Subscribers)

In this section of the tutorial, we’ll walk through making a second stage, which will subscribe to your string message coming from the first stage, and both print it out and send off some statistics.

Creating the Second Stage

We’ll create the header file for the second stage at ${APP_ROOT}/tutorial/second_stage/second_stage.hh:

#pragma once

#include "ark/pipeline/stage.hh"

#include "tutorial/messages.hh"

#include <string>

class SecondStage : public ark::pipeline::Stage
{
public:
    /// Notice that we allow customizing the name here when this stage
    /// is instantiated.
    SecondStage(std::string name) : ark::pipeline::Stage(name)
    {
    }

    void initialize(ark::pipeline::StageInterface &interface) override;

private:
    /// A global counter that tracks how many messages we have received.
    uint64_t messages_received_ = 0;

    /// The channel that we publish our statistics over.
    ark::pipeline::PublisherPtr<SecondStageStats> stats_publisher_;

    /// This method will be invoked when a message arrives.
    void handle_message(const std::shared_ptr<const MyString> &message);

    /// This method is invoked at 1Hz to publish statistics.
    void publish_stats();
};

You’ll notice we have member variables now. These will contain “state” that the stage owns and is reponsible for. If you instantiate this stage multiple times in your pipeline, each stage will have unique state and be self-contained. This will be reflected when they publish their own stats.

Also, notice that we are publishing SecondStageStats, a message that does not exist yet! This contains ‘statistics’ or information about the stage. It is good practice for your stages to publish health information (both for debugging and analysis purposes). Let’s make a stats structure now by editing ${APP_ROOT}/tutorial/messages.rbuf. Place this at the bottom:

schema SecondStageStats
{
    /// The sending stage's name.
    string stage_name;

    /// The messages received on this stage.
    uint64 messages_received;
}

Next up, we need to implement the body of the second stage, at ${APP_ROOT}/tutorial/second_stage/second_stage.cc:

#include "tutorial/second_stage/second_stage.hh"

#include "ark/debuglog/log.hh"
#include "ark/pipeline/periodic_timer_config.hh"
#include "ark/pipeline/stage_interface.hh"
#include "ark/pipeline/subscriber.hh"

void SecondStage::initialize(ark::pipeline::StageInterface &interface)
{
    // Construct the publisher we will send stats over...
    stats_publisher_ = interface.add_publisher<SecondStageStats>("stats");

    // Next we want to create a subscriber to listen for messages from the publisher...
    {
        ark::pipeline::SubscriberConfiguration<MyString> config;

        config.callback = [this](const auto &message) { this->handle_message(message); };
        config.channel_name = "/my_strings";
        config.maximum_queue_depth = 1;

        interface.add_subscriber(config);
    }

    // Finally, add a 1Hz timer that publishes statistics
    {
        ark::pipeline::PeriodicTimerConfiguration config;

        config.callback = [this](const auto &/*unused*/) { this->publish_stats(); };
        config.name = "stats_publisher";
        config.rate = std::chrono::seconds{1};

        interface.add_timer(config);
    }
}

void SecondStage::handle_message(const std::shared_ptr<const MyString> &message)
{
    // Print a debug message out so we know that messages are being received
    LINFO("{} stage received message: '{}'", this->name(), message->message);

    // Increment our stats counter...
    messages_received_++;
}

void SecondStage::publish_stats()
{
    SecondStageStats stats;

    stats.messages_received = messages_received_;
    stats.stage_name = this->name();

    stats_publisher_->push(stats);
}

In this stage, we have four main things going on:

  • we create a subscriber to receive updates on /my_strings
  • we create a publisher for stats on the ‘stats’ channel
  • we create a timer to emit the number of messages received and stage name at 1Hz
  • we are printing debug information out using the Ark debug logger

One thing to call out is that our publisher is not an absolute path (ie, its just called stats). This allows us to have multiple instances of the same stage without conflicting with each other. We’ll go into this in more detail layer.

Compiling

Create a file ${APP_ROOT}/tutorial/second_stage/CMakeLists.txt:

add_library(second_stage second_stage.cc)

target_link_libraries(
    second_stage
    PUBLIC ark::pipeline tutorial_messages
    PRIVATE ark::debuglog)

Next, update the top-level CMakeLists.txt file so that your additions, in total, look like:

add_subdirectory(tutorial)
add_subdirectory(tutorial/first_stage)
add_subdirectory(tutorial/second_stage)
add_subdirectory(tutorial/pipeline)

Next, build your pipeline:

./make.sh

You should compile without errors.

Updating Pipeline

We have the second stage built now, but we need to update our pipeline to actually instantiate it. Let’s do that now.

Update ${APP_ROOT}/tutorial/pipeline/tutorial_pipeline.cc:

#include "tutorial/first_stage/first_stage.hh"
#include "tutorial/second_stage/second_stage.hh"

#include "ark/comms/stages/http_server_stage.hh"
#include "ark/main/main_offboard.hh"

int main(int argc, const char **argv)
{
    ark::pipeline::Pipeline pipeline;

    pipeline.add_stage<FirstStage>();
    pipeline.add_stage<ark::comms::HttpServerStage>();

    // Add the 'SecondStage' to your pipeline
    pipeline.add_stage<SecondStage>("SecondStage");

    return ark::main::execute_realtime_pipeline(argc, argv, std::move(pipeline));
}

One thing to note here is that we are giving our second stage a name. This will be important in the next tutorial.

For now, update the ${APP_ROOT}/tutorial/pipeline/CMakeLists.txt file to reference the second stage:

add_executable(tutorial_pipeline tutorial_pipeline.cc)

target_link_libraries(
    tutorial_pipeline
    PRIVATE first_stage second_stage ark::main_offboard ark::http_server_stage)

Build and run now. You should see something like this:

~/ark$ ./make.sh
~/ark$ ./build/tutorial_pipeline
(2022-02-02 14:02:08.732) [info] (http_server_stage.cc:874) HTTP Server is listening on "0.0.0.0:8080".
(2022-02-02 14:02:09.732) [info] (second_stage.cc:39) SecondStage stage received message: 'Hello world, it's now 1.000363529.'
(2022-02-02 14:02:10.732) [info] (second_stage.cc:39) SecondStage stage received message: 'Hello world, it's now 2.000363529.'
(2022-02-02 14:02:11.732) [info] (second_stage.cc:39) SecondStage stage received message: 'Hello world, it's now 3.000363529.'
(2022-02-02 14:02:12.732) [info] (second_stage.cc:39) SecondStage stage received message: 'Hello world, it's now 4.000363529.'
(2022-02-02 14:02:13.732) [info] (second_stage.cc:39) SecondStage stage received message: 'Hello world, it's now 5.000363529.'
(2022-02-02 14:02:14.732) [info] (second_stage.cc:39) SecondStage stage received message: 'Hello world, it's now 6.000363529.'

Let’s also inspect things with ark-spy:

~/ark$ ./build/ark-spy 
/debuglog
/my_strings
/scope_timing_reports
/stats

We see our /stats channel has shown up! Let’s look at it:

~/ark$ ./build/ark-spy -c /stats
(2022-02-02 14:03:27.751) [info] (websocket_client.cc:512) Established connection to '127.0.0.1:8080/stats'.
{
  "messages_received": 22,
  "stage_name": "SecondStage"
}
{
  "messages_received": 23,
  "stage_name": "SecondStage"
}
{
  "messages_received": 24,
  "stage_name": "SecondStage"
}

We can see that our stage name is populated correctly, and we’re incrementing the messages received at about 1Hz.

In the Part 6, we’ll look into better handling of errors.