Stages & Startup
Stage is the unit of work that is added to a pipeline. This allows
you to register a few elements:
- Publishers, which allow you to push data onto the network
- Subscribers, which allow you to consume published data
- Periodic Timers, which are invoked periodically
- Event Timers, which are invoked at time intervals of your choosing
- Shutdown Tokens, which allow you to request a shutdown
You register each of these things through a
StageInterface, which is your
interface to the rest of the pipeline (and provided to you during the
There are two methods you need to know about for pipeline startup:
When a pipeline is beginning execution, the process looks like:
initializeon all stages
- Invokes the ’topology changed’ callback as pub/sub topology is established
- Establishes pub/sub connections (messages can now flow)
- Reports that the topology is fully established to all topology callbacks
starton all stages
- Begins & completes execution
shutdownon all stages
- Waits for all outstanding thread pool work to complete
finalizeon all stages
- Exits with a success code
All stages should do all of their registration in the
Stages can register a ’topology change’ callback that will be invoked each time a stage makes a new publisher or subscriber. This can be used for things like logging and comms stages, to dynamically adjust configuration based on publishers/subscribers (or create their own publishers/subscribers).
Once the topology has been established, your ’topology change’ callback will be
informed through an
This allows you to do any addition pre-start work, knowing that no new connections will
You are allowed to change your pipeline topology (ie, add new publishers, subscribers,
or timers) at any point until the topology is fully established (right before the
fully established message is sent out, or more typically, right before
start is invoked).
In other words, you can change the topology from within the
initialize function and the
topology changed with an event type of
start is invoked. This is where stages should start any threads, if
they need to. This is typically where comms stages and other sources of messages
begin sending out messages to the rest of the stack. Once
start has been invoked,
you can begin publishing messages (either from your
start function, or the thread
start function creates).
As an example, both the logging and comms stages use the topology changed callbacks
to create their own publishers/subscribers, and then in
start they begin their
NoticeIf any stage’s
startthrows, the entire pipeline is stopped.
shutdownand ‘finalize’ will be invoked on any stage which successfully initialized (but not on any stage that did not initialize, successfully or otherwise).
shutdown API is intended to inform data producers, such as sensor interfaces,
or other stages with threads that produce data, that they should stop their threads
and stop producing data. This gives the rest of the pipeline a chance to complete
work on anything that is outstanding.
shutdown has completed, the pub/sub network will deliver all remaining messages,
and then be halted (ie, you cannot publish or subscribe anymore). You can publish messages
from callbacks during this period, but once all messages have been delivered, the system
will be brought down.
finalize API is intended to allow stages that are at the “end” of processing,
such as log writers or amenders, a chance to finish writing out their log files.
finalize APIs work together to allow all work to be completed
before it is written out to disk, in an orderly fashion.
finalize all run sequentially over all stages,
in the other in which they were added to the pipeline.
shutdown will run in parallel,
allowing you to do some blocking work (such as waiting on threads) without impacting the
rest of the shutdown process.
NoticeIf any stage blocks in their
initializemethod, the entire pipeline will stall. It’s not advisable to do long blocking work or I/O in your
initializemethod as a result.
NoticeMessages published during the
initializephase will cause the pipeline to fail with an error. You must only publish from threads, timers, or subscriber callbacks.
Every subscriber and timer callback within a stage will be executed in a mutually-exclusive way. That is to say, callbacks within a stage will never execute simultaneously. Callbacks in other stages can execute simultaneously, however.
In other words, if you have three callbacks,
StageTwo::CallbackThree, you will find that
CallbackTwo will never execute
at the same time, but
CallbackThree can execute when either
CallbackTwo is active.
This allows you to think of your stage as essentially single-threaded with respect to the pipeline.
See the section on Pipeline Configuration for information and examples on how to configure stages.