Message Synchronizer Stage
This stage allows you to gather messages over some set of channels and then publish
a SynchronizedMessageSet
containing a set of messages (one from each channel) that
all share some common pivot element.
For a common example, you can synchronize messages based on timestamp. There are parameters that allow you to specify the per-channel queue depth and the threshold that you want to use to consider items ‘synchronized’.
Configuration
A handful of parameters are available:
channel_names
- The list of channels to synchronize. One message from each of the channels listed here will be included in the set. If one channel is not publishing, no set will be published.synchronized_threshold
- The threshold value for considering items synchronized. The unit depends on the value you are synchronizing (for example, nanoseconds).per_channel_queue_depth
- The number of messages to store on each channel when searching for a synchronized set. Large numbers consume more CPU/resources.
By default, this stage synchronized on publish time, which is almost certainly not what you want. You can easily override this by providing a callback to the constructor:
#pragma once
#include "ark/image/image.hh"
#include "ark/pipeline/envelope.hh"
#include "ark/pipeline/envelope_helpers.hh"
#include "ark/pipeline/stage.hh"
#include "ark/pipeline/stages/synchronize_messages_stage.hh"
int64_t extract_image_time(const ark::pipeline::AbstractEnvelopeCPtr &envelope)
{
return static_cast<int64_t>(ark::pipeline::extract_envelope_payload<ark::image::Image>(envelope)->capture_time_ns);
}
class ImageSynchronizerStage : public ark::pipeline::MessageSynchronizerStage
{
public:
ImageSynchronizerStage()
: ark::pipeline::MessageSynchronizerStage("ImageSynchronizerStage", extract_image_time)
{
}
};
Note that you may want to hide the implementation in a C++ file to avoid longer compilation times.
Interacting
When a set is built, it will be published as a SynchronizedMessageSet
. This contains the
raw envelopes of the messages. You can either access those (they are organized by channel
name), or you can access their payloads with the payload(channel_name)
helper API.
std::shared_ptr<Image> image = sync_set.payload<Image>("/my/camera/image0");
These are published on the synchronized_set
channel. They are not directly serializable.
Metrics
A statistics message is emitted periodically. This contains the number of sets produced, along with statistics for each channel (items dropped, added, matched).