Stages & Startup
A Stage is the unit of work that is added to a pipeline. This allows
you to register a few elements:
- Publishers, which allow you to push data onto the network
- Subscribers, which allow you to consume published data
- Periodic Timers, which are invoked periodically
- Event Timers, which are invoked at time intervals of your choosing
- Shutdown Tokens, which allow you to request a shutdown
You register each of these things through a StageInterface, which is your
interface to the rest of the pipeline (and provided to you during the
initialization phase).
Startup
There are two methods you need to know about for pipeline startup: initialize,
and start.
When a pipeline is beginning execution, the process looks like:
- Invokes
initializeon all stages - Invokes the ’topology changed’ callback as pub/sub topology is established
- Establishes pub/sub connections
- Reports that the topology is fully established to all topology callbacks. Messages can now be published.
- Invokes
starton all stages. Messages will now be received. - Begins & completes execution
- Invokes
shutdownon all stages - Waits for all outstanding thread pool work to complete
- Invokes
finalizeon all stages - Exits with a success code
All stages should do all of their registration in the initialize method.
Stages can register a ’topology change’ callback that will be invoked each time a stage makes a new publisher or subscriber. This can be used for things like logging and comms stages, to dynamically adjust configuration based on publishers/subscribers (or create their own publishers/subscribers).
Once the topology has been established, your ’topology change’ callback will be
informed through an ark::pipeline::TopologyChangeEventType::TopologyFullyEstablished event.
This allows you to do any addition pre-start work, knowing that no new connections will
be created. This includes publishing messages.
You are allowed to change your pipeline topology (ie, add new publishers, subscribers,
or timers) at any point until the topology is fully established (right before the
fully established message is sent out, or more typically, right before start is invoked).
In other words, you can change the topology from within the initialize function and the
topology changed with an event type of ark::pipeline::ToplogyChangeEventType::NewConnectionsEstablished
callbacks.
Next, start is invoked. This is where stages should start any threads, if
they need to. This is typically where comms stages and other sources of messages
begin sending out messages to the rest of the stack. When start has been invoked,
you can publish messages (either from your start function, or the thread
that the start function creates), and other stages will begin receiving them. Note that
there may be some slight delay in receiving the messages, as the pipeline may still
be invoking start and not able or not willing to service callbacks.
As an example, both the logging and comms stages use the topology changed callbacks
to create their own publishers/subscribers, and then in start they begin their
background threads.
Notice
If any stage’sinitialize or start throws, the entire pipeline
is stopped. shutdown and ‘finalize’ will be invoked on any stage which
successfully initialized (but not on any stage that did not initialize, successfully
or otherwise).
The shutdown API is intended to inform data producers, such as sensor interfaces,
or other stages with threads that produce data, that they should stop their threads
and stop producing data. This gives the rest of the pipeline a chance to complete
work on anything that is outstanding.
Once shutdown has completed, the pub/sub network will deliver all remaining messages,
and then be halted (ie, you cannot publish or subscribe anymore). You can publish messages
from callbacks during this period, but once all messages have been delivered, the system
will be brought down.
The finalize API is intended to allow stages that are at the “end” of processing,
such as log writers or amenders, a chance to finish writing out their log files.
The shutdown and finalize APIs work together to allow all work to be completed
before it is written out to disk, in an orderly fashion.
Note that initialize, start, and finalize all run sequentially over all stages,
in the other in which they were added to the pipeline. shutdown will run in parallel,
allowing you to do some blocking work (such as waiting on threads) without impacting the
rest of the shutdown process.
Notice
If any stage blocks in theirinitialize method, the entire pipeline will stall.
It’s not advisable to do long blocking work or I/O in your initialize method as a
result.
Notice
Messages published during theinitialize phase will cause the pipeline to fail
with an error. You must only publish from threads, timers, or subscriber callbacks. You
can begin publishing messages in the topology changed callback, once the topology is
fully established (or, failing that, after start has been invoked).
Execution
Every subscriber and timer callback within a stage will be executed in a mutually-exclusive way. That is to say, callbacks within a stage will never execute simultaneously. Callbacks in other stages can execute simultaneously, however.
In other words, if you have three callbacks, StageOne::CallbackOne, StageOne::CallbackTwo, and
StageTwo::CallbackThree, you will find that CallbackOne and CallbackTwo will never execute
at the same time, but CallbackThree can execute when either CallbackOne or CallbackTwo is active.
This allows you to think of your stage as essentially single-threaded with respect to the pipeline.
Configuration
See the section on Pipeline Configuration for information and examples on how to configure stages.