Pipeline Stages

You can use the ark::python::PythonStage class to spawn a stage in a C++ pipeline that will in turn launch your Python stage as a separate process. The stage follows all of the same rules as a normal C++ Stage – it works properly within the constraints of the determinism framework, and obeys the real time scheduler and priorities when onboard the vehicle.

Each Python stage gets its own process, allowing you to use different virtual environments for each stage, and allowing you to avoid GIL issues. Note that all communications to and from this stage must go through serialization, which can incur a latency/performance penalty.

Constructing a Python stage is fairly simple:

import pypipeline
import pyserialize

from ark_example_messages import *

class ExamplePythonStage(pypipeline.Stage):
    def __init__(self):
        pass

    def initialize(self, interface):
        # Initialize member variables in your stage...
        self.my_member_variable = 0

        # Construct publishers...
        self.publisher = interface.add_publisher(ExampleMessage, "output")

        # Add a single subscriber that will invoke a callback when it gets an ExampleMessage
        interface.add_subscriber(ExampleMessage, "input", self.handle_message)

        # Add a single timer to run at 1Hz and publish
        interface.add_periodic_timer("output_timer", self.handle_output_timer, 1.0)

    def start(self, interface):
        pass

    def interrupt(self):
        pass

    def shutdown(self):
        pass

    def finalize(self):
        pass

    def handle_message(self, message):
        # Print that we got a message. Note that stdout is redirected to the
        # pipeline's stdout.
        print("Received message on /test/1: {}".format(message))

    def handle_output_timer(self, pipeline_now):
        # Publish a message
        message = ExampleMessage()

        self.publisher.push(response)

# Register the stage with the pipeline.
pypipeline.connect_and_execute_stage(ExamplePythonStage())

On the C++ side, you need to add the stage to your pipeline (the normal way):

ark::pipeline::Pipeline pipeline;

pipeline.add_stage<ark::python::PythonStage>("MyNamedPythonStage");

And you likely want a configuration in your config package YAML:

configs:
  MyNamedPythonStage:
    overrides:
      module_name: "my_py_file_name"
      virtualenv_name: "name of ark_python_venv target"
      python_path: "path_to_module:additional_paths:etc..."

When building the pipeline, make sure you add your virtual environment as a dependency!

Note that while start is invoked as part of the initialization procedure, it is not yet possible to publish messages from threads reliably. You will instead need to publish only from a subscriber or timer callback.

Stage Interface

The stage interface right now is relatively bare-boned compared to the C++ equivalent. It supports the following routines:

  • add_publisher(MessageType, channel_name)

    This adds a publisher of the given message type and channel name. Follows normal stage rules for namespacing.

  • add_subscriber(MessageType, channel_name, callback)

    Adds a subscriber for the given message type on the channel, following normal namespacing rules. When each message is received, a callback will be invoked. Only supports callbacks that take the message as a parameter, not the message and pipeline time.

  • add_timer(timer_name, callback, rate_in_seconds)

    Creates a periodic timer that runs at the given rate, invoking the callback each time the rate expires. Follows normal PeriodicTimer rules.

  • stage_name()

    Returns the name of your stage, as configured in your pipeline.

  • pipeline_identifier()

    Returns the identifier of the running pipeline.

  • get_stage_config()

    Returns the deserialized configuration for a particular stage. This is similar to the C++ equivalent of ark::pipeline::get_stage_config. This can only be invoked during the initialize phase.

  • add_shutdown_token()

    Adds a shutdown token (mimics the C++ API). The token that is retrieved can be used to request that the pipeline exits. Can only be invoked during the initialize phase.

We expect this interface to build out to match the C++ interface over time.

Other Ark Bindings

There are presently no other Python bindings for Ark. Please feel free to request them as necessary.