Part 4 (Serialization)

In this section of the tutorial, we’ll learn about making an rbuf object, and change our publisher to publish those objects, so that we can receive our published messages over comms.

Creating an rbuf

The native serialization format for Ark is rbuf, a high performance serialization format with backwards/forwards compatibility.

Let’s create our first schema. Place this in ${APP_ROOT}/tutorial/messages.rbuf:

schema MyString
{
    steady_time_point now;
    string message;
}

Let’s then make a CMakeLists.txt file to compile it:

add_rbuf(tutorial_messages messages.rbuf)

And update our top-level ${APP_ROOT} cmake to build it. Make sure to place this above your other additions, such that altogether they now look like:

add_subdirectory(tutorial)
add_subdirectory(tutorial/first_stage)
add_subdirectory(tutorial/pipeline)

You can now build with:

./make.sh

At this point, we have a messages library compiled. Let’s update our stage to publish messages.

Publishing Messages

Let’s go back to your ${APP_ROOT}/tutorials/first_stage/first_stage.cc file and update it to send serializable rbuf messages:

#include "tutorial/first_stage/first_stage.hh"
#include "tutorial/messages.hh"

#include "ark/pipeline/publisher.hh"
#include "ark/pipeline/periodic_timer_config.hh"
#include "ark/pipeline/stage_interface.hh"

#include "fmt/format.h"

void FirstStage::initialize(ark::pipeline::StageInterface &interface)
{
    // Note that we change the type from std::string to our message type.
    auto publisher = interface.add_publisher<MyString>("/my_strings");

    {
        ark::pipeline::PeriodicTimerConfiguration config;

        config.name = "emit_string";
        config.rate = std::chrono::seconds{1};
        config.callback = [publisher](const auto &now) {
            // Update the callback body to populate your schema with the current
            // time and a unique message.
            auto time_s = std::chrono::duration<double>(now.time_since_epoch());

            MyString string;

            string.now = now;
            string.message = fmt::format("Hello world, it's now {}.", time_s.count());

            publisher->push(string);
        };

        interface.add_timer(config);
    }
}

You will need to update your ${APP_ROOT}/tutorials/first_stage/CMakeLists.txt file again to tell it to link against your messages:

add_library(first_stage first_stage.cc)

target_link_libraries(first_stage
    PUBLIC ark::pipeline
    PRIVATE fmt::fmt tutorial_messages)

Now build and run:

./make.sh
./build/tutorial_pipeline

Next, switch back to your other terminal, and let’s try to spy on the channel with ark-spy again:

~/ark$ ./build/ark-spy -c /my_strings
(2022-02-02 11:09:36.936) [info] (websocket_client.cc:512) Established connection to '127.0.0.1:8080/my_strings'.
{
  "message": "Hello world, it's now 32.000547229.",
  "now": 32000547229
}
{
  "message": "Hello world, it's now 33.000547229.",
  "now": 33000547229
}
{
  "message": "Hello world, it's now 34.000547229.",
  "now": 34000547229
}
{
  "message": "Hello world, it's now 35.000547229.",
  "now": 35000547229
}

You should see something like the above! It will print out a JSON form of your message, and you should finally be able to see your message and a timestamp.

Next, we’ll move on to adding a subscriber for your message, in Part 5.