Stages & Startup

A Stage is the unit of work that is added to a pipeline. This allows you to register a few elements:

  • Publishers, which allow you to push data onto the network
  • Subscribers, which allow you to consume published data
  • Periodic Timers, which are invoked periodically
  • Event Timers, which are invoked at time intervals of your choosing
  • Shutdown Tokens, which allow you to request a shutdown

You register each of these things through a StageInterface, which is your interface to the rest of the pipeline (and provided to you during the initialization phase).

Startup

There are two methods you need to know about for pipeline startup: initialize, and start.

When a pipeline is beginning execution, the process looks like:

  • Invokes initialize on all stages
  • Invokes the ’topology changed’ callback as pub/sub topology is established
  • Establishes pub/sub connections
  • Reports that the topology is fully established to all topology callbacks. Messages can now be published.
  • Invokes start on all stages. Messages will now be received.
  • Begins & completes execution
  • Invokes shutdown on all stages
  • Waits for all outstanding thread pool work to complete
  • Invokes finalize on all stages
  • Exits with a success code

All stages should do all of their registration in the initialize method.

Stages can register a ’topology change’ callback that will be invoked each time a stage makes a new publisher or subscriber. This can be used for things like logging and comms stages, to dynamically adjust configuration based on publishers/subscribers (or create their own publishers/subscribers).

Once the topology has been established, your ’topology change’ callback will be informed through an ark::pipeline::TopologyChangeEventType::TopologyFullyEstablished event. This allows you to do any addition pre-start work, knowing that no new connections will be created. This includes publishing messages.

You are allowed to change your pipeline topology (ie, add new publishers, subscribers, or timers) at any point until the topology is fully established (right before the fully established message is sent out, or more typically, right before start is invoked). In other words, you can change the topology from within the initialize function and the topology changed with an event type of ark::pipeline::ToplogyChangeEventType::NewConnectionsEstablished callbacks.

Next, start is invoked. This is where stages should start any threads, if they need to. This is typically where comms stages and other sources of messages begin sending out messages to the rest of the stack. When start has been invoked, you can publish messages (either from your start function, or the thread that the start function creates), and other stages will begin receiving them. Note that there may be some slight delay in receiving the messages, as the pipeline may still be invoking start and not able or not willing to service callbacks.

As an example, both the logging and comms stages use the topology changed callbacks to create their own publishers/subscribers, and then in start they begin their background threads.

The shutdown API is intended to inform data producers, such as sensor interfaces, or other stages with threads that produce data, that they should stop their threads and stop producing data. This gives the rest of the pipeline a chance to complete work on anything that is outstanding.

Once shutdown has completed, the pub/sub network will deliver all remaining messages, and then be halted (ie, you cannot publish or subscribe anymore). You can publish messages from callbacks during this period, but once all messages have been delivered, the system will be brought down.

The finalize API is intended to allow stages that are at the “end” of processing, such as log writers or amenders, a chance to finish writing out their log files. The shutdown and finalize APIs work together to allow all work to be completed before it is written out to disk, in an orderly fashion.

Note that initialize, start, and finalize all run sequentially over all stages, in the other in which they were added to the pipeline. shutdown will run in parallel, allowing you to do some blocking work (such as waiting on threads) without impacting the rest of the shutdown process.

Execution

Every subscriber and timer callback within a stage will be executed in a mutually-exclusive way. That is to say, callbacks within a stage will never execute simultaneously. Callbacks in other stages can execute simultaneously, however.

In other words, if you have three callbacks, StageOne::CallbackOne, StageOne::CallbackTwo, and StageTwo::CallbackThree, you will find that CallbackOne and CallbackTwo will never execute at the same time, but CallbackThree can execute when either CallbackOne or CallbackTwo is active.

This allows you to think of your stage as essentially single-threaded with respect to the pipeline.

Configuration

See the section on Pipeline Configuration for information and examples on how to configure stages.