Message Synchronizer Stage

This stage allows you to gather messages over some set of channels and then publish a SynchronizedMessageSet containing a set of messages (one from each channel) that all share some common pivot element.

For a common example, you can synchronize messages based on timestamp. There are parameters that allow you to specify the per-channel queue depth and the threshold that you want to use to consider items ‘synchronized’.

Configuration

A handful of parameters are available:

  • channel_names - The list of channels to synchronize. One message from each of the channels listed here will be included in the set. If one channel is not publishing, no set will be published.
  • synchronized_threshold - The threshold value for considering items synchronized. The unit depends on the value you are synchronizing (for example, nanoseconds).
  • per_channel_queue_depth - The number of messages to store on each channel when searching for a synchronized set. Large numbers consume more CPU/resources.

By default, this stage synchronized on publish time, which is almost certainly not what you want. You can easily override this by providing a callback to the constructor:

#pragma once

#include "ark/image/image.hh"
#include "ark/pipeline/envelope.hh"
#include "ark/pipeline/envelope_helpers.hh"
#include "ark/pipeline/stage.hh"
#include "ark/pipeline/stages/synchronize_messages_stage.hh"

int64_t extract_image_time(const ark::pipeline::AbstractEnvelopeCPtr &envelope)
{
    return static_cast<int64_t>(ark::pipeline::extract_envelope_payload<ark::image::Image>(envelope)->capture_time_ns);
}

class ImageSynchronizerStage : public ark::pipeline::MessageSynchronizerStage
{
public:
    ImageSynchronizerStage()
        : ark::pipeline::MessageSynchronizerStage("ImageSynchronizerStage", extract_image_time)
    {
    }
};

Note that you may want to hide the implementation in a C++ file to avoid longer compilation times.

Interacting

When a set is built, it will be published as a SynchronizedMessageSet. This contains the raw envelopes of the messages. You can either access those (they are organized by channel name), or you can access their payloads with the payload(channel_name) helper API.

std::shared_ptr<Image> image = sync_set.payload<Image>("/my/camera/image0");

These are published on the synchronized_set channel. They are not directly serializable.

Metrics

A statistics message is emitted periodically. This contains the number of sets produced, along with statistics for each channel (items dropped, added, matched).