Skip to content

Servers

ropt.workflow.servers.Server

Bases: ABC

Abstract base class for server components within an optimization workflow.

Subclasses must implement the following abstract methods and properties:

  • start: Starts the server.
  • cancel: Stops the server.
  • task_queue: Retrieves the servers task queue.
  • loop: Retrieves the currently running asyncio loop.
  • task_group: The asyncio.Taskgroup used by this server.
  • is_running: Checks if the server is running.

task_queue abstractmethod property

task_queue: Queue[Any]

Get the task queue.

loop abstractmethod property

loop: AbstractEventLoop | None

Get the asyncio loop used by this server.

task_group abstractmethod property

task_group: TaskGroup | None

Get the task group used by this server.

start abstractmethod async

start(task_group: TaskGroup) -> None

Start the evaluation server.

Parameters:

Name Type Description Default
task_group TaskGroup

The task group to use.

required

Raises:

Type Description
RuntimeError

If the evaluator is already running or using an external queue.

cancel abstractmethod

cancel() -> None

Stop the evaluation server.

is_running abstractmethod

is_running() -> bool

Check if the server is not running.

Returns:

Type Description
bool

True if the server is not running.

ropt.workflow.servers.Task dataclass

Bases: ABC

A task to be executed by a worker.

Attributes:

Name Type Description
function Callable[..., Any]

The function to execute.

args tuple[Any, ...]

The arguments to pass to the function.

kwargs dict[str, Any]

The keyword arguments to pass to the function.

results_queue ResultsQueue

The queue to put the result in.

result Any | None

The result of the function, or None if no result is available.

name str | None

Optional unique name of the task.

put_result

put_result(result: Any) -> None

Put the result in the result queue.

cancel_all

cancel_all() -> None

Stop putting results in the result queue.

ropt.workflow.servers.AsyncServer

Bases: ServerBase

An evaluator server that employs asynchronous workers.

__init__

__init__(*, workers: int = 1, queue_size: int = 0) -> None

Initialize the server.

Parameters:

Name Type Description Default
workers int

The number of workers to use.

1
queue_size int

Maximum size of the tasks queue.

0

start async

start(task_group: TaskGroup) -> None

Start the server.

Parameters:

Name Type Description Default
task_group TaskGroup

The task group to use.

required

cleanup

cleanup() -> None

Cleanup the server.

ropt.workflow.servers.MultiprocessingServer

Bases: ServerBase

An evaluator server that employs a pool of multiprocessing workers.

__init__

__init__(
    *,
    workers: int = 1,
    queue_size: int = 0,
    max_tasks_per_child: int | None = None,
) -> None

Initialize the server.

max_tasks_per_child

The max_tasks_per_child is passed to the process executor pool. It forces the pool tasks to be restarted after the set numbers of tasks is executed. This adds to the overhead which can be significant. It is therefore set to None be default. Adjust memory or resource leaks are suspected.

Parameters:

Name Type Description Default
workers int

The number of workers to use.

1
queue_size int

Maximum size of the tasks queue.

0
max_tasks_per_child int | None

Number of tasks to execute before refreshing workers.

None

start async

start(task_group: TaskGroup) -> None

Start the server.

Parameters:

Name Type Description Default
task_group TaskGroup

The task group to use.

required

cleanup

cleanup() -> None

Cleanup the server.

ropt.workflow.servers.HPCServer

Bases: ServerBase

A server for submitting tasks to a High-Performance Computing (HPC) cluster.

This server interfaces with an HPC queueing system (like Slurm) via the pysqa library. It manages the entire lifecycle of a remote task, including:

  • Serializing the task (function and arguments) and writing it to a shared filesystem.
  • Submitting the task as a job to the HPC queue.
  • Polling the queue for the job's status.
  • Retrieving the results (or any exceptions) once the job is complete.

Configuration of the cluster connection is handled either through a submission script template or a pysqa configuration directory.

__init__

__init__(
    *,
    workdir: Path | str = "./",
    workers: int = 1,
    queue_size: int = 0,
    interval: float = 1,
    queue_type: str = "slurm",
    template: str | None = None,
    config_path: Path | str | None = None,
    cluster: str | None = None,
    queue: str | None = None,
    cores: int = 1,
) -> None

Initialize the HPC server.

This sets up the server for communication with an HPC cluster. The connection can be configured in two ways:

  1. By providing a template string for the job submission script.
  2. By providing a config_path to a directory containing pysqa cluster configurations.

If config_path is not given, the server will look for a default configuration at <sysconfig_data>/share/ropt/pysqa. One of these configuration methods is required.

Parameters:

Name Type Description Default
workdir Path | str

Working directory on a shared filesystem accessible by both the client and the HPC nodes. Used for temporary input/output files.

'./'
workers int

The maximum number of concurrent jobs to run on the HPC cluster.

1
queue_size int

The maximum number of tasks to hold in the internal queue before submission. A value of 0 means an unlimited size.

0
interval float

The interval in seconds at which to poll the HPC queue for job status updates.

1
queue_type str

The type of the queueing system (e.g., "slurm"). This is passed to pysqa and is also used to find the correct subdirectory within config_path.

'slurm'
template str | None

An optional submission script template. If provided, it will be used by pysqa to generate the job submission script.

None
config_path Path | str | None

An optional path to a directory containing pysqa cluster configuration files. This is used if template is not provided.

None
cluster str | None

Optional name of the cluster to use. If supported by the installation, this makes it possible to switch between clusters.

None
queue str | None

Optional queue to use on the cluster.

None
cores int

The number of cpu's per task.

1

Raises:

Type Description
RuntimeError

If neither a template is provided nor a valid config_path can be found.

start async

start(task_group: TaskGroup) -> None

Start the server.

Parameters:

Name Type Description Default
task_group TaskGroup

The task group to use.

required

cleanup

cleanup() -> None

Clean up the server resources.

This method cancels the background worker task that polls the HPC queue and ensures that any clients waiting for results are notified of the shutdown.