Stages & Startup
A Stage
is the unit of work that is added to a pipeline. This allows
you to register a few elements:
- Publishers, which allow you to push data onto the network
- Subscribers, which allow you to consume published data
- Periodic Timers, which are invoked periodically
- Event Timers, which are invoked at time intervals of your choosing
- Shutdown Tokens, which allow you to request a shutdown
You register each of these things through a StageInterface
, which is your
interface to the rest of the pipeline (and provided to you during the
initialization phase).
Startup
There are two methods you need to know about for pipeline startup: initialize
,
and start
.
When a pipeline is beginning execution, the process looks like:
- Invokes
initialize
on all stages - Invokes the ’topology changed’ callback as pub/sub topology is established
- Establishes pub/sub connections
- Reports that the topology is fully established to all topology callbacks. Messages can now be published.
- Invokes
start
on all stages. Messages will now be received. - Begins & completes execution
- Invokes
shutdown
on all stages - Waits for all outstanding thread pool work to complete
- Invokes
finalize
on all stages - Exits with a success code
All stages should do all of their registration in the initialize
method.
Stages can register a ’topology change’ callback that will be invoked each time a stage makes a new publisher or subscriber. This can be used for things like logging and comms stages, to dynamically adjust configuration based on publishers/subscribers (or create their own publishers/subscribers).
Once the topology has been established, your ’topology change’ callback will be
informed through an ark::pipeline::TopologyChangeEventType::TopologyFullyEstablished
event.
This allows you to do any addition pre-start work, knowing that no new connections will
be created. This includes publishing messages.
You are allowed to change your pipeline topology (ie, add new publishers, subscribers,
or timers) at any point until the topology is fully established (right before the
fully established message is sent out, or more typically, right before start
is invoked).
In other words, you can change the topology from within the initialize
function and the
topology changed
with an event type of ark::pipeline::ToplogyChangeEventType::NewConnectionsEstablished
callbacks.
Next, start
is invoked. This is where stages should start any threads, if
they need to. This is typically where comms stages and other sources of messages
begin sending out messages to the rest of the stack. When start
has been invoked,
you can publish messages (either from your start
function, or the thread
that the start
function creates), and other stages will begin receiving them. Note that
there may be some slight delay in receiving the messages, as the pipeline may still
be invoking start
and not able or not willing to service callbacks.
As an example, both the logging and comms stages use the topology changed callbacks
to create their own publishers/subscribers, and then in start
they begin their
background threads.
Notice
If any stage’sinitialize
or start
throws, the entire pipeline
is stopped. shutdown
and ‘finalize’ will be invoked on any stage which
successfully initialized (but not on any stage that did not initialize, successfully
or otherwise).
The shutdown
API is intended to inform data producers, such as sensor interfaces,
or other stages with threads that produce data, that they should stop their threads
and stop producing data. This gives the rest of the pipeline a chance to complete
work on anything that is outstanding.
Once shutdown
has completed, the pub/sub network will deliver all remaining messages,
and then be halted (ie, you cannot publish or subscribe anymore). You can publish messages
from callbacks during this period, but once all messages have been delivered, the system
will be brought down.
The finalize
API is intended to allow stages that are at the “end” of processing,
such as log writers or amenders, a chance to finish writing out their log files.
The shutdown
and finalize
APIs work together to allow all work to be completed
before it is written out to disk, in an orderly fashion.
Note that initialize
, start
, and finalize
all run sequentially over all stages,
in the other in which they were added to the pipeline. shutdown
will run in parallel,
allowing you to do some blocking work (such as waiting on threads) without impacting the
rest of the shutdown process.
Notice
If any stage blocks in theirinitialize
method, the entire pipeline will stall.
It’s not advisable to do long blocking work or I/O in your initialize
method as a
result.
Notice
Messages published during theinitialize
phase will cause the pipeline to fail
with an error. You must only publish from threads, timers, or subscriber callbacks. You
can begin publishing messages in the topology changed callback, once the topology is
fully established (or, failing that, after start
has been invoked).
Execution
Every subscriber and timer callback within a stage will be executed in a mutually-exclusive way. That is to say, callbacks within a stage will never execute simultaneously. Callbacks in other stages can execute simultaneously, however.
In other words, if you have three callbacks, StageOne::CallbackOne
, StageOne::CallbackTwo
, and
StageTwo::CallbackThree
, you will find that CallbackOne
and CallbackTwo
will never execute
at the same time, but CallbackThree
can execute when either CallbackOne
or CallbackTwo
is active.
This allows you to think of your stage as essentially single-threaded with respect to the pipeline.
Configuration
See the section on Pipeline Configuration for information and examples on how to configure stages.