run.py
Entrypoint of the automation
- Load environment variables from
../.envandconfig/.env(the latter has priority) - Lock the automation with a file lock so that only one instance can run at a time
- Run the automation if the lock could be acquired
src
Package of the sensor system automation software.
Import hierarchy (a -> b means that a cannot import from b):
constants -> types -> utils -> procedures -> main
src.constants.py
Variables
PROJECT_DIR: strThe root directory of the project (the parent of src/)
DATA_DIR: strThe directory where the data files are stored = PROJECT_DIR/data or the value of the environment variable IVY_DATA_DIR (if set)
VERSION: tum_esm_utils.validators.VersionThe current version of the project
NAME: strThe name of the project
DEFAULT_ROOT_DIR: strThe default root directory on a production system = ~/Documents/{NAME}
ROOT_DIR: strThe default root directory on a production system or the value of the environment variable IVY_ROOT_DIR (if set)
LOGGING_LEVEL_PRIORITIES: dict[typing.Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'EXCEPTION'], int]Order of the logging levels from the lowest to the highest, high number means high priority
SECONDS_PER_GRACEFUL_PROCEDURE_TEARDOWN: intNumber of seconds to wait for a procedure process to tear down gracefully before killing it
src.main.py
Functions
run
def run() -> None:Run the automation.
- Load the configuration
- Initialize the logger
- Initialize the messaging agent
- Initialize the updater
- Remove old virtual environments
- Initialize the lifecycle managers
- Establish graceful shutdown logic (run teardown for every lifecycle manager)
- Start the main loop
Inside the mainloop the following steps are performed until termination:
- Start each procedure/backend if it is not running
- Check the status of each procedure/backend
- Perform any pending updates
If an exception is raised in the main loop, it is logged and the main loop sleeps for an exponentially increasing amount of time before it tries to continue.
src.backend
This module provides all backends that the automation software can communicate with. Every submodule should provide a single function that runs infinitely.
All of the procedures in this module should have the signature:
def run(
config: src.types.Config,
name: str,
teardown_indicator: multiprocessing.synchronize.Event,
) -> None:
...These are managed by a src.utils.ProcessManager, but opposed to "procedures", they
do not receive a SIGTERM signal, but the teardown_indicator event is set. This
gives more freedom on how to handle the shutdown of the backend.
src.backend.tenta_backend.py
Functions
run
def run(
config: src.types.config.Config,
name: str,
teardown_indicator: multiprocessing.synchronize.Event,
) -> None:The main procedure for the Tenta backend.
Arguments:
config: The configuration object.name: The name of the backend procedure.teardown_indicator: The event that is set when the procedure should terminate.
src.backend.thingsboard_backend.py
Functions
run
def run(
config: src.types.config.Config,
name: str,
teardown_indicator: multiprocessing.synchronize.Event,
) -> None:The main procedure for the ThingsBoard backend.
Arguments:
config: The configuration object.name: The name of the backend procedure.teardown_indicator: The event that is set when the procedure should terminate.
src.procedures
This modules provides all procedures that the automation software should run. They should all be run in parallel processes and each file provides a single funtion that runs infinitely. Functions may implement graceful teardown steps upon receiving SIGTERM/SIGINT.
All of the procedures in this module should have the signature:
def run(
config: src.types.Config,
name: str,
) -> None:
...src.procedures.dummy_procedure.py
Functions
run
def run(
config: src.types.config.Config,
name: str,
) -> None:Performs a random walk and sends out the current position.
You can use this as an example for your own procedures and remove this in your own project
Arguments:
config: The configuration object.name: The name of the procedure.
src.procedures.system_checks.py
Functions
run
def run(
config: src.types.config.Config,
name: str,
) -> None:Logs the system load and last boot time.
Arguments:
config: The configuration object.name: The name of the procedure.
src.types
This module contains all type definitions of the codebase and
may implement loading and dumping functionality like Config.load.
src.types.config.py
Class BackendConfig
class BackendConfig(pydantic.BaseModel):Class Config
class Config(pydantic.BaseModel):Schema of the config file for this version of the software.
A rendered API reference can be found in the documentation.
dump
def dump(
self,
) -> None:Dump the config file to the path <ivy_root>/<version>/config/config.json
load
@staticmethod
def load() -> src.types.config.Config:Load the config file from the path project_dir/config/config.json
load_from_string
@staticmethod
def load_from_string(
c: str,
) -> src.types.config.Config:Load the object from a string.
Arguments:
c: The string to load the object from.
load_template
@staticmethod
def load_template() -> src.types.config.Config:Load the config file from the path project_dir/config/config.template.json
to_foreign_config
def to_foreign_config(
self,
) -> src.types.config.ForeignConfig:Convert the config to a src.types.ForeignConfig object.
Class ForeignConfig
class ForeignConfig(pydantic.BaseModel):Schema of a foreign config file for any other version of the software
to update to. It probably has more fields than listed in the schema. This schema only includes the fields that are required in any new config to be accepted by the updater in this version of the software.
A rendered API reference can be found in the documentation.
dump
def dump(
self,
) -> None:Dump the config file to the path <ivy_root>/<version>/config/config.json
load_from_string
@staticmethod
def load_from_string(
c: str,
) -> src.types.config.ForeignConfig:Load the object from a string.
Arguments:
c: The string to load the object from.
Class ForeignGeneralConfig
class ForeignGeneralConfig(pydantic.BaseModel):Class MQTTBrokerConfig
class MQTTBrokerConfig(pydantic.BaseModel):Class UpdaterConfig
class UpdaterConfig(pydantic.BaseModel):src.types.messages.py
Class ConfigMessageBody
class ConfigMessageBody(pydantic.BaseModel):The body of a config message, defined by body.variant == "config".
Class DataMessageBody
class DataMessageBody(pydantic.BaseModel):The body of a data message, defined by body.variant == "data".
Class LogMessageBody
class LogMessageBody(pydantic.BaseModel):The body of a log message, defined by body.variant == "log".
Class MessageArchiveItem
class MessageArchiveItem(pydantic.BaseModel):Class MessageQueueItem
class MessageQueueItem(MessageArchiveItem):src.types.state.py
Class State
class State(pydantic.BaseModel):Central state used to communicate between prodedures and with the mainloop.
Class SystemState
class SystemState(pydantic.BaseModel):State values determined in the system checks procedure.
src.utils
This module contains all utility functionality of the codebase.
Some of the functions have been used from https://github.com/tum-esm/utils (opens in a new tab) but this library has not been added as a dependency to reduce the number of third party libaries this software depends on.
src.utils.functions.py
Functions
get_time_to_next_datapoint
def get_time_to_next_datapoint(
seconds_between_datapoints: int,
) -> float:Calculates the time until the next measurement should be taken. If the seconds
between datapoints is 10 and the current time is 12:00:03, the next measurement should be taken at 12:00:10. This function starts counting at 00:00:00 system time. Hence it returns 00:00:00, 00:00:10, 00:00:20, 00:00:30.
Arguments:
seconds_between_datapoints: The time between two datapoints in seconds.
Returns: The time until the next measurement should be taken in seconds.
log_level_is_visible
def log_level_is_visible(
min_log_level: typing.Optional[typing.Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'EXCEPTION']],
log_level: typing.Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'EXCEPTION'],
) -> bool:Checks if a log level is forwarded to the user.
Arguments:
min_log_level: The minimum log level to forward, if None, no log levels are forwarded.log_level: The log level to check.
Returns: Whether log_level is at least as important as min_log_level
publish_mqtt_message
def publish_mqtt_message(
topic: str,
message: dict[typing.Any, typing.Any],
host: str,
port: int,
username: str,
password: str,
sleep: float,
) -> None:with_automation_lock
@contextlib.contextmanager
def with_automation_lock() -> typing.Generator[None, None, None]:This function will lock the automation with a file lock so that
only one instance can run at a time.
Usage:
with with_automation_lock():
run_automation()
# or
run_tests()Returns: A context manager that locks the automation.
Raises:
TimeoutError: If the automation is already running.
src.utils.lifecycle_manager.py
Class LifecycleManager
class LifecycleManager:Manages the lifecycle of a procedure or a backend process.
Both procedures and backends run an infinite loop to perform their respective tasks. The procedure manager is responsible for starting, stopping and checking the status of the procedure.
Each procedure/backend is wrapped in one instance of the lifecycle manager.
__init__
def __init__(
self,
config: src.types.config.Config,
entrypoint: typing.Union[typing.Callable[[src.types.config.Config, str], None], typing.Callable[[src.types.config.Config, str, multiprocessing.synchronize.Event], None]],
name: str,
variant: typing.Literal['procedure', 'backend'],
) -> None:Initializes a new lifecycle manager.
Arguments:
config: The configuration object.entrypoint: The entrypoint of the procedure or backend.name: The name of the procedure or backend. Used to name the spawned process.variant: Whether the entrypoint is a procedure or a backend. The difference is only in the teardown logic.
Raises:
ValueError: If the given variant does not match the entrypoint signature.
check_procedure_status
def check_procedure_status(
self,
) -> None:Checks if the procedure is still running. Logs an error if
the procedure has died unexpectedly.
Raises:
RuntimeError: If the procedure has not been started yet. This is a wrong usage of the procedure manager.
procedure_is_running
def procedure_is_running(
self,
) -> bool:Returns True if the procedure has been started. Does not check
whether the process is still alive.
start_procedure
def start_procedure(
self,
) -> None:Starts the procedure in a separate process.
Raises:
RuntimeError: If the procedure is already running. This is a wrong usage of the procedure manager.
teardown
def teardown(
self,
) -> None:Tears down the procedures.
For procedures, it sends a SIGTERM to the process. For backends, it sets a multiprocessing.Event to signal the backend to shut down. This gives the backend processes more freedom to manage a shutdown.
The lifecycle manager waits for the process to shut down gracefully for a certain amount of time. If the process does not shut down in time, it kills the process forcefully by sending a SIGKILL.
For procedures, the SIGKILL is sent after
src.constants.SECONDS_PER_GRACEFUL_PROCEDURE_TEARDOWN seconds. For
backends, the SIGKILL is sent after config.backend.max_drain_time + 15
seconds.
src.utils.logger.py
Class Logger
class Logger:A custom logger class that optionally sends out the logs via
MQTT and writes them to a file. One can add details to log messages of all levels which will be logged on a separate line for better readability.
You can give each instance a custom origin to distinguish between the sources of the log messages.
A simple log message will look like this:
2021-08-22 16:00:00.000 UTC+2 - origin - INFO - test messageA log message with details will look like this:
2021-08-22 16:00:00.000 UTC+2 - origin - INFO - test message
--- details ----------------------------
test details
----------------------------------------An exception will be formatted like this:
2021-08-22 16:00:00.000 UTC+2 - origin - EXCEPTION - ZeroDivisionError: division by zero
--- exception details ------------------
test details
--- traceback --------------------------
Traceback (most recent call last):
File "src/utils/logger.py", line 123, in _write_log_line
raise Exception("test exception")
Exception: test exception
----------------------------------------__init__
def __init__(
self,
config: src.types.config.Config,
origin: str,
) -> None:Initializes a new Logger instance.
Arguments:
config: The config object.origin: The origin of the log messages, will be displayed in the log lines.
_write_log_line
def _write_log_line(
self,
level: typing.Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'EXCEPTION'],
subject: str,
details: list[tuple[str, typing.Optional[str]]],
) -> None:Formats the log line string and writes it out to the selected
output channels.
The output channels are configured using config.logging_verbosity.
You can set the level of detail you want to see in the console, the
file archive and the MQTT message stream.
Arguments:
level: The log level of the message.subject: The subject of the message.details: Additional details to log, useful for verbose output.
debug
def debug(
self,
message: str,
details: typing.Optional[str],
) -> None:Writes a INFO log line.
Arguments:
message: The message to log.details: Additional details to log, useful for verbose output.
error
def error(
self,
message: str,
details: typing.Optional[str],
) -> None:Writes an error log line.
Arguments:
message: The message to log.details: Additional details to log, useful for verbose output.
exception
def exception(
self,
e: Exception,
label: typing.Optional[str],
details: typing.Optional[str],
) -> None:logs the traceback of an exception, sends the message via
MQTT when config is passed (required for revision number).
The subject will be formatted like this:
(label, )ZeroDivisionError: division by zero
Arguments:
e: The exception to log.label: A label to prepend to the exception name.details: Additional details to log, useful for verbose output like full log of a failed pytest on a new config.
horizontal_line
def horizontal_line(
self,
fill_char: typing.Literal['-', '=', '.', '_'],
) -> None:Writes a horizontal line.
Arguments:
fill_char: The character to fill the line with.
info
def info(
self,
message: str,
details: typing.Optional[str],
) -> None:Writes a INFO log line.
Arguments:
message: The message to log.details: Additional details to log, useful for verbose output.
read_current_log_file
@staticmethod
def read_current_log_file() -> typing.Optional[str]:Reads the current log file and returns its content.
Returns: The content of the current log file.
warning
def warning(
self,
message: str,
details: typing.Optional[str],
) -> None:Writes a WARNING log line.
Arguments:
message: The message to log.details: Additional details to log, useful for verbose output.
src.utils.mainloop_toggle.py
Functions to start and terminate background processes.
Variables
SCRIPT_PATH: strAbsolute path of the run.py file that starts an infinite mainloop
Class MainloopToggle
class MainloopToggle:Used to start and stop the mainloop process in the background.
All functionality borrowed from the tum-esm-utils package (opens in a new tab)
get_mainloop_pids
@staticmethod
def get_mainloop_pids() -> list[int]:Get the process ID(s) of the mainloop process(es).
Should be used to check if the mainloop process is running.
Returns: A list of process IDs. Might have more than one element if the mainloop process has spawned child process(es).
start_mainloop
@staticmethod
def start_mainloop() -> None:Start the mainloop process in the background and print
the process ID(s) of the new process(es).
stop_mainloop
@staticmethod
def stop_mainloop() -> None:Terminate the mainloop process in the background and print
the process ID(s) of the terminated process(es).
src.utils.messaging_agent.py
Variables
ACTIVE_QUEUE_FILE: strThe absolute path of the SQLite database that stores the active message queue (data/active-message-queue.sqlite3)
MESSAGE_ARCHIVE_DIR: strThe absolute path of the directory that stores the message archive (data/messages/)
MESSAGE_ARCHIVE_DIR_LOCK: strThe absolute path of the lock file that is used to lock the message archive directory (data/messages.lock). This is used to make sure that only one process can write to the message archive at a time.
Class MessagingAgent
class MessagingAgent:__init__
def __init__(
self,
) -> None:Create a new messaging agent.
Sets up a connection to the SQLite database that stores the active message queue. Creates the SQL tables if they don't exist yet.
add_message
def add_message(
self,
message_body: typing.Union[src.types.messages.DataMessageBody, src.types.messages.LogMessageBody, src.types.messages.ConfigMessageBody],
) -> None:Add a message to the active message queue and the message archive.
Messages are written to the archive right away so they don't get lost if the backend process fails to send them out.
Arguments:
message_body: The message body.
get_message_archive_file
@staticmethod
def get_message_archive_file() -> str:Get the file path of the message archive file for the current date.
get_n_latest_messages
def get_n_latest_messages(
self,
n: int,
excluded_message_ids: set[int] | list[int],
) -> list[src.types.messages.MessageQueueItem]:Get the n latest messages from the active message queue.
Arguments:
n: The number of messages to get.excluded_message_ids: The message IDs to exclude from the result. Can be used to exclude messages that are already being processed but are still in the active message queue.
Returns: A list of messages from the active queue.
load_message_archive
@staticmethod
def load_message_archive(
date: datetime.date,
) -> list[src.types.messages.MessageArchiveItem]:Load the message archive for a specific date.
Arguments:
date: The date for which to load the message archive.
Returns: A list of messages from the message archive.
remove_messages
def remove_messages(
self,
message_ids: set[int] | list[int],
) -> None:Remove messages from the active message queue.
Arguments:
message_ids: The message IDs to be removed.
teardown
def teardown(
self,
) -> None:Close the connection to the active message queue database.
src.utils.state_interface.py
Variables
STATE_FILE: strPoints to data/state.json where the state is communicated with all threads
STATE_FILE_LOCK: strPoints to data/state.lock which is used to ensure that only one thread can access the state at a time.
Class StateInterface
class StateInterface:load
@tum_esm_utils.decorators.with_filelock(STATE_FILE_LOCK, timeout=6)
@staticmethod
def load() -> src.types.state.State:Load the state file from the path project_dir/data/state.json
update
@tum_esm_utils.decorators.with_filelock(STATE_FILE_LOCK, timeout=6)
@staticmethod
@contextlib.contextmanager
def update() -> typing.Generator[src.types.state.State, None, None]:Load the state file and update it within a semaphore.
This makes sure that only one process can access this section at a time. If you would do 1. load, 2. modify, 3. save in separate calls, you might overwrite the changes by another process.
Usage:
with State.update() as state:
state.system.last_boot_time = datetime.datetime.now()Returns: A generator that yields the state object.
src.utils.updater.py
Class Updater
class Updater:Implementation of the update capabilities of ivy: checks whether
a new config is in a valid format, downloads new source code, creates virtual environments, installs dependencies, runs pytests, removes old virtual environments, and updates the cli pointer to the currently used version of the automation software.
__init__
def __init__(
self,
config: src.types.config.Config,
) -> None:Initialize an Updater instance.
Arguments:
config: The current config object
download_source_code
@staticmethod
def download_source_code(
updater_config: src.types.config.UpdaterConfig,
version: tum_esm_utils.validators.Version,
log_progress: typing.Callable[[str], None],
) -> None:Download the source code of the new version to the version
directory. This is currently only implemented for github and gitlab for private and public repositories. Feel free to request other providers in the issue tracker.
This is a static method, so it can be tested independently.
Arguments:
updater_config: The updater config objectversion: The version of the source code to download
install_dependencies
@staticmethod
def install_dependencies(
version: tum_esm_utils.validators.Version,
log_progress: typing.Callable[[str], None],
installation_command: str,
) -> None:Create a virtual environment and install the dependencies in the
version directory using PDM. It uses the pdm sync command to exactly
create the desired environmont.
Since the pyproject.toml file generated by PDM is complying with PEP
standards, one could also use pip install .. However, we recommend
using PDM for due to caching and dependency locking.
This is a static method, so it can be tested independently.
Arguments:
version: The version of the source code to downloadlog_progress: A function to log progress messages
perform_update
def perform_update(
self,
foreign_config: src.types.config.ForeignConfig,
) -> None:Perform an update for a received config file.
See the documentation for a detailed explanation of the update process.
Arguments:
foreign_config: The received foreign config object
remove_old_venvs
@staticmethod
def remove_old_venvs(
current_version: tum_esm_utils.validators.Version,
log_progress: typing.Callable[[str], None],
) -> None:Remove all old virtual environments, besides the current one.
Arguments:
current_version: The current version of the softwarelog_progress: A function to log progress
run_pytests
@staticmethod
def run_pytests(
version: tum_esm_utils.validators.Version,
pytest_marker: str,
) -> None:Run all pytests with the mark "version_change" in the version directory.
Arguments:
version: The version of the source code to be testedpytest_marker: The pytest marker to run
update_cli_pointer
@staticmethod
def update_cli_pointer(
to_version: tum_esm_utils.validators.Version,
) -> None:Update the cli pointer to a new version.
Arguments:
to_version: The version to update the cli pointer to