Orbitalis is a distributed, event-based micro-kernel library for Python designed to simplify the construction of loosely coupled, scalable, and modular systems.
At its core, Orbitalis provides a lightweight yet powerful foundation for building event-driven applications using an event bus mechanism. It integrates natively with Busline, a flexible and efficient event bus library that handles message distribution across system components, whether local or distributed.
If you don't want to read the user guide, you can skip to Practical Example.
flowchart LR
C1 --- EB1
C1 --- EB2
EB1 --- P3
EB1 --- P1
EB2 --- P2
EB2 --- P4
C1@{ shape: circle, label: "Core" }
P1@{ shape: stadium, label: "Plugin" }
P2@{ shape: stadium, label: "Plugin" }
P3@{ shape: stadium, label: "Plugin" }
P4@{ shape: stadium, label: "Plugin" }
EB1@{ shape: das, label: "MQTT Broker" }
EB2@{ shape: das, label: "Local EventBus" }
In this section we will explain how to use Orbitalis. If you want to know more details about this library, please read the Advance Guide.
Every time-based values are expressed in seconds, because asyncio.sleep is used.
pip install orbitalis
If you know what Busline is, you can skip this section.
Busline is an agnostic asynchronous pub/sub library, which represents a good starting point to implement a wide set of application thanks to its pub/sub abstract layer.
It natively supports MQTT and Local event bus and you should know how to create pub/sub clients in order to allow your components to share messages.
We advice to read Busline documentation before continue.
Orbitalis follows the micro-kernel (a.k.a. Plugin) architectural pattern, where a minimal Core (the kernel) handles only the essential system functions—like loading and managing modules, and routing events—while all additional features are implemented as independent Plugins that interact via the event bus.
This design brings several advantages:
- Separation of Concerns: Each module is self-contained and focused on a specific task or domain
- Extensibility: New features can be added without modifying the core, reducing the risk of breaking existing functionality
- Flexibility: Modules can be enabled, disabled, or replaced at runtime
- Maintainability: Smaller code units are easier to test, debug, and reuse
- Scalability: Modules can be distributed across processes or machines and communicate via events
flowchart LR
C1 --> P1
C1 <--> P3
C2 <--> P2
P3 <--> C2
P4 --> C2
C1@{ shape: circle, label: "Core 1" }
C2@{ shape: circle, label: "Core 2" }
P1@{ shape: stadium, label: "Plugin 1" }
P2@{ shape: stadium, label: "Plugin 2" }
P3@{ shape: stadium, label: "Plugin 3" }
P4@{ shape: stadium, label: "Plugin 4" }
Orbitalis allows to have more Cores and more Plugins, even in different domains thanks to Busline capabilities, which can be instantiate at runtime thanks to a powerful handshake mechanism. In fact, for example, we can have some plugins connected with MQTT and other with a Local event bus at the same time. This allows you a powerful management of your components.
In other words, Orbitalis allows you to start cores and plugins, connect them together and execute plugin operations. Cores and plugins can be started in any time and connections are created based on pre-defined policies.
Messages used by Orbitalis are Avro messages because we need input and output schemas.
flowchart LR
C1 <--> EB1
C1 <--> EB2
C2 <--> EB1
C2 <--> EB2
EB1 <--> P3
EB1 <--> P1
EB2 <--> P2
EB2 <--> P4
C1@{ shape: circle, label: "Core 1" }
C2@{ shape: circle, label: "Core 2" }
P1@{ shape: stadium, label: "Plugin 1" }
P2@{ shape: stadium, label: "Plugin 2" }
P3@{ shape: stadium, label: "Plugin 3" }
P4@{ shape: stadium, label: "Plugin 4" }
EB1@{ shape: das, label: "MQTT Broker" }
EB2@{ shape: das, label: "Local EventBus" }
Orbiter is the base class which provides common capabilities to components.
Therefore, you can use the provided methods both in Cores and Plugins.
It manages pending requests, connections, keepalive and connection close procedure. In addiction, it has useful shared methods and main loop.
classDiagram
Orbiter <|-- Core
Orbiter <|-- Plugin
Main public attributes:
identifieris the unique identifiereventbus_clientis a Busline client, used to send eventsdiscover_topicspecifies topic used to send discover messagesraise_exceptionsifTrue, exceptions are raised, otherwise they are managed by try/catchloop_intervalspecifies how often the loop iterations are called (it is a minimum value, because maximum depends on weight of operations in loop)with_loopset toFalseif you don't want the loop (care about what loop do)close_connection_if_unused_afterif not None, it specifies how many seconds can pass without use a connection, then it is closedpending_requests_expire_afterif not None, it specifies how many seconds can pass before that a pending request is discardedconsider_others_dead_afterstates how many seconds can pass before that a remote orbiter is considered dead if no keepalive arrivessend_keepalive_before_timelimitstates how many seconds before a keepalive message is sent that other remote orbiter considers current orbiter deadgraceful_close_timeout: states how many seconds a graceful close connection can be pendingnew_connection_added_event: notify you when a new connection is createdremote_identifiers: return all remote connected orbiter identifiers
Main hooks:
_get_on_close_data: used to obtain data to send on close connection, by default None is returned_on_starting: called before starting_internal_start: actual implementation to start the orbiter_on_started: called after starting_on_stopping: called before stopping_internal_stop: actual implementation to stop the orbiter_on_stopped: called after stopping_on_promote_pending_request_to_connection: called before promotion_on_keepalive_request: called on keepalive request, before response_on_keepalive: called on inbound keepalive_on_graceless_close_connection: called before graceless close connection request is sent_on_close_connection: called when a connection is closed_on_graceful_close_connection: called before sending graceful close connection request_on_loop_start: called on loop start_on_new_loop_iteration: called before every loop iteration_on_loop_iteration_end: called at the end of every loop iteration_on_loop_iteration: called during every loop iteration
Main methods:
retrieve_connections: retrieve all connections which satisfy querydiscard_expired_pending_requests: remove expired pending requests and return total amount of discarded requestsclose_unused_connections: send a graceful close request to all remote orbiter if connection was unused based onclose_connection_if_unused_afterforce_close_connection_for_out_to_timeout_pending_graceful_close_connection: send graceless close connection based ongraceful_timeoutif a connection is in close pending due to graceful close connectionupdate_acquaintances: update knowledge about keepalive request topics, keepalive topics and dead timehave_seen: update last seen for remote orbitersend_keepalivesend_keepalive_requestsend_all_keepalive_based_on_connections: send keepalive messages to all remote orbiters which have a connection with this orbitersend_keepalive_based_on_connections_and_threshold: send keepalive messages to all remote orbiters which have a connection with this orbiter only ifsend_keepalive_before_timelimitseconds away from being considered dead this orbitersend_graceless_close_connection: send a graceless close connection request to specified remote orbiter, therefore, self side connection will be closed immediatelysend_graceful_close_connection: send a graceful close connection request to specified remote orbiter, therefore self side connection is not close immediately, but ACK is waited_close_self_side_connection: close local connection with remote orbiter, therefore only this orbiter will no longer be able to use connection. Generally, a close connection request was sent before this method call.
flowchart TD
n2["_on_starting()"] --> n3["_internal_start()"]
n3 --> n4["_on_started()"]
n13["Orbiter Activity"] -- stop() --> n16["_on_stopping()"]
n14["Start Orbiter"] -- start() --> n2
n16 --> n17["_internal_stop()"]
n17 --> n18["_on_stopped()"]
n18 --> n15["Orbiter Stopped"]
s1["Orbiter Loop"] --> n15
n4 -- start_loop() --> s1
n3 --> n13
n14@{ shape: rect}
style n13 fill:#BBDEFB
style n14 fill:#BBDEFB
style n15 fill:#BBDEFB
style s1 fill:#BBDEFB
Every orbiter has an internal loop which performs periodically operations. Automatic loop initialization can be avoided setting with_loop=False.
Loop is stopped when stop method is called, but you can stop it prematurely using stop_loop method.
If you want to pause loop, you can use pause_loop and resume_loop.
Additionally to hooks, the following operations (already discussed above) are executed in parallel:
_on_loop_iterationclose_unused_connectionsdiscard_expired_pending_requests:force_close_connection_for_out_to_timeout_pending_graceful_close_connectionsend_keepalive_based_on_connections_and_threshold
Hooks:
_on_loop_start: called on loop start_on_new_loop_iteration: called before every loop iteration_on_loop_iteration_end: called at the end of every loop iteration_on_loop_iteration: called during every loop iteration
flowchart TD
subgraph s1["Orbiter Loop"]
n6["_on_loop_start()"]
n7["_on_new_loop_iteration()"]
n8["_on_loop_iteration()"]
n9["close_unused_connections()"]
n10["discard_expired_pending_requests()"]
n11["force_close_connection_for_out_to_timeout_pending_graceful_close_connection()"]
n12["send_keepalive_based_on_connections_and_threshold()"]
end
n6 --> n7
n7 --> n8
n8 --> n9
n9 --> n10
n10 --> n11
n11 --> n12
n12 --> n7
n2["_on_starting()"] --> n3["_internal_start()"]
n3 --> n4["_on_started()"]
n4 --> n13["Orbiter Activity"]
n13 -- stop() --> n16["_on_stopping()"]
n14["Start Orbiter"] -- start() --> n2
n16 --> n17["_internal_stop()"]
n17 --> n18["_on_stopped()"]
n18 --> n15["Orbiter Stopped"]
s1 --> n15
n3 -- start_loop() --> s1
n14@{ shape: rect}
style n13 fill:#BBDEFB
style n14 fill:#BBDEFB
style n15 fill:#BBDEFB
style s1 fill:#BBDEFB
Orbitalis implemented different communication protocol which are used to ensure that orbiters can share information and connect them together.
If you only use this library, you should not care about their implementation details, because every was made by us. Instead, if you want to know more or if you want to contribute, please read the Advance Guide.
To allow cores and plugins to create connections a handshake mechanism was implemented based on how DHCP works.
There are 4 steps:
- Discover: message used by cores to notify plugins of their presence and to ask operations connections, core provides a full set of pluggable operations with related information
- Offer: Message used by plugins to response to discover message, providing their base information and a list of offered operations. List of offered operations can be smaller than fullset provided by discover
- Reply
- Request: message used by core to formally request an operation. Every operation has own request. Core provides additional information to finalize the connection
- Reject: message used by core to formally reject an operation (e.g., not needed anymore). Every operation has own reject
- Response
- Confirm: message used by plugins to confirm the connection creation
- OperationNoLongerAvailable: message used by plugins to notify core that operation is no longer available
When an Pending Request is confirmed, then a Connection is generated (and the related request is removed).
sequenceDiagram
Core->>Plugin: Discover
Plugin->>Core: Offer
par Reply for each offered operation
alt is still needed
Core->>Plugin: Request
else not needed anymore
Core->>Plugin: Reject
end
end
par Response for each requested operation
alt is still available
Plugin->>Core: Confirm
else operation no longer available
Plugin->>Core: OperationNoLongerAvailable
end
end
Orbiters which are related to the same context must use the same discover topic (by default $handshake.discover).
It can be set using discover_topic attribute.
Other topics are automatically generated, but generation can be modified overriding _build_* methods.
We must notice that discover and offer messages are also used to share information about presence of cores/plugins. Theoretically, this means that a plugin may send an offer without an explicit discover, for example if a connection is closed and a slot for an operation becomes available. Anyway, this is not performed in current implementation.
Connection is a link between a core and a plugin related to a single operation, therefore more connections can be present between same core and plugin.
Regularly, connections are created after a handshake procedure, promoting a Pending Request. Generally you don't need to know how to create a connection, but if you want, you can read the Advance Guide.
flowchart TD
P["Pending Request"] --> C["Connection"]
Connection class store all information about a connection:
operation_nameremote_identifierincoming_close_connection_topic: topic on which close connection request arrives from remote orbiterclose_connection_to_remote_topic: topic which orbiter must use to close connection with remote orbiterlock:asyncio.Lock, used to synchronize connection usesinput: acceptableInputoutput: sendableOutputinput_topicoutput_topicsoft_closed_at: used during graceful close connectioncreated_at: connection creation datetimelast_use: datetime of last use
last_use must be updated manually, if you want to update it, using touch method on each connection (remember to lock the connection).
For example, when a new event arrives:
# Following a custom plugin (MyPlugin) with an operation "my_operation"
@dataclass
class MyPlugin(Plugin):
@operation(
# operation name
name="my_operation",
# operation is fed with Int64Message messages (integer)
input=Input.from_message(Int64Message),
# operation doesn't send any output
output=Output.no_output()
)
async def its_event_handler(self, topic: str, event: Event[...]):
# Retrieve connections related to the input topic and the operation name
connections = self.retrieve_connections(
input_topic=topic,
operation_name="my_operation"
)
# Touch each operation to update `last_use`
for connection in connections:
async with connection.lock: # lock connection to be async-safe
connection.touch()Fortunately, there is an useful method called retrieve_and_touch_connections which encapsulates exactly that code:
# Same plugin of previous example,
# but in which retrieve_and_touch_connections is used
@dataclass
class MyPlugin(Plugin):
@operation(
name="my_operation", # operation's name, i.e. "my_operation"
input=Input.from_message(Int64Message), # operation's input, i.e. an integer number
output=Output.no_output() # operation's output, i.e. no output are produced
)
async def its_event_handler(self, topic: str, event: Event[...]):
# Retrieves connections related to this operation and touches them
connections = await self.retrieve_and_touch_connections(
input_topic=topic,
operation_name="my_operation"
)
# ...operation's logicOrbiter connections are stored in _connections attribute.
You can manage them using following methods:
_add_connection_remove_connection(does not lock automatically the connection)_connections_by_remote_identifier: retrieves connection based on remote identifierretrieve_connections: to query connections_find_connection_or_fail: find the connection based onoperation_nameandinput_topicclose_unused_connectionsbased onclose_connection_if_unused_afterandlast_use
Important
Remember that potentially you can have more connections associated with the same pair input topic and operation name, based on how you have defined builder methods for input/output topics. This is the reason behind the fact that you don't receive a single connection by default as an input parameter. Anyway, if you are sure that only one connection is present, you can use _find_connection_or_fail as specified above.
An orbiter (Core or Plugin) can close a connection in every moment. There are two ways to close a connection: Graceless or Graceful.
In the following example we suppose an orbiter "my_orbiterA" that closes connection with another orbiter "my_orbiterB" related to operation "my_operation".
In Graceless procedure the orbiter sends a GracelessCloneConnectionMessage to remote one and close connection immediately:
sequenceDiagram
Orbiter A->>Orbiter A: Close connection
Orbiter A->>Orbiter B: Graceless close connection
Orbiter B->>Orbiter B: Close connection
# `orbiterA` close connection immediately
# with "my_orbiterB" related to operation "my_operation"
orbiterA.send_graceless_close_connection(
remote_identifier="my_orbiterB",
operation_name="my_operation"
)In Graceful the orbiter sends a GracefulCloseConnectionMessage to remote one. Remote orbiter is able to perform some operations before connection is actually closed. Remote orbiter must send a CloseConnectionAckMessage, after that connection is closed. If graceful_close_timeout is not None is used to send graceless close connection if ACK is not sent.
You can force timeout check using force_close_connection_for_out_to_timeout_pending_graceful_close_connection method.
sequenceDiagram
Orbiter A->>Orbiter B: Graceful close connection
activate Orbiter A
Orbiter B->>Orbiter B: Some operations
Orbiter B->>Orbiter B: Close connection
Orbiter B->>Orbiter A: Close connection ACK
deactivate Orbiter A
Orbiter A->>Orbiter A: Close connection
# `orbiterA` close connection gracefully
# with "my_orbiterB" related to operation "my_operation"
orbiterA.send_graceful_close_connection(
remote_identifier="my_orbiterB",
operation_name="my_operation"
)When all connections with a remote orbiter are closed, orbiter unsubscribes itself from topics in _unsubscribe_on_full_close_bucket field.
Both during graceful or graceless method call, you can provide data (bytes) which will be sent when connection is actually closed.
For example, considering graceful procedure:
orbiterA.send_graceful_close_connection(
remote_identifier="my_orbiterB", # identifier of orbiter related to connection to close
operation_name="my_operation",
data=bytes(...) # serialize your data
)Hooks:
_on_graceless_close_connection: called before graceless close connection request is sent_on_close_connection: called when a connection is closed_on_graceful_close_connection: called before sending graceful close connection request
Note
These methods are also called when a connection is closed using method close_unused_connections (used to close unused connections based on close_connection_if_unused_after and last_use).
Keepalive mechanism allows orbiters to preserve connections during the time. Every orbiter must send a keepalive to all own linked orbiters.
An orbiter can request a keepalive using send_keepalive_request method, which sends a KeepaliveRequestMessage.
sequenceDiagram
Orbiter A->>Orbiter B: Keepalive request
Orbiter B->>Orbiter A: Keepalive
Keepalive are sent using KeepaliveMessage messages. You can manually send a keepalive using send_keepalive method.
In addiction, send_all_keepalive_based_on_connections and send_keepalive_based_on_connections_and_threshold are provided.
send_all_keepalive_based_on_connections sends a keepalive to all remote orbiters which have an opened connection,
instead send_keepalive_based_on_connections_and_threshold sends a keepalive to all remote orbiters which have an opened connection
only if it is in range send_keepalive_before_timelimit seconds before remote orbiter considers it dead.
You can know which are dead remote orbiters thanks to dead_remote_identifiers property.
Main related fields:
_others_considers_me_dead_after: dictionary which containsremote_identifier => time, used to know when a keepalive message must be sent_remote_keepalive_request_topics: dictionary which containsremote_identifier => keepalive_request_topic, used to send keepalive requests to remote orbiters_remote_keepalive_topics: dictionary which containsremote_identifier => keepalive_topic, used to send keepalive to remote orbiters_last_seen: dictionary which containsremote_identifier => last_seen, used to know if a remote orbiter must be considered dead_last_keepalive_sent: dictionary which containsremote_identifier => last_keepalive_sent, used to know if a keepalive message must be sent
Hooks:
_on_keepalive_request: called on keepalive request, before response_on_keepalive: called on inbound keepalive
Plugin is an Orbiter and it is basically an operations provider. In a certain sense, plugins lent possibility to execute their operations.
In particular, every plugin has a set of operations which are exposed to other components (i.e., cores).
Only connected Cores should execute operations, but this is not strictly ensured, you should check if there is a valid connection during operation elaboration.
You can check this using retrieve_connections or _find_connection_or_fail.
A plugin is a state machine which follows this states:
stateDiagram-v2
[*] --> CREATED
CREATED --> RUNNING: start()
RUNNING --> STOPPED: stop()
STOPPED --> RUNNING: start()
STOPPED --> [*]
You can easily create a new plugin inheriting Plugin abstract class. Remember to use @dataclass For example:
@dataclass
class MyPlugin(Plugin):
... # your plugin's operations and logicMain hooks:
_on_new_discover: called when a new discover message arrives_on_reject: called when a reject message arrives_setup_operation: called to set up operation when connection is created_on_request: called when a new request message arrives_on_reply: called when a new reply message arrives
Main methods:
send_offer: send a new offer message in given topic to given core identifier (it should be used only if you want to send an offer message manually)with_operation: generally used during creation, allows you to specify additional operations (but generally we use decorator)with_custom_policy: generally used during creation, allows you to specify a custom operation policysend_result_to_all: send operation result to all connections which have an output topic
An operation (Operation) represents a feature of a plugin, which is exposed and can be executed remotely.
Operations are managed by OperationsProviderMixin which also provides builder-like methods.
Every operation has the following attributes:
name: unique name which identify the operationhandler: Busline handler, which will be used to handle inbound eventspolicy: specifies default operation lending rules, you can override this usingwith_custom_policymethod or modifyingoperationsattribute directlyinput: specifies which is the acceptable inputoutput: specifies which is the sendable output
Even if output can be specified, if a Core doesn't need it, it should not sent. Obviously, you decide which messages must be sent in which topics, so you must ensure the compliance.
You can add or modify manually operations to a plugin thanks to operations attribute, otherwise you can use @operation decorator.
Important
Obviously, if the operation's input is Input.no_input() you must add manually the operation, because there isn't an event handler. You can see an example of it in periodic operation section.
@operation if you don't provide an input or an output, they are considered "no input/output" (see input/output).
If you don't specify default_policy, Policy.no_constraints() is assigned.
@operation automatically add to operations attributes the generated Operation object, related to (operation) name key.
For example, if you want to create a plugin having an operation "lowercase" which supports strings as input and produces strings (without lent constraints, i.e. no policy):
@dataclass
class LowercaseTextProcessorPlugin(Plugin):
@operation(
name="lowercase", # operation's name
input=Input.from_message(StringMessage), # operation's input
output=Output.from_message(StringMessage) # operation's output
# no policy is specified => Policy.no_constraints()
)
async def lowercase_event_handler(self, topic: str, event: Event[StringMessage]):
# NOTE: input message specified in @operation should be the same of
# what is specified as type hint of event parameter
# Retrieve input string value, remember that it is wrapped into StringMessage
input_str = event.payload.value
lowercase_text = input_str.lower() # process the string
# Retrieve and touch related connections
connections = await self.retrieve_and_touch_connections(
input_topic=topic,
operation_name="lowercase"
)
tasks = []
for connection in connections:
# Only if the connection expects an output
# it is published in the related topic
# specified by `connection.output_topic`
if connection.has_output:
tasks.append(
asyncio.create_task(
self.eventbus_client.publish(
connection.output_topic,
lowercase_text # will be wrapped into StringMessage
)
)
)
await asyncio.gather(*tasks) # wait publishesNote
Method name is not related to operation's name.
If you want to elaborate something periodically, you can use provided orbiter's loop. This allows you to avoid custom loop, even if you can create them.
We advice to use an auxiliary field which contains last elaboration datetime.
For example, we suppose to have a plugin which has an operation randint which periodically send a random number to connected cores.
@dataclass
class RandomNumberPlugin(Plugin):
"""
This plugin has only one operation which sends periodically a random number
"""
last_sent: Optional[datetime] = field(default=None) # will be used to check if a new value must be sent
def __post_init__(self):
super().__post_init__()
# Manually define "randint" operation
self.operations["randint"] = Operation(
name="randint", # operation's name
input=Input.no_input(), # no inputs are expected
handler=None, # handler is not needed, given that no inputs must be processed
output=Output.from_message(Int64Message), # integers will be sent to cores
policy=Policy.no_constraints() # no constraint during handshake
)
@override
async def _on_loop_iteration(self): # we use loop iteration hook to provide periodic operations
now = datetime.now()
# Only if the enough time is elapsed the operation will be executed
if self.last_sent is None or (now - self.last_sent).seconds > 2: # send a new random int every 2 seconds
self.last_sent = now # update timer for next iteration
await self.__send_randint()
async def __send_randint(self):
random_number = random.randint(0, 100) # generate a new random number, it will be sent
connections = await self.retrieve_and_touch_connections(operation_name="randint") # retrieve current core connections
tasks = []
for connection in connections:
if connection.has_output: # check if output is expected (it should be...)
tasks.append(
asyncio.create_task(
self.eventbus_client.publish( # send random int
connection.output_topic,
random_number # message (output)
)
)
)
await asyncio.gather(*tasks)Important
Remember to enable loop (avoid with_loop=False) during plugin instantiation.
Policy allows you to specify for an operation:
maximumamount of connectionsallowlistof remote orbitersblocklistof remote orbiters
Obviously, you can specify allowlist or blocklist, not both.
If you don't want constraints: Policy.no_constraints().
If you use @operation, you can specified a default policy, which is what is used if you don't override it during plugin initialization.
Anyway, you could manage manually operations' policies, but we advise against.
@dataclass
class LowercaseTextProcessorPlugin(Plugin):
@operation(
name="lowercase",
input=Input.from_message(StringMessage),
output=Output.from_message(StringMessage),
default_policy=Policy(
... # insert here you constraints (e.g. allowlist or maximum)
)
)
async def lowercase_event_handler(self, topic: str, event: Event[StringMessage]):
... # operation's logicAs already mentioned, if you want to override default policy for a plugin's operation you can use with_custom_policy.
For example, if you want to add an allowlist (within core identifier "my_core") for the above plugin, related to operation "lowercase":
plugin = LowercaseTextProcessorPlugin(...)
.with_custom_policy( # default policy specified with @operation will be override
operation_name="lowercase", # target operation
policy=Policy(allowlist=["my_core"]) # new custom policy
)Input and Output are both SchemaSpec, i.e. the way to specify a schema set.
In a SchemaSpec we can specify support_empty_schema if we want to support payload-empty events,
support_undefined_schema if we want to accept every schema and/or a schema list (schemas). A schema is a string.
We do have an input or an output only if:
support_undefined_schema or support_empty_schema or has_some_explicit_schemasIn other words, we must always specify an Input/Output even if it is "no input/output".
We can easily generate a "no input/output" thanks to:
input = Input.no_input() # create new "no input"
# `has_input` property is used to check if input is expected
assert not input.has_input
output = Output.no_output() # create new "no output"
# `has_output` property is used to check if output is expected
assert not output.has_outputIf we want to generate a filled Input/Output:
# Input related with MyMessage
Input.from_message(MyMessage)
# Input related with MyMessage, same as above
Input.from_schema(MyMessage.avro_schema())
# Input which accepts empty events
Input.empty()
# Input which accepts any payload
Input.undefined()
# Input which accepts related primitive data
Input.int32()
Input.int64()
Input.float32()
Input.float64()
Input.string()
# Manual initialization of an Input object
Input(schemas=[...], support_empty_schema=True, support_undefined_schema=False)By default, given that we use Avro JSON schemas, two schemas are compatible if the dictionary version of both is equal or if the string version of both is equal.
For this reason, you must avoid time-based default value in your classes, because Avro set as default a time-variant value. Therefore, in this way, two same-class schema are different, even if they are related to the same class.
created_at: datetime = field(default_factory=lambda: datetime.now()) # AVOID !!!In this example, we will show you how to implement a hierarchy of plugins.
We will define LampPlugin abstract class plugin which provides a common operation "get_status", then it will be inherited by LampXPlugin and LampYPlugin which add more operations.
class LampStatus(StrEnum):
"""
Utility enum to define possible plugin statuses
"""
ON = "on"
OFF = "off"
@dataclass
class StatusMessage(AvroMessageMixin):
"""
Custom message defined to share status of lamps
"""
lamp_identifier: str
status: str # "on" or "off"
created_at: datetime = None # it is an Avro message, avoid default of time-variant fields
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now() # default created_at value
@dataclass
class LampPlugin(Plugin, ABC):
"""
Plugin to control a smart plugin which has an energy-meter
"""
# Custom plugin attributes
kw: float # plugin energy consumption
status: LampStatus = field(default=LampStatus.OFF)
on_at: Optional[datetime] = field(default=None) # datetime of on request
total_kwh: float = field(default=0.0) # total consumption history
@property
def is_on(self) -> bool:
return self.status == LampStatus.ON
@property
def is_off(self) -> bool:
return self.status == LampStatus.OFF
def turn_on(self):
self.status = LampStatus.ON
if self.on_at is None:
self.on_at = datetime.now()
def turn_off(self):
"""
Turn off this plugin and update consumption history
"""
self.status = LampStatus.OFF
if self.on_at is not None:
# Update total consumption:
self.total_kwh += self.kw * (datetime.now() - self.on_at).total_seconds() / 3600
self.on_at = None
@operation(
name="get_status",
input=Input.empty(),
output=Output.from_message(StatusMessage)
)
async def get_status_event_handler(self, topic: str, event: Event):
connections = await self.retrieve_and_touch_connections(input_topic=topic, operation_name="get_status")
# Only one connection should be present on inbound topic
assert len(connections) == 1
connection = connections[0]
assert connection.output_topic is not None
assert connection.output.has_output
# Manually touch the connection
async with connection.lock:
connection.touch()
# Send output to core
await self.eventbus_client.publish(
connection.output_topic,
StatusMessage(self.identifier, str(self.status))
)
@abstractmethod
async def turn_on_event_handler(self, topic: str, event: Event):
raise NotImplemented()
@abstractmethod
async def turn_off_event_handler(self, topic: str, event: Event):
raise NotImplemented()@dataclass
class LampXPlugin(LampPlugin):
"""
Specific plugin related to brand X of smart lamps.
This type of lamps doesn't have additional features
"""
@operation( # add new operation with name: "turn_on"
name="turn_on",
input=Input.empty() # accepts empty events
)
async def turn_on_event_handler(self, topic: str, event: Event):
self.turn_on()
@operation( # add new operation with name: "turn_off"
name="turn_off",
input=Input.empty() # accepts empty events
)
async def turn_off_event_handler(self, topic: str, event: Event):
self.turn_off()# Create new plugin X plugin
lamp_x_plugin = LampXPlugin(
identifier="lamp_x_plugin",
eventbus_client=..., # provide Busline client
raise_exceptions=True,
with_loop=False,
kw=24 # LampPlugin-specific attribute
).with_custom_policy( # override custom policy related to operation "turn_on"
operation_name="turn_on",
policy=Policy(allowlist=["smart_home"])
)@dataclass(frozen=True)
class TurnOnLampYMessage(AvroMessageMixin):
"""
Custom message to turn on plugin of brand Y.
You can provide a "power" value which will be used to
control brightness (and energy consumption)
"""
power: float = field(default=1)
def __post_init__(self):
assert 0 < self.power <= 1
@dataclass(frozen=True)
class TurnOffLampYMessage(AvroMessageMixin):
"""
Custom message to turn off plugin of brand Y.
You can reset energy-meter setting True the flag
"""
reset_consumption: bool = field(default=False)
@dataclass(kw_only=True)
class LampYPlugin(LampPlugin):
"""
Specific plugin related to brand Y of smart lamps.
These lamps are able to manage brightness level
thanks to "power" attribute
"""
power: float = field(default=1)
@override
def turn_off(self):
"""
Overridden version to turn off the plugin and compute energy consumption
also based on power field
"""
self.status = LampStatus.OFF
if self.on_at is not None:
self.total_kwh += self.power * self.kw * (datetime.now() - self.on_at).total_seconds() / 3600
self.on_at = None
@operation( # add new operation with name: "turn_on"
name="turn_on",
input=Input.from_message(TurnOnLampYMessage) # accepts TurnOnLampYMessage messages (checking its Avro schema)
)
async def turn_on_event_handler(self, topic: str, event: Event[TurnOnLampYMessage]):
self.turn_on()
self.power = event.payload.power
@operation( # add new operation with name: "turn_off"
name="turn_off",
input=Input.from_schema(TurnOffLampYMessage.avro_schema()) # accepts TurnOffLampYMessage messages
)
async def turn_off_event_handler(self, topic: str, event: Event[TurnOffLampYMessage]):
self.turn_off()
# Reset energy-meter based on operation input
if event.payload.reset_consumption:
self.total_kwh = 0# Create a new plugin Y plugin
lamp_y_plugin = LampYPlugin(
identifier="lamp_y_plugin",
eventbus_client=..., # provide a Busline client
raise_exceptions=True,
with_loop=False,
kw=42 # custom field
)Core is an Orbiter and it is the component which connects itself to plugins, in order to be able to execute their operations.
We must notice that Orbitalis' Cores are able to manage operations having different inputs/outputs (but same name), thanks to (Avro) schemas.
We can use a core to execute operations and collect outputs (if they are present).
sequenceDiagram
Core->>Plugin: Execute Operation (with Input)
Plugin-->>Core: Output (gathered by sink)
A core can also receive messages without an explicit execute call, e.g. a plugin can send information periodically, such as a report.
sequenceDiagram
Plugin->>Core: Message (gathered by sink)
Plugin->>Core: Message (gathered by sink)
Plugin->>Core: Message (gathered by sink)
We can specify needed operations which make a core compliant with respect to our needs. In fact, Core follows these states changes:
stateDiagram-v2
[*] --> CREATED
CREATED --> COMPLIANT: start()
CREATED --> NOT_COMPLIANT: start()
COMPLIANT --> NOT_COMPLIANT
NOT_COMPLIANT --> COMPLIANT
COMPLIANT --> STOPPED: stop()
NOT_COMPLIANT --> STOPPED: stop()
STOPPED --> [*]
COMPLIANT when all needs are satisfied, otherwise NOT_COMPLIANT.
Main public attributes:
discovering_interval: interval between two discover messages (only when loop is enabled)operation_requirements: specifies which operations are needed to be compliant, specifying their constraints and optionally the default setup data or the sinkoperation_sinks(see sinks)compliant_eventandnot_compliant_event: notify you in related state switching
Main hooks:
_on_compliant: called when core becomes compliant_on_not_compliant: called when core becomes not compliant_on_send_discover: called before discover message is sent_get_setup_data: called to obtain setup data which generally will be sent to plugins. By default,default_setup_datais used_on_new_offer: called when a new offer arrives_on_confirm_connection: called when a confirm connection arrives_on_operation_no_longer_available: called when operation no longer available message arrives_on_response: called when response message arrives
Main methods:
current_constraint_for_operation: returns current constraint for operation based on current connectionsis_compliant_for_operation: evaluate at run-time if core is compliant for given operation based on its configuration. It may be a time-consuming operationis_compliance: evaluate at run-time if core is global compliant based on its configuration. It may be a very time-consuming operationupdate_compliant: useis_compliantto update core's state_operation_to_discover: returns a dictionaryoperation_name=>not_satisfied_need, based on current connections. This operations should be discover
# Create a custom core
@dataclass
class MyCore(Core):
... # core's sinks and logicoperation_requirements attribute is a dictionary where keys are operation names and values are OperationRequirement objects.
We can specify:
constraint: set of rules to manage connection requestsoverride_sink: to specify a different sink with respect to default provided by coredefault_setup_data: bytes which are send by default to plugin on connection establishment
Constraint class allows you to be very granular in rule specifications:
mandatory: list of needed plugin identifiersminimumnumber of additional plugins (plugins in mandatory list are excluded)maximumnumber of additional plugins (plugins in mandatory list are excluded)allowlist/blocklistinputs: represents the list of supportedInputoutputs: represents the list of supportedOutput
In other words, you can specify a list of possible and different inputs and outputs which are supported by your core.
You must observe that inputs and outputs are not related, therefore all possible combinations are evaluated.
For example, if your core needs these operations:
operation1:"plugin1"is mandatory, at least 2 additional plugins, maximum 5 additional plugins. Borrowableoperation1can be fed with "no input",Operation1InputV1MessageorOperation1InputV2Message, instead they produce nothing as outputoperation2:"plugin1"and"plugin2"are allowed (pluggable), there is no a minimum or a maximum number of additional plugins. Borrowableoperation2must be feedable with empty events (no message) and they must produceOpeartion2OutputMessagemessagesoperation3: no mandatory plugins required, no additional plugins required (any number of connections), but"plugin2"can not be plugged (due toblocklist).operation3has no input (therefore core doesn't interact with plugin, but is plugin to send data arbitrary).Operation3Outputis expected to be sent to core
YourCore(
..., # other attributes such as eventbus_client, identifier, ... (see next example)
operation_requirements={
# required operation name: "operation1"
"operation1": OperationRequirement(Constraint(
minimum=2, # minimum number of non-mandatory plugins
maximum=5, # maximum number of non-mandatory plugins
mandatory=["plugin1"], # list of mandatory plugins
inputs=[Input.no_input(), Input.from_message(Operation1InputV1Message), Input.from_message(Operation1InputV2Message)], # list of supported inputs for this operation
outputs=[Output.no_output()] # list of supported outputs for this operation
)),
# required operation name: "operation2"
"operation2": OperationRequirement(Constraint(
minimum=0,
allowlist=["plugin1", "plugin2"], # list of allowed plugins
inputs=[Input.empty()],
outputs=[Output.from_message(Opeartion2OutputMessage)]
)),
# required operation name: "operation3"
"operation3": OperationRequirement(Constraint(
blocklist=["plugin2"] # list of blocked plugins
inputs=[Input.no_input()],
outputs=[Output.from_message(Operation3Output)]
)),
}
)In order to be able to handle operation outputs, cores must be equipped with Sink, which are basically Busline's EventHandler associated to an operation.
Operation's sinks are stored in operation_sinks. You can manage them in four ways:
- Manually add them directly using the attribute
operation_sinks
@dataclass
class MyCore(Core):
# Add in post init a new sink (for operation's name "my_operation")
def __post_init__(self):
super().__post_init__() # mandatory! Otherwise Core logic will not be executed
# Add a new sink for operation "my_operation", it is a simple in-place lambda
self.operation_sinks["my_operation"] = CallbackEventHandler(lambda t, e: print(t))- Overriding the pre-defined sink using
override_sinkfield inOperationRequirementduring core instantiation. You must provide a BuslineEventHandler
core = MyCore(
eventbus_client=..., # build a Busline's client
# ...other attributes
operation_requirements={
"lowercase": OperationRequirement(Constraint(
inputs=[Input.from_message(StringMessage)],
outputs=[Output.from_message(StringMessage)],
# provide a new sink:
), override_sink=CallbackEventHandler(lambda t, e: print(t)))
# in-place lambda will be used to handle new events
}
)- Using
with_operation_sinkmethod during core instantiation to add more sinks in core instance (this has the same effect of direct management in__post_init__)
core = MyCore(...) # fill with core attributes
.with_operation_sink(
operation_name="my_operation", # sink's related operation name
handler=CallbackEventHandler(lambda t, e: print(t)) # event handler for operation's outputs
)@sinkdecorator, which you can use to decorator your methods and functions providing operation name
@dataclass
class MyCore(Core): # Inherit Core class to create your custom core
@sink("plugin_operation_name") # new sink with related operation name
async def my_operation_event_handler(self, topic: str, event: Event[MyMessage]):
... # sink logicSinks in operation_sinks are used to link sink automatically with related operation during handshake. Sink related to an operation in operation_sinks is ignored if override_sink in OperationRequirement for that operation is set.
You should consider plugins of this example.
@dataclass
class SmartHomeCore(Core):
# Dictionary to store lamps statues
lamp_status: Dict[str, str] = field(default_factory=dict)
@sink( # declared sink related to operation "get_status"
operation_name="get_status"
)
async def get_status_sink(self, topic: str, event: Event[StatusMessage]):
self.lamp_status[event.payload.lamp_identifier] = event.payload.status # store plugin statussmart_home = SmartHomeCore(
identifier="smart_home", # core's identifier
eventbus_client=..., # Busline's client
operation_requirements={
# required operation name: "turn_on"
"turn_on": OperationRequirement(Constraint(
minimum=1, # required amount of generic plugins
mandatory=[self.lamp_x_plugin.identifier], # mandatory plugin (identifier)
inputs=[Input.empty()], # list of supported inputs (in this case empty events)
outputs=[Output.no_output()] # list of supported outputs (in this case no outputs are expected)
)),
# required operation name: "turn_off"
"turn_off": OperationRequirement(Constraint(
minimum=1,
allowlist=[self.lamp_x_plugin.identifier], # list of pluggable plugins
inputs=[Input.empty()],
outputs=[Output.no_output()]
)),
}
)Tip
Check Busline documentation to know how to create an eventbus client.
The main capability of a Core is execute plugins operations. As already mentioned, there are two methods to execute operations:
executesudo_execute
execute is the regular method to execute an operation which uses connections to choose right plugins.
To execute an operation you must provide:
operation_namedata(optional, input of operation)- Modality: one among the following parameters:
all,anyorplugin_identifier
In fact, execute retrieves current connections related to provided operation_name, evaluating compatibility with data input type.
Warning
If modality is not provided, ValueError is raised.
Then, given the set of all potentially connections, core sends data to topics chosen based on modality:
allsends data to all plugins related to retrieved connectionsanysends data to a random plugin related to retrieved connectionsplugin_identifiersends data to specified plugindistributedsplits execution on pool of compatible plugins
For example, suppose you want to execute "plugin_operation" of plugin "plugin_identifier", sending an empty event (i.e., no data):
await self.my_core.execute("plugin_operation", plugin_identifier="plugin_identifier")
# ^^^^^^^^^^^^^^^^ operation's nameIf you know a priori how to execute an operation, you can exploit direct methods:
execute_using_pluginexecute_sending_anyexecute_sending_allexecute_distributed
sudo_execute allows to bypass connections, send an execution request to a plugin.
my_core.sudo_execute(
topic="operation_topic", # topic on which message will be published
data=YourMessageData(...) # message data which will be sent
)Important
We provide sudo_execute method because Orbitalis framework works in secure and managed environment, therefore we think a developer can execute arbitrary operations, even if we advice against to use sudo_execute.
Let's consider following core and plugin in a scenario in which we want to turn on lamps.
@dataclass
class LampXPlugin(LampPlugin):
"""
Specific plugin related to brand X of smart lamps.
This type of lamps doesn't have additional features
"""
@operation( # add new operation with name: "turn_on"
name="turn_on",
input=Input.empty() # accepts empty events
)
async def turn_on_event_handler(self, topic: str, event: Event):
self.turn_on()
@operation( # add new operation with name: "turn_off"
name="turn_off",
input=Input.empty() # accepts empty events
)
async def turn_off_event_handler(self, topic: str, event: Event):
self.turn_off()@dataclass
class SmartHomeCore(Core):
lamp_status: Dict[str, str] = field(default_factory=dict)
@sink(
operation_name="get_status"
)
async def get_status_sink(self, topic: str, event: Event[StatusMessage]):
self.lamp_status[event.payload.lamp_identifier] = event.payload.statusYou should follow this:
- Inizialize orbiters
- Start components
- Await discovery process
- Execute operations
async def main():
# Eventually setup logging if you need
# import logging
# logging.basicConfig(level=logging.INFO)
# Initialize core
smart_home = SmartHomeCore(
identifier="smart_home",
eventbus_client=build_new_local_client(),
raise_exceptions=True,
with_loop=False,
operation_requirements={
"turn_on": OperationRequirement(Constraint(
minimum=1,
inputs=[Input.empty()],
outputs=[Output.no_output()]
)),
"turn_off": OperationRequirement(
Constraint(
minimum=1,
inputs=[Input.empty()],
outputs=[Output.no_output()]
)
),
"get_status": OperationRequirement(
Constraint(
minimum=1,
inputs=[Input.empty()],
outputs=[Output.from_schema(StatusMessage.avro_schema())]
)
)
}
)
# Initialize plugins
plugin1 = LampXPlugin(
identifier="plugin1",
eventbus_client=build_new_local_client(),
raise_exceptions=True,
with_loop=False,
kw=24 # LampPlugin-specific attribute
)
plugin2 = LampXPlugin(
identifier="plugin2",
eventbus_client=build_new_local_client(),
raise_exceptions=True,
with_loop=False,
kw=42 # LampPlugin-specific attribute
)
# 2. Start components
# 3. Await discovery process
# 4. Execute operationsNote
We set with_loop=False on core, therefore plugins must be started before.
async def main():
# 1. Inizialize orbiters
# Start plugins (before core due to `with_loop=False`)
await plugin1.start()
await plugin2.start()
# ...now plugins are ready to accept new connections (based on their configuration)
# Start core, this starts discovery process
await self.smart_home.start()
# 3. Await discovery process
# 4. Execute operationsThere are a lot of ways to await discovery process:
await asyncio.sleep(t): simplest, but increases await time eventuallyawait my_core.compliant_event.wait(): the best if you are interest in core's compliantawait self.smart_home.new_connection_added_event.wait(): the best if you know number of plugins a priori and you want to await all of them
In this example we know example how many plugins we have, i.e. 2, therefore we await exactly 2 connection creations.
async def main():
# 1. Inizialize orbiters
# 2. Start components
await self.smart_home.new_connection_added_event.wait()
await self.smart_home.new_connection_added_event.wait()
# 4. Execute operationsNow we can execute operations. We have several modes:
- Execute operation
turn_onon pluginplugin1
await self.smart_home.execute_using_plugin(
operation_name="turn_on",
plugin_identifier="plugin1"
)
assert plugin1.is_on
assert plugin2.is_off- Execute operation
turn_onusing one random plugin among available and compatible ones
plugin_identifier = await self.smart_home.execute_sending_any(operation_name="turn_on")
assert plugin_identifier == plugin1.identifier or plugin_identifier == plugin2.identifier
assert plugin1.is_on or plugin2.is_on
assert not (plugin1.is_on and plugin2.is_on)- Execute operation
turn_onon all available and compatible plugins
plugin_identifiers = await self.smart_home.execute_sending_all(operation_name="turn_on")
assert plugin1.is_on and plugin2.is_on- Execute operation
turn_onsplitting execution among all available and compatible plugins, if total amount of plugins is less then needed one, a plugin will execute operation more than one times
plugin_identifiers = await self.smart_home.execute_distributed(
operation_name="turn_on",
data=[None, None, None, None]
)
assert plugin1.is_on and plugin2.is_on
# in this case both plugins execute operation two timesNote
"all" and "distributed" modes could be confused. "all" execute a given operation with a given message on all (available and compatible) plugins. Instead, in "distributed" more, every message of a given operation is sent only one time.
In this section we have inserted more details about library implementation which you can use to modify regular behavior.
We suppose that User Guide was read before this, because in this section we only add more details.
Handshake is the most complex protocol in Orbitalis, it has a lot of possible branches. The entire process is automatically managed by Orbitalis framework, in fact event handlers are private methods, you should use provided hooks (listed in User Guide) to change handshake behavior.
Following the enriched handshake sequence diagram:
sequenceDiagram
Core->>Plugin: Discover
Plugin->>Plugin: Update acquaintances
Plugin->>Plugin: New pending requests
Plugin->>Core: Offer
Core->>Core: Update acquaintances
par Reply for each offered operation
alt is still needed
Core->>Core: New pending request
Core->>Plugin: Request
else not needed anymore
Core->>Plugin: Reject
Plugin->>Plugin: Remove pending request
end
end
par Response for each requested operation
alt is still available
Plugin->>Plugin: Promote pending request to connection
Plugin->>Core: Confirm
Core->>Core: Promote pending request to connection
else operation no longer available
Plugin->>Plugin: Remove pending request
Plugin->>Core: OperationNoLongerAvailable
Core->>Core: Remove pending request
end
end
In the following chapters we will discuss more in deep all steps.
Below we report the same previous diagram adding hooks.
sequenceDiagram
Core->>Core: send_discover_based_on_requirements()
Core->>Core: _on_send_discover()
Core->>Plugin: Discover
Plugin->>Plugin: _on_new_discover()
Plugin->>Plugin: Update acquaintances
Plugin->>Plugin: New pending requests
Plugin->>Plugin: send_offer()
Plugin->>Plugin: _on_send_offer()
Plugin->>Core: Offer
Core->>Core: _on_new_offer()
Core->>Core: Update acquaintances
par Reply for each offered operation
alt is still needed
Core->>Core: New pending request
Core->>Plugin: Request
Plugin->>Plugin: _on_request()
else not needed anymore
Core->>Plugin: Reject
Plugin->>Plugin: _on_reject()
Plugin->>Plugin: Remove pending request
end
end
par Response for each requested operation
alt is still available
Plugin->>Plugin: Promote pending request to connection
Plugin->>Plugin: _on_promote_pending_request_to_connection()
Plugin->>Core: Confirm
Core->>Core: _on_response()
Core->>Core: _on_confirm_connection()
Core->>Core: Promote pending request to connection
Core->>Core: _on_promote_pending_request_to_connection()
else operation no longer available
Plugin->>Plugin: Remove pending request
Plugin->>Core: OperationNoLongerAvailable
Core->>Core: _on_response()
Core->>Core: _on_operation_no_longer_available()
Core->>Core: Remove pending request
end
end
Discover message is the first message of the protocol which is sent by cores. It has two main goals:
- Notify it existence to all plugins, providing information such as
core_identifier,offer_topic,core_keepalive_topic(topic which plugins must use to send keepalive to core),core_keepalive_request_topic(topic which plugins must use to send a keepalive request),considered_dead_after(time after which plugins are considered dead if keepalive is not sent) - Retrieve operations thanks to
queriesfield, which is a dictionaryoperation_name => DiscoverQuery
DiscoverQuery is a dataclass used to store information about operation requirements. Basically it provides constraint information and operation's name.
As already mentioned in User Guide, you must ensure that discover_topic field is equal for all your orbiters, otherwise discover messages will be lost.
There are more than one methods to send a discover message, but all wrap send_discover_for_operations call. In this method:
- Discover message is published
_last_discover_sent_atis updated
Note
Offer topic is fixed and pre-defined by offer_topic property, in order to allow a single subscription and future offers.
When a plugin receives a discover, it will be processed. In particular, based on plugin's policy, if there are some available slots, they are proposed to core thanks to an offer message. In particular, __allow_offer is used to check compatibility.
First of all, when a new discover event arrives, plugin updates its acquaintances, i.e. updates knowledge about keepalive request topics, keepalive topics and dead time (thanks to update_acquaintances method).
Offer logic is mainly present in the discover event handler and in the send_offer method.
Actually, send_offer can be called in any moment, this allows plugins (and you) to modify a little bit the handshake logic. For example, plugins can send offer messages in a second moment. This feature will be add in a next version.
Similarly to discover, even offer messages are used to share information about plugins with cores. In fact, plugins also share plugin_identifier, reply_topic, plugin_keepalive_topic (topic which cores must use to send keepalive to core), plugin_keepalive_request_topic (topic which cores must use to send a keepalive request), considered_dead_after (time after which cores are considered dead if keepalive is not sent).
In addiction offered_operations list is provided. To reduce information sharing, only essential information about plugin's operations are sent, i.e. name, input and output (used to check compatibility).
In fact, remember that cores firstly search operations by name and then check input/output compatibility.
You must know that when a plugin offers a slot, a new pending request is created, in order to preserve that slot for a period of time and wait core response events.
PendingRequest contains information about its future connection. It is built during handshake process, so generally you should not use this class.
Anyway, it has into_connection method to build a Connection and, similarly to connections, has a lock attribute to synchronize elaborations.
created_at is the field used to check if a pending request must be discarded.
You can manage pending requests thanks to:
_pending_requests_by_remote_identifier: retrieves pending requests based on remote identifier_add_pending_request_remove_pending_request(does not lock automatically the pending request)_is_pendingto know if there is a pending request related to a remote identifier and an operation name_promote_pending_request_to_connection: (does not lock automatically the pending request) transforms a pending request into a connectiondiscard_expired_pending_requestsbased onpending_requests_expire_afterandcreated_at
When an offer event arrives, similarly to plugins, it update its acquaintances. Then, core evaluates the offer and for each offered operation it replies.
In particular, there are two different possible reply messages:
- Request the operation (actually)
- Reject the operation if not need anymore (e.g., another plugin offers the same operation before)
Note
Core responses for each operation instead of using a batch to reduce dimension of messages and allow it to request operations even after some time.
If offered operation is rejected, a simple message RejectOperationMessage is sent, in which core's identifier and operation's name are specified.
Instead, if offered operation is still required, then a RequestOperationMessage is sent. In that message more details about future connection are provided:
output_topic: topic on which outputs of the operation must be sent (only present if output is acquirable by core)core_side_close_operation_connection_topic: topic which will be used to send close connection messagessetup_data(inbytes): data which will be used to setup the connection
In addiction, also cores generate new pending requests when a request is sent. This allows cores to request more connections at the same time, avoid too many connections risk.
Note
Independent from request message type, cores update last seen for plugins when an offer message is received. This a side effect which may reduce number of keepalive messages.
Finally, the last protocol stage involves a response to cores if needed.
In particular, if a RejectOperationMessage arrives, plugin removes related pending request in order to make the operation slot available again.
Otherwise, if a RequestOperationMessage arrives means that core really want to create a connection. In this case there are two possibilities:
- Operation's slot is still available
- Operation's slot is not longer available
Availability is checked thanks to __can_lend_to_core method.
Note
Similarly to cores, plugins update last seen for cores. Again, this a side effect which may reduce number of keepalive messages.
If the slot is still available, then related pending request (plugin-side) is promoted to actual connection and _plug_operation_into_core method is called.
This starts plug procedure: last missed and required topics (i.e., incoming_close_connection_topic and operation_input_topic_for_core) are built, operation is set upped (thanks to _setup_operation)
and finally ConfirmConnectionMessage is sent to core, in order to notify it.
When the core receives the ConfirmConnectionMessage check if confirmation was expected (otherwise ignores it) and promotes core-side the related pending request, links sink of the operation
and subscribes itself to connection's topics.
Warning
Remember that if you change custom sink for an operation or if you change override_sink of an OperationRequirement, already linked sinks will not be modified. You must close and re-create the connection.
Caution
If you use unlucky set of parameters for cores and plugins, a confirmation may arrive to core without a related pending request. This may occur when core's pending request expired before ConfirmConnectionMessage. This is the main reason behind needs of close unused connections. If a connection is opened plugin-side and core-side it will not open, connection plugin-side expires naturally, preventing operation's slot starvation. For this reason you should always provide a timeout for unused connections, avoiding None value for close_connection_if_unused_after field.
Instead, if operation's slot is not longer available, plugin sends OperationNoLongerAvailableMessage to core in order to notify it. In this case, core simply removes related pending request.
close_unused_connections is the Orbiter method which is used to close connections which are untouched during last close_connection_if_unused_after (attribute of Orbiter) and return the number of sent close request.
This method is called periodically in the Orbiter's main loop. As already mentioned, if a connection is unused, then graceful close procedure is started.
To prevent this behavior, you can set close_connection_if_unused_after=None during orbiter instantiation. In this case, 0 is always returned.
Loop is managed in __loop (private) method. The method is private because the entire logic is fully managed and hooks are provided.
Basically, loop is controlled by two asyncio.Event:
__stop_loop_controller: used inwhilecondition, if it is set, loop is stopped__pause_loop_controller: if it is set, loop iteration are skipped
As already mentioned in User Guide, set and clear method of asyncio.Event are wrapped in stop_loop, pause_loop, resume_loop methods.
If you don't want to use regular loop into an orbiter, you can create your custom asyncio loop without any strange knowledge about this framework.
Anyway, we advice against to use custom loops, because they reduce performance and must be manually managed by you (instead provided loop is already fully managed by us).
Following we provide you an example to show how to create a custom loop. You can notice that compared with version with a timer explained in the User Guide
(the single different is basically StringMessage instead of Int64Message), there is a lot of more code, you must know how to use asyncio.Event and less features (simil-pause_loop method is missed).
@dataclass
class HelloSenderPlugin(Plugin):
"""
This plugin has only one operation which sends a "hello" message periodically
"""
__stop_custom_loop: asyncio.Event = field(default_factory=lambda: asyncio.Event())
def __post_init__(self):
super().__post_init__()
# Manually define "hello" operation
self.operations["hello"] = Operation(
name="hello", # operation's name
input=Input.no_input(), # no inputs are expected
handler=None, # handler is not needed, given that no inputs must be processed
output=Output.from_message(StringMessage), # "hello" string will be sent to cores
policy=Policy.no_constraints() # no constraint during handshake
)
@override
async def _internal_start(self, *args, **kwargs):
await super()._internal_start(*args, **kwargs)
self.__stop_custom_loop.clear() # allow to start custom loop
# Start custom loop task
asyncio.create_task( # created task is ignored, because __stop_custom_loop is used
self.__custom_loop()
)
@override
async def _internal_stop(self, *args, **kwargs):
await super()._internal_stop(*args, **kwargs)
# During plugin stop, custom loop must be stopped too
self.__stop_custom_loop.set()
async def __custom_loop(self):
while not self.__stop_custom_loop.is_set():
await asyncio.sleep(2) # to prevent spamming, custom loop is paused for 2 seconds
await self.__send_hello()
async def __send_hello(self):
connections = await self.retrieve_and_touch_connections(operation_name="hello") # retrieve current core connections
tasks = []
for connection in connections:
if connection.has_output: # check if output is expected (it should be...)
tasks.append(
asyncio.create_task(
self.eventbus_client.publish( # send random int
connection.output_topic,
"hello" # hello message
)
)
)
await asyncio.gather(*tasks)We have already provided you some examples of how to manage operations manually, anyway we will cover it again.
Firstly, remember ways to manage operations. If you really want to manage them manually you can use operations field,
which is a dictionary operation_name => Operation which contains all operations information of a plugin.
In any moment and in any place in your plugin code you can:
- Add an operation:
# Manually define "hello" operation
self.operations["hello"] = Operation(
name="hello", # operation's name
input=Input.no_input(), # no inputs are expected
handler=None, # handler is not needed, given that no inputs must be processed
output=Output.from_message(StringMessage), # "hello" string will be sent to cores
policy=Policy.no_constraints() # no constraint during handshake
)- Modified an operation:
# e.g., change the policy constraints
self.operations["hello"].policy.allowlist = ["your_core_identifier"]- Remove an operation:
# Remove entry from dictionary
del self.operations["hello"]In order to modify operation requirements manually, you can perform the same actions explained for manual operations management,
considering core field operation_requirements which contains dictionary operation_name => OperationRequirement.
In order to modify sinks manually, you can perform the same actions explained for manual operations management,
considering core field operation_sinks which contains dictionary operation_name => EventHandler.
Consider the following scenario, you have a core plugged to more plugins for the same operation (e.g., "save") and you want to send a different value based on their input.
In this case you must check manually connection input compatibility with your message's schema.
In the following example we provide you an example in which there are two different plugins which perform the same operation but on different input data, i.e. they store in their internal vault last received value.
Instead MyCore has a method execute_dynamically which sends 42 if the connection is related with Int64Message, instead "hello" if input is StringMessage.
@dataclass
class SaveIntegerPlugin(Plugin):
"""
Save inbound integer
"""
vault: Optional[int] = None
@operation(
name="save", # operation's name
input=Input.int64(), # operation's input, i.e. an int number
output=Output.no_output() # operation's output, i.e. no outputs
# no policy is specified => Policy.no_constraints()
)
async def save_int_event_handler(self, topic: str, event: Event[Int64Message]):
self.vault = event.payload.value@dataclass
class SaveStringPlugin(Plugin):
"""
Save inbound string
"""
vault: Optional[str] = None
@operation(
name="save", # operation's name
input=Input.string(), # operation's input, i.e. a string
output=Output.no_output() # operation's output, i.e. no outputs
# no policy is specified => Policy.no_constraints()
)
async def save_int_event_handler(self, topic: str, event: Event[StringMessage]):
self.vault = event.payload.value@dataclass
class MyCore(Core):
async def execute_dynamically(self):
"""
Send right data type based on plugin operations
"""
# First retrieve all connections related to operation
connections = self.retrieve_connections(operation_name="save")
for connection in connections:
# Ignore connection without an input
if not connection.has_input:
continue
# If connection input has a schema compatible with Int64Message send 42
if connection.input.is_compatible_with_schema(Int64Message.avro_schema()):
await self.eventbus_client.publish(
connection.input_topic,
42
)
# If connection input has a schema compatible with StringMessage send "hello"
if connection.input.is_compatible_with_schema(StringMessage.avro_schema()):
await self.eventbus_client.publish(
connection.input_topic,
"hello"
)int_plugin = SaveIntegerPlugin(
identifier="int_plugin",
eventbus_client=build_new_local_client(),
raise_exceptions=True,
with_loop=False,
)
str_plugin = SaveStringPlugin(
identifier="str_plugin",
eventbus_client=build_new_local_client(),
raise_exceptions=True,
with_loop=False,
)
core = MyCore(
eventbus_client=build_new_local_client(),
with_loop=False,
raise_exceptions=True,
operation_requirements={
"save": OperationRequirement(Constraint(
inputs=[Input.int64(), Input.string()],
outputs=[Output.no_output()],
mandatory=["int_plugin", "str_plugin"]
))
}
)
await int_plugin.start()
await str_plugin.start()
await core.start()
await asyncio.sleep(2) # time for handshake
self.assertTrue(core.state == CoreState.COMPLIANT)
await core.execute_dynamically()
await asyncio.sleep(2) # time to execute
self.assertEqual(int_plugin.vault, 42)
self.assertEqual(str_plugin.vault, "hello")
await int_plugin.stop()
await str_plugin.stop()
await core.stop()
await asyncio.sleep(1) # time for close connectionWe have planned to allow Plugins to send an offer in a second moment, as explained in handshake section. To do this, we are going to modify Discover message to allow Cores to decide if this new feature must be considered thanks to a boolean flag.
In addiction, we would improve how to change an already linked sink or operation event handler for connections. Now, a connection must be closed and then re-opened.
In order to coordinate the work, please open an issue or a pull request.
Thank you for your contributions!
Ideator of DHCP-like handshake protocol used by Orbitalis. Designer of Orbitalis' protocols and components. Developer of Orbitalis' code base. Documentation referent.
University of Modena and Reggio Emilia.
Co-Designer of Orbitalis' main communication protocols and components. Documentation reviewer.
Associate Professor at University of Modena and Reggio Emilia.
