Skip to content

Topology Optimizer#

toop_engine_topology_optimizer.interfaces.messages.commands #

Defines the interaction between the optimizer and a backend.

The backend will send commands in the form of messages to the optimizer, which will trigger a certain behaviour. The Optimizer will respond with results, for this see results.py

StartOptimizationCommand #

Bases: BaseModel

Command with parameters for starting an optimization run.

message_type class-attribute instance-attribute #

message_type = 'start_optimization'

The command type for deserialization, don't change this

dc_params class-attribute instance-attribute #

dc_params = DCOptimizerParameters()

The parameters for the DC optimizer

ac_params class-attribute instance-attribute #

ac_params = ACOptimizerParameters()

The parameters for the AC optimizer

grid_files instance-attribute #

grid_files

The grid files to load, where each gridfile represents one timestep. The grid files also include coupling information for the timesteps.

optimization_id instance-attribute #

optimization_id

The id of the optimization run, used to identify the optimization run in the results. Should stay the same for the whole optimization run and should be equal to the kafka event key

ShutdownCommand #

Bases: BaseModel

Command to shutdown the worker.

message_type class-attribute instance-attribute #

message_type = 'shutdown'

The command type for deserialization, don't change this

exit_code class-attribute instance-attribute #

exit_code = 0

The exit code to exit with

Command #

Bases: BaseModel

Base class for all commands to the optimizer.

command class-attribute instance-attribute #

command = Field(discriminator='message_type')

The actual command

uuid class-attribute instance-attribute #

uuid = Field(default_factory=lambda: str(uuid4()))

A unique identifier for the command message, used to avoid duplicate processing on optimizer side

timestamp class-attribute instance-attribute #

timestamp = Field(default_factory=lambda: str(now()))

When the command was sent

validate_first_gridfile_uncoupled #

validate_first_gridfile_uncoupled(val)

Check that the first gridfile is uncoupled

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/interfaces/messages/commands.py
def validate_first_gridfile_uncoupled(val: list[GridFile]) -> list[GridFile]:
    """Check that the first gridfile is uncoupled"""
    if val:
        if val[0].coupling != "none":
            raise ValueError("The first gridfile must be uncoupled")
    return val

toop_engine_topology_optimizer.interfaces.messages.dc_params #

The DC optimizer is the GPU stage where massive amounts of topologies are being checked.

This holds the parameters to start the optimization. Some parameters can not be changed (mainly the names of the kafka streams) and are included in the command line start parameters instead.

BatchedMEParameters #

Bases: BaseModel

Parameters for starting the batched genetic algorithm (In this case Map-Elites)

substation_split_prob class-attribute instance-attribute #

substation_split_prob = 0.1

The probability to split an unsplit substation. If not split, a reconfiguration is applied

substation_unsplit_prob class-attribute instance-attribute #

substation_unsplit_prob = 0.01

The probability to reset a split substation to the unsplit state

disconnect_prob class-attribute instance-attribute #

disconnect_prob = 0.1

The probability to disconnect a new branch

reconnect_prob class-attribute instance-attribute #

reconnect_prob = 0.1

The probability to reconnect a disconnected branch, will overwrite a possible disconnect

pst_mutation_sigma class-attribute instance-attribute #

pst_mutation_sigma = 0.0

The sigma to use for the normal distribution that mutates the PST taps. The mutation is applied by adding a random value drawn from this distribution to the current tap position. A value of 0.0 means no PST mutation.

n_subs_mutated_lambda class-attribute instance-attribute #

n_subs_mutated_lambda = 2.0

The number of substations to mutate in a single iteration is drawn from a poisson with this lambda

proportion_crossover class-attribute instance-attribute #

proportion_crossover = 0.1

The proportion of the first topology to take in the crossover

crossover_mutation_ratio class-attribute instance-attribute #

crossover_mutation_ratio = 0.5

The ratio of crossovers to mutations

target_metrics class-attribute instance-attribute #

target_metrics = (('overload_energy_n_1', 1.0),)

The list of metrics to optimize for with their weights

observed_metrics class-attribute instance-attribute #

observed_metrics = (
    "max_flow_n_0",
    "overload_energy_n_0",
    "overload_energy_limited_n_0",
    "max_flow_n_1",
    "overload_energy_n_1",
    "overload_energy_limited_n_1",
    "split_subs",
    "switching_distance",
)

The observed metrics, i.e. which metrics are to be computed for logging purposes. The target_metrics and me_descriptors must be included in the observed metrics and will be added automatically by the validator if they are missing

me_descriptors class-attribute instance-attribute #

me_descriptors = (
    DescriptorDef(metric="split_subs", num_cells=5),
    DescriptorDef(
        metric="switching_distance", num_cells=45
    ),
)

The descriptors to use for MAP-Elites. This includes a metric that determines the cell index and a number of cells. If the metric exceeds the number of cells, it will be clipped to the largest cell index. Currently, this must be integer metrics

runtime_seconds class-attribute instance-attribute #

runtime_seconds = 60

The runtime in seconds

iterations_per_epoch class-attribute instance-attribute #

iterations_per_epoch = 100

The number of iterations per epoch

random_seed class-attribute instance-attribute #

random_seed = 42

The random seed to use for reproducibility

cell_depth class-attribute instance-attribute #

cell_depth = 1

When applicable, each cell contains cell_depth unique topologies. Use 1 to retain the original map-elites behaviour

plot class-attribute instance-attribute #

plot = False

Whether to plot the repertoire

mutation_repetition class-attribute instance-attribute #

mutation_repetition = 1

More chance to get unique mutations by mutating multiple copies of the repertoire

n_worst_contingencies class-attribute instance-attribute #

n_worst_contingencies = 20

The number of worst contingencies to consider in the scoring function. This is used to determine the worst cases for overloads.

infer_missing_observed_metrics #

infer_missing_observed_metrics()

Add potentially missing target and descriptor metrics to the observed metrics.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/interfaces/messages/dc_params.py
@model_validator(mode="after")
def infer_missing_observed_metrics(self) -> BatchedMEParameters:
    """Add potentially missing target and descriptor metrics to the observed metrics."""
    for metric, _ in self.target_metrics:
        if metric not in self.observed_metrics:
            self.observed_metrics += (metric,)
    for descriptor in self.me_descriptors:
        if descriptor.metric not in self.observed_metrics:
            self.observed_metrics += (descriptor.metric,)
    return self

LoadflowSolverParameters #

Bases: BaseModel

Parameters for the loadflow solver.

max_num_splits class-attribute instance-attribute #

max_num_splits = 4

The maximum number of splits per topology

max_num_disconnections class-attribute instance-attribute #

max_num_disconnections = 0

The maximum number of disconnections to apply per topology

batch_size class-attribute instance-attribute #

batch_size = 8

The batch size for the genetic algorithm

distributed class-attribute instance-attribute #

distributed = False

Whether to run the genetic algorithm distributed over multiple devices

cross_coupler_flow class-attribute instance-attribute #

cross_coupler_flow = False

Whether to compute cross-coupler flows

DCOptimizerParameters #

Bases: BaseModel

The set of parameters that are used in the DC optimizer only

ga_config class-attribute instance-attribute #

ga_config = BatchedMEParameters()

The configuration options for the genetic algorithm

loadflow_solver_config class-attribute instance-attribute #

loadflow_solver_config = LoadflowSolverParameters()

The configuration options for the loadflow solver

double_limits class-attribute instance-attribute #

double_limits = None

The double limits for the optimization, if they should be updated

summary_frequency class-attribute instance-attribute #

summary_frequency = 10

The frequency to push back results, based on number of iterations. Default is after every 10 iterations.

check_command_frequency class-attribute instance-attribute #

check_command_frequency = 10

The frequency to check for new commands, based on number of iterations. Should be a multiple of summary_frequency

toop_engine_topology_optimizer.interfaces.messages.ac_params #

The parameters for the AC optimizer.

On AC, some subtelties are different to the DC optimization such as that the optimization is not batched, and the parameters are slightly different.

ACGAParameters #

Bases: BaseModel

Parameters for the AC genetic algorithm

runtime_seconds class-attribute instance-attribute #

runtime_seconds = 30

The maximum runtime of the AC optimization in seconds

pull_prob class-attribute instance-attribute #

pull_prob = 0.9

The probability of pulling a strategy from the DC repertoire

me_descriptors class-attribute instance-attribute #

me_descriptors = (
    DescriptorDef(
        metric="split_subs", num_cells=2, range=(0, 5)
    ),
    DescriptorDef(
        metric="switching_distance",
        num_cells=5,
        range=(0, 50),
    ),
    DescriptorDef(
        metric="disconnected_branches", num_cells=2
    ),
)

The descriptors for the aggregated map elites repertoire.

reconnect_prob class-attribute instance-attribute #

reconnect_prob = 0.05

The probability of reconnecting a disconnected branch in a strategy

close_coupler_prob class-attribute instance-attribute #

close_coupler_prob = 0.05

The probability of closing an opened coupler in a strategy

n_worst_contingencies class-attribute instance-attribute #

n_worst_contingencies = 20

How many worst contingencies to consider for the initial metrics, i.e. the top k contingencies that are used to compute the initial metrics. This is used to compute the top_k_overloads_n_1

seed class-attribute instance-attribute #

seed = 42

The seed for the random number generator

timestep_processes class-attribute instance-attribute #

timestep_processes = 1

How many processes to spawn for computing the timesteps in parallel

runner_processes class-attribute instance-attribute #

runner_processes = 1

How many processes to spawn for computing the N-1 cases in each timestep in parallel. Note that this multiplies with timestep_processes and you might run out of memory if you set both too high

runner_batchsize class-attribute instance-attribute #

runner_batchsize = None

Whether to batch the N-1 definition into smaller chunks, might conserve memory

filter_strategy class-attribute instance-attribute #

filter_strategy = None

The filter strategy to use for the optimization, used to filter out strategies based on the discriminator, median or dominator filter.

enable_ac_rejection class-attribute instance-attribute #

enable_ac_rejection = True

Whether to enable the AC rejection, i.e. no messages will be sent to the results topic in case of non-acceptance.

reject_convergence_threshold class-attribute instance-attribute #

reject_convergence_threshold = 1.0

The rejection threshold for the convergence rate, i.e. the split case must have at most the same amount of non converging loadflows as the unsplit case or it will be rejected.

reject_overload_threshold class-attribute instance-attribute #

reject_overload_threshold = 0.95

The rejection threshold for the overload energy improvement, i.e. the split case must have at least 5% lower overload energy than the unsplit case or it will be rejected.

reject_critical_branch_threshold class-attribute instance-attribute #

reject_critical_branch_threshold = 1.1

The rejection threshold for the critical branches increase, i.e. the split case must have less than 10% more critical branches than the unsplit case or it will be rejected.

early_stop_validation class-attribute instance-attribute #

early_stop_validation = True

Whether to enable early stopping during the optimization process.

early_stopping_non_convergence_percentage_threshold class-attribute instance-attribute #

early_stopping_non_convergence_percentage_threshold = 0.1

The threshold for the early stopping criterion, i.e. if the percentage of non-converging cases is greater than this value, the ac validation will be stopped early.

max_initial_wait_seconds class-attribute instance-attribute #

max_initial_wait_seconds = 60

The maximum amount of seconds to wait for the initial DC results. If no results have arrived within this time, we assume the DC optimizer had some problem and abort the optimization run.

probabilities_sum_to_one #

probabilities_sum_to_one()

Ensure that the probabilities sum to one

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/interfaces/messages/ac_params.py
@model_validator(mode="after")
def probabilities_sum_to_one(self) -> ACOptimizerParameters:
    """Ensure that the probabilities sum to one"""
    total_probability = self.pull_prob + self.reconnect_prob + self.close_coupler_prob
    if not math.isclose(total_probability, 1.0):
        raise ValueError("The probabilities must sum to one")
    return self

ACOptimizerParameters #

Bases: BaseModel

The set of parameters that are used in the AC optimizer only

initial_loadflow class-attribute instance-attribute #

initial_loadflow = None

If an initial AC loadflow was computed before the start of the optimization run, this can be passed and will be used e.g. to compute double limits. It will be sent back through the initial topology push.

ga_config class-attribute instance-attribute #

ga_config = ACGAParameters()

The genetic algorithm configuration

AC Validation#

toop_engine_topology_optimizer.ac.evolution_functions #

Implements an adjusted AC evolution

Instead of the original GA evolution which only knows mutate and crossover, we introduce the following operations: - The pull operator will take a promising topology from the DC repertoire and re-evaluate it on AC. The notion of promising is defined through a interest-scoring function which tries to balance the explore/exploit trade-off. - The reconnect operator will take a promising topology from the DC repertoire and reconnect a single branch in all timesteps. The idea is that the DC part might have disconnected too many branches, so we try to simplify the topology by reconnecting a single branch. - The close_coupler operator will take a promising topology from the DC repertoire and close a coupler in all timesteps. The idea is that the DC part might have too many open couplers, so we try to simplify the topology by closing a single coupler.

logger module-attribute #

logger = Logger(__name__)

select_repertoire #

select_repertoire(optimization_id, optimizer_type, session)

Select the topologies that are suitable for mutation and crossover

In this case, all topologies that satisfy the filter criteria are suitable, however a check after the mutate is necessary to ensure that the topology is not already in the database.

The unsplit strategy is never selected for mutation or crossover.

PARAMETER DESCRIPTION
optimization_id

The optimization ID to filter for

TYPE: str

optimizer_type

The optimizer types to filter for (whitelist)

TYPE: list[OptimizerType]

session

The database session to use

TYPE: Session

RETURNS DESCRIPTION
list[ACOptimTopology]

The topologies that are suitable for mutation and crossover

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/evolution_functions.py
def select_repertoire(optimization_id: str, optimizer_type: list[OptimizerType], session: Session) -> list[ACOptimTopology]:
    """Select the topologies that are suitable for mutation and crossover

    In this case, all topologies that satisfy the filter criteria are suitable, however a check after
    the mutate is necessary to ensure that the topology is not already in the database.

    The unsplit strategy is never selected for mutation or crossover.

    Parameters
    ----------
    optimization_id : str
        The optimization ID to filter for
    optimizer_type : list[OptimizerType]
        The optimizer types to filter for (whitelist)
    session : Session
        The database session to use

    Returns
    -------
    list[ACOptimTopology]
        The topologies that are suitable for mutation and crossover
    """
    # Query to select topologies suitable for mutation and crossover
    mutate_query = select(ACOptimTopology).where(
        ACOptimTopology.optimization_id == optimization_id,
        ACOptimTopology.optimizer_type.in_(optimizer_type),
        ACOptimTopology.unsplit == False,  # noqa: E712
    )

    # Execute the query and get the results
    suitable_topologies = session.exec(mutate_query).all()

    return suitable_topologies

get_unsplit_ac_topology #

get_unsplit_ac_topology(optimization_id, session)

Get the unsplit AC topology for the given optimization ID

PARAMETER DESCRIPTION
optimization_id

The optimization ID to filter for

TYPE: str

session

The database session to use

TYPE: Session

RETURNS DESCRIPTION
ACOptimTopology

The unsplit AC topology for the given optimization ID, or None if not found

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/evolution_functions.py
def get_unsplit_ac_topology(
    optimization_id: str,
    session: Session,
) -> ACOptimTopology:
    """Get the unsplit AC topology for the given optimization ID

    Parameters
    ----------
    optimization_id : str
        The optimization ID to filter for
    session : Session
        The database session to use

    Returns
    -------
    ACOptimTopology
        The unsplit AC topology for the given optimization ID, or None if not found
    """
    query = select(ACOptimTopology).where(
        ACOptimTopology.optimization_id == optimization_id,
        ACOptimTopology.unsplit == True,  # noqa: E712
        ACOptimTopology.optimizer_type == OptimizerType.AC,
    )
    return session.exec(query).first()

default_scorer #

default_scorer(metrics)

Score the topologies based on their fitness only (greedy selection)

PARAMETER DESCRIPTION
metrics

The metrics DataFrame to score

TYPE: DataFrame

RETURNS DESCRIPTION
Series

The fitness scores

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/evolution_functions.py
def default_scorer(metrics: pd.DataFrame) -> pd.Series:
    """Score the topologies based on their fitness only (greedy selection)

    Parameters
    ----------
    metrics : pd.DataFrame
        The metrics DataFrame to score

    Returns
    -------
    pd.Series
        The fitness scores
    """
    return metrics["fitness"] + metrics["fitness"].min()

get_contingency_indices_from_ids #

get_contingency_indices_from_ids(
    case_ids_all_t, n_minus1_definitions
)

Map contingency ids to their indices in the N-1 definition for each timestep.

This is a helper method used in update_initial_metrics_with_worst_k_contingencies method.

PARAMETER DESCRIPTION
case_ids_all_t

A list of lists, where each inner list contains the contingency ids for a specific timestep.

TYPE: list[list[str]]

n_minus1_definitions

A list of N-1 definitions, one for each timestep, containing the contingencies.

TYPE: list[Nminus1Definition]

RETURNS DESCRIPTION
list[list[int]]

A list of lists, where each inner list contains the indices of the contingencies in the N-1 definition for the corresponding timestep. If a contingency id is not found, it will be skipped.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/evolution_functions.py
def get_contingency_indices_from_ids(
    case_ids_all_t: list[list[str]], n_minus1_definitions: list[Nminus1Definition]
) -> list[list[int]]:
    """Map contingency ids to their indices in the N-1 definition for each timestep.

    This is a helper method used in update_initial_metrics_with_worst_k_contingencies
    method.

    Parameters
    ----------
    case_ids_all_t : list[list[str]]
        A list of lists, where each inner list contains the contingency ids for a specific timestep.
    n_minus1_definitions : list[Nminus1Definition]
        A list of N-1 definitions, one for each timestep, containing the contingencies.

    Returns
    -------
    list[list[int]]
        A list of lists, where each inner list contains the indices of the contingencies in the
        N-1 definition for the corresponding timestep. If a contingency id is not found, it
        will be skipped.
    """
    case_indices_all_t = []
    for case_ids, n_minus1_def in zip(case_ids_all_t, n_minus1_definitions, strict=True):
        id_to_index = {cont.id: idx for idx, cont in enumerate(n_minus1_def.contingencies)}
        case_indices = [id_to_index[case_id] for case_id in case_ids if case_id in id_to_index]
        case_indices_all_t.append(case_indices)
    return case_indices_all_t

pull #

pull(
    selected_strategy,
    session=None,
    n_minus1_definitions=None,
)

Pull a promising topology from the DC repertoire to AC

This only copies the topology without any changes other than setting the optimizer type to AC. This function takes a list of selected DC topologies and creates corresponding AC topologies by copying relevant attributes, setting the optimizer type to AC, and merging contingency case indices from both the DC and unsplit AC topologies.

PARAMETER DESCRIPTION
selected_strategy

The selected strategy to pull

TYPE: list[ACOptimTopology]

session

The database session to use, by default None. The session object is used to fetch the unsplit AC topology which is then used to add the critical contingency cases to the pulled strategy. These critical cases can then be used for early stopping of AC N-1 contingency analysis.

TYPE: Session DEFAULT: None

n_minus1_definitions

The N-1 definitions to use for the pulled strategy. If not provided, the pulled strategy will not include any N-1 contingency cases while calculating the top critical contingencies for early stopping.

TYPE: Optional[list[Nminus1Definition]] DEFAULT: None

RETURNS DESCRIPTION
list[ACOptimTopology]

The a copy of the input topologies with the optimizer type set to AC

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/evolution_functions.py
def pull(
    selected_strategy: list[ACOptimTopology],
    session: Session = None,
    n_minus1_definitions: Optional[list[Nminus1Definition]] = None,
) -> list[ACOptimTopology]:
    """Pull a promising topology from the DC repertoire to AC

    This only copies the topology without any changes other than setting the optimizer type to AC.
    This function takes a list of selected DC topologies and creates corresponding AC topologies by copying
    relevant attributes, setting the optimizer type to AC, and merging contingency case indices from both
    the DC and unsplit AC topologies.

    Parameters
    ----------
    selected_strategy : list[ACOptimTopology]
        The selected strategy to pull
    session : Session, optional
        The database session to use, by default None. The session object is used to fetch the unsplit AC topology
        which is then used to add the critical contingency cases to the pulled strategy. These critical cases
        can then be used for early stopping of AC N-1 contingency analysis.
    n_minus1_definitions : Optional[list[Nminus1Definition]]
        The N-1 definitions to use for the pulled strategy. If not provided, the pulled strategy will not
        include any N-1 contingency cases while calculating the top critical contingencies for early stopping.

    Returns
    -------
    list[ACOptimTopology]
        The a copy of the input topologies with the optimizer type set to AC
    """
    if not selected_strategy:
        return []

    optimization_id = selected_strategy[0].optimization_id

    # Unsplit AC topology will be used to merge critical contingency cases. This topology is pushed in the repertoire
    # in the initialization phase of the AC optimizer.
    unsplit_ac_topo = get_unsplit_ac_topology(optimization_id=optimization_id, session=session) if session else None
    worst_k_cont_ids_unsplit = set(unsplit_ac_topo.worst_k_contingency_cases) if unsplit_ac_topo else set()
    if n_minus1_definitions is None:
        logger.warning(
            "N-1 definition is not provided to pull method."
            " Early stopping in AC validation will not consider worst_k DC indices"
        )
    pulled_strategy = []
    for topo in selected_strategy:
        # Merge case_ids from DC and unsplit AC
        merged_ids = list(set(topo.worst_k_contingency_cases) | worst_k_cont_ids_unsplit)

        data = topo.model_dump(
            include=[
                "actions",
                "disconnections",
                "pst_setpoints",
                "unsplit",
                "timestep",
                "optimization_id",
                "strategy_hash",
            ]
        )
        metrics = topo.metrics
        metrics["fitness_dc"] = topo.fitness

        new_topo = ACOptimTopology(
            **data,
            optimizer_type=OptimizerType.AC,
            fitness=-9999999,
            parent_id=topo.id,
            metrics=metrics,
            worst_k_contingency_cases=merged_ids,
        )
        new_topo.metrics["top_k_overloads_n_1"] = (
            unsplit_ac_topo.metrics.get("top_k_overloads_n_1", None) if unsplit_ac_topo else None
        )
        pulled_strategy.append(new_topo)

    return pulled_strategy

reconnect #

reconnect(rng, selected_strategy)

Reconnect a disconnected branch

The idea behind this mutation operation is that the DC part might have disconnected too many branches, so we try to simplify the topology by reconnecting a single branch in all timesteps

This might accidentally create the unsplit topology and does not check for that case. A check for this happens upon insertion into the database where the unique constraint will prevent duplicates.

PARAMETER DESCRIPTION
rng

The random number generator to use

TYPE: Generator

selected_strategy

The selected DC or AC strategy to reconnect

TYPE: list[ACOptimTopology]

RETURNS DESCRIPTION
list[ACOptimTopology]

The a copy of the input topologies with the optimizer type set to AC and a branch reconnected or the empty list if no disconnections were present in the input topologies

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/evolution_functions.py
def reconnect(rng: Rng, selected_strategy: list[ACOptimTopology]) -> list[ACOptimTopology]:
    """Reconnect a disconnected branch

    The idea behind this mutation operation is that the DC part might have disconnected too many
    branches, so we try to simplify the topology by reconnecting a single branch in all timesteps

    This might accidentally create the unsplit topology and does not check for that case. A check
    for this happens upon insertion into the database where the unique constraint will prevent
    duplicates.

    Parameters
    ----------
    rng : Rng
        The random number generator to use
    selected_strategy : list[ACOptimTopology]
        The selected DC or AC strategy to reconnect

    Returns
    -------
    list[ACOptimTopology]
        The a copy of the input topologies with the optimizer type set to AC and a branch reconnected
        or the empty list if no disconnections were present in the input topologies
    """
    branch_set = set([disc for topo in selected_strategy for disc in topo.disconnections])
    if len(branch_set) == 0:
        return []

    chosen = rng.choice(list(branch_set))
    topos = [
        ACOptimTopology(
            **topo.model_dump(
                include=[
                    "actions",
                    "pst_setpoints",
                    "timestep",
                    "optimization_id",
                    "strategy_hash",
                ]
            ),
            optimizer_type=OptimizerType.AC,
            fitness=-9999999,
            disconnections=[disc for disc in topo.disconnections if disc != chosen],
            unsplit=False,
            parent_id=topo.id,
        )
        for topo in selected_strategy
    ]

    updated_hash = hash_topologies(topos)
    unsplit = is_unsplit_topologies(topos)
    for topo in topos:
        topo.strategy_hash = updated_hash
        topo.unsplit = unsplit

    return topos

close_coupler #

close_coupler(rng, selected_strategy)

Close a coupler in the selected strategy.

A station that has an open coupler in any of the timesteps is selected and the coupler is closed for all timesteps in the topology.

This might accidentally create the unsplit topology and does not prohibit that case. A check for this happens upon insertion into the database where the unique constraint will prevent duplicates. The unsplit flag will be set correctly though, so in case the unsplit topology was not yet in the database for some reason, this will add it.

PARAMETER DESCRIPTION
rng

The random number generator to use

TYPE: Generator

selected_strategy

The selected DC or AC strategy to close a coupler. The number of timesteps must be equal to the number of timesteps in the branches_per_sub and injections_per_sub arrays. If the empty list is passed, the function will return an empty list.

TYPE: list[ACOptimTopology]

RETURNS DESCRIPTION
list[ACOptimTopology]

The a copy of the input topologies with the optimizer type set to AC and a coupler closed or the empty list if no open couplers were present in the input topologies

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/evolution_functions.py
def close_coupler(
    rng: Rng,
    selected_strategy: list[ACOptimTopology],
) -> list[ACOptimTopology]:
    """Close a coupler in the selected strategy.

    A station that has an open coupler in any of the timesteps is selected and the coupler is closed
    for all timesteps in the topology.

    This might accidentally create the unsplit topology and does not prohibit that case. A check
    for this happens upon insertion into the database where the unique constraint will prevent
    duplicates. The unsplit flag will be set correctly though, so in case the unsplit topology was
    not yet in the database for some reason, this will add it.

    Parameters
    ----------
    rng : Rng
        The random number generator to use
    selected_strategy : list[ACOptimTopology]
        The selected DC or AC strategy to close a coupler. The number of timesteps must be equal to
        the number of timesteps in the branches_per_sub and injections_per_sub arrays. If the empty
        list is passed, the function will return an empty list.

    Returns
    -------
    list[ACOptimTopology]
        The a copy of the input topologies with the optimizer type set to AC and a coupler closed
        or the empty list if no open couplers were present in the input topologies
    """
    if selected_strategy == []:
        return []

    if not any(len(topo.actions) for topo in selected_strategy):
        return []

    # Select an action to delete (will close the coupler) and delete it in all timesteps
    selected_action = rng.choice(([action for topo in selected_strategy for action in topo.actions]))

    new_actions = [[action for action in topo.actions if action != selected_action] for topo in selected_strategy]

    # Copy the input strategy and replace the action in all timesteps
    topos = [
        ACOptimTopology(
            **topo.model_dump(
                include=[
                    "disconnections",
                    "pst_setpoints",
                    "timestep",
                    "optimization_id",
                    "strategy_hash",  # Will be updated below
                ]
            ),
            optimizer_type=OptimizerType.AC,
            fitness=-9999999,
            actions=new_actions_timestep,
            unsplit=False,  # Will be updated below
            parent_id=topo.id,
        )
        for topo, new_actions_timestep in zip(selected_strategy, new_actions, strict=True)
    ]

    updated_hash = hash_topologies(topos)
    unsplit = is_unsplit_topologies(topos)
    for topo in topos:
        topo.strategy_hash = updated_hash
        topo.unsplit = unsplit
    return topos

evolution #

evolution(
    rng,
    session,
    optimization_id,
    close_coupler_prob,
    reconnect_prob,
    pull_prob,
    max_retries,
    n_minus1_definitions=None,
    filter_strategy=None,
)

Perform the AC evolution.

PARAMETER DESCRIPTION
rng

The random number generator to use

TYPE: Generator

session

The database session to use, will write the new topologies to the database

TYPE: Session

optimization_id

The optimization ID to filter for

TYPE: str

close_coupler_prob

The probability of closing a coupler

TYPE: float

reconnect_prob

The probability of reconnecting a branch

TYPE: float

pull_prob

The probability of pulling a strategy

TYPE: float

max_retries

The maximum number of retries to perform if a strategy is already in the database

TYPE: int

n_minus1_definitions

A list of N-1 definitions, one for each timestep, containing the contingencies.

TYPE: Optional[list[Nminus1Definition]] DEFAULT: None

filter_strategy

The filter strategy to use for the optimization, used to filter out strategies that are too far away from the original topology.

TYPE: Optional[FilterStrategy] DEFAULT: None

RETURNS DESCRIPTION
list[ACOptimTopology]

The strategy that was created during the evolution or an empty list if something went wrong at all retries

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/evolution_functions.py
def evolution(
    rng: Rng,
    session: Session,
    optimization_id: str,
    close_coupler_prob: float,
    reconnect_prob: float,
    pull_prob: float,
    max_retries: int,
    n_minus1_definitions: Optional[list[Nminus1Definition]] = None,
    filter_strategy: Optional[FilterStrategy] = None,
) -> list[ACOptimTopology]:
    """Perform the AC evolution.

    Parameters
    ----------
    rng : Rng
        The random number generator to use
    session : Session
        The database session to use, will write the new topologies to the database
    optimization_id : str
        The optimization ID to filter for
    close_coupler_prob : float
        The probability of closing a coupler
    reconnect_prob : float
        The probability of reconnecting a branch
    pull_prob : float
        The probability of pulling a strategy
    max_retries : int
        The maximum number of retries to perform if a strategy is already in the database
    n_minus1_definitions : Optional[list[Nminus1Definition]]
        A list of N-1 definitions, one for each timestep, containing the contingencies.
    filter_strategy : Optional[FilterStrategy]
        The filter strategy to use for the optimization,
        used to filter out strategies that are too far away from the original topology.

    Returns
    -------
    list[ACOptimTopology]
        The strategy that was created during the evolution or an empty list if something went
        wrong at all retries
    """
    for _try in range(max_retries):
        new_strategy = evolution_try(
            rng=rng,
            session=session,
            optimization_id=optimization_id,
            close_coupler_prob=close_coupler_prob,
            reconnect_prob=reconnect_prob,
            pull_prob=pull_prob,
            n_minus1_definition=n_minus1_definitions,
            filter_strategy=filter_strategy,
        )
        if len(new_strategy):
            return new_strategy
    return []

evolution_try #

evolution_try(
    rng,
    session,
    optimization_id,
    close_coupler_prob,
    reconnect_prob,
    pull_prob,
    n_minus1_definition=None,
    filter_strategy=None,
)

Perform a single try of the AC evolution.

PARAMETER DESCRIPTION
rng

The random number generator to use

TYPE: Generator

session

The database session to use, will write the new topologies to the database

TYPE: Session

optimization_id

The optimization ID to filter for

TYPE: str

close_coupler_prob

The probability of closing a coupler

TYPE: float

reconnect_prob

The probability of reconnecting a branch

TYPE: float

pull_prob

The probability of pulling a strategy

TYPE: float

n_minus1_definition

A list of N-1 definitions, one for each timestep, containing the contingencies.

TYPE: Optional[list[Nminus1Definition]] DEFAULT: None

filter_strategy

The filter strategy to use for the optimization, used to filter out strategies that are too far away from the original topology.

TYPE: Optional[FilterStrategy] DEFAULT: None

RETURNS DESCRIPTION
list[ACOptimTopology]

The strategy that was created during the evolution or an empty list if something went wrong.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/evolution_functions.py
def evolution_try(
    rng: Rng,
    session: Session,
    optimization_id: str,
    close_coupler_prob: float,
    reconnect_prob: float,
    pull_prob: float,
    n_minus1_definition: Optional[list[Nminus1Definition]] = None,
    filter_strategy: Optional[FilterStrategy] = None,
) -> list[ACOptimTopology]:
    """Perform a single try of the AC evolution.

    Parameters
    ----------
    rng : Rng
        The random number generator to use
    session : Session
        The database session to use, will write the new topologies to the database
    optimization_id : str
        The optimization ID to filter for
    close_coupler_prob : float
        The probability of closing a coupler
    reconnect_prob : float
        The probability of reconnecting a branch
    pull_prob : float
        The probability of pulling a strategy
    n_minus1_definition : Optional[list[Nminus1Definition]]
        A list of N-1 definitions, one for each timestep, containing the contingencies.
    filter_strategy : Optional[FilterStrategy]
        The filter strategy to use for the optimization,
        used to filter out strategies that are too far away from the original topology.

    Returns
    -------
    list[ACOptimTopology]
        The strategy that was created during the evolution or an empty list if something went
        wrong.
    """
    action_choice = rng.choice(["pull", "reconnect", "close_coupler"], p=[pull_prob, reconnect_prob, close_coupler_prob])
    if action_choice == "pull":
        old_strategy = select_strategy(
            rng,
            select_repertoire(optimization_id, [OptimizerType.DC, OptimizerType.AC], session),
            default_scorer,
            filter_strategy=filter_strategy,
        )
        new_strategy = pull(old_strategy, session=session, n_minus1_definitions=n_minus1_definition)
    elif action_choice == "reconnect":
        old_strategy = select_strategy(
            rng,
            select_repertoire(optimization_id, [OptimizerType.DC, OptimizerType.AC], session),
            default_scorer,
            filter_strategy=None,
        )

        new_strategy = reconnect(
            rng,
            old_strategy,
        )
    elif action_choice == "close_coupler":
        old_strategy = select_strategy(
            rng,
            select_repertoire(optimization_id, [OptimizerType.DC, OptimizerType.AC], session),
            default_scorer,
            filter_strategy=None,
        )
        new_strategy = close_coupler(
            rng,
            old_strategy,
        )
    else:
        raise RuntimeError("np.random.choice returned an unexpected value")

    # Something went wrong during the mutation
    if new_strategy == []:
        return []

    # Insert the new strategies into the database
    for topo in new_strategy:
        session.add(topo)

    try:
        session.commit()
        for topo in new_strategy:
            session.refresh(topo)
    except IntegrityError:
        session.rollback()
        return []
    return new_strategy

toop_engine_topology_optimizer.ac.listener #

A listener that listens for new topologies on a kafka result stream and saves them to db

This is intended for usage both in the AC optimizer and the backend, as they both listen for topologies on the result kafka stream. The backend has some further logic as it watches all optimizations and might change the state of the optimization to "running" if it receives a start optimization result and "stopped" if a stop optimization result is received.

One of the requirements is that the AC listener will only want to listen to a single optimization_id and not to messages by itself. Hence a filtering mechanic is added.

logger module-attribute #

logger = Logger(__name__)

poll_results_topic #

poll_results_topic(db, consumer, first_poll=True)

Poll the results topic for new topologies to store in the DB

We store topologies from all optimization jobs, as it could be that we will later optimize the same job in this worker.

PARAMETER DESCRIPTION
db

The database session to use for saving the topologies

TYPE: Session

consumer

The kafka consumer to poll messages from. It should already be subscribed to the result topic.

TYPE: LongRunningKafkaConsumer

first_poll

If True, we assume the optimizatin has just started and we can afford to wait for the first DC results for a longer time (30 seconds). If False, we assume the optimization is already running and we don't want to block the consumer for too long (100 ms), by default True

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
list[ACOptimTopology]

A list of topologies that were added to the database

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/listener.py
def poll_results_topic(
    db: Session,
    consumer: LongRunningKafkaConsumer,
    first_poll: bool = True,
) -> list[ACOptimTopology]:
    """Poll the results topic for new topologies to store in the DB

    We store topologies from all optimization jobs, as it could be that we will later optimize the same job in this worker.

    Parameters
    ----------
    db : Session
        The database session to use for saving the topologies
    consumer : LongRunningKafkaConsumer
        The kafka consumer to poll messages from. It should already be subscribed to the result
        topic.
    first_poll : bool, optional
        If True, we assume the optimizatin has just started and we can afford to wait for the first DC results for
        a longer time (30 seconds). If False, we assume the optimization is already running and we don't want to block
        the consumer for too long (100 ms), by default True

    Returns
    -------
    list[ACOptimTopology]
        A list of topologies that were added to the database
    """
    added_topos = []
    messages = consumer.consume(timeout=30.0 if first_poll else 0.1, num_messages=10000)
    for message in messages:
        result = Result.model_validate_json(deserialize_message(message.value()))

        strategies = None
        if isinstance(result.result, TopologyPushResult):
            strategies = result.result.strategies
        elif isinstance(result.result, OptimizationStartedResult):
            strategies = [result.result.initial_topology]
        else:
            continue

        topologies = convert_message_topo_to_db_topo(strategies, result.optimization_id, result.optimizer_type)

        # Push the topologies to the database, ignoring duplicates
        # Duplicates will trigger an IntegrityError
        added_topos = []
        for topo in topologies:
            try:
                db.add(topo)
                db.commit()
                added_topos.append(topo)
            except IntegrityError:
                db.rollback()
                pass

    return added_topos

toop_engine_topology_optimizer.ac.optimizer #

Implements initialize and run_epoch functions for the AC optimizer

logger module-attribute #

logger = Logger(__name__)

AcNotConvergedError #

Bases: Exception

An exception that is raised when the AC optimization did not converge in the base grid

OptimizerData dataclass #

OptimizerData(
    params,
    session,
    evolution_fn,
    scoring_fn,
    store_loadflow_fn,
    load_loadflow_fn,
    acceptance_fn,
    rng,
    action_sets,
    framework,
    runners,
)

The epoch-to-epoch storage for the AC optimizer

params instance-attribute #

params

The parameters this optimizer was initialized with

session instance-attribute #

session

A in-memory sqlite session for storing the repertoire

evolution_fn instance-attribute #

evolution_fn

The curried evolution function

scoring_fn instance-attribute #

scoring_fn

The curried scoring function

store_loadflow_fn instance-attribute #

store_loadflow_fn

The function to store loadflow results

load_loadflow_fn instance-attribute #

load_loadflow_fn

The function to load loadflow results

acceptance_fn instance-attribute #

acceptance_fn

The acceptance function to decide whether a topology is accepted or not. Takes the loadflow results and the metrics of the split topology and returns True/False.

rng instance-attribute #

rng

The random number generator for the optimizer

action_sets instance-attribute #

action_sets

The action sets for the grid files (one for each timestep)

framework instance-attribute #

framework

The framework of the grid files

runners instance-attribute #

runners

The initialized loadflow runners, one for each grid file

update_initial_metrics_with_worst_k_contingencies #

update_initial_metrics_with_worst_k_contingencies(
    initial_loadflow, initial_metrics, worst_k
)

Update the initial metrics with the worst k contingencies.

This function computes the worst k contingencies for each timestep in the initial loadflow results and updates the initial metrics with the case ids and the top k overloads.

PARAMETER DESCRIPTION
initial_loadflow

The initial loadflow results containing the branch results.

TYPE: LoadflowResultsPolars

initial_metrics

The initial metrics for each timestep.

TYPE: list[Metrics]

worst_k

The number of worst contingencies to consider for the initial metrics.

TYPE: int

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/optimizer.py
def update_initial_metrics_with_worst_k_contingencies(
    initial_loadflow: LoadflowResultsPolars,
    initial_metrics: list[Metrics],
    worst_k: int,
) -> None:
    """Update the initial metrics with the worst k contingencies.

    This function computes the worst k contingencies for each timestep in the initial loadflow results
    and updates the initial metrics with the case ids and the top k overloads.

    Parameters
    ----------
    initial_loadflow : LoadflowResultsPolars
        The initial loadflow results containing the branch results.
    initial_metrics : list[Metrics]
        The initial metrics for each timestep.
    worst_k : int
        The number of worst contingencies to consider for the initial metrics.
    """
    case_ids, top_k_overloads_n_1 = get_worst_k_contingencies_ac(
        initial_loadflow.branch_results,
        k=worst_k,
    )

    # case_ids is an empty list if the loadflow didn't converge -> the initial_loadflow.branch_results is full of NaNs
    if len(case_ids) == 0:
        logger.warning("No worst case ids found as the loadflow didn't converge")
        top_k_overloads_n_1 = [0] * len(initial_metrics)
        case_ids = [[]] * len(initial_metrics)

    # Update extra_scores of initial_metrics with case_ids and top_k_overloads_n_1
    for timestep, metric in enumerate(initial_metrics):
        metric.extra_scores.update(
            {
                "top_k_overloads_n_1": top_k_overloads_n_1[timestep],
            }
        )
        metric.worst_k_contingency_cases = case_ids[timestep]

make_runner #

make_runner(
    action_set,
    nminus1_definition,
    grid_file,
    n_processes,
    batch_size,
    processed_gridfile_fs,
)

Initialize a runner for a gridfile, action set and n-1 def

PARAMETER DESCRIPTION
action_set

The action set to use

TYPE: ActionSet

nminus1_definition

The N-1 definition to use

TYPE: Nminus1Definition

grid_file

The grid file to use

TYPE: GridFile

n_processes

The number of processes to use, from the ACGAParameters

TYPE: int

batch_size

The batch size to use, if any, from the ACGAParameters

TYPE: Optional[int]

processed_gridfile_fs

The target filesystem for the preprocessing worker. This contains all processed grid files. During the import job, a new folder import_results.data_folder was created which will be completed with the preprocess call to this function. Internally, only the data folder is passed around as a dirfs. Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the unprocessed gridfiles were already done.

TYPE: AbstractFileSystem

RETURNS DESCRIPTION
AbstractLoadflowRunner

The initialized loadflow runner, either Pandapower or Powsybl

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/optimizer.py
def make_runner(
    action_set: ActionSet,
    nminus1_definition: Nminus1Definition,
    grid_file: GridFile,
    n_processes: int,
    batch_size: Optional[int],
    processed_gridfile_fs: AbstractFileSystem,
) -> AbstractLoadflowRunner:
    """Initialize a runner for a gridfile, action set and n-1 def

    Parameters
    ----------
    action_set : ActionSet
        The action set to use
    nminus1_definition : Nminus1Definition
        The N-1 definition to use
    grid_file : GridFile
        The grid file to use
    n_processes : int
        The number of processes to use, from the ACGAParameters
    batch_size : Optional[int]
        The batch size to use, if any, from the ACGAParameters
    processed_gridfile_fs: AbstractFileSystem
        The target filesystem for the preprocessing worker. This contains all processed grid files.
        During the import job,  a new folder import_results.data_folder was created
        which will be completed with the preprocess call to this function.
        Internally, only the data folder is passed around as a dirfs.
        Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the
        unprocessed gridfiles were already done.

    Returns
    -------
    AbstractLoadflowRunner
        The initialized loadflow runner, either Pandapower or Powsybl
    """
    if grid_file.framework == Framework.PANDAPOWER:
        runner = PandapowerRunner(n_processes=n_processes, batch_size=batch_size)
        grid_file_path = Path(grid_file.grid_folder) / PREPROCESSING_PATHS["grid_file_path_pandapower"]
    elif grid_file.framework == Framework.PYPOWSYBL:
        runner = PowsyblRunner(n_processes=n_processes, batch_size=batch_size)
        grid_file_path = Path(grid_file.grid_folder) / PREPROCESSING_PATHS["grid_file_path_powsybl"]
    else:
        raise ValueError(f"Unknown framework {grid_file.framework}")
    runner.load_base_grid_fs(filesystem=processed_gridfile_fs, grid_path=grid_file_path)
    runner.store_action_set(action_set)
    runner.store_nminus1_definition(nminus1_definition)
    return runner

initialize_optimization #

initialize_optimization(
    params,
    session,
    optimization_id,
    grid_files,
    loadflow_result_fs,
    processed_gridfile_fs,
)

Initialize an optimization run for the AC optimizer

PARAMETER DESCRIPTION
params

The parameters for the AC optimizer

TYPE: ACOptimizerParameters

session

The database session to use for storing topologies

TYPE: Session

optimization_id

The ID of the optimization run

TYPE: str

grid_files

The grid files to optimize on, must contain at least one file

TYPE: list[GridFile]

loadflow_result_fs

A filesystem where the loadflow results are stored. Loadflows will be stored here using the uuid generation process and passed as a StoredLoadflowReference which contains the subfolder in this filesystem.

TYPE: AbstractFileSystem

processed_gridfile_fs

The target filesystem for the preprocessing worker. This contains all processed grid files. During the import job, a new folder import_results.data_folder was created which will be completed with the preprocess call to this function. Internally, only the data folder is passed around as a dirfs. Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the unprocessed gridfiles were already done.

TYPE: AbstractFileSystem

RETURNS DESCRIPTION
OptimizerData

The initial optimizer data

Strategy

The initial strategy

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/optimizer.py
def initialize_optimization(
    params: ACOptimizerParameters,
    session: Session,
    optimization_id: str,
    grid_files: list[GridFile],
    loadflow_result_fs: AbstractFileSystem,
    processed_gridfile_fs: AbstractFileSystem,
) -> tuple[OptimizerData, Strategy]:
    """Initialize an optimization run for the AC optimizer

    Parameters
    ----------
    params : ACOptimizerParameters
        The parameters for the AC optimizer
    session : Session
        The database session to use for storing topologies
    optimization_id : str
        The ID of the optimization run
    grid_files : list[GridFile]
        The grid files to optimize on, must contain at least one file
    loadflow_result_fs: AbstractFileSystem
        A filesystem where the loadflow results are stored. Loadflows will be stored here using the uuid generation process
        and passed as a StoredLoadflowReference which contains the subfolder in this filesystem.
    processed_gridfile_fs: AbstractFileSystem
        The target filesystem for the preprocessing worker. This contains all processed grid files.
        During the import job,  a new folder import_results.data_folder was created
        which will be completed with the preprocess call to this function.
        Internally, only the data folder is passed around as a dirfs.
        Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the
        unprocessed gridfiles were already done.


    Returns
    -------
    OptimizerData
        The initial optimizer data
    Strategy
        The initial strategy
    """
    if not len(grid_files):
        raise ValueError("At least one grid file must be provided")

    ga_config = params.ga_config

    # Load the network datas
    action_sets = [
        load_action_set_fs(filesystem=processed_gridfile_fs, file_path=grid_file.action_set_file) for grid_file in grid_files
    ]
    nminus1_definitions = [
        load_pydantic_model_fs(
            filesystem=processed_gridfile_fs, file_path=grid_file.nminus1_definition_file, model_class=Nminus1Definition
        )
        for grid_file in grid_files
    ]
    base_case_ids = [(n1def.base_case.id if n1def.base_case is not None else None) for n1def in nminus1_definitions]

    num_psts = [len(action_set.pst_ranges) for action_set in action_sets]

    # Prepare the loadflow runners
    runners = [
        make_runner(
            action_set,
            nminus1_definition,
            grid_file,
            n_processes=ga_config.runner_processes,
            batch_size=ga_config.runner_batchsize,
            processed_gridfile_fs=processed_gridfile_fs,
        )
        for action_set, nminus1_definition, grid_file in zip(action_sets, nminus1_definitions, grid_files, strict=True)
    ]

    # Prepare the evolution function
    rng = np.random.default_rng(ga_config.seed)
    evolution_fn = partial(
        evolution,
        rng=rng,
        session=session,
        optimization_id=optimization_id,
        close_coupler_prob=ga_config.close_coupler_prob,
        reconnect_prob=ga_config.reconnect_prob,
        pull_prob=ga_config.pull_prob,
        max_retries=10,
        n_minus1_definitions=nminus1_definitions,
        filter_strategy=ga_config.filter_strategy,
    )
    scoring_fn = partial(
        scoring_function,
        runners=runners,
        base_case_ids=base_case_ids,
        n_timestep_processes=ga_config.timestep_processes,
        early_stop_validation=ga_config.early_stop_validation,
        early_stop_non_converging_threshold=ga_config.early_stopping_non_convergence_percentage_threshold,
    )

    # Prepare the initial strategy
    initial_strategy = [
        ACOptimTopology(
            actions=[],
            disconnections=[],
            pst_setpoints=[0] * n_pst,
            timestep=i,
            fitness=0,
            unsplit=True,
            strategy_hash=b"willbeupdated",
            optimization_id=optimization_id,
            optimizer_type=OptimizerType.AC,
        )
        for i, n_pst in enumerate(num_psts)
    ]
    initial_hash = hash_topologies(initial_strategy)
    for topo in initial_strategy:
        topo.strategy_hash = initial_hash

    def store_loadflow(loadflow: LoadflowResultsPolars) -> StoredLoadflowReference:
        return save_loadflow_results_polars(
            loadflow_result_fs, f"{optimization_id}-{loadflow.job_id}-{datetime.now()}", loadflow
        )

    def loadflow_ref(loadflow: StoredLoadflowReference) -> LoadflowResultsPolars:
        return load_loadflow_results_polars(loadflow_result_fs, reference=loadflow)

    # This requires a full loadflow computation if the loadflow results are not passed in
    initial_loadflow_reference = params.initial_loadflow
    if initial_loadflow_reference is None:
        initial_loadflow, initial_metrics = scoring_function(
            strategy=initial_strategy,
            runners=runners,
            base_case_ids=base_case_ids,
            n_timestep_processes=ga_config.timestep_processes,
        )
        initial_loadflow_reference = store_loadflow(initial_loadflow)
    else:
        # If the initial loadflow is passed in, we load it from the database
        initial_loadflow = loadflow_ref(initial_loadflow_reference)
        # Compute the metrics for the initial loadflow
        initial_metrics = compute_metrics(
            strategy=initial_strategy,
            lfs=initial_loadflow,
            base_case_ids=base_case_ids,
            additional_info=[None] * len(initial_strategy),
        )

    # Update the initial metrics with the worst k contingencies
    update_initial_metrics_with_worst_k_contingencies(
        initial_loadflow, initial_metrics, params.ga_config.n_worst_contingencies
    )

    for timestep_metrics, n1_def in zip(initial_metrics, nminus1_definitions, strict=True):
        if timestep_metrics.extra_scores["non_converging_loadflows"] > len(n1_def.contingencies) / 2:
            raise AcNotConvergedError(
                "Too many non-converging loadflows in initial loadflow: "
                f"{timestep_metrics.extra_scores['non_converging_loadflows']} > {len(n1_def.contingencies) / 2}"
            )

    # Store the initial strategy in the database
    for topo, metric in zip(initial_strategy, initial_metrics, strict=True):
        topo.fitness = metric.fitness
        topo.metrics = metric.extra_scores
        topo.worst_k_contingency_cases = metric.worst_k_contingency_cases
        topo.set_loadflow_reference(initial_loadflow_reference)
        session.add(topo)
    session.commit()

    # As we have the initial loadflows, we can now define an acceptance function
    acceptance_fn = partial(
        evaluate_acceptance,
        metrics_unsplit=initial_metrics,
        loadflow_results_unsplit=initial_loadflow,
        reject_convergence_threshold=ga_config.reject_convergence_threshold,
        reject_overload_threshold=ga_config.reject_overload_threshold,
        reject_critical_branch_threshold=ga_config.reject_critical_branch_threshold,
    )

    # Convert the initial strategy to a message strategy
    initial_strategy_message = convert_db_topo_to_message_topo(initial_strategy)
    assert len(initial_strategy_message) == 1

    logger.info(
        f"Initialization completed, metrics: {initial_metrics[0].extra_scores}, fitness: {initial_metrics[0].fitness}, "
        f"worst_k_contingency_cases: {initial_metrics[0].worst_k_contingency_cases}. Waiting for DC results..."
    )

    return (
        OptimizerData(
            params=params,
            session=session,
            evolution_fn=evolution_fn,
            scoring_fn=scoring_fn,
            store_loadflow_fn=store_loadflow,
            load_loadflow_fn=loadflow_ref,
            acceptance_fn=acceptance_fn,
            rng=rng,
            framework=grid_files[0].framework,
            runners=runners,
            action_sets=action_sets,
        ),
        initial_strategy_message[0],
    )

wait_for_first_dc_results #

wait_for_first_dc_results(
    results_consumer,
    session,
    max_wait_time,
    optimization_id,
)

Wait an initial period for DC results to arrive before proceeding with the optimization.

Call this after initialize optimization and before run epoch to ensure that the DC optimizer has started, and avoid the AC optimizer idling while waiting for the first DC results to arrive.

PARAMETER DESCRIPTION
results_consumer

The consumer where to listen for DC results

TYPE: LongRunningKafkaConsumer

session

The database session to use for storing topologies

TYPE: Session

max_wait_time

The maximum time to wait for DC results, in seconds

TYPE: float

optimization_id

The ID of the optimization run, used to filter the incoming topologies and only proceed when DC results from the correct optimization run arrive. Note that other DC runs could be active.

TYPE: str

RAISES DESCRIPTION
TimeoutError

If no DC results arrive within the maximum wait time

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/optimizer.py
def wait_for_first_dc_results(
    results_consumer: LongRunningKafkaConsumer, session: Session, max_wait_time: float, optimization_id: str
) -> None:
    """Wait an initial period for DC results to arrive before proceeding with the optimization.

    Call this after initialize optimization and before run epoch to ensure that the DC optimizer has started, and avoid the
    AC optimizer idling while waiting for the first DC results to arrive.

    Parameters
    ----------
    results_consumer : LongRunningKafkaConsumer
        The consumer where to listen for DC results
    session : Session
        The database session to use for storing topologies
    max_wait_time : float
        The maximum time to wait for DC results, in seconds
    optimization_id : str
        The ID of the optimization run, used to filter the incoming topologies and only proceed when DC results from
        the correct optimization run arrive. Note that other DC runs could be active.

    Raises
    ------
    TimeoutError
        If no DC results arrive within the maximum wait time
    """
    start_wait = datetime.now()
    while datetime.now() - start_wait < timedelta(seconds=max_wait_time):
        added_topos = poll_results_topic(db=session, consumer=results_consumer, first_poll=True)
        if len([x for x in added_topos if x.optimization_id == optimization_id]) > 0:
            logger.info(f"Received {len(added_topos)} topologies from DC results, proceeding with optimization")
            return
    raise TimeoutError(f"Did not receive DC results within {max_wait_time} seconds, cannot proceed with optimization")

run_epoch #

run_epoch(
    optimizer_data, results_consumer, send_result_fn, epoch
)

Run a single epoch of the AC optimizer

This shall send the investigated topology to the result topic upon completion.

PARAMETER DESCRIPTION
optimizer_data

The optimizer data, will be updated in-place

TYPE: OptimizerData

results_consumer

The consumer where to listen for DC results

TYPE: LongRunningKafkaConsumer

send_result_fn

The function to send results

TYPE: Callable[[ResultUnion], None]

epoch

The current epoch number, used for logging and heartbeat purposes. Also, on epoch 1 the wait time for the consumer is longer to allow for dc optim startup.

TYPE: int

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/optimizer.py
def run_epoch(
    optimizer_data: OptimizerData,
    results_consumer: LongRunningKafkaConsumer,
    send_result_fn: Callable[[ResultUnion], None],
    epoch: int,
) -> None:
    """Run a single epoch of the AC optimizer

    This shall send the investigated topology to the result topic upon completion.

    Parameters
    ----------
    optimizer_data : OptimizerData
        The optimizer data, will be updated in-place
    results_consumer : LongRunningKafkaConsumer
        The consumer where to listen for DC results
    send_result_fn : Callable[[ResultUnion], None]
        The function to send results
    epoch : int
        The current epoch number, used for logging and heartbeat purposes. Also, on epoch 1 the wait time for
        the consumer is longer to allow for dc optim startup.
    """
    poll_results_topic(db=optimizer_data.session, consumer=results_consumer, first_poll=epoch == 1)
    new_strategy = optimizer_data.evolution_fn()

    # It is possible that no new strategy was generated
    if not new_strategy:
        return

    loadflow_results, metrics = optimizer_data.scoring_fn(new_strategy)
    acceptance = optimizer_data.acceptance_fn(loadflow_results, metrics)
    loadflow_result_reference = optimizer_data.store_loadflow_fn(loadflow_results)

    # Update the strategy with the new loadflow results
    message_topos = []
    for topology, metric in zip(new_strategy, metrics, strict=True):
        # TODO: FIXME: remove fitness_dc when "Topology" is refactored and accepts different stages like "dc", "dc+" and "ac"
        # topology should store a dict of metrics instead of a single fitness value
        if "fitness_dc" in topology.metrics:
            metric.extra_scores["fitness_dc"] = topology.metrics["fitness_dc"]
        topology.metrics = metric.extra_scores
        topology.fitness = metric.fitness
        topology.acceptance = acceptance
        topology.set_loadflow_reference(loadflow_result_reference)

        optimizer_data.session.add(topology)

        message_topos.append(
            Topology(
                actions=topology.actions,
                pst_setpoints=topology.pst_setpoints,
                disconnections=topology.disconnections,
                loadflow_results=loadflow_result_reference,
                metrics=metric,
            )
        )

    optimizer_data.session.commit()

    # Send the new strategy to the result topic
    if not optimizer_data.params.ga_config.enable_ac_rejection or acceptance:
        send_result_fn(TopologyPushResult(strategies=[Strategy(timesteps=message_topos)], epoch=epoch))
    logger.info(
        f"Epoch {epoch} completed, accept: {acceptance}, metrics: {metrics[0].extra_scores}, fitness: {metrics[0].fitness}"
    )

toop_engine_topology_optimizer.ac.scoring_functions #

Scoring functions for the AC optimizer - in this case this runs an N-1 and computes metrics for it

logger module-attribute #

logger = Logger(__name__)

get_threshold_n_minus1_overload #

get_threshold_n_minus1_overload(strategy)

Extract the 'top_k_overloads_n_1' thresholds and corresponding case indices from a list of ACOptimTopology strategies.

overload_threshold is defined as the maximum allowed overload energy for the worst k AC N-1 contingency analysis of the split topologies. This threshold is computed using the worst k AC contingencies of the unsplit grid and the worst k DC contingencies of the split grid. Refer to the "pull" method in evolution_functions.py for more details.

PARAMETER DESCRIPTION
strategy

A list of ACOptimTopology objects, each containing a 'metrics' dictionary with overload thresholds and case indices.

TYPE: list of ACOptimTopology

RETURNS DESCRIPTION
tuple of (Optional[list of float], Optional[list of list of int])

A tuple containing: - A list of overload thresholds for each topology, or None if any required metric is missing. - A list of lists of case indices for each topology, or None if any required metric is missing.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/scoring_functions.py
def get_threshold_n_minus1_overload(
    strategy: list[ACOptimTopology],
) -> tuple[Optional[list[float]], Optional[list[list[int]]]]:
    """Extract the 'top_k_overloads_n_1' thresholds and corresponding case indices from a list of ACOptimTopology strategies.

    overload_threshold is defined as the maximum allowed overload energy for the worst k AC N-1 contingency analysis
    of the split topologies. This threshold is computed using the worst k AC contingencies of the unsplit grid and the
    worst k DC contingencies of the split grid. Refer to the "pull" method in evolution_functions.py for more details.

    Parameters
    ----------
    strategy : list of ACOptimTopology
        A list of ACOptimTopology objects, each containing a 'metrics' dictionary with overload thresholds and case indices.

    Returns
    -------
    tuple of (Optional[list of float], Optional[list of list of int])
        A tuple containing:
        - A list of overload thresholds for each topology, or None if any required metric is missing.
        - A list of lists of case indices for each topology, or None if any required metric is missing.

    """
    overload_threshold_all_t = []
    case_indices_all_t = []
    for topo in strategy:
        threshold_overload = topo.metrics.get("top_k_overloads_n_1", None)
        threshold_case_indices = topo.worst_k_contingency_cases
        if threshold_overload is None or len(threshold_case_indices) == 0:
            logger.warning(
                f"No overload threshold or case indices found in the strategy"
                f"threshold_overload: {threshold_overload}, threshold_case_indices: {threshold_case_indices}"
            )
            return None, None

        overload_threshold_all_t.append(threshold_overload)
        case_indices_all_t.append(threshold_case_indices)

    return overload_threshold_all_t, case_indices_all_t

update_runner_nminus1 #

update_runner_nminus1(
    runners, nminus1_defs, case_ids_all_t
)

Update the N-1 definitions in the runners to only include the worst k contingencies.

This modifies the N-1 definitions in the runners to only include the contingencies at the given indices.

PARAMETER DESCRIPTION
runners

The loadflow runners to update. The length of the list equals the number of timesteps.

TYPE: list[AbstractLoadflowRunner]

nminus1_defs

The original N-1 definitions to use as a template. The length of the list equals the number of timesteps.

TYPE: list[Nminus1Definition]

case_ids_all_t

A list of contingency ids for each runner, indicating which contingencies to keep in the N-1 definition. Each element should be an index to the contingencies in the original N-1 definition.

TYPE: list[list[str]]

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/scoring_functions.py
def update_runner_nminus1(
    runners: list[AbstractLoadflowRunner], nminus1_defs: list[Nminus1Definition], case_ids_all_t: list[list[str]]
) -> None:
    """Update the N-1 definitions in the runners to only include the worst k contingencies.

    This modifies the N-1 definitions in the runners to only include the contingencies at the given indices.

    Parameters
    ----------
    runners : list[AbstractLoadflowRunner]
        The loadflow runners to update. The length of the list equals the number of timesteps.
    nminus1_defs : list[Nminus1Definition]
        The original N-1 definitions to use as a template. The length of the list equals the number of timesteps.
    case_ids_all_t : list[list[str]]
        A list of contingency ids for each runner, indicating which contingencies to keep in the N-1 definition.
        Each element should be an index to the contingencies in the original N-1 definition.
    """
    case_indices_all_t = get_contingency_indices_from_ids(case_ids_all_t, n_minus1_definitions=nminus1_defs)
    for runner, case_indices, n1_def in zip(runners, case_indices_all_t, nminus1_defs, strict=True):
        contingencies = np.array(n1_def.contingencies)[list(case_indices)]
        n1_def_copy = n1_def.model_copy()
        n1_def_copy.contingencies = contingencies.tolist()
        runner.store_nminus1_definition(n1_def_copy)

compute_loadflow_and_metrics #

compute_loadflow_and_metrics(
    runners, strategy, base_case_ids, n_timestep_processes=1
)

Compute loadflow results and associated metrics for a given set of strategies.

This function runs loadflow simulations for each provided strategy using the specified runners, then computes additional metrics based on the simulation results.

PARAMETER DESCRIPTION
runners

List of loadflow runner instances to use for simulations.

TYPE: list of AbstractLoadflowRunner

strategy

List of topology strategies to evaluate.

TYPE: list of ACOptimTopology

base_case_ids

List of base case identifiers corresponding to each strategy. Can be None.

TYPE: list of Optional[str]

n_timestep_processes

Number of parallel processes to use for timestep simulations (default is 1).

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
lfs

The results of the loadflow simulations.

TYPE: LoadflowResultsPolars

additional_info

Additional information for each action taken in the strategies.

TYPE: list of Optional[AdditionalActionInfo]

metrics

Computed metrics for each strategy.

TYPE: list of Metrics

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/scoring_functions.py
def compute_loadflow_and_metrics(
    runners: list[AbstractLoadflowRunner],
    strategy: list[ACOptimTopology],
    base_case_ids: list[Optional[str]],
    n_timestep_processes: int = 1,
) -> tuple[LoadflowResultsPolars, list[Optional[AdditionalActionInfo]], list[Metrics]]:
    """Compute loadflow results and associated metrics for a given set of strategies.

    This function runs loadflow simulations for each provided strategy using the specified runners,
    then computes additional metrics based on the simulation results.

    Parameters
    ----------
    runners : list of AbstractLoadflowRunner
        List of loadflow runner instances to use for simulations.
    strategy : list of ACOptimTopology
        List of topology strategies to evaluate.
    base_case_ids : list of Optional[str]
        List of base case identifiers corresponding to each strategy. Can be None.
    n_timestep_processes : int, optional
        Number of parallel processes to use for timestep simulations (default is 1).

    Returns
    -------
    lfs : LoadflowResultsPolars
        The results of the loadflow simulations.
    additional_info : list of Optional[AdditionalActionInfo]
        Additional information for each action taken in the strategies.
    metrics : list of Metrics
        Computed metrics for each strategy.
    """
    lfs, additional_info = compute_loadflow(
        actions=[topo.actions for topo in strategy],
        disconnections=[topo.disconnections for topo in strategy],
        pst_setpoints=[topo.pst_setpoints for topo in strategy],
        runners=runners,
        n_timestep_processes=n_timestep_processes,
    )
    metrics = compute_metrics(
        strategy=strategy,
        lfs=lfs,
        additional_info=additional_info,
        base_case_ids=base_case_ids,
    )
    return lfs, additional_info, metrics

compute_loadflow_and_metrics_with_early_stopping #

compute_loadflow_and_metrics_with_early_stopping(
    runners,
    strategy,
    base_case_ids,
    threshold_overload_all_t,
    threshold_case_ids_all_t,
    n_timestep_processes=1,
    early_stop_non_converging_threshold=0.1,
)

Run N-1 loadflow analysis with early stopping based on overload thresholds.

This function first runs loadflow analysis for the worst k contingencies, checking if overload energy exceeds the specified thresholds. If so, it stops further analysis and returns the results. If overload energy is within thresholds, it continues to run loadflow for non-critical contingencies.

This optimizes loadflow analysis by focusing on critical contingencies first, defined by the provided thresholds. Early stopping avoids unnecessary computations if overload energy exceeds the specified thresholds.

PARAMETER DESCRIPTION
runners

Loadflow runner instances for each timestep.

TYPE: list of AbstractLoadflowRunner

strategy

AC optimization topologies for each timestep.

TYPE: list of ACOptimTopology

base_case_ids

Base case identifiers for each timestep. If None, N-0 analysis is skipped.

TYPE: list of Optional[str]

threshold_overload_all_t

Overload energy thresholds for early stopping at each timestep.

TYPE: list of float

threshold_case_ids_all_t

Case IDs of critical contingencies for each timestep.

TYPE: list of list of str

n_timestep_processes

Number of parallel processes for loadflow computation (default is 1).

TYPE: int DEFAULT: 1

early_stop_non_converging_threshold

The threshold for the early stopping criterion, i.e. if the percentage of non-converging cases is greater than this value, the ac validation will be stopped early.

TYPE: float DEFAULT: 0.1

RETURNS DESCRIPTION
lfs

Concatenated loadflow results for critical and non-critical contingencies.

TYPE: LoadflowResultsPolars

metrics

Computed metrics for the strategy and loadflow results.

TYPE: list of Metrics

Notes
  • Early stopping is triggered if overload energy for any timestep exceeds the threshold.
  • Critical contingencies are processed first; if early stopping is triggered, non-critical contingencies are skipped.
  • Runner definitions are updated to focus on critical or non-critical contingencies as needed.
Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/scoring_functions.py
def compute_loadflow_and_metrics_with_early_stopping(
    runners: list[AbstractLoadflowRunner],
    strategy: list[ACOptimTopology],
    base_case_ids: list[Optional[str]],
    threshold_overload_all_t: list[float],
    threshold_case_ids_all_t: list[list[str]],
    n_timestep_processes: int = 1,
    early_stop_non_converging_threshold: float = 0.1,
) -> tuple[LoadflowResultsPolars, list[Metrics]]:
    """Run N-1 loadflow analysis with early stopping based on overload thresholds.

    This function first runs loadflow analysis for the worst k contingencies, checking if overload energy
    exceeds the specified thresholds. If so, it stops further analysis and returns the results.
    If overload energy is within thresholds, it continues to run loadflow for non-critical contingencies.

    This optimizes loadflow analysis by focusing on critical contingencies first, defined by the provided thresholds.
    Early stopping avoids unnecessary computations if overload energy exceeds the specified thresholds.

    Parameters
    ----------
    runners : list of AbstractLoadflowRunner
        Loadflow runner instances for each timestep.
    strategy : list of ACOptimTopology
        AC optimization topologies for each timestep.
    base_case_ids : list of Optional[str]
        Base case identifiers for each timestep. If None, N-0 analysis is skipped.
    threshold_overload_all_t : list of float
        Overload energy thresholds for early stopping at each timestep.
    threshold_case_ids_all_t : list of list of str
        Case IDs of critical contingencies for each timestep.
    n_timestep_processes : int, optional
        Number of parallel processes for loadflow computation (default is 1).
    early_stop_non_converging_threshold : float, optional
        The threshold for the early stopping criterion, i.e. if the percentage of non-converging cases is greater than
        this value, the ac validation will be stopped early.

    Returns
    -------
    lfs : LoadflowResultsPolars
        Concatenated loadflow results for critical and non-critical contingencies.
    metrics : list of Metrics
        Computed metrics for the strategy and loadflow results.

    Notes
    -----
    - Early stopping is triggered if overload energy for any timestep exceeds the threshold.
    - Critical contingencies are processed first; if early stopping is triggered, non-critical contingencies are skipped.
    - Runner definitions are updated to focus on critical or non-critical contingencies as needed.

    """
    logger.info("Running N-1 analysis with early stopping.")

    contingency_ids_all_t = []
    original_n_minus1_defs = []
    for runner in runners:
        n_1_def = runner.get_nminus1_definition()
        contingency_ids_all_t.append([contingency.id for contingency in n_1_def.contingencies])
        original_n_minus1_defs.append(n_1_def)

    # Update the N-1 definitions in the runners to only include the critical contingencies
    n_critical_contingencies = len(threshold_case_ids_all_t[0])
    update_runner_nminus1(runners, original_n_minus1_defs, threshold_case_ids_all_t)
    logger.info(f"Running N-1 analysis with {n_critical_contingencies} critical contingencies per timestep.")

    # We pass the base case IDs to None to prevent N-0 analysis in the runners
    # Compute the loadflow and metrics with only critical contingencies included in the N-1 analysis. Critical contingencies
    # are defined by the threshold_case_ids_all_t.
    lfs_critical, _, metrics_critical = compute_loadflow_and_metrics(
        runners, strategy, [None] * len(threshold_case_ids_all_t), n_timestep_processes
    )

    # Early stopping: check if overload_energy_n_1 exceed thresholds
    stop_early = False
    for metric, overload_th in zip(metrics_critical, threshold_overload_all_t, strict=True):
        overload = metric.extra_scores.get("overload_energy_n_1", 0)
        logger.info(f"Checking overload for N-1 analysis: overload = {overload}, (overload_worst_k_unsplit)={overload_th}")

        n_nonconverging_cases = metric.extra_scores.get("non_converging_loadflows", 0)
        logger.info(f"Number of non converging cases = {n_nonconverging_cases}")

        # if overload is greater than overload_th or n_nonconverging_cases is greater than
        # early_stop_non_converging_threshold(10%) of all critical cases, we stop
        if overload > overload_th or n_nonconverging_cases > int(
            early_stop_non_converging_threshold * n_critical_contingencies
        ):
            logger.info(
                f"Early stopping N-1 analysis "
                f" overload: {overload}, threshold_overload: {overload_th}, "
                f"n_nonconverging_cases: {n_nonconverging_cases}, "
                f"threshold_n_non_converging_cases: {int(early_stop_non_converging_threshold * n_critical_contingencies)}"
            )
            stop_early = True
            metric.fitness = -99999999  # Set fitness to a very low value to indicate failure
            metric.extra_scores["overload_energy_n_1"] = 9999999
            break

    if not stop_early:
        # Determine non-critical contingencies by excluding critical ones from all contingencies
        non_critical_contingencies_all_t = [
            set(all_ids) - set(critical_ids)
            for all_ids, critical_ids in zip(contingency_ids_all_t, threshold_case_ids_all_t, strict=True)
        ]

        # Update the N-1 definitions in the runners to now include only the non-critical contingencies.
        logger.info(
            f"Running N-1 analysis with {len(non_critical_contingencies_all_t[0])} non-critical contingencies per timestep."
        )
        update_runner_nminus1(runners, original_n_minus1_defs, non_critical_contingencies_all_t)

        lfs_non_critical, additional_info_non_critical = compute_loadflow(
            actions=[topo.actions for topo in strategy],
            disconnections=[topo.disconnections for topo in strategy],
            pst_setpoints=[topo.pst_setpoints for topo in strategy],
            runners=runners,
            n_timestep_processes=n_timestep_processes,
        )

        lfs = concatenate_loadflow_results_polars([lfs_critical, lfs_non_critical])

        # We can pass the additional info from either critical or non critical contingencies as they are the same
        metrics = compute_metrics(
            strategy=strategy,
            lfs=lfs,
            additional_info=additional_info_non_critical,
            base_case_ids=base_case_ids,
        )
    else:
        lfs = lfs_critical
        metrics = metrics_critical

    # Restore the original N-1 definitions in the runners
    for runner, original_n1_def in zip(runners, original_n_minus1_defs, strict=True):
        runner.store_nminus1_definition(original_n1_def)

    return lfs, metrics

scoring_function #

scoring_function(
    strategy,
    runners,
    base_case_ids,
    n_timestep_processes=1,
    early_stop_validation=True,
    early_stop_non_converging_threshold=0.1,
)

Compute loadflows and metrics for a given strategy

PARAMETER DESCRIPTION
strategy

The strategy to score, length n_timesteps

TYPE: list[ACOptimTopology]

runners

The loadflow runners to use, length n_timesteps.

TYPE: list[AbstractLoadflowRunner]

base_case_ids

The base case ids for the loadflow runners, length n_timesteps (used to separately compute the N-0 flows)

TYPE: list[Optional[str]]

n_timestep_processes

The number of processes to use for computing timesteps in parallel, by default 1

TYPE: int DEFAULT: 1

early_stop_validation

Whether to enable early stopping during the optimization process, by default True

TYPE: bool DEFAULT: True

early_stop_non_converging_threshold

The threshold for the early stopping criterion, i.e. if the percentage of non-converging cases is greater than this value, the ac validation will be stopped early.

TYPE: float = 0.1 DEFAULT: 0.1

RETURNS DESCRIPTION
LoadflowResultsPolars

The loadflow results for the strategy

list[Metrics]

The metrics for the strategy

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/scoring_functions.py
def scoring_function(
    strategy: list[ACOptimTopology],
    runners: list[AbstractLoadflowRunner],
    base_case_ids: list[Optional[str]],
    n_timestep_processes: int = 1,
    early_stop_validation: bool = True,
    early_stop_non_converging_threshold: float = 0.1,
) -> tuple[LoadflowResultsPolars, list[Metrics]]:
    """Compute loadflows and metrics for a given strategy

    Parameters
    ----------
    strategy : list[ACOptimTopology]
        The strategy to score, length n_timesteps
    runners : list[AbstractLoadflowRunner]
        The loadflow runners to use, length n_timesteps.
    base_case_ids : list[Optional[str]]
        The base case ids for the loadflow runners, length n_timesteps (used to separately compute the N-0 flows)
    n_timestep_processes : int, optional
        The number of processes to use for computing timesteps in parallel, by default 1
    early_stop_validation : bool, optional
        Whether to enable early stopping during the optimization process, by default True
    early_stop_non_converging_threshold : float = 0.1
        The threshold for the early stopping criterion, i.e. if the percentage of non-converging cases is greater than
        this value, the ac validation will be stopped early.

    Returns
    -------
    LoadflowResultsPolars
        The loadflow results for the strategy
    list[Metrics]
        The metrics for the strategy
    """
    # overload_threshold is defined as the maximum allowed overload energy for the worst k N-1 contingencies
    # of split topologies. This threshold is computed using the worst k contingencies of the unsplit grid.
    # The thresholds are available only when the strategy has been pulled from the repertoire using the
    # pull method. This means that early stopping can be used only for AC validation and not for AC optimization.
    threshold_overload_all_t, threshold_case_ids_all_t = get_threshold_n_minus1_overload(strategy)
    overload_threshold_available = threshold_overload_all_t is not None and threshold_case_ids_all_t is not None

    if overload_threshold_available and early_stop_validation:
        lfs, metrics = compute_loadflow_and_metrics_with_early_stopping(
            runners,
            strategy,
            base_case_ids,
            threshold_overload_all_t=threshold_overload_all_t,
            threshold_case_ids_all_t=threshold_case_ids_all_t,
            n_timestep_processes=n_timestep_processes,
            early_stop_non_converging_threshold=early_stop_non_converging_threshold,
        )
    else:
        logger.info("No overload thresholds available, running full N-1 analysis.")
        lfs, _additional_info, metrics = compute_loadflow_and_metrics(runners, strategy, base_case_ids, n_timestep_processes)
    return lfs, metrics

compute_metrics #

compute_metrics(
    strategy, lfs, additional_info, base_case_ids
)

Compute the metrics for a given strategy. Just calls compute_metrics_single_timestep for each timestep

PARAMETER DESCRIPTION
strategy

The strategy to score, length n_timesteps

TYPE: list[ACOptimTopology]

lfs

The loadflow results for the strategy, length n_timesteps

TYPE: LoadflowResults

additional_info

Additional information about the actions taken, such as switching distance or other metrics. The length of the list is n_timesteps.

TYPE: list[Optional[AdditionalActionInfo]]

base_case_ids

The base case ids for the loadflow runners, length n_timesteps (used to separately compute the N-0 flows)

TYPE: list[Optional[str]]

RETURNS DESCRIPTION
list[Metrics]

The metrics for the strategy, length n_timesteps

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/scoring_functions.py
def compute_metrics(
    strategy: list[ACOptimTopology],
    lfs: LoadflowResultsPolars,
    additional_info: list[Optional[AdditionalActionInfo]],
    base_case_ids: list[Optional[str]],
) -> list[Metrics]:
    """Compute the metrics for a given strategy. Just calls compute_metrics_single_timestep for each timestep

    Parameters
    ----------
    strategy : list[ACOptimTopology]
        The strategy to score, length n_timesteps
    lfs : LoadflowResults
        The loadflow results for the strategy, length n_timesteps
    additional_info : list[Optional[AdditionalActionInfo]]
        Additional information about the actions taken, such as switching distance or other metrics. The length of
        the list is n_timesteps.
    base_case_ids : list[Optional[str]]
        The base case ids for the loadflow runners, length n_timesteps (used to separately
        compute the N-0 flows)

    Returns
    -------
    list[Metrics]
        The metrics for the strategy, length n_timesteps
    """
    return [
        compute_metrics_single_timestep(
            actions=topo.actions,
            disconnections=topo.disconnections,
            loadflow=select_timestep_polars(lfs, timestep=timestep),
            additional_info=info,
            base_case_id=base_case_id,
        )
        for timestep, (topo, info, base_case_id) in enumerate(zip(strategy, additional_info, base_case_ids, strict=True))
    ]

extract_switching_distance #

extract_switching_distance(additional_info)

Extract the switching distance from the additional action info

PARAMETER DESCRIPTION
additional_info

The additional action info containing the switching distance

TYPE: AdditionalActionInfo

RETURNS DESCRIPTION
int

The switching distance, or 0 if not available

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/scoring_functions.py
def extract_switching_distance(additional_info: AdditionalActionInfo) -> int:
    """Extract the switching distance from the additional action info

    Parameters
    ----------
    additional_info : AdditionalActionInfo
        The additional action info containing the switching distance

    Returns
    -------
    int
        The switching distance, or 0 if not available
    """
    if isinstance(additional_info, RealizedTopology):
        return len(additional_info.reassignment_diff)
    if isinstance(additional_info, pd.DataFrame):
        return len(additional_info)
    raise ValueError("Unknown format for additional info")

compute_metrics_single_timestep #

compute_metrics_single_timestep(
    actions,
    disconnections,
    loadflow,
    additional_info,
    base_case_id=None,
)

Compute the metrics for a single timestep

PARAMETER DESCRIPTION
actions

The reconfiguration assignment for the timestep

TYPE: list[int]

disconnections

The disconnections for the timestep

TYPE: list[int]

loadflow

The loadflow results for the timestep, use select_timestep to get the results for a specific timestep

TYPE: LoadflowResults

additional_info

Additional information about the actions taken, such as switching distance or other metrics.

TYPE: Optional[AdditionalActionInfo]

base_case_id

The base case id from the nminus1 definition, to separate N-0 flows from N-1

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
Metrics

The metrics for the timestep

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/scoring_functions.py
def compute_metrics_single_timestep(
    actions: list[int],
    disconnections: list[int],
    loadflow: LoadflowResultsPolars,
    additional_info: Optional[AdditionalActionInfo],
    base_case_id: Optional[str] = None,
) -> Metrics:
    """Compute the metrics for a single timestep

    Parameters
    ----------
    actions : list[int]
        The reconfiguration assignment for the timestep
    disconnections : list[int]
        The disconnections for the timestep
    loadflow : LoadflowResults
        The loadflow results for the timestep, use select_timestep to get the results for a specific timestep
    additional_info : Optional[AdditionalActionInfo]
        Additional information about the actions taken, such as switching distance or other metrics.
    base_case_id: Optional[str]
        The base case id from the nminus1 definition, to separate N-0 flows from N-1

    Returns
    -------
    Metrics
        The metrics for the timestep
    """
    metrics = compute_metrics_lfs(loadflow_results=loadflow, base_case_id=base_case_id)
    metrics = {key: np.nan_to_num(value, nan=0, posinf=99999999, neginf=-99999999).item() for key, value in metrics.items()}
    non_successful_states = [
        ConvergenceStatus.FAILED.value,
        ConvergenceStatus.MAX_ITERATION_REACHED.value,
        ConvergenceStatus.NO_CALCULATION.value,
    ]
    metrics.update(
        {
            "split_subs": len(actions),
            "disconnected_branches": len(disconnections),
            "non_converging_loadflows": loadflow.converged.filter(pl.col("status").is_in(non_successful_states))
            .select(pl.len())
            .collect()
            .item(),
        }
    )
    if additional_info is not None:
        metrics["switching_distance"] = extract_switching_distance(additional_info)
    worst_k_contingent_cases = metrics.pop("worst_k_contingent_cases", None)
    return Metrics(
        fitness=metrics["overload_energy_n_1"], extra_scores=metrics, worst_k_contingent_cases=worst_k_contingent_cases
    )

compute_loadflow #

compute_loadflow(
    actions,
    disconnections,
    pst_setpoints,
    runners,
    n_timestep_processes=1,
)

Compute the loadflow for a given strategy

PARAMETER DESCRIPTION
actions

The reconfiguration actions for each timestep, where the outer list is the timestep dimension and the inner list the split substation identified through an index into the action set.

TYPE: list[list[int]]

disconnections

The disconnections for each timestep, where the outer list is the timestep dimension and the inner list the disconnection indices

TYPE: list[list[int]]

pst_setpoints

The PST setpoints for each timestep, where the outer list is the timestep dimension and the inner list the PST taps if computed.

TYPE: list[Optional[list[int]]]

runners

The loadflow runners to use

TYPE: list[AbstractLoadflowRunner]

n_timestep_processes

The number of processes to use for each timestep

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
LoadflowResultsPolars

The loadflow results for all timesteps in the strategy

list[AdditionalActionInfo]

Additional information about the actions taken, such as switching distance or other metrics. The length of the list is n_timesteps.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/scoring_functions.py
def compute_loadflow(
    actions: list[list[int]],
    disconnections: list[list[int]],
    pst_setpoints: list[Optional[list[int]]],
    runners: list[AbstractLoadflowRunner],
    n_timestep_processes: int = 1,  # noqa: ARG001
) -> tuple[LoadflowResultsPolars, list[AdditionalActionInfo]]:
    """Compute the loadflow for a given strategy

    Parameters
    ----------
    actions : list[list[int]]
        The reconfiguration actions for each timestep, where the outer list is the timestep dimension and
        the inner list the split substation identified through an index into the action set.
    disconnections : list[list[int]]
        The disconnections for each timestep, where the outer list is the timestep dimension and
        the inner list the disconnection indices
    pst_setpoints : list[Optional[list[int]]]
        The PST setpoints for each timestep, where the outer list is the timestep dimension and the inner list the PST taps
        if computed.
    runners : list[AbstractLoadflowRunner]
        The loadflow runners to use
    n_timestep_processes : int
        The number of processes to use for each timestep

    Returns
    -------
    LoadflowResultsPolars
        The loadflow results for all timesteps in the strategy
    list[AdditionalActionInfo]
        Additional information about the actions taken, such as switching distance or other metrics. The length of
        the list is n_timesteps.
    """
    lf_results = []
    additional_information = []
    for action, disconnection, pst_setpoint, runner in zip(actions, disconnections, pst_setpoints, runners, strict=True):
        loadflow = runner.run_ac_loadflow(action, disconnection, pst_setpoint)
        lf_results.append(loadflow)
        additional_information.append(runner.get_last_action_info())

    return concatenate_loadflow_results_polars(lf_results), additional_information

evaluate_acceptance #

evaluate_acceptance(
    loadflow_results_split,
    metrics_split,
    loadflow_results_unsplit,
    metrics_unsplit,
    reject_convergence_threshold=1.0,
    reject_overload_threshold=0.95,
    reject_critical_branch_threshold=1.1,
)

Evaluate if the split loadflow results are acceptable compared to the unsplit results.

Compares the unsplit metrics * the thresholds to the split metrics. If all split metrics are better than the unsplit metrics * thresholds, the split results are accepted.

Checked metrics are: non_converging_loadflows: the number of non-converging loadflows should be less than or equal to reject_convergence_threshold * unsplit[non_converging_loadflows] overload_energy_n_1: the overload energy should be less than or equal to reject_overload_threshold * unsplit[overload_energy_n_1] critical_branch_count_n_1: the number of critical branches should be less than or equal to reject_critical_branch_threshold * unsplit[critical_branch_count_n_1] TODO: Check Voltage Jumps between N0 and N1

PARAMETER DESCRIPTION
loadflow_results_split

The loadflow results for the split case.

TYPE: LoadflowResultsPolars

metrics_split

The metrics for the split case.

TYPE: list[Metrics]

loadflow_results_unsplit

The loadflow results for the unsplit case.

TYPE: LoadflowResultsPolars

metrics_unsplit

The metrics for the unsplit case.

TYPE: list[Metrics]

reject_convergence_threshold

The threshold for the convergence rate, by default 1. (i.e. the split case must have at most the same amount of nonconverging loadflows as the unsplit case.)

TYPE: float DEFAULT: 1.0

reject_overload_threshold

The threshold for the overload energy improvement, by default 0.95 (i.e. the split case must have at least 5% lower overload energy than the unsplit case).

TYPE: float DEFAULT: 0.95

reject_critical_branch_threshold

The threshold for the critical branches increase, by default 1.1 (i.e. the split case must not have more than 110 % of the critical branches in the unsplit case).

TYPE: float DEFAULT: 1.1

RETURNS DESCRIPTION
bool

True if the split results are acceptable, False if rejected.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/scoring_functions.py
def evaluate_acceptance(
    loadflow_results_split: LoadflowResultsPolars,  # noqa: ARG001
    metrics_split: list[Metrics],
    loadflow_results_unsplit: LoadflowResultsPolars,  # noqa: ARG001
    metrics_unsplit: list[Metrics],
    reject_convergence_threshold: float = 1.0,
    reject_overload_threshold: float = 0.95,
    reject_critical_branch_threshold: float = 1.1,
) -> bool:
    """Evaluate if the split loadflow results are acceptable compared to the unsplit results.

    Compares the unsplit metrics * the thresholds to the split metrics. If all split metrics are better than
    the unsplit metrics * thresholds, the split results are accepted.

    Checked metrics are:
        non_converging_loadflows: the number of non-converging loadflows should be less than or equal to
            reject_convergence_threshold * unsplit[non_converging_loadflows]
        overload_energy_n_1: the overload energy should be less than or equal to
            reject_overload_threshold * unsplit[overload_energy_n_1]
        critical_branch_count_n_1: the number of critical branches should be less than or equal
            to reject_critical_branch_threshold * unsplit[critical_branch_count_n_1]
        TODO: Check Voltage Jumps between N0 and N1

    Parameters
    ----------
    loadflow_results_split : LoadflowResultsPolars
        The loadflow results for the split case.
    metrics_split : list[Metrics]
        The metrics for the split case.
    loadflow_results_unsplit : LoadflowResultsPolars
        The loadflow results for the unsplit case.
    metrics_unsplit : list[Metrics]
        The metrics for the unsplit case.
    reject_convergence_threshold : float, optional
        The threshold for the convergence rate, by default 1.
        (i.e. the split case must have at most the same amount of nonconverging loadflows as the unsplit case.)
    reject_overload_threshold : float, optional
        The threshold for the overload energy improvement, by default 0.95
        (i.e. the split case must have at least 5% lower overload energy than the unsplit case).
    reject_critical_branch_threshold : float, optional
        The threshold for the critical branches increase, by default 1.1
        (i.e. the split case must not have more than 110 % of the critical branches in the unsplit case).

    Returns
    -------
    bool
        True if the split results are acceptable, False if rejected.
    """
    n_non_converged_unsplit = np.array(
        [unsplit.extra_scores.get("non_converging_loadflows", 0) for unsplit in metrics_unsplit]
    )
    n_non_converged_split = np.array(
        [
            split.extra_scores.get("non_converging_loadflows", 0) - split.extra_scores.get("disconnected_branches", 0)
            for split in metrics_split
        ]
    )
    convergence_acceptable = np.all(n_non_converged_split <= n_non_converged_unsplit * reject_convergence_threshold)
    if not convergence_acceptable:
        logger.info(
            "Rejecting topology due to insufficient convergence: "
            f"{n_non_converged_split} vs {n_non_converged_unsplit} before"
        )

    unsplit_overload = np.array([unsplit.extra_scores.get("overload_energy_n_1", 0) for unsplit in metrics_unsplit])
    split_overload = np.array([split.extra_scores.get("overload_energy_n_1", 99999) for split in metrics_split])
    overload_improvement = np.all(split_overload <= unsplit_overload * reject_overload_threshold)
    if not overload_improvement:
        logger.info(
            f"Rejecting topology due to overload energy not improving: {split_overload} vs {unsplit_overload} before"
        )

    unsplit_critical_branches = np.array(
        [unsplit.extra_scores.get("critical_branch_count_n_1", 999) for unsplit in metrics_unsplit], dtype=float
    )
    split_critical_branches = np.array(
        [split.extra_scores.get("critical_branch_count_n_1", 0) for split in metrics_split], dtype=float
    )

    critical_branches_acceptable = np.all(
        split_critical_branches <= unsplit_critical_branches * reject_critical_branch_threshold
    )
    if not critical_branches_acceptable:
        logger.info(
            "Rejecting topology due to critical branches increasing too much: "
            f"{split_critical_branches} vs {unsplit_critical_branches} before"
        )

    return convergence_acceptable and overload_improvement and critical_branches_acceptable

toop_engine_topology_optimizer.ac.select_strategy #

Selection strategy for AC optimization topologies.

logger module-attribute #

logger = Logger(__name__)

select_strategy #

select_strategy(
    rng, repertoire, interest_scorer, filter_strategy=None
)

Select a promising strategy from the repertoire

Make sure the repertoire only contains topologies with the right optimizer type and optimization id as select_topology will not filter for this.

PARAMETER DESCRIPTION
rng

The random number generator to use

TYPE: Generator

repertoire

The filtered repertoire to select from

TYPE: list[BaseDBTopology]

interest_scorer

The function to score the topologies in the repertoire. The higher the score, the more interesting the topology is. Eventually, the topology will be selected with a probability proportional to its score.

TYPE: Callable[[DataFrame], Series]

filter_strategy

Whether to filter the repertoire based on discriminator, median or dominator filter.

TYPE: Optional[FilterStrategy] DEFAULT: None

RETURNS DESCRIPTION
Union[list[ACOptimTopology], Tuple[list[ACOptimTopology], list[ACOptimTopology]]]

The selected strategy which is represented as a list of topologies with similar strategy_hash and optimizer type.. If two is True, a tuple of two lists is returned with two different strategy_hashes. If no strategy could be selected because the repertoire wasn't containing enough strategies, return an empty list

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/select_strategy.py
def select_strategy(
    rng: Rng,
    repertoire: list[BaseDBTopology],
    interest_scorer: Callable[[pd.DataFrame], pd.Series],
    filter_strategy: Optional[FilterStrategy] = None,
) -> Union[list[ACOptimTopology], Tuple[list[ACOptimTopology], list[ACOptimTopology]]]:
    """Select a promising strategy from the repertoire

    Make sure the repertoire only contains topologies with the right optimizer type and optimization id
    as select_topology will not filter for this.

    Parameters
    ----------
    rng : Rng
        The random number generator to use
    repertoire : list[BaseDBTopology]
        The filtered repertoire to select from
    interest_scorer : Callable[[pd.DataFrame], pd.Series]
        The function to score the topologies in the repertoire. The higher the score, the more
        interesting the topology is. Eventually, the topology will be selected with a probability
        proportional to its score.
    filter_strategy : Optional[FilterStrategy], optional
        Whether to filter the repertoire based on discriminator, median or dominator filter.

    Returns
    -------
    Union[list[ACOptimTopology], Tuple[list[ACOptimTopology], list[ACOptimTopology]]]
        The selected strategy which is represented as a list of topologies with similar strategy_hash and
        optimizer type..
        If two is True, a tuple of two lists is returned with two different strategy_hashes.
        If no strategy could be selected because the repertoire wasn't containing enough strategies,
        return an empty list
    """
    if len(repertoire) == 0:
        return []
    # Extract only the metrics in a nice format
    metrics = metrics_dataframe(repertoire)

    if filter_strategy is not None:
        if filter_strategy.filter_dominator_metrics_target is not None:
            discriminator_df = get_discriminator_df(
                metrics[metrics["optimizer_type"] == OptimizerType.AC.value], filter_strategy.filter_dominator_metrics_target
            )
        else:
            discriminator_df = pd.DataFrame()
        metrics = filter_metrics_df(
            metrics_df=metrics[metrics["optimizer_type"] == OptimizerType.DC.value],
            discriminator_df=discriminator_df,
            filter_strategy=filter_strategy,
        )

    # Score them according to some function
    metrics["score"] = interest_scorer(metrics)
    group = metrics.groupby(["strategy_hash", "optimizer_type"])
    if len(group.size()) == 0:
        return []

    strategies = group.sum("score")
    sum_scores = strategies.score.sum()
    if not np.isclose(sum_scores, 0):
        strategies.score /= sum_scores
    else:
        strategies.score = 1 / len(strategies)

    # Select a strategy with probability proportional to its score
    idx = rng.choice(len(strategies), p=strategies.score)
    hash_, optim_type_ = strategies.index[idx]
    return [t for t in repertoire if t.strategy_hash == hash_ and t.optimizer_type.value == optim_type_]

filter_metrics_df #

filter_metrics_df(
    metrics_df, discriminator_df, filter_strategy
)

Get a mask for the metrics DataFrame that filters out rows based on discriminator and median masks.

This function applies a discriminator, median and dominator mask.

PARAMETER DESCRIPTION
metrics_df

The DataFrame containing the metrics to filter. This is typically the DC repertoire from which new results shall be pulled.

TYPE: DataFrame

discriminator_df

The DataFrame containing the discriminator metrics. These are topologies that have previously been AC validated

TYPE: DataFrame

filter_strategy

The filter strategy to use for the optimization, used to filter out strategies based on the discriminator, median or dominator filter.

TYPE: FilterStrategy

RETURNS DESCRIPTION
DataFrame

The filtered metrics DataFrame with similar topologies removed. If all topologies are filtered out, the original metrics DataFrame is returned.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/select_strategy.py
def filter_metrics_df(
    metrics_df: pd.DataFrame,
    discriminator_df: pd.DataFrame,
    filter_strategy: FilterStrategy,
) -> np.ndarray:
    """Get a mask for the metrics DataFrame that filters out rows based on discriminator and median masks.

    This function applies a discriminator, median and dominator mask.

    Parameters
    ----------
    metrics_df : pd.DataFrame
        The DataFrame containing the metrics to filter. This is typically the DC repertoire from which new results shall
        be pulled.
    discriminator_df : pd.DataFrame
        The DataFrame containing the discriminator metrics. These are topologies that have previously been AC validated
    filter_strategy : FilterStrategy
        The filter strategy to use for the optimization,
        used to filter out strategies based on the discriminator, median or dominator filter.

    Returns
    -------
    pd.DataFrame
        The filtered metrics DataFrame with similar topologies removed.
        If all topologies are filtered out, the original metrics DataFrame is returned.
    """
    repertoire_mask = get_repertoire_filter_mask(
        metrics_df=metrics_df,
        discriminator_df=discriminator_df,
        filter_strategy=filter_strategy,
    )
    # make sure that the metrics_df is not empty after filtering
    if len(metrics_df[repertoire_mask]) != 0:
        metrics_df = metrics_df[repertoire_mask]
    return metrics_df

get_repertoire_filter_mask #

get_repertoire_filter_mask(
    metrics_df, discriminator_df, filter_strategy
)

Get a mask for the metrics DataFrame that filters out rows based on discriminator and median masks.

This function applies a discriminator, median and dominator mask.

PARAMETER DESCRIPTION
metrics_df

The DataFrame containing the metrics to filter.

TYPE: DataFrame

discriminator_df

The DataFrame containing the discriminator metrics.

TYPE: DataFrame

filter_strategy

The filter strategy to use for the optimization, used to filter out strategies based on the discriminator, median or dominator filter.

TYPE: FilterStrategy

RETURNS DESCRIPTION
Bool[ndarray, ' metrics_df.shape[0]']

A boolean mask where True indicates the row is not filtered out.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/select_strategy.py
def get_repertoire_filter_mask(
    metrics_df: pd.DataFrame,
    discriminator_df: pd.DataFrame,
    filter_strategy: FilterStrategy,
) -> np.ndarray:
    """Get a mask for the metrics DataFrame that filters out rows based on discriminator and median masks.

    This function applies a discriminator, median and dominator mask.

    Parameters
    ----------
    metrics_df : pd.DataFrame
        The DataFrame containing the metrics to filter.
    discriminator_df : pd.DataFrame
        The DataFrame containing the discriminator metrics.
    filter_strategy : FilterStrategy
        The filter strategy to use for the optimization,
        used to filter out strategies based on the discriminator, median or dominator filter.

    Returns
    -------
    Bool[np.ndarray, " metrics_df.shape[0]"]
        A boolean mask where True indicates the row is not filtered out.
    """
    # set the dominator metrics observed if not provided
    if (
        filter_strategy.filter_dominator_metrics_observed is None
        and filter_strategy.filter_dominator_metrics_target is not None
    ):
        # set a to target
        filter_strategy.filter_dominator_metrics_observed = filter_strategy.filter_dominator_metrics_target

    # get the discriminator filter mask if the config is provided
    if (
        not discriminator_df.empty
        and filter_strategy.filter_discriminator_metric_distances is not None
        and filter_strategy.filter_discriminator_metric_multiplier is not None
    ):
        discriminator_filter_mask = get_discriminator_mask(
            metrics_df=metrics_df,
            discriminator_df=discriminator_df,
            metric_distances=filter_strategy.filter_discriminator_metric_distances,
            metric_multiplier=filter_strategy.filter_discriminator_metric_multiplier,
        )
    else:
        discriminator_filter_mask = np.ones(len(metrics_df), dtype=bool)

    # get the median filter mask if the config is provided
    if filter_strategy.filter_median_metric is not None:
        median_filter_mask = get_median_mask(
            metrics_df=metrics_df,
            target_metrics=filter_strategy.filter_median_metric,
        )
    else:
        median_filter_mask = np.ones(len(metrics_df), dtype=bool)

    # apply the discriminator and median filter masks
    metrics_df_filtered = metrics_df[discriminator_filter_mask & median_filter_mask]

    # get the dominator filter mask if the config is provided
    if (
        filter_strategy.filter_dominator_metrics_target is not None
        and filter_strategy.filter_dominator_metrics_observed is not None
    ):
        dominator_filter_mask = get_dominator_mask(
            metrics_df=metrics_df_filtered,
            target_metrics=filter_strategy.filter_dominator_metrics_target,
            observed_metrics=filter_strategy.filter_dominator_metrics_observed,
        )
    else:
        dominator_filter_mask = np.ones(len(metrics_df_filtered), dtype=bool)

    # apply the dominator filter mask
    filtered_index = metrics_df_filtered[dominator_filter_mask].index

    return np.isin(metrics_df.index, filtered_index)

get_median_mask #

get_median_mask(
    metrics_df, target_metrics, fitness_col="fitness"
)

Get a mask for fitness values below the median for each discrete value of the target metrics.

Note: expects the target metrics to be discrete values, not continuous.

PARAMETER DESCRIPTION
metrics_df

The DataFrame containing the metrics to filter.

TYPE: DataFrame

target_metrics

A list of metrics with discrete values to consider for filtering. example: ["split_subs"].

TYPE: list[MetricType]

fitness_col

The column name that contains the fitness values. Defaults to "fitness".

TYPE: Optional[str] DEFAULT: 'fitness'

RETURNS DESCRIPTION
filter_mask

A boolean mask where True indicates the row is not below the median for any of the target metrics.

TYPE: Bool[ndarray, ' metrics_df.shape[0]']

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/select_strategy.py
def get_median_mask(
    metrics_df: pd.DataFrame, target_metrics: list[MetricType], fitness_col: Optional[Fitness] = "fitness"
) -> np.ndarray:
    """Get a mask for fitness values below the median for each discrete value of the target metrics.

    Note: expects the target metrics to be discrete values, not continuous.

    Parameters
    ----------
    metrics_df : pd.DataFrame
        The DataFrame containing the metrics to filter.
    target_metrics : list[MetricType]
        A list of metrics with discrete values to consider for filtering.
        example: ["split_subs"].
    fitness_col : Optional[str], optional
        The column name that contains the fitness values. Defaults to "fitness".

    Returns
    -------
    filter_mask : Bool[np.ndarray, " metrics_df.shape[0]"]
        A boolean mask where True indicates the row is not below the median for any of the target metrics.
    """
    filter_mask = np.zeros(len(metrics_df), dtype=bool)
    for target_metric in target_metrics:
        for discrete_value in metrics_df[target_metric].unique():
            col_mask = (metrics_df[target_metric] == discrete_value).values
            metric_df = metrics_df[col_mask][fitness_col]
            # remove median
            median_fitness = metric_df.median()
            filter_mask |= col_mask & (metrics_df[fitness_col] < median_fitness).values
    # return the filter mask, that removes rows below the median
    filter_mask = ~filter_mask
    return filter_mask

get_dominator_mask #

get_dominator_mask(
    metrics_df,
    target_metrics,
    observed_metrics,
    fitness_col="fitness",
)

Get a mask for rows from a DataFrame that are dominated by other rows based on specified metrics.

A metric entry if there is any other metric entry with a better fitness, in respect to the distance to the original topology. The distance in measured by the metric, assuming that lower values are better.

The target metric is used to fix the discrete value for which the dominance is checked. The fitness column is used to determine the fitness of the rows. Each observed metric is checked against the minimum fitness of all discrete target values.

Intended use is target_metrics = ["switching_distance", "split_subs"] and observed_metrics = Any additional metric to the target metrics, e.g. "disconnections"

PARAMETER DESCRIPTION
metrics_df

The DataFrame to filter.

TYPE: DataFrame

target_metrics

A list of metrics to consider for dominance. A target metric is expected to have discrete values (e.g. not fitness, overload_energy, or max_flow) If None, defaults to ["switching_distance", "split_subs"].

TYPE: list[MetricType]

observed_metrics

A list of metrics to observe for dominance. If None, defaults to ["switching_distance", "split_subs"].

TYPE: list[MetricType]

fitness_col

The column name that contains the fitness values. Defaults to "fitness". Note: the values are expected to be negative, best fitness converges to zero.

TYPE: Optional[str] DEFAULT: 'fitness'

RETURNS DESCRIPTION
filter_mask

A boolean mask where True indicates the row is not dominated by another row.

TYPE: Bool[ndarray, ' metrics_df.shape[0]']

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/select_strategy.py
def get_dominator_mask(
    metrics_df: pd.DataFrame,
    target_metrics: list[MetricType],
    observed_metrics: list[MetricType],
    fitness_col: Optional[Fitness] = "fitness",
) -> np.ndarray:
    """Get a mask for rows from a DataFrame that are dominated by other rows based on specified metrics.

    A metric entry if there is any other metric entry with a better fitness,
    in respect to the distance to the original topology.
    The distance in measured by the metric, assuming that lower values are better.

    The target metric is used to fix the discrete value for which the dominance is checked.
    The fitness column is used to determine the fitness of the rows.
    Each observed metric is checked against the minimum fitness of all discrete target values.

    Intended use is target_metrics = ["switching_distance", "split_subs"]
    and observed_metrics = Any additional metric to the target metrics, e.g. "disconnections"


    Parameters
    ----------
    metrics_df : pd.DataFrame
        The DataFrame to filter.
    target_metrics : list[MetricType]
        A list of metrics to consider for dominance.
        A target metric is expected to have discrete values (e.g. not fitness, overload_energy, or max_flow)
        If None, defaults to ["switching_distance", "split_subs"].
    observed_metrics : list[MetricType]
        A list of metrics to observe for dominance.
        If None, defaults to ["switching_distance", "split_subs"].
    fitness_col : Optional[str], optional
        The column name that contains the fitness values. Defaults to "fitness".
        Note: the values are expected to be negative, best fitness converges to zero.

    Returns
    -------
    filter_mask : Bool[np.ndarray, " metrics_df.shape[0]"]
        A boolean mask where True indicates the row is not dominated by another row.

    """
    filter_mask = np.zeros(len(metrics_df), dtype=bool)
    for target_metric in target_metrics:
        for discrete_value in metrics_df[target_metric].unique():
            # get columns mask
            col_mask = (metrics_df[target_metric] == discrete_value).values
            if (col_mask & ~filter_mask).sum() == 0:
                # all the elements have been filtered out already
                # -> no element is dominated by an already eliminated element
                continue
            max_idx = metrics_df[col_mask & ~filter_mask][fitness_col].idxmax()

            # get fitness mask
            fitness_mask = (metrics_df[fitness_col] < metrics_df[fitness_col].loc[max_idx]).values

            # apply dominator condition
            for col in observed_metrics:
                if col == fitness_col:
                    continue
                filter_mask |= (metrics_df[col] > metrics_df[col].loc[max_idx]).values & fitness_mask & col_mask

    # retrun the filter mask, that removes dominated rows
    filter_mask = ~filter_mask

    return filter_mask

get_discriminator_mask #

get_discriminator_mask(
    metrics_df,
    discriminator_df,
    metric_distances,
    metric_multiplier=None,
)

Get a mask for rows in metrics_df that are within a certain distance from the discriminator_df.

The distance is defined by the metric_distances dictionary, which contains the metrics and their respective distances. Use the use_split_sub_multiplier flag to apply a multiplier in respect to the split_subs_col. e.g. use_split_sub_multiplier=False, the metric_distances is applied directly to the metrics_df. If use_split_sub_multiplier=True, the metric_distances are multiplied by the split_subs, leading to a larger distance for larger split_subs values.

PARAMETER DESCRIPTION
metrics_df

The DataFrame containing the metrics to filter.

TYPE: DataFrame

discriminator_df

The DataFrame containing the discriminator metrics.

TYPE: DataFrame

metric_distances

A dictionary defining the metric distances for filtering. The keys are metric names and the values are sets of distances. example: metric_distances = { "split_subs": {0}, "switching_distance": {-0.9, 0.9}, "fitness": {-0.1, 0.1}, } Note: the fitness is treated as a percentage.

TYPE: dict[str, set[float]]

metric_multiplier

A dictionary defining multiplier for the metric distances. The keys are metric names and the values are sets of distances. If None, defaults to an empty dictionary. Multiple values are added by: distance_multiplier = ( metric_multiplier[metric1] * discriminator_df[metric1] + metric_multiplier[metric2] * discriminator_df[metric2] + ... ) example: {"split_subs": 0.5} The discriminator_df will be multiplied by the split_subs value. In the case of the metric_distances values will be multiplied by the split_subs value and the metric_multiplier. -> metric_distances["switching_distance"] * 0.5 * split_subs_col (e.g. 4 splits) -> the metric distance is increased by this metric multiplier.

TYPE: Optional[dict[str, set[float]]] DEFAULT: None

RETURNS DESCRIPTION
filter_mask

A boolean mask where True indicates the row is not within the distance defined by the discriminator_df and metric_distances.

TYPE: Bool[ndarray, ' metrics_df.shape[0]']

RAISES DESCRIPTION
ValueError

If not all metric distances are present in the discriminator DataFrame columns. If the metric_distances is None, it will use default values based on the use_split_sub_multiplier flag.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/select_strategy.py
def get_discriminator_mask(
    metrics_df: pd.DataFrame,
    discriminator_df: pd.DataFrame,
    metric_distances: dict[str, set[float]],
    metric_multiplier: Optional[dict[str, set[float]]] = None,
) -> np.ndarray:
    """Get a mask for rows in metrics_df that are within a certain distance from the discriminator_df.

    The distance is defined by the metric_distances dictionary, which contains the metrics and their respective distances.
    Use the `use_split_sub_multiplier` flag to apply a multiplier in respect to the split_subs_col.
    e.g. use_split_sub_multiplier=False, the metric_distances is applied directly to the metrics_df.
    If use_split_sub_multiplier=True, the metric_distances are multiplied by the split_subs, leading to a
    larger distance for larger split_subs values.

    Parameters
    ----------
    metrics_df : pd.DataFrame
        The DataFrame containing the metrics to filter.
    discriminator_df : pd.DataFrame
        The DataFrame containing the discriminator metrics.
    metric_distances : dict[str, set[float]]
        A dictionary defining the metric distances for filtering.
        The keys are metric names and the values are sets of distances.
        example:
        metric_distances = {
            "split_subs": {0},
            "switching_distance": {-0.9, 0.9},
            "fitness": {-0.1, 0.1},
        }
        Note: the fitness is treated as a percentage.
    metric_multiplier : Optional[dict[str, set[float]]], optional
        A dictionary defining multiplier for the metric distances.
        The keys are metric names and the values are sets of distances.
        If None, defaults to an empty dictionary.
        Multiple values are added by:
        distance_multiplier = (
           `metric_multiplier[metric1]` * `discriminator_df[metric1]` +
           `metric_multiplier[metric2]` * `discriminator_df[metric2]` + ...
        )
        example: {"split_subs": 0.5}
                 The discriminator_df will be multiplied by the split_subs value.
                 In the case of the metric_distances values will be multiplied by the
                 split_subs value and the metric_multiplier.
                 -> metric_distances["switching_distance"] * 0.5 * split_subs_col (e.g. 4 splits)
                 -> the metric distance is increased by this metric multiplier.


    Returns
    -------
    filter_mask : Bool[np.ndarray, " metrics_df.shape[0]"]
        A boolean mask where True indicates the row is not within the distance defined by the discriminator_df
        and metric_distances.

    Raises
    ------
    ValueError
        If not all metric distances are present in the discriminator DataFrame columns.
        If the metric_distances is None, it will use default values based on the use_split_sub_multiplier flag.
    """
    if metric_multiplier is None:
        metric_multiplier = {}

    if not set(metric_distances.keys()).issubset(discriminator_df.columns):
        raise ValueError(
            f"Not all metric distances {metric_distances.keys()} are "
            f"present in the discriminator DataFrame {discriminator_df.columns}. "
        )
    if not set(metric_multiplier.keys()).issubset(discriminator_df.columns):
        raise ValueError(
            f"Not all metric multipliers {metric_multiplier.keys()} are "
            f"present in the discriminator DataFrame {discriminator_df.columns}. "
        )

    discriminator_df["fitness_save"] = discriminator_df["fitness"].copy()  # save original fitness for later use
    discriminator_df["fitness"] = discriminator_df["fitness"].abs()  # ensure fitness is
    metrics_df["fitness_save"] = metrics_df["fitness"].copy()  # save original fitness for later use
    metrics_df["fitness"] = metrics_df["fitness"].abs()  # ensure fitness is positive for the discriminator

    # filter mask for discriminator
    filter_mask = np.zeros(len(metrics_df), dtype=bool)
    for _idx, row in discriminator_df.iterrows():
        mask_metrics = np.ones(len(metrics_df), dtype=bool)
        for metric, distance in metric_distances.items():
            # get distance multiplier
            distance_multiplier = 0
            for metric_multiplier_key, metric_multiplier_value in metric_multiplier.items():
                distance_multiplier += metric_multiplier_value * row[metric_multiplier_key]
            if distance_multiplier == 0:
                # if no multiplier is defined, fall back to 1
                distance_multiplier = 1

            # apply the distance to the metrics_df
            if metric != "fitness":
                min_condition = row[metric] + min(distance) * distance_multiplier
                max_condition = row[metric] + max(distance) * distance_multiplier

            else:
                min_condition = row[metric] * (1 + min(distance) * distance_multiplier)
                max_condition = row[metric] * (1 + max(distance) * distance_multiplier)

            mask_metrics = mask_metrics & (metrics_df[metric] >= min_condition) & (metrics_df[metric] <= max_condition)

        filter_mask += mask_metrics

    # restore original fitness
    metrics_df["fitness"] = metrics_df["fitness_save"]
    metrics_df.drop(columns=["fitness_save"], inplace=True)

    # return the filter mask, that removes discriminated rows
    filter_mask = ~filter_mask

    return filter_mask

get_discriminator_df #

get_discriminator_df(metrics_df, target_metrics)

Get a discriminator DataFrame from the metrics DataFrame.

The discriminator DataFrame is a subset of the metrics DataFrame that contains only the target metrics. It is used to filter out similar topologies from the metrics DataFrame.

PARAMETER DESCRIPTION
metrics_df

The DataFrame containing the metrics to filter. Note: expects the metrics_df to contain only AC topologies that have as a metric the "fitness_dc" column.

TYPE: DataFrame

target_metrics

A list of metrics to consider for filtering.

TYPE: list[str]

RETURNS DESCRIPTION
DataFrame

A DataFrame containing only the target metrics.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/select_strategy.py
def get_discriminator_df(metrics_df: pd.DataFrame, target_metrics: list[str]) -> pd.DataFrame:
    """Get a discriminator DataFrame from the metrics DataFrame.

    The discriminator DataFrame is a subset of the metrics DataFrame that contains only the target metrics.
    It is used to filter out similar topologies from the metrics DataFrame.

    Parameters
    ----------
    metrics_df : pd.DataFrame
        The DataFrame containing the metrics to filter.
        Note: expects the metrics_df to contain only AC topologies
        that have as a metric the "fitness_dc" column.
    target_metrics : list[str]
        A list of metrics to consider for filtering.

    Returns
    -------
    pd.DataFrame
        A DataFrame containing only the target metrics.
    """
    if metrics_df.empty:
        return pd.DataFrame()
    discriminator_df = metrics_df[[*target_metrics, "fitness_dc"]]
    discriminator_df.rename(columns={"fitness_dc": "fitness"}, inplace=True)
    return discriminator_df

toop_engine_topology_optimizer.ac.storage #

The database models to store topologies in the AC optimizer

ACOptimTopology #

Bases: BaseDBTopology

Inherits from the base topology to make a database table for AC optimizer topologies

This can include both AC and DC topologies, with the specific needs of the AC optimizer

__tablename__ class-attribute instance-attribute #

__tablename__ = 'ac_optim_topology'

parent_id class-attribute instance-attribute #

parent_id = Field(
    foreign_key="ac_optim_topology.id",
    nullable=True,
    default=None,
)

The mutation parent id, i.e. the topology that this topology was mutated from. This is mostly for debugging purposes to see where a topology came from and understand the mutation process.

parent class-attribute instance-attribute #

parent = Relationship()

The mutation parent

acceptance class-attribute instance-attribute #

acceptance = Field(default=None)

Whether the strategy was accepted or not.

id class-attribute instance-attribute #

id = Field(default=None, primary_key=True)

The table primary key

actions class-attribute instance-attribute #

actions = Field(sa_type=JSON)

The branch/injection reconfiguration actions as indices into the action set.

disconnections class-attribute instance-attribute #

disconnections = Field(sa_type=JSON)

A list of disconnections, indexing into the disconnectable branches set in the action set.

pst_setpoints class-attribute instance-attribute #

pst_setpoints = Field(sa_type=JSON, default=None)

The setpoints for the PSTs if they have been computed. This is an index into the range of pst taps, i.e. the smallest tap is 0 and the neutral tap somewhere in the middle of the range. The tap range is defined in the action set. The list always has the same length, i.e. the number of controllable PSTs in the system, and each entry corresponds to the PST at the same position in the action set.

unsplit instance-attribute #

unsplit

Whether all topologies in the strategy including this one have no branch assignments, disconnections or injections.

timestep instance-attribute #

timestep

The timestep of this topology, starting at 0

strategy_hash instance-attribute #

strategy_hash

The hash of the strategy - this hashes actions, disconnections and pst_setpoints for all timesteps in the strategy, making it possible to form a unique constraint on the strategy. This value will be set to the same for all topologies in the same strategy, furthermore making it possible to group timesteps.

strategy_hash_str property #

strategy_hash_str

The strategy hash as a string, for human readability

optimization_id instance-attribute #

optimization_id

The optimization ID this topology belongs to

optimizer_type instance-attribute #

optimizer_type

Which optimizer created this topology

fitness instance-attribute #

fitness

The fitness of this topology

metrics class-attribute instance-attribute #

metrics = Field(default_factory=lambda: {}, sa_type=JSON)

The metrics of this topology

worst_k_contingency_cases class-attribute instance-attribute #

worst_k_contingency_cases = Field(
    default_factory=lambda: [], sa_type=JSON
)

The worst k contingency case IDs for the topology.

created_at class-attribute instance-attribute #

created_at = Field(default=now(), nullable=False)

The time the topology was recorded in the database

stored_loadflow_reference class-attribute instance-attribute #

stored_loadflow_reference = None

The file reference for the loadflow results of this topology/strategy, if they were computed. Multiple topologies belonging to the same strategy will have the same serialized loadflow results object as there is a timestep notion in the loadflow results. To obtain the correct loadflow results, use the timestep attribute. This is stored as a json serialized StoredLoadflowReference object

__table_args__ class-attribute instance-attribute #

__table_args__ = (
    UniqueConstraint(
        "optimization_id",
        "optimizer_type",
        "strategy_hash",
        "timestep",
        name="topo_unique",
    ),
)

get_loadflow_reference #

get_loadflow_reference()

Get the loadflow reference as a StoredLoadflowReference object

RETURNS DESCRIPTION
Optional[StoredLoadflowReference]

The loadflow reference, or None if it is not set

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/interfaces/models/base_storage.py
def get_loadflow_reference(self) -> Optional[StoredLoadflowReference]:
    """Get the loadflow reference as a StoredLoadflowReference object

    Returns
    -------
    Optional[StoredLoadflowReference]
        The loadflow reference, or None if it is not set
    """
    if self.stored_loadflow_reference is None:
        return None
    return StoredLoadflowReference.model_validate_json(self.stored_loadflow_reference)

set_loadflow_reference #

set_loadflow_reference(loadflow_reference)

Set the loadflow reference from a StoredLoadflowReference object

PARAMETER DESCRIPTION
loadflow_reference

The loadflow reference to set, or None to unset it

TYPE: Optional[StoredLoadflowReference]

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/interfaces/models/base_storage.py
def set_loadflow_reference(self, loadflow_reference: Optional[StoredLoadflowReference]) -> BaseDBTopology:
    """Set the loadflow reference from a StoredLoadflowReference object

    Parameters
    ----------
    loadflow_reference : Optional[StoredLoadflowReference]
        The loadflow reference to set, or None to unset it
    """
    if loadflow_reference is None:
        self.stored_loadflow_reference = None
    else:
        self.stored_loadflow_reference = loadflow_reference.model_dump_json()
    return self

convert_single_topology #

convert_single_topology(
    topology,
    optimization_id,
    optimizer_type,
    timestep,
    strategy_hash,
    unsplit,
)

Convert a single Topology to a ACOptimTopology

PARAMETER DESCRIPTION
topology

The topology to convert

TYPE: Topology

optimization_id

The optimization ID to assign to the db topology

TYPE: str

optimizer_type

The optimizer type to assign to the db topology

TYPE: OptimizerType

timestep

The timestep of the topology

TYPE: int

strategy_hash

The hash of the strategy computed through hash_strategy

TYPE: bytes

unsplit

Whether the strategy is unsplit

TYPE: bool

RETURNS DESCRIPTION
ACOptimTopology

The converted topology

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/storage.py
def convert_single_topology(
    topology: Topology,
    optimization_id: str,
    optimizer_type: OptimizerType,
    timestep: int,
    strategy_hash: bytes,
    unsplit: bool,
) -> ACOptimTopology:
    """Convert a single Topology to a ACOptimTopology

    Parameters
    ----------
    topology : Topology
        The topology to convert
    optimization_id : str
        The optimization ID to assign to the db topology
    optimizer_type : OptimizerType
        The optimizer type to assign to the db topology
    timestep : int
        The timestep of the topology
    strategy_hash : bytes
        The hash of the strategy computed through hash_strategy
    unsplit : bool
        Whether the strategy is unsplit

    Returns
    -------
    ACOptimTopology
        The converted topology
    """
    return ACOptimTopology(
        actions=topology.actions,
        disconnections=topology.disconnections,
        pst_setpoints=topology.pst_setpoints,
        unsplit=unsplit,
        timestep=timestep,
        strategy_hash=strategy_hash,
        optimization_id=optimization_id,
        optimizer_type=optimizer_type,
        fitness=topology.metrics.fitness,
        metrics=topology.metrics.extra_scores,
        worst_k_contingency_cases=topology.metrics.worst_k_contingency_cases,
    ).set_loadflow_reference(topology.loadflow_results)

convert_message_topo_to_db_topo #

convert_message_topo_to_db_topo(
    message_strategies, optimization_id, optimizer_type
)

Convert a TopologyPushResult to a list of ACOptimTopology

PARAMETER DESCRIPTION
message_strategies

The strategies to convert, usually from a TopologyPushResult or OptimizationStartedResult. Strategies are flattened into the list of topologies that are being returned.

TYPE: list[Strategy]

optimization_id

The optimization ID to assign to the topologies. This was sent through with the parent result message and is not part of TopologyPushResult

TYPE: str

optimizer_type

The optimizer type to assign. This was sent through with the parent result message and is not part of TopologyPushResult

TYPE: OptimizerType

RETURNS DESCRIPTION
list[ACOptimTopology]

A list of converted topologies where for each topology and for each timestep a new ACOptimTopology instance is created.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/storage.py
def convert_message_topo_to_db_topo(
    message_strategies: list[Strategy], optimization_id: str, optimizer_type: OptimizerType
) -> list[ACOptimTopology]:
    """Convert a TopologyPushResult to a list of ACOptimTopology

    Parameters
    ----------
    message_strategies : list[Strategy]
        The strategies to convert, usually from a TopologyPushResult or OptimizationStartedResult. Strategies
        are flattened into the list of topologies that are being returned.
    optimization_id : str
        The optimization ID to assign to the topologies. This was sent through with the parent
        result message and is not part of TopologyPushResult
    optimizer_type : OptimizerType
        The optimizer type to assign. This was sent through with the parent result message and is not
        part of TopologyPushResult

    Returns
    -------
    list[ACOptimTopology]
        A list of converted topologies where for each topology and for each timestep a new
        ACOptimTopology instance is created.
    """
    converted = []
    for strategy in message_strategies:
        # Hash the strategy to get a unique global identifier
        strategy_hash = hash_strategy(strategy)
        unsplit = is_unsplit_strategy(strategy)
        for time_id, timestep_topo in enumerate(strategy.timesteps):
            converted.append(
                convert_single_topology(
                    topology=timestep_topo,
                    optimization_id=optimization_id,
                    optimizer_type=optimizer_type,
                    timestep=time_id,
                    strategy_hash=strategy_hash,
                    unsplit=unsplit,
                )
            )
    return converted

create_session #

create_session()

Create an in-memory SQLite session with the ACOptimTopology table created.

RETURNS DESCRIPTION
Session

The created session with the in-memory SQLite database

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/storage.py
def create_session() -> Session:
    """Create an in-memory SQLite session with the ACOptimTopology table created.

    Returns
    -------
    Session
        The created session with the in-memory SQLite database
    """
    engine = create_engine("sqlite:///:memory:")
    SQLModel.metadata.create_all(engine, tables=[ACOptimTopology.__table__])
    session = Session(engine)
    return session

scrub_db #

scrub_db(session, max_age_seconds=86400)

Scrub the database of the worker to prevent memory issues.

The AC worker has to store topologies from all runs, not only the current one, as it might have to pick up an optimization later. Meaning, the AC worker busily collects all topologies into memory. To prevent memory leak issues, we scrub the database of topologies that are older than a day as we do not expect to ever have to go back to those.

PARAMETER DESCRIPTION
session

The database session. The session object will be modified in-place

TYPE: Session

max_age_seconds

The maximum age of topologies to keep in the database, in seconds. Topologies older than this will be removed.

TYPE: int DEFAULT: 86400

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/storage.py
def scrub_db(session: Session, max_age_seconds: int = 86400) -> None:
    """Scrub the database of the worker to prevent memory issues.

    The AC worker has to store topologies from all runs, not only the current one, as it might have to pick up an
    optimization later. Meaning, the AC worker busily collects all topologies into memory. To prevent memory leak issues,
    we scrub the database of topologies that are older than a day as we do not expect to ever have to go back to those.

    Parameters
    ----------
    session : Session
        The database session. The session object will be modified in-place
    max_age_seconds : int
        The maximum age of topologies to keep in the database, in seconds. Topologies older than this will be removed.
    """
    cutoff_time = datetime.now() - timedelta(seconds=max_age_seconds)
    session.exec(delete(ACOptimTopology).where(ACOptimTopology.created_at < cutoff_time))
    session.commit()

toop_engine_topology_optimizer.ac.worker #

The AC worker that listens to the kafka topics, organizes optimization runs, etc.

logger module-attribute #

logger = Logger(__name__)

Args #

Bases: Args

Command line arguments for the AC worker.

Mostly the same as the DC worker except for an additional loadflow results folder

kafka_broker class-attribute instance-attribute #

kafka_broker = 'localhost:9092'

The Kafka broker to connect to.

optimizer_command_topic class-attribute instance-attribute #

optimizer_command_topic = 'commands'

The Kafka topic to listen for commands on.

optimizer_results_topic class-attribute instance-attribute #

optimizer_results_topic = 'results'

The topic to push results to.

optimizer_heartbeat_topic class-attribute instance-attribute #

optimizer_heartbeat_topic = 'heartbeat'

The topic to push heartbeats to.

heartbeat_interval_ms class-attribute instance-attribute #

heartbeat_interval_ms = 1000

The interval in milliseconds to send heartbeats.

WorkerData dataclass #

WorkerData(command_consumer, result_consumer, producer, db)

Data that is stored across optimization runs

command_consumer instance-attribute #

command_consumer

A kafka consumer listening in for optimization commands

result_consumer instance-attribute #

result_consumer

A kafka consumer listening on the results topic, constantly writing results to the database. This is polled both during the optimization and idle loop to keep the database up to date.

producer instance-attribute #

producer

A kafka producer to send heartbeats and results

db instance-attribute #

db

An initialized database session to an in-memory sqlite database.

optimization_loop #

optimization_loop(
    ac_params,
    grid_files,
    worker_data,
    send_result_fn,
    send_heartbeat_fn,
    optimization_id,
    loadflow_result_fs,
    processed_gridfile_fs,
)

Run the main loop for the AC optimizer.

This function will run the AC optimizer on the given grid files with the given parameters.

PARAMETER DESCRIPTION
ac_params

The parameters for the AC optimizer

TYPE: ACOptimizerParameters

grid_files

The grid files to optimize on

TYPE: list[GridFile]

worker_data

The dataclass with the results consumer and database

TYPE: WorkerData

send_result_fn

The function to send results

TYPE: Callable[[ResultUnion], None]

send_heartbeat_fn

The function to send heartbeats

TYPE: Callable[[HeartbeatUnion], None]

optimization_id

The ID of the optimization run

TYPE: str

loadflow_result_fs

A filesystem where the loadflow results are stored. Loadflows will be stored here using the uuid generation process and passed as a StoredLoadflowReference which contains the subfolder in this filesystem.

TYPE: AbstractFileSystem

processed_gridfile_fs

The target filesystem for the preprocessing worker. This contains all processed grid files. During the import job, a new folder import_results.data_folder was created which will be completed with the preprocess call to this function. Internally, only the data folder is passed around as a dirfs. Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the unprocessed gridfiles were already done.

TYPE: AbstractFileSystem

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/worker.py
def optimization_loop(
    ac_params: ACOptimizerParameters,
    grid_files: list[GridFile],
    worker_data: WorkerData,
    send_result_fn: Callable[[ResultUnion], None],
    send_heartbeat_fn: Callable[[HeartbeatUnion], None],
    optimization_id: str,
    loadflow_result_fs: AbstractFileSystem,
    processed_gridfile_fs: AbstractFileSystem,
) -> None:
    """Run the main loop for the AC optimizer.

    This function will run the AC optimizer on the given grid files with the given parameters.

    Parameters
    ----------
    ac_params : ACOptimizerParameters
        The parameters for the AC optimizer
    grid_files : list[GridFile]
        The grid files to optimize on
    worker_data : WorkerData
        The dataclass with the results consumer and database
    send_result_fn : Callable[[ResultUnion], None]
        The function to send results
    send_heartbeat_fn : Callable[[HeartbeatUnion], None]
        The function to send heartbeats
    optimization_id : str
        The ID of the optimization run
    loadflow_result_fs: AbstractFileSystem
        A filesystem where the loadflow results are stored. Loadflows will be stored here using the uuid generation process
        and passed as a StoredLoadflowReference which contains the subfolder in this filesystem.
    processed_gridfile_fs: AbstractFileSystem
        The target filesystem for the preprocessing worker. This contains all processed grid files.
        During the import job,  a new folder import_results.data_folder was created
        which will be completed with the preprocess call to this function.
        Internally, only the data folder is passed around as a dirfs.
        Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the
        unprocessed gridfiles were already done.
    """
    logger.info(f"Initializing optimization {optimization_id}")
    try:
        send_heartbeat_fn(
            OptimizationStartedHeartbeat(
                optimization_id=optimization_id,
            )
        )
        optimizer_data, initial_topology = initialize_optimization(
            session=worker_data.db,
            params=ac_params,
            optimization_id=optimization_id,
            grid_files=grid_files,
            loadflow_result_fs=loadflow_result_fs,
            processed_gridfile_fs=processed_gridfile_fs,
        )
        wait_for_first_dc_results(
            results_consumer=worker_data.result_consumer,
            session=worker_data.db,
            max_wait_time=ac_params.ga_config.max_initial_wait_seconds,
            optimization_id=optimization_id,
        )
        send_result_fn(
            OptimizationStartedResult(
                initial_topology=initial_topology,
            )
        )
    except AcNotConvergedError as e:
        # If the AC optimization did not converge in the base grid, we send a special message
        # to indicate that the optimization cannot be run.
        send_result_fn(OptimizationStoppedResult(reason="ac-not-converged", message=str(e)))
        logger.error(f"AC optimization {optimization_id} did not converge in the base grid: {e}")
        return
    except TimeoutError as e:
        # If the DC results did not arrive in time, we assume a failure on DC side and abandon the optimization
        send_result_fn(OptimizationStoppedResult(reason="dc-not-started", message=str(e)))
        logger.error(f"DC results for optimization {optimization_id} did not arrive in time: {e}")
        return
    except Exception as e:
        send_result_fn(OptimizationStoppedResult(reason="error", message=str(e)))
        logger.error(f"Error during initialization of optimization {optimization_id}: {e}")
        return

    logger.info(f"Starting optimization {optimization_id}")
    epoch = 1  # Start at epoch 1 so the initial topology will be epoch 0
    running = True
    start_time = time.time()
    while running:
        try:
            run_epoch(optimizer_data, worker_data.result_consumer, send_result_fn, epoch=epoch)
        except Exception as e:
            # Send a stop message to the results
            send_result_fn(OptimizationStoppedResult(reason="error", message=str(e)))
            logger.error(f"Error during optimization {optimization_id}, epoch {epoch}: {e}")
            logger.error(f"Stack trace: {traceback.format_exc()}")
            return
        epoch += 1

        if time.time() - start_time > ac_params.ga_config.runtime_seconds:
            logger.info(f"Stopping optimization {optimization_id} at epoch {epoch} due to runtime limit")
            send_result_fn(OptimizationStoppedResult(epoch=epoch, reason="converged", message="runtime limit"))
            running = False
            break

        send_heartbeat_fn(
            OptimizationStatsHeartbeat(
                optimization_id=optimization_id,
                wall_time=time.time() - start_time,
                iteration=epoch,
                num_branch_topologies_tried=0,
                num_injection_topologies_tried=0,
            )
        )

idle_loop #

idle_loop(
    worker_data, send_heartbeat_fn, heartbeat_interval_ms
)

Run idle loop of the AC optimizer worker.

This will be running when the worker is currently not optimizing This will wait until a StartOptimizationCommand is received and return it. In case a ShutdownCommand is received, the worker will exit with the exit code provided in the command.

PARAMETER DESCRIPTION
worker_data

The dataclass with the command consumer, results consumer and database

TYPE: WorkerData

send_heartbeat_fn

A function to call when there were no messages received for a while.

TYPE: Callable[[HeartbeatUnion], None]

heartbeat_interval_ms

The time to wait for a new command in milliseconds. If no command has been received, a heartbeat will be sent and then the receiver will wait for commands again.

TYPE: int

RETURNS DESCRIPTION
StartOptimizationCommand

The start optimization command to start the optimization run with

RAISES DESCRIPTION
SystemExit

If a ShutdownCommand is received

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/worker.py
def idle_loop(
    worker_data: WorkerData,
    send_heartbeat_fn: Callable[[HeartbeatUnion], None],
    heartbeat_interval_ms: int,
) -> StartOptimizationCommand:
    """Run idle loop of the AC optimizer worker.

    This will be running when the worker is currently not optimizing
    This will wait until a StartOptimizationCommand is received and return it. In case a
    ShutdownCommand is received, the worker will exit with the exit code provided in the command.

    Parameters
    ----------
    worker_data : WorkerData
        The dataclass with the command consumer, results consumer and database
    send_heartbeat_fn : Callable[[HeartbeatUnion], None]
        A function to call when there were no messages received for a while.
    heartbeat_interval_ms : int
        The time to wait for a new command in milliseconds. If no command has been received, a
        heartbeat will be sent and then the receiver will wait for commands again.


    Returns
    -------
    StartOptimizationCommand
        The start optimization command to start the optimization run with

    Raises
    ------
    SystemExit
        If a ShutdownCommand is received
    """
    send_heartbeat_fn(IdleHeartbeat())
    logger.info("Entering idle loop")
    while True:
        message = worker_data.command_consumer.poll(heartbeat_interval_ms / 1000)

        # Wait timeout exceeded - send a heartbeat and poll the results topic
        if not message:
            send_heartbeat_fn(IdleHeartbeat())
            poll_results_topic(
                db=worker_data.db,
                consumer=worker_data.result_consumer,
                first_poll=False,
            )
            continue

        command = Command.model_validate_json(deserialize_message(message.value()))

        if isinstance(command.command, StartOptimizationCommand):
            return command.command

        if isinstance(command.command, ShutdownCommand):
            logger.info("Shutting down due to ShutdownCommand")
            worker_data.command_consumer.close()
            worker_data.result_consumer.close()
            raise SystemExit(command.command.exit_code)

        # If we are here, we received a command that we do not know
        logger.warning(f"Received unknown command, dropping: {command} / {message.value}")
        worker_data.command_consumer.commit()

main #

main(
    args,
    loadflow_result_fs,
    processed_gridfile_fs,
    producer,
    command_consumer,
    result_consumer,
)

Run the main AC worker loop.

PARAMETER DESCRIPTION
args

The command line arguments

TYPE: Args

loadflow_result_fs

A filesystem where the loadflow results are stored. Loadflows will be stored here using the uuid generation process and passed as a StoredLoadflowReference which contains the subfolder in this filesystem.

TYPE: AbstractFileSystem

processed_gridfile_fs

The target filesystem for the preprocessing worker. This contains all processed grid files. During the import job, a new folder import_results.data_folder was created which will be completed with the preprocess call to this function. Internally, only the data folder is passed around as a dirfs. Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the unprocessed gridfiles were already done.

TYPE: AbstractFileSystem

producer

A kafka producer to send heartbeats and results

TYPE: Producer

command_consumer

A kafka consumer listening in for optimization commands

TYPE: LongRunningKafkaConsumer

result_consumer

A kafka consumer listening in for results

TYPE: LongRunningKafkaConsumer

RAISES DESCRIPTION
SystemExit

If the worker receives a ShutdownCommand

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/ac/worker.py
def main(
    args: Args,
    loadflow_result_fs: AbstractFileSystem,
    processed_gridfile_fs: AbstractFileSystem,
    producer: Producer,
    command_consumer: LongRunningKafkaConsumer,
    result_consumer: LongRunningKafkaConsumer,
) -> None:
    """Run the main AC worker loop.

    Parameters
    ----------
    args : Args
        The command line arguments
    loadflow_result_fs: AbstractFileSystem
        A filesystem where the loadflow results are stored. Loadflows will be stored here using the uuid generation process
        and passed as a StoredLoadflowReference which contains the subfolder in this filesystem.
    processed_gridfile_fs: AbstractFileSystem
        The target filesystem for the preprocessing worker. This contains all processed grid files.
        During the import job,  a new folder import_results.data_folder was created
        which will be completed with the preprocess call to this function.
        Internally, only the data folder is passed around as a dirfs.
        Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the
        unprocessed gridfiles were already done.
    producer : Producer
        A kafka producer to send heartbeats and results
    command_consumer : LongRunningKafkaConsumer
        A kafka consumer listening in for optimization commands
    result_consumer : LongRunningKafkaConsumer
        A kafka consumer listening in for results

    Raises
    ------
    SystemExit
        If the worker receives a ShutdownCommand
    """
    instance_id = str(uuid4())
    logger.info(f"Starting AC worker {instance_id} with config {args}")

    # We create two separate consumers for the command and result topics as we don't want to
    # catch results during the idle loop.
    worker_data = WorkerData(
        command_consumer=command_consumer,
        # Create a results consumer that will listen to results from any DC optimizers
        # Make sure to use a unique group.id for each instance to avoid conflicts
        result_consumer=result_consumer,
        producer=producer,
        db=create_session(),
    )

    def send_heartbeat(message: HeartbeatUnion, ping_commands: bool) -> None:
        heartbeat = Heartbeat(
            optimizer_type=OptimizerType.AC,
            instance_id=instance_id,
            message=message,
        )
        Heartbeat.model_validate(heartbeat)  # Validate the heartbeat message
        worker_data.producer.produce(
            args.optimizer_heartbeat_topic,
            value=serialize_message(heartbeat.model_dump_json()),
            key=heartbeat.instance_id.encode(),
        )
        worker_data.producer.flush()
        if ping_commands:
            worker_data.command_consumer.heartbeat()

    def send_result(message: ResultUnion, optimization_id: str) -> None:
        result = Result(
            result=message,
            optimization_id=optimization_id,
            optimizer_type=OptimizerType.AC,
            instance_id=instance_id,
        )
        Result.model_validate(result)  # Validate the result message
        worker_data.producer.produce(
            args.optimizer_results_topic,
            value=serialize_message(result.model_dump_json()),
            key=result.optimization_id.encode(),
        )
        worker_data.producer.flush()

    while True:
        # During the idle loop, the result consumer is paused and only the command consumer is active
        command = idle_loop(
            worker_data=worker_data,
            send_heartbeat_fn=partial(send_heartbeat, ping_commands=False),
            heartbeat_interval_ms=args.heartbeat_interval_ms,
        )

        # During the optimization loop, the command consumer is paused and the result consumer is active
        worker_data.command_consumer.start_processing()
        optimization_loop(
            ac_params=command.ac_params,
            grid_files=command.grid_files,
            worker_data=worker_data,
            send_result_fn=partial(send_result, optimization_id=command.optimization_id),
            send_heartbeat_fn=partial(send_heartbeat, ping_commands=True),
            optimization_id=command.optimization_id,
            loadflow_result_fs=loadflow_result_fs,
            processed_gridfile_fs=processed_gridfile_fs,
        )
        worker_data.command_consumer.stop_processing()
        scrub_db(worker_data.db)

Benchmarks#

toop_engine_topology_optimizer.benchmark.benchmark #

Run benchmark.

This module provides functionality to run benchmark tasks based on a provided configuration. It supports grid search for sweeping through the values of number of CUDA devices or a single parameter not defined inside lf_config and ga_config. Multiple parameter sweeps are not supported yet.

FUNCTION DESCRIPTION
main

logger module-attribute #

logger = Logger(__name__)

main #

main(cfg)

Run benchmark tasks based on the provided configuration.

PARAMETER DESCRIPTION
cfg

Configuration object containing benchmark tasks and output settings.

TYPE: DictConfig

RETURNS DESCRIPTION
None
Notes

The function performs the following steps: 1. Iterates over the benchmark tasks specified in the configuration. 2. Checks for grid search configurations and generates new configurations for each value in the grid. 3. Sets environment variables for each benchmark task. 4. Runs each benchmark task in a separate process to avoid side-effects. 5. Collects the results and logs them into a JSON file specified in the configuration.

Grid search currently supports sweeping through the values of number of CUDA devices or a single parameter not defined inside lf_config and ga_config. Multiple parameter sweeps are not supported yet.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/benchmark/benchmark.py
@hydra.main(config_path="configs", config_name="ms_benchmark.yaml", version_base="1.2")
def main(cfg: DictConfig) -> None:  # FIXME: Does not work for ToOp engine
    """Run benchmark tasks based on the provided configuration.

    Parameters
    ----------
    cfg : DictConfig
        Configuration object containing benchmark tasks and output settings.

    Returns
    -------
    None

    Notes
    -----
    The function performs the following steps:
    1. Iterates over the benchmark tasks specified in the configuration.
    2. Checks for grid search configurations and generates new configurations for each value in the grid.
    3. Sets environment variables for each benchmark task.
    4. Runs each benchmark task in a separate process to avoid side-effects.
    5. Collects the results and logs them into a JSON file specified in the configuration.

    Grid search currently supports sweeping through the values of number of CUDA devices or
    a single parameter not defined inside `lf_config` and `ga_config`. Multiple parameter sweeps are not supported yet.
    """
    benchmarks = []
    for task_cfg in cfg.benchmark_tasks:
        task_cfg_composed = compose(config_name=task_cfg.task_config)["task"]

        # check for grid searches
        if "grid_search" in task_cfg_composed:
            # Note: the grid search only works for sweeping through the values of
            # number of cuda devices or a single parameter not defined inside lf_config and ga_config.
            # The support to sweep multiple paramters are not there yet.
            key = next(iter(task_cfg_composed["grid_search"].keys()))
            values = next(iter(task_cfg_composed["grid_search"].values()))
            for val in values:
                new_config = deepcopy(task_cfg_composed)
                new_config[key] = val

                #  edit task name
                new_config["task_name"] = new_config["task_name"] + "_" + key + "_" + str(val)
                benchmarks.append(new_config)
        else:
            benchmarks.append(task_cfg_composed)

    results = []
    for benchmark in tqdm(benchmarks):
        set_environment_variables(benchmark)

        # Run the benchmark in a clean process to avoid side-effects from previous benchmarks
        ctx = mp.get_context("spawn")
        parent_conn, child_conn = ctx.Pipe()
        process = Process(target=run_task_process, args=(benchmark, child_conn))
        process.start()
        res = parent_conn.recv()
        process.join()
        results.append(res)

    #  Log the results as a json file
    with open(cfg.output_json, "w", encoding="utf-8") as f:
        json.dump(results, f, indent=2)

DC Optimizer#

toop_engine_topology_optimizer.dc.ga_helpers #

Helper functions for genetic algorithms.

TrackingMixingEmitter #

Bases: MixingEmitter

A MixingEmitter that tracks the number of branch and injection combinations and splits.

init #

init(random_key, init_genotypes)

Overwrite the Emitter.init function to seed an EmitterState.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/ga_helpers.py
def init(
    self,
    random_key: jax.random.PRNGKey,
    init_genotypes: Optional[Genotype],  # noqa: ARG002
) -> tuple[EmitterState, jax.random.PRNGKey]:
    """Overwrite the Emitter.init function to seed an EmitterState."""
    return {
        "total_branch_combis": jnp.array(0, dtype=int),
        "total_inj_combis": jnp.array(0, dtype=int),
        "total_num_splits": jnp.array(0, dtype=int),
    }, random_key

state_update #

state_update(
    emitter_state,
    repertoire,
    genotypes,
    fitnesses,
    descriptors,
    extra_scores,
)

Overwrite the state update to store information for the running means.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/ga_helpers.py
def state_update(
    self,
    emitter_state: Optional[EmitterState],
    repertoire: Optional[Repertoire],  # noqa: ARG002
    genotypes: Optional[Genotype],  # noqa: ARG002
    fitnesses: Optional[Fitness],  # noqa: ARG002
    descriptors: Optional[Descriptor],  # noqa: ARG002
    extra_scores: Optional[ExtraScores],
) -> EmitterState:
    """Overwrite the state update to store information for the running means."""
    assert emitter_state is not None
    assert extra_scores is not None
    return {
        "total_branch_combis": emitter_state["total_branch_combis"] + extra_scores["n_branch_combis"].astype(int),
        "total_inj_combis": emitter_state["total_inj_combis"] + extra_scores["n_inj_combis"].astype(int),
        "total_num_splits": emitter_state["total_num_splits"] + extra_scores["n_split_grids"].astype(int),
    }

RunningMeans dataclass #

RunningMeans(
    start_time,
    last_time,
    time_step,
    total_branch_combis,
    total_inj_combis,
    br_per_sec,
    inj_per_sec,
    split_per_iter,
    last_emitter_state,
    n_outages,
    n_devices,
)

A dataclass to hold the running means estimations for the TQDM progress bar.

start_time instance-attribute #

start_time

last_time instance-attribute #

last_time

time_step instance-attribute #

time_step

total_branch_combis instance-attribute #

total_branch_combis

total_inj_combis instance-attribute #

total_inj_combis

br_per_sec instance-attribute #

br_per_sec

inj_per_sec instance-attribute #

inj_per_sec

split_per_iter instance-attribute #

split_per_iter

last_emitter_state instance-attribute #

last_emitter_state

n_outages instance-attribute #

n_outages

n_devices instance-attribute #

n_devices

init_running_means #

init_running_means(n_outages, n_devices)

Initialize an empty RunningMeans object.

PARAMETER DESCRIPTION
n_outages

The number of outages

TYPE: int

n_devices

The number of devices

TYPE: int

RETURNS DESCRIPTION
RunningMeans

The initialized running means

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/ga_helpers.py
def init_running_means(n_outages: int, n_devices: int) -> RunningMeans:
    """Initialize an empty RunningMeans object.

    Parameters
    ----------
    n_outages : int
        The number of outages
    n_devices : int
        The number of devices

    Returns
    -------
    RunningMeans
        The initialized running means
    """
    now = time.time()
    return RunningMeans(
        start_time=now,
        last_time=now,
        time_step=0.0,
        total_branch_combis=0,
        total_inj_combis=0,
        br_per_sec=EWMA(),
        inj_per_sec=EWMA(),
        split_per_iter=EWMA(),
        last_emitter_state=None,
        n_outages=n_outages,
        n_devices=n_devices,
    )

update_running_means #

update_running_means(running_means, emitter_state)

Aggregate the emitter state statistics into the running means.

PARAMETER DESCRIPTION
running_means

The running means to be updated

TYPE: RunningMeans

emitter_state

The emitter state of the current iteration

TYPE: EmitterState

RETURNS DESCRIPTION
RunningMeans

The updated running means

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/ga_helpers.py
def update_running_means(running_means: RunningMeans, emitter_state: EmitterState) -> RunningMeans:
    """Aggregate the emitter state statistics into the running means.

    Parameters
    ----------
    running_means : RunningMeans
        The running means to be updated
    emitter_state : EmitterState
        The emitter state of the current iteration

    Returns
    -------
    RunningMeans
        The updated running means
    """
    now = time.time()
    running_means = deepcopy(running_means)
    emitter_state = jax.tree_util.tree_map(lambda x: x * running_means.n_devices, emitter_state)
    last_emitter_state = (
        running_means.last_emitter_state
        if running_means.last_emitter_state is not None
        else {
            "total_branch_combis": jnp.array(0, dtype=int),
            "total_inj_combis": jnp.array(0, dtype=int),
            "total_num_splits": jnp.array(0, dtype=int),
        }
    )

    branch_diff = emitter_state["total_branch_combis"].item() - last_emitter_state["total_branch_combis"].item()
    inj_diff = emitter_state["total_inj_combis"].item() - last_emitter_state["total_inj_combis"].item()
    split_diff = emitter_state["total_num_splits"].item() - last_emitter_state["total_num_splits"].item()

    # Clip diffs due to sporadic integer overflows
    branch_diff = max(branch_diff, 0)
    inj_diff = max(inj_diff, 0)
    split_diff = max(split_diff, 0)

    running_means.total_branch_combis += branch_diff
    running_means.total_inj_combis += inj_diff

    running_means.time_step = now - running_means.last_time

    running_means.br_per_sec.update(branch_diff / running_means.time_step)
    running_means.inj_per_sec.update(inj_diff / running_means.time_step)
    running_means.split_per_iter.update(split_diff)

    running_means.last_time = now
    running_means.last_emitter_state = emitter_state

    return running_means

make_description_string #

make_description_string(running_means, runtime_seconds)

Create a string representation of the running means for the command line tqdm output.

PARAMETER DESCRIPTION
running_means

The running means to be displayed

TYPE: RunningMeans

runtime_seconds

The total runtime of the optimization in seconds

TYPE: float

RETURNS DESCRIPTION
str

The string representation of the running means

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/ga_helpers.py
def make_description_string(running_means: RunningMeans, runtime_seconds: float) -> str:
    """Create a string representation of the running means for the command line tqdm output.

    Parameters
    ----------
    running_means : RunningMeans
        The running means to be displayed
    runtime_seconds : float
        The total runtime of the optimization in seconds

    Returns
    -------
    str
        The string representation of the running means
    """
    now = time.time()

    loadflows = running_means.total_inj_combis * running_means.n_outages
    return (
        f"br/s: {running_means.br_per_sec.get():.2f}, "
        + f"inj/s: {running_means.inj_per_sec.get():.2f}, "
        + f"lfs: {loadflows:.2e}, "
        + f"time: {(now - running_means.start_time):.0f}/{runtime_seconds:.0f}s"
    )

toop_engine_topology_optimizer.dc.main #

Launcher for the Map-Elites optimizer.

example args:

--fixed_files /workspaces/AICoE_HPC_RL_Optimizer/data/static_information.hdf5 --stats_dir /workspaces/AICoE_HPC_RL_Optimizer/stats/ --tensorboard_dir /workspaces/AICoE_HPC_RL_Optimizer/stats/tensorboard/ --ga_config.target_metrics overload_energy_n_1 1.0 # The metrics to optimize with their weight --ga_config.me_descriptors.0.metric split_subs --ga_config.me_descriptors.0.num_cells 5 --ga_config.me_descriptors.1.metric switching_distance --ga_config.me_descriptors.1.num_cells 45 # The metrics to use as descriptors along with their maximum value --ga_config.observed_metrics overload_energy_n_1 split_subs switching_distance # All the relevant metrics, including target, descriptors and extra metrics you want to see in the report --ga_config.substation_unsplit_prob 0.5 # Instinctively better suited for mapelites --ga_config.substation_split_prob 0.5 # Instinctively better suited for mapelites --ga_config.plot # Enable plot generation and saving in stats_dir/plots/ --ga_config.iterations_per_epoch 50 # Basically how often you wanna get a report. Suggested range : 50 - 1000 --ga_config.runtime_seconds 300 # How many seconds to run the optimization for The first descriptor metric is the vertical axis, the second the horizontal axis.

logger module-attribute #

logger = Logger(__name__)

args module-attribute #

args = cli(CLIArgs)

file_system module-attribute #

file_system = LocalFileSystem()

CLIArgs #

Bases: BaseModel

The arguments for a CLI invocation, mostly equal to OptimizationStartCommand.

ga_config class-attribute instance-attribute #

ga_config = Field(default_factory=BatchedMEParameters)

The configuration for the genetic algorithm

lf_config class-attribute instance-attribute #

lf_config = Field(default_factory=LoadflowSolverParameters)

The configuration for the loadflow solver

fixed_files class-attribute instance-attribute #

fixed_files = ()

The file containing the static information. You can pass multiple files here

tensorboard_dir class-attribute instance-attribute #

tensorboard_dir = 'tensorboard'

The directory to store the tensorboard logs

stats_dir class-attribute instance-attribute #

stats_dir = 'stats'

The directory to store the json summaries

summary_frequency class-attribute instance-attribute #

summary_frequency = 10

How often to write tensorboard summaries

checkpoint_frequency class-attribute instance-attribute #

checkpoint_frequency = 100

How often to write json summaries

double_limits class-attribute instance-attribute #

double_limits = (1.0, 1.0)

The double limits to use in the form (lower_limit, upper_limit). lower_limit: float The relative lower limit to set, for branches whose n-1 flows are below the lower limit upper_limit: float The relative upper_limit determining at what relative load a branch is considered overloaded. Branches in the band between lower and upper limit are considered overloaded if more load is added.

log_tensorboard #

log_tensorboard(fitness, metrics, iteration, writer)

Log fitness and metrics to tensorboard.

PARAMETER DESCRIPTION
fitness

The fitness value

TYPE: float

metrics

The metrics to log

TYPE: dict[MetricType, float]

iteration

The current iteration number

TYPE: int

writer

The tensorboard writer

TYPE: SummaryWriter

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/main.py
def log_tensorboard(
    fitness: float,
    metrics: dict[MetricType, float],
    iteration: int,
    writer: SummaryWriter,
) -> None:
    """Log fitness and metrics to tensorboard.

    Parameters
    ----------
    fitness : float
        The fitness value
    metrics : dict[MetricType, float]
        The metrics to log
    iteration : int
        The current iteration number
    writer : SummaryWriter
        The tensorboard writer
    """
    writer.add_scalar("fitness", fitness, iteration)
    for key, value in metrics.items():
        writer.add_scalar(key, value, iteration)

write_summary #

write_summary(
    optimizer_data,
    repertoire,
    emitter_state,
    iteration,
    folder,
    cli_args,
    processed_gridfile_fs,
    final_results=False,
)

Write a summary to a json file.

Watch out : here, the optimizer data is out of sync with the jax_data

PARAMETER DESCRIPTION
optimizer_data

The optimizer data of the optimization. This includes a jax_data, however as the jax_data is updated per-iteration and the optimizer_data only per-epoch, we need to pass the jax_data separately

TYPE: OptimizerData

repertoire

The current repertoire for this iteration, will be used instead of the one in the optimizer_data

TYPE: DiscreteMapElitesRepertoire

emitter_state

The emitter state for this iteration, will be used instead of the one in the optimizer_data

TYPE: EmitterState

iteration

The iteration number

TYPE: int

folder

The folder to write the summary to, relative to the processed_gridfile_fs

TYPE: str

cli_args

The arguments used for invocation, will be added to the summary for documentation purposes

TYPE: CLIArgs

processed_gridfile_fs

The target filesystem for the preprocessing worker. This contains all processed grid files. During the import job, a new folder import_results.data_folder was created which will be completed with the preprocess call to this function. Internally, only the data folder is passed around as a dirfs. Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the unprocessed gridfiles were already done.

TYPE: AbstractFileSystem

final_results

Whether this is the final results summary "res.json" or an intermediate one "res_{iteration}.json"

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict

The summary that was written

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/main.py
def write_summary(
    optimizer_data: OptimizerData,
    repertoire: DiscreteMapElitesRepertoire,
    emitter_state: EmitterState,
    iteration: int,
    folder: str,
    cli_args: CLIArgs,
    processed_gridfile_fs: AbstractFileSystem,
    final_results: bool = False,
) -> dict:
    """Write a summary to a json file.

    Watch out : here, the optimizer data is out of sync with the jax_data

    Parameters
    ----------
    optimizer_data : OptimizerData
        The optimizer data of the optimization. This includes a jax_data, however as the jax_data
        is updated per-iteration and the optimizer_data only per-epoch, we need to pass the
        jax_data separately
    repertoire : DiscreteMapElitesRepertoire
        The current repertoire for this iteration, will be used instead of the one in the optimizer_data
    emitter_state : EmitterState
        The emitter state for this iteration, will be used instead of the one in the optimizer_data
    iteration : int
        The iteration number
    folder : str
        The folder to write the summary to, relative to the processed_gridfile_fs
    cli_args : CLIArgs
        The arguments used for invocation, will be added to the summary for
        documentation purposes
    processed_gridfile_fs: AbstractFileSystem
        The target filesystem for the preprocessing worker. This contains all processed grid files.
        During the import job,  a new folder import_results.data_folder was created
        which will be completed with the preprocess call to this function.
        Internally, only the data folder is passed around as a dirfs.
        Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the
        unprocessed gridfiles were already done.
    final_results : bool
        Whether this is the final results summary "res.json" or an intermediate one "res_{iteration}.json"

    Returns
    -------
    dict
        The summary that was written
    """
    processed_gridfile_fs.makedirs(folder, exist_ok=True)

    ga_config = cli_args.ga_config
    n_cells_per_dim = tuple(desc.num_cells for desc in ga_config.me_descriptors)
    descriptor_metrics = tuple(desc.metric for desc in ga_config.me_descriptors)
    args_dict = cli_args.model_dump()

    # Here we assume that contingency_ids are the same for all topos in the repertoire
    contingency_ids = optimizer_data.solver_configs[0].contingency_ids
    summary = summarize(
        repertoire=repertoire,
        emitter_state=emitter_state,
        initial_fitness=optimizer_data.initial_fitness,
        initial_metrics=optimizer_data.initial_metrics,
        contingency_ids=contingency_ids,
        grid_model_low_tap=optimizer_data.jax_data.dynamic_informations[0].nodal_injection_information.grid_model_low_tap
        if optimizer_data.jax_data.dynamic_informations[0].nodal_injection_information is not None
        else None,
    )
    summary.update(
        {
            "args": args_dict,
            "iteration": iteration,
        }
    )
    filename = "res.json" if final_results else f"res_{iteration}.json"
    with processed_gridfile_fs.open(os.path.join(folder, filename), "w") as f:
        json.dump(summary, f)
    if ga_config.plot:
        plot_repertoire(
            repertoire.fitnesses[: np.prod(n_cells_per_dim)],
            iteration,
            folder,
            n_cells_per_dim=n_cells_per_dim,
            descriptor_metrics=descriptor_metrics,
            save_plot=ga_config.plot,
        )
    return summary

main #

main(args, processed_gridfile_fs)

Run main optimization function for CLI execution.

PARAMETER DESCRIPTION
args

The arguments for the optimization

TYPE: CLIArgs

processed_gridfile_fs

The target filesystem for the preprocessing worker. This contains all processed grid files. During the import job, a new folder import_results.data_folder was created which will be completed with the preprocess call to this function. Internally, only the data folder is passed around as a dirfs. Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the unprocessed gridfiles were already done.

TYPE: AbstractFileSystem

RETURNS DESCRIPTION
dict

The final results of the optimization

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/main.py
def main(
    args: CLIArgs,
    processed_gridfile_fs: AbstractFileSystem,
) -> dict:
    """Run main optimization function for CLI execution.

    Parameters
    ----------
    args : CLIArgs
        The arguments for the optimization
    processed_gridfile_fs: AbstractFileSystem
        The target filesystem for the preprocessing worker. This contains all processed grid files.
        During the import job,  a new folder import_results.data_folder was created
        which will be completed with the preprocess call to this function.
        Internally, only the data folder is passed around as a dirfs.
        Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the
        unprocessed gridfiles were already done.

    Returns
    -------
    dict
        The final results of the optimization
    """
    jax.config.update("jax_enable_x64", True)

    logger.info(f"Starting with config {args}")

    args_dict = args.model_dump()

    partial_write_summary = partial(
        write_summary,
        folder=args.stats_dir,
        cli_args=args,
        processed_gridfile_fs=processed_gridfile_fs,
    )

    optimizer_data, stats, initial_topology = initialize_optimization(
        params=DCOptimizerParameters(
            ga_config=args.ga_config,
            loadflow_solver_config=args.lf_config,
            summary_frequency=args.summary_frequency,
            check_command_frequency=args.summary_frequency,
            double_limits=DoubleLimitsSetpoint(lower=args.double_limits[0], upper=args.double_limits[1])
            if args.double_limits != (1.0, 1.0)
            else None,
        ),
        optimization_id="CLI",
        static_information_files=args.fixed_files,
        processed_gridfile_fs=processed_gridfile_fs,
    )

    running_means = init_running_means(
        n_outages=optimizer_data.jax_data.dynamic_informations[0].n_nminus1_cases,
        n_devices=len(jax.devices()) if args.lf_config.distributed else 1,
    )

    logger.info(f"Optimization started: {stats}")

    writer = SummaryWriter(f"{args.tensorboard_dir}/{datetime.datetime.now()}")
    writer.add_hparams(args_dict, {})

    # Log initial results
    log_tensorboard(
        fitness=initial_topology.timesteps[0].metrics.fitness,
        metrics=initial_topology.timesteps[0].metrics.extra_scores,
        iteration=0,
        writer=writer,
    )

    epoch = 0
    with tqdm(
        total=args.ga_config.runtime_seconds,
        bar_format="{l_bar}{bar}[{elapsed}<{remaining}]{postfix}",
        postfix={
            "f0": optimizer_data.initial_fitness,
            "f": optimizer_data.initial_fitness,
            "br/s": 0,
            "inj/s": 0,
            "lfs": 0,
            "epoch": 0,
        },
    ) as pbar:
        while time.time() - running_means.start_time < args.ga_config.runtime_seconds:
            optimizer_data = run_epoch(optimizer_data)

            with jax.default_device(jax.devices("cpu")[0]):
                repertoire = (
                    jax.tree_util.tree_map(lambda x: x[0], optimizer_data.jax_data.repertoire)
                    if args.lf_config.distributed
                    else optimizer_data.jax_data.repertoire
                )
                emitter_state = (
                    jax.tree_util.tree_map(lambda x: x[0], optimizer_data.jax_data.emitter_state)
                    if args.lf_config.distributed
                    else optimizer_data.jax_data.emitter_state
                )

                fitness, metrics = get_repertoire_metrics(repertoire, args.ga_config.observed_metrics)
                log_tensorboard(fitness, metrics, epoch, writer)
                partial_write_summary(
                    optimizer_data,
                    repertoire,
                    emitter_state,
                    iteration=epoch,
                    final_results=False,
                )

                running_means = update_running_means(running_means=running_means, emitter_state=emitter_state)
            pbar.update(time.time() - running_means.start_time - pbar.n)
            pbar.set_postfix(
                {
                    "f0": optimizer_data.initial_fitness,
                    "f": fitness,
                    "br/s": running_means.br_per_sec.get(),
                    "inj/s": running_means.inj_per_sec.get(),
                    "lfs": running_means.total_inj_combis * running_means.n_outages,
                    "epoch": epoch,
                }
            )
            epoch += 1

    final_results = partial_write_summary(
        optimizer_data,
        jax.tree_util.tree_map(lambda x: x[0], optimizer_data.jax_data.repertoire)
        if args.lf_config.distributed
        else optimizer_data.jax_data.repertoire,
        jax.tree_util.tree_map(lambda x: x[0], optimizer_data.jax_data.emitter_state)
        if args.lf_config.distributed
        else optimizer_data.jax_data.emitter_state,
        iteration=epoch,
        final_results=True,
    )
    return final_results

Genetic Algorithm Functions#

toop_engine_topology_optimizer.dc.genetic_functions.evolution_functions #

Contains the genetic operations for the topologies.

This module contains the functions to perform the genetic operations of mutation and crossover on the topologies of the grid. The topologies are represented as Genotype dataclasses, which contain the substation ids, the branch topology, the injection topology and the disconnections.

Genotype #

A single genome in the repertoire representing a topology.

action_index instance-attribute #

action_index

An action index into the action set

disconnections instance-attribute #

disconnections

The disconnections to apply, padded with int_max for disconnection slots that are unused. These are indices into the disconnectable branches set.

nodal_injections_optimized instance-attribute #

nodal_injections_optimized

The results of the nodal injection optimization, if any was performed.

deduplicate_genotypes #

deduplicate_genotypes(genotypes, desired_size=None)

Deduplicate the genotypes in the repertoire.

This version is jittable because we set the size

PARAMETER DESCRIPTION
genotypes

The genotypes to deduplicate

TYPE: Genotype

desired_size

How many unique values you are expecting. If not given, this is not jittable

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
Genotype

The deduplicated genotypes

Int[Array, ' n_unique']

The indices of the unique genotypes

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def deduplicate_genotypes(
    genotypes: Genotype,
    desired_size: Optional[int] = None,
) -> tuple[Genotype, Int[Array, " n_unique"]]:
    """Deduplicate the genotypes in the repertoire.

    This version is jittable because we set the size

    Parameters
    ----------
    genotypes : Genotype
        The genotypes to deduplicate
    desired_size : Optional[int]
        How many unique values you are expecting. If not given, this is not jittable

    Returns
    -------
    Genotype
        The deduplicated genotypes
    Int[Array, " n_unique"]
        The indices of the unique genotypes
    """
    # Purposefully not taking into account nodal_injections_optimized, as these are not part of the topology
    genotype_flat = jnp.concatenate(
        [
            genotypes.action_index,
            genotypes.disconnections,
        ],
        axis=1,
    )

    _, indices = jnp.unique(
        genotype_flat,
        axis=0,
        return_index=True,
        size=desired_size,
        # fill_value takes the minimum flattened topology by default
        # it also corresponds to the first index in the list
    )
    unique_genotypes = jax.tree_util.tree_map(lambda x: x[indices], genotypes)
    return unique_genotypes, indices

fix_dtypes #

fix_dtypes(genotypes)

Fix the dtypes of the genotypes to their native type.

For some reason, qdax aggressively converts everything to float

PARAMETER DESCRIPTION
genotypes

The genotypes to fix

TYPE: Genotype

RETURNS DESCRIPTION
Genotype

The genotypes with fixed dtypes

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def fix_dtypes(genotypes: Genotype) -> Genotype:
    """Fix the dtypes of the genotypes to their native type.

    For some reason, qdax aggressively converts everything to float

    Parameters
    ----------
    genotypes : Genotype
        The genotypes to fix

    Returns
    -------
    Genotype
        The genotypes with fixed dtypes
    """
    return Genotype(
        action_index=genotypes.action_index.astype(int),
        disconnections=genotypes.disconnections.astype(int),
        nodal_injections_optimized=genotypes.nodal_injections_optimized,
    )

empty_repertoire #

empty_repertoire(
    batch_size,
    max_num_splits,
    max_num_disconnections,
    n_timesteps,
    starting_taps=None,
)

Create an initial genotype repertoire with all zeros for all entries and int_max for all subs.

PARAMETER DESCRIPTION
batch_size

The batch size

TYPE: int

max_num_splits

The maximum number of splits per topology

TYPE: int

max_num_disconnections

The maximum number of disconnections as topological measures per topology

TYPE: int

n_timesteps

The number of timesteps in the optimization horizon, used for the nodal injection optimization results

TYPE: int

starting_taps

The starting taps for the psts. If None, no nodal inj optimization will be enabled and nodal_injections_optimized will be set to None. If provided, nodal_injections_optimized will be initialized with these starting taps.

TYPE: Optional[Int[Array, ' num_psts']] DEFAULT: None

RETURNS DESCRIPTION
Genotype

The initial genotype

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def empty_repertoire(
    batch_size: int,
    max_num_splits: int,
    max_num_disconnections: int,
    n_timesteps: int,
    starting_taps: Optional[Int[Array, " num_psts"]] = None,
) -> Genotype:
    """Create an initial genotype repertoire with all zeros for all entries and int_max for all subs.

    Parameters
    ----------
    batch_size : int
        The batch size
    max_num_splits : int
        The maximum number of splits per topology
    max_num_disconnections : int
        The maximum number of disconnections as topological measures per topology
    n_timesteps : int
        The number of timesteps in the optimization horizon, used for the nodal injection optimization results
    starting_taps : Optional[Int[Array, " num_psts"]]
        The starting taps for the psts. If None, no nodal inj optimization will be enabled and nodal_injections_optimized
        will be set to None. If provided, nodal_injections_optimized will be initialized with these starting taps.

    Returns
    -------
    Genotype
        The initial genotype
    """
    if starting_taps is not None:
        nodal_injections_optimized = NodalInjOptimResults(
            pst_tap_idx=jnp.tile(starting_taps[None, None, :], (batch_size, n_timesteps, 1))
        )
    else:
        nodal_injections_optimized = None

    return Genotype(
        action_index=jnp.full((batch_size, max_num_splits), int_max(), dtype=int),
        disconnections=jnp.full((batch_size, max_num_disconnections), int_max(), dtype=int),
        nodal_injections_optimized=nodal_injections_optimized,
    )

mutate #

mutate(
    topologies,
    random_key,
    substation_split_prob,
    substation_unsplit_prob,
    action_set,
    n_disconnectable_branches,
    n_subs_mutated_lambda,
    disconnect_prob,
    reconnect_prob,
    pst_mutation_sigma,
    pst_n_taps,
    mutation_repetition=1,
)

Mutate the topologies by splitting substations and changing the branch and injection topos.

Makes sure that at all times, a substation is split at most once and that all branch and injection actions are in range of the available actions for the substation. If a substation is not split, this is indicated by the value int_max in the substation, branch and injection.

We mutate mutation_repetition copies of the initial repertoire to increase the chance of getting unique mutations.

PARAMETER DESCRIPTION
topologies

The topologies to mutate

TYPE: Genotype

random_key

The random key to use for the mutation

TYPE: PRNGKey

substation_split_prob

The probability to split a substation. In case all substations are already split, this probability is ignored

TYPE: float

substation_unsplit_prob

The probability to reset a substation to the unsplit state

TYPE: float

action_set

The action set containing available actions on a per-substation basis.

TYPE: ActionSet

n_disconnectable_branches

The number of disconnectable branches in the action set.

TYPE: int

n_subs_mutated_lambda

The lambda for the poisson distribution to determine the number of substations to mutate

TYPE: float

disconnect_prob

The probability to disconnect a new branch

TYPE: float

reconnect_prob

The probability to reconnect a disconnected branch, will overwrite a possible disconnect

TYPE: float

pst_mutation_sigma

The sigma to use for the normal distribution to sample the PST tap mutation from.

TYPE: float

pst_n_taps

The number of taps for each PST, from nodal_injection_information.pst_n_taps.

TYPE: Int[Array, ' num_psts']

mutation_repetition

More chance to get unique mutations by mutating mutation_repetition copies of the repertoire

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
Genotype

The mutated topologies

PRNGKey

The new random key

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def mutate(  # noqa: PLR0913
    topologies: Genotype,
    random_key: jax.random.PRNGKey,
    substation_split_prob: float,
    substation_unsplit_prob: float,
    action_set: ActionSet,
    n_disconnectable_branches: int,
    n_subs_mutated_lambda: float,
    disconnect_prob: float,
    reconnect_prob: float,
    pst_mutation_sigma: float,
    pst_n_taps: Int[Array, " num_psts"],
    mutation_repetition: int = 1,
) -> tuple[Genotype, jax.random.PRNGKey]:
    """Mutate the topologies by splitting substations and changing the branch and injection topos.

    Makes sure that at all times, a substation is split at most once and that all branch and
    injection actions are in range of the available actions for the substation. If a substation is
    not split, this is indicated by the value int_max in the substation, branch and injection.

    We mutate mutation_repetition copies of the initial repertoire to increase the chance of getting
    unique mutations.

    Parameters
    ----------
    topologies : Genotype
        The topologies to mutate
    random_key : jax.random.PRNGKey
        The random key to use for the mutation
    substation_split_prob : float
        The probability to split a substation. In case all substations are already split, this
        probability is ignored
    substation_unsplit_prob : float
        The probability to reset a substation to the unsplit state
    action_set : ActionSet
        The action set containing available actions on a per-substation basis.
    n_disconnectable_branches: int
        The number of disconnectable branches in the action set.
    n_subs_mutated_lambda : float
        The lambda for the poisson distribution to determine the number of substations to mutate
    disconnect_prob : float
        The probability to disconnect a new branch
    reconnect_prob : float
        The probability to reconnect a disconnected branch, will overwrite a possible disconnect
    pst_mutation_sigma : float
        The sigma to use for the normal distribution to sample the PST tap mutation from.
    pst_n_taps : Int[Array, " num_psts"]
        The number of taps for each PST, from nodal_injection_information.pst_n_taps.
    mutation_repetition : int
        More chance to get unique mutations by mutating mutation_repetition copies of the repertoire

    Returns
    -------
    Genotype
        The mutated topologies
    jax.random.PRNGKey
        The new random key
    """
    max_num_splits = topologies.action_index.shape[1]

    def _mutate_single_topo(
        sub_ids: Int[Array, " max_num_splits"],
        action: Int[Array, " max_num_splits"],
        disconnections_topo: Int[Array, " max_num_disconnections"],
        random_key: jax.random.PRNGKey,
    ) -> tuple[
        Int[Array, " max_num_splits"],
        Int[Array, " max_num_splits"],
        Int[Array, " max_num_disconnections"],
        jax.random.PRNGKey,
    ]:
        """Mutates a single topology n_subs_mutated times and adds disconnections."""
        # Sample number of subs mutated from a poisson
        random_key, subkey = jax.random.split(random_key)
        n_subs_mutated = jax.random.poisson(subkey, lam=n_subs_mutated_lambda, shape=())
        n_subs_mutated = jnp.clip(n_subs_mutated, 1, max_num_splits)

        # Mutate sub_ids, action and injection_topo n_subs_mutated times
        sub_ids, action, random_key = jax.lax.fori_loop(
            0,
            n_subs_mutated,
            lambda _i, args: mutate_sub(
                sub_ids=args[0],
                action=args[1],
                random_key=args[2],
                substation_split_prob=substation_split_prob,
                substation_unsplit_prob=substation_unsplit_prob,
                action_set=action_set,
            ),
            (
                sub_ids,
                action,
                random_key,
            ),
        )

        # Mutate the disconnections
        random_key, subkey = jax.random.split(random_key)
        disconnections_topo = mutate_disconnections(
            random_key=subkey,
            disconnections=disconnections_topo,
            n_disconnectable_branches=n_disconnectable_branches,
            disconnect_prob=disconnect_prob,
            reconnect_prob=reconnect_prob,
        )

        return sub_ids, action, disconnections_topo, random_key

    topologies = fix_dtypes(topologies)
    batch_size = len(topologies.action_index)

    # Repeat the topologies to increase the chance of getting unique mutations
    if mutation_repetition == 1:
        repeated_topologies = topologies
    else:
        repeated_topologies: Genotype = jax.tree.map(
            lambda x: jnp.repeat(
                x, repeats=mutation_repetition, axis=0, total_repeat_length=batch_size * mutation_repetition
            ),
            topologies,
        )

    random_keys = jax.random.split(random_key, batch_size * mutation_repetition)
    sub_ids = extract_sub_ids(
        repeated_topologies.action_index,
        action_set,
    )

    (
        sub_ids,
        action,
        disconnections_topo,
        random_keys,
    ) = jax.vmap(_mutate_single_topo)(
        sub_ids,
        repeated_topologies.action_index,
        repeated_topologies.disconnections,
        random_keys,
    )
    random_key = random_keys[0]

    nodal_injections_optimized = mutate_nodal_injections(
        random_key=random_key,
        nodal_inj_info=repeated_topologies.nodal_injections_optimized,
        pst_mutation_sigma=pst_mutation_sigma,
        pst_n_taps=pst_n_taps,
    )

    topologies_mutated = Genotype(
        action_index=action,
        disconnections=disconnections_topo,
        nodal_injections_optimized=nodal_injections_optimized,
    )

    if mutation_repetition > 1:
        topologies_mutated, _ = deduplicate_genotypes(
            topologies_mutated,
            desired_size=batch_size,
        )

    return topologies_mutated, random_key

mutate_sub_id #

mutate_sub_id(
    random_key, sub_ids, n_subs_rel, substation_split_prob
)

Mutate the substation ids of a single topology.

PARAMETER DESCRIPTION
random_key

The random key to use for the mutation

TYPE: PRNGKey

sub_ids

The substation ids before mutation

TYPE: Int[Array, ' max_num_splits']

n_subs_rel

The number of relevant substations in the grid

TYPE: int

substation_split_prob

The probability to split a substation

TYPE: float

RETURNS DESCRIPTION
Int[Array, ' max_num_splits']

The mutated substation ids

Int[Array, ' ']

The index of the substation that was mutated. If no substation was mutated, this is set to a random substation that was already split, so branch and injection mutations can still happen despite no new split. If no substation was split yet and no split happened, this is set to int_max

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def mutate_sub_id(
    random_key: jax.random.PRNGKey,
    sub_ids: Int[Array, " max_num_splits"],
    n_subs_rel: int,
    substation_split_prob: float,
) -> tuple[Int[Array, " max_num_splits"], Int[Array, " "]]:
    """Mutate the substation ids of a single topology.

    Parameters
    ----------
    random_key : jax.random.PRNGKey
        The random key to use for the mutation
    sub_ids : Int[Array, " max_num_splits"]
        The substation ids before mutation
    n_subs_rel : int
        The number of relevant substations in the grid
    substation_split_prob : float
        The probability to split a substation

    Returns
    -------
    Int[Array, " max_num_splits"]
        The mutated substation ids
    Int[Array, " "]
        The index of the substation that was mutated. If no substation was mutated, this is set to
        a random substation that was already split, so branch and injection mutations can still
        happen despite no new split. If no substation was split yet and no split happened, this is
        set to int_max
    """
    int_max = jnp.iinfo(jnp.int32).max
    randkeys = jax.random.split(random_key, 3)
    max_num_splits = sub_ids.shape[0]

    # Write a boolean array which substations have been split already, we don't want to split these
    # again
    substations_split = jnp.zeros(n_subs_rel, dtype=bool).at[sub_ids].set(True, mode="drop")
    all_split = jnp.all(substations_split)

    # Sample a new substation from the substations which have not been split yet for every topology
    # in the batch
    new_substation_idx: Int[Array, " "] = jax.random.categorical(randkeys[0], jnp.log(1 - substations_split.astype(float)))

    # Now, decide on which index to update the substation. We only have max_num_splits substations
    # that can be split, so we need to choose one of the spots.
    # Additionally, we want a certain probability to leave the substation unchanged, we implement
    # This by sampling an index that could be out of bounds, and dropping the update if it is
    split_idx: Int[Array, " "] = jax.random.randint(
        randkeys[1],
        shape=(1,),
        minval=0,
        maxval=int(max_num_splits * (1 / substation_split_prob)),
    )[0]
    # If all substations have been split, do not update
    split_idx = jnp.where(all_split, int_max, split_idx)
    sub_ids = sub_ids.at[split_idx].set(new_substation_idx, mode="drop")

    # Furthermore, we want to mutate the branch topology and the injection topology. We want to
    # mutate exactly one substation per topology every time
    # If the substation id has changed, we're forced to also change the branch topology and the
    # injection topology of that substation. Otherwise (split_idx >= max_num_splits), we'll resample
    # During resampling, we want to make sure we're sampling a spot that has a split (if any)
    resampled_split = jax.random.categorical(randkeys[2], jnp.log((sub_ids != int_max).astype(float)))
    split_idx = jnp.where(split_idx >= max_num_splits, resampled_split, split_idx)
    # Edge case: No substations have been split yet
    split_idx = jnp.where(sub_ids[split_idx] == int_max, int_max, split_idx)

    return sub_ids, split_idx

mutate_disconnections #

mutate_disconnections(
    random_key,
    disconnections,
    n_disconnectable_branches,
    disconnect_prob,
    reconnect_prob,
)

Mutate the disconnections of a single topology.

PARAMETER DESCRIPTION
random_key

The random key to use for the mutation

TYPE: PRNGKey

disconnections

The disconnections before mutation of one individual

TYPE: Int[Array, ' max_num_disconnections']

n_disconnectable_branches

The number of disconnectable branches in the action set. It is not necessary to know the contents of the action set here because we only sample an index into the action set.

TYPE: int

disconnect_prob

The probability to disconnect a new branch

TYPE: float

reconnect_prob

The probability to reconnect a disconnected branch, will overwrite a possible disconnect

TYPE: float

RETURNS DESCRIPTION
Int[Array, ' max_num_disconnections']

The mutated disconnections

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def mutate_disconnections(
    random_key: jax.random.PRNGKey,
    disconnections: Int[Array, " max_num_disconnections"],
    n_disconnectable_branches: int,
    disconnect_prob: float,
    reconnect_prob: float,
) -> Int[Array, " max_num_disconnections"]:
    """Mutate the disconnections of a single topology.

    Parameters
    ----------
    random_key : jax.random.PRNGKey
        The random key to use for the mutation
    disconnections : Int[Array, " max_num_disconnections"]
        The disconnections before mutation of one individual
    n_disconnectable_branches: int,
        The number of disconnectable branches in the action set. It is not necessary to know the
        contents of the action set here because we only sample an index into the action set.
    disconnect_prob : float
        The probability to disconnect a new branch
    reconnect_prob : float
        The probability to reconnect a disconnected branch, will overwrite a possible disconnect

    Returns
    -------
    Int[Array, " max_num_disconnections"]
        The mutated disconnections
    """
    key1, key2, key3 = jax.random.split(random_key, 3)
    if disconnect_prob > 0 and disconnections.size > 0 and n_disconnectable_branches > 0:
        # List available disconnections
        available_disconnections = ~jnp.isin(jnp.arange(n_disconnectable_branches), disconnections, assume_unique=True)

        # Choose a slot so that the probability of choosing out-of-bounds is equal to 1 - disconnect_prob
        disconnect_idx = jax.random.randint(
            key1,
            shape=(1,),
            minval=0,
            maxval=int(disconnections.shape[0] * (1 / disconnect_prob)),
        )[0]

        # Sample a new disconnection from the available disconnections
        new_disconnection = jax.random.categorical(key2, jnp.log(available_disconnections.astype(float)))
        # If no disconnection is available, set disconnect_idx to int_max
        disconnect_idx = jnp.where(available_disconnections.sum() == 0, int_max(), disconnect_idx)
        disconnections = disconnections.at[disconnect_idx].set(new_disconnection, mode="drop")

    # Choose a slot to overwrite with a reconnect
    if reconnect_prob > 0 and disconnections.size > 0 and n_disconnectable_branches > 0:
        reconnect_idx = jax.random.randint(
            key3,
            shape=(1,),
            minval=0,
            maxval=int(disconnections.shape[0] * (1 / reconnect_prob)),
        )[0]
        disconnections = disconnections.at[reconnect_idx].set(int_max(), mode="drop")

    return disconnections

mutate_sub #

mutate_sub(
    sub_ids,
    action,
    random_key,
    substation_split_prob,
    substation_unsplit_prob,
    action_set,
)

Mutate a single substation, changing the sub_ids, branch and inj topos.

The sub-ids are implicit to the branch topo action index, however we pass them explicitely to aid substation mutation.

PARAMETER DESCRIPTION
sub_ids

The substation ids before mutation

TYPE: Int[Array, ' max_num_splits']

action

The branch topology before mutation

TYPE: Int[Array, ' max_num_splits']

random_key

The random key to use for the mutation

TYPE: PRNGKey

substation_split_prob

The probability to split a substation

TYPE: float

substation_unsplit_prob

The probability to reset a split substation to the unsplit state

TYPE: float

action_set

The actions for every substation. If not provided, will sample from all possible actions per substation

TYPE: ActionSet

RETURNS DESCRIPTION
sub_ids

The substation ids after mutation

TYPE: Int[Array, ' max_num_splits']

action

The branch topology after mutation

TYPE: Int[Array, ' max_num_splits']

random_key

The random key used for the mutation

TYPE: PRNGKey

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def mutate_sub(
    sub_ids: Int[Array, " max_num_splits"],
    action: Int[Array, " max_num_splits"],
    random_key: jax.random.PRNGKey,
    substation_split_prob: float,
    substation_unsplit_prob: float,
    action_set: ActionSet,
) -> tuple[
    Int[Array, " max_num_splits"],
    Int[Array, " max_num_splits"],
    jax.random.PRNGKey,
]:
    """Mutate a single substation, changing the sub_ids, branch and inj topos.

    The sub-ids are implicit to the branch topo action index, however we pass them explicitely
    to aid substation mutation.

    Parameters
    ----------
    sub_ids : Int[Array, " max_num_splits"]
        The substation ids before mutation
    action : Int[Array, " max_num_splits"]
        The branch topology before mutation
    random_key : jax.random.PRNGKey
        The random key to use for the mutation
    substation_split_prob : float
        The probability to split a substation
    substation_unsplit_prob : float
        The probability to reset a split substation to the unsplit state
    action_set : ActionSet
        The actions for every substation. If not provided, will sample from all possible
        actions per substation

    Returns
    -------
    sub_ids : Int[Array, " max_num_splits"]
        The substation ids after mutation
    action : Int[Array, " max_num_splits"]
        The branch topology after mutation
    random_key : jax.random.PRNGKey
        The random key used for the mutation
    """
    if substation_split_prob == 0:
        return sub_ids, action, random_key

    randkeys = jax.random.split(random_key, 7)

    n_subs_rel = len(action_set.n_actions_per_sub)
    assert substation_split_prob > 0 and substation_split_prob <= 1
    assert substation_unsplit_prob >= 0 and substation_unsplit_prob <= 1

    sub_ids, split_idx = mutate_sub_id(
        randkeys[0],
        sub_ids,
        n_subs_rel,
        substation_split_prob,
    )
    selected_sub = sub_ids.at[split_idx].get(mode="fill", fill_value=int_max())

    # The branch action set stores the available actions
    new_action = sample_action_index_from_branch_actions(
        rng_key=randkeys[3],
        sub_id=selected_sub,
        branch_action_set=action_set,
    )

    action = action.at[split_idx].set(new_action, mode="drop")

    sub_ids, action = unsplit_substation(
        random_key=randkeys[5],
        sub_ids=sub_ids,
        action=action,
        split_idx=split_idx,
        substation_unsplit_prob=substation_unsplit_prob,
    )

    # Order by sub_ids
    indices = jnp.argsort(sub_ids)
    sub_ids = sub_ids[indices]
    action = action[indices]

    return sub_ids, action, randkeys[6]

mutate_nodal_injections #

mutate_nodal_injections(
    random_key,
    nodal_inj_info,
    pst_mutation_sigma,
    pst_n_taps,
)

Mutate the nodal injection optimization results, currently only the PST taps.

PARAMETER DESCRIPTION
random_key

The random key to use for the mutation

TYPE: PRNGKey

nodal_inj_info

The nodal injection optimization results before mutation. If None, no mutation is performed and None is returned.

TYPE: Optional[NodalInjOptimResults]

pst_mutation_sigma

The sigma to use for the normal distribution to sample the mutation from. The mutation will be sampled as an integer from a normal distribution with mean 0 and sigma pst_mutation_sigma.

TYPE: float

pst_n_taps

The number of taps for each PST. If a PST has N taps in this array, then it is assumed that all taps from 0 to N-1 are valid tap positions. Output taps will be clipped to this range.

TYPE: Int[Array, ' num_psts']

RETURNS DESCRIPTION
Optional[NodalInjOptimResults]

The mutated nodal injection optimization results. If nodal_inj_info was None, returns None.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def mutate_nodal_injections(
    random_key: jax.random.PRNGKey,
    nodal_inj_info: Optional[NodalInjOptimResults],
    pst_mutation_sigma: float,
    pst_n_taps: Int[Array, " num_psts"],
) -> Optional[NodalInjOptimResults]:
    """Mutate the nodal injection optimization results, currently only the PST taps.

    Parameters
    ----------
    random_key : jax.random.PRNGKey
        The random key to use for the mutation
    nodal_inj_info : Optional[NodalInjOptimResults]
        The nodal injection optimization results before mutation. If None, no mutation is performed and None is returned.
    pst_mutation_sigma : float
        The sigma to use for the normal distribution to sample the mutation from. The mutation will be sampled as an
        integer from a normal distribution with mean 0 and sigma pst_mutation_sigma.
    pst_n_taps : Int[Array, " num_psts"]
        The number of taps for each PST. If a PST has N taps in this array, then it is assumed that all taps from
        0 to N-1 are valid tap positions. Output taps will be clipped to this range.

    Returns
    -------
    Optional[NodalInjOptimResults]
        The mutated nodal injection optimization results. If nodal_inj_info was None, returns None.
    """
    if nodal_inj_info is None:
        return None

    if pst_mutation_sigma <= 0:
        return nodal_inj_info

    batch_size = nodal_inj_info.pst_tap_idx.shape[0]
    n_timesteps = nodal_inj_info.pst_tap_idx.shape[1]
    random_key = jax.random.split(random_key, (batch_size, n_timesteps))

    # vmap to mutate the PST taps for each timestep + batch independently
    new_pst_taps = jax.vmap(jax.vmap(partial(mutate_psts, pst_n_taps=pst_n_taps, pst_mutation_sigma=pst_mutation_sigma)))(
        random_key=random_key,
        pst_taps=nodal_inj_info.pst_tap_idx.astype(int),
    )

    return NodalInjOptimResults(pst_tap_idx=new_pst_taps)

mutate_psts #

mutate_psts(
    random_key, pst_taps, pst_n_taps, pst_mutation_sigma
)

Mutate the PST taps of a single topology.

PARAMETER DESCRIPTION
random_key

The random key to use for the mutation

TYPE: PRNGKey

pst_taps

The PST tap positions before mutation

TYPE: Int[Array, ' num_psts']

pst_n_taps

The number of taps for each PST. If a PST has N taps in this array, then it is assumed that all taps from 0 to N-1 are valid tap positions. Output taps will be clipped to this range.

TYPE: Int[Array, ' num_psts']

pst_mutation_sigma

The sigma to use for the normal distribution to sample the mutation from. The mutation will be sampled as an integer from a normal distribution with mean 0 and sigma pst_mutation_sigma.

TYPE: float

RETURNS DESCRIPTION
Int[Array, ' num_psts']

The mutated PST tap positions, clipped to the valid range of taps for each PST.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def mutate_psts(
    random_key: jax.random.PRNGKey,
    pst_taps: Int[Array, " num_psts"],
    pst_n_taps: Int[Array, " num_psts"],
    pst_mutation_sigma: float,
) -> Int[Array, " num_psts"]:
    """Mutate the PST taps of a single topology.

    Parameters
    ----------
    random_key : jax.random.PRNGKey
        The random key to use for the mutation
    pst_taps : Int[Array, " num_psts"]
        The PST tap positions before mutation
    pst_n_taps : Int[Array, " num_psts"]
        The number of taps for each PST. If a PST has N taps in this array, then it is assumed that all taps from
        0 to N-1 are valid tap positions. Output taps will be clipped to this range.
    pst_mutation_sigma : float
        The sigma to use for the normal distribution to sample the mutation from. The mutation will be sampled as an
        integer from a normal distribution with mean 0 and sigma pst_mutation_sigma.

    Returns
    -------
    Int[Array, " num_psts"]
        The mutated PST tap positions, clipped to the valid range of taps for each PST.
    """
    mutation = jax.random.normal(random_key, shape=pst_taps.shape) * pst_mutation_sigma
    mutation = jnp.round(mutation).astype(int)
    new_pst_taps = pst_taps + mutation
    new_pst_taps = jnp.clip(new_pst_taps, a_min=0, a_max=pst_n_taps - 1)
    return new_pst_taps

sample_unique_from_array #

sample_unique_from_array(
    random_key, sample_pool, sample_probs, n_samples
)

Sample n unique elements from an array (returning indices), counting int_max as always unique.

PARAMETER DESCRIPTION
random_key

The random key to use for the sampling

TYPE: PRNGKey

sample_pool

The array to sample from. Only unique elements are sampled, however int_max entries are not checked for uniqueness

TYPE: Int[Array, ' n']

sample_probs

The probabilities to sample each element

TYPE: Float[Array, ' n']

n_samples

The number of samples to take, should be less than the number of non-int_max entries in array.

TYPE: int

RETURNS DESCRIPTION
Int[Array, ' n_samples']

The sampled indices into sample_pool

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def sample_unique_from_array(
    random_key: jax.random.PRNGKey,
    sample_pool: Int[Array, " n"],
    sample_probs: Float[Array, " n"],
    n_samples: int,
) -> Int[Array, " n_samples"]:
    """Sample n unique elements from an array (returning indices), counting int_max as always unique.

    Parameters
    ----------
    random_key : jax.random.PRNGKey
        The random key to use for the sampling
    sample_pool : Int[Array, " n"]
        The array to sample from. Only unique elements are sampled, however int_max entries are
        not checked for uniqueness
    sample_probs : Float[Array, " n"]
        The probabilities to sample each element
    n_samples : int
        The number of samples to take, should be less than the number of non-int_max entries in
        array.

    Returns
    -------
    Int[Array, " n_samples"]
        The sampled indices into sample_pool
    """
    subkeys = jax.random.split(random_key, n_samples)

    def _body_fn(
        i: Int,
        entries_sampled: tuple[Int[Array, " max_num_splits"], Bool[Array, " n_subs_rel"]],
    ) -> tuple[Int[Array, " max_num_splits"], Bool[Array, " n_subs_rel"]]:
        indices_sampled, choice_mask = entries_sampled

        probs = sample_probs * choice_mask
        probs = probs / jnp.sum(probs)

        current_index = jax.random.choice(subkeys[i], jnp.arange(sample_pool.shape[0]), shape=(1,), p=probs)[0]

        # Update the choice mask
        # If a substation has been sampled, we can't sample this substation again
        # If a no-split (int_max) has been sampled, we only mask out the sampled index
        choice_mask = jnp.where(
            sample_pool[current_index] == int_max(),
            choice_mask.at[current_index].set(False, mode="promise_in_bounds"),
            jnp.where(
                sample_pool == sample_pool[current_index],
                False,
                choice_mask,
            ),
        )

        # Update the indices sampled
        indices_sampled = indices_sampled.at[i].set(current_index, mode="promise_in_bounds")

        return (indices_sampled, choice_mask)

    indices_sampled, _choice_mask = jax.lax.fori_loop(
        lower=0,
        upper=n_samples,
        body_fun=_body_fn,
        init_val=(
            jnp.full((n_samples,), int_max(), dtype=int),
            jnp.ones(sample_pool.shape, dtype=bool),
        ),
        unroll=True,
    )

    return indices_sampled

crossover_unbatched #

crossover_unbatched(
    topologies_a,
    topologies_b,
    random_key,
    action_set,
    prob_take_a,
)

Crossover two topologies while making sure that no substation is present twice.

This version is unbatched, i.e. it only works on a single topology. Use crossover for batched inputs.

PARAMETER DESCRIPTION
topologies_a

The first topology

TYPE: Genotype

topologies_b

The second topology

TYPE: Genotype

random_key

The random key to use for the crossover

TYPE: PRNGKey

action_set

The branch action set containing available actions on a per-substation basis. This is needed to resolve the sub_ids for the branch actions

TYPE: ActionSet

prob_take_a

The probability to take the value from topology_a, otherwise the value from topology_b is taken

TYPE: float

RETURNS DESCRIPTION
Genotype

The new topology

PRNGKey

The new random key

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def crossover_unbatched(
    topologies_a: Genotype,
    topologies_b: Genotype,
    random_key: jax.random.PRNGKey,
    action_set: ActionSet,
    prob_take_a: float,
) -> tuple[Genotype, jax.random.PRNGKey]:
    """Crossover two topologies while making sure that no substation is present twice.

    This version is unbatched, i.e. it only works on a single topology. Use crossover for batched
    inputs.

    Parameters
    ----------
    topologies_a : Genotype
        The first topology
    topologies_b : Genotype
        The second topology
    random_key : jax.random.PRNGKey
        The random key to use for the crossover
    action_set : ActionSet
        The branch action set containing available actions on a per-substation basis.
        This is needed to resolve the sub_ids for the branch actions
    prob_take_a : float
        The probability to take the value from topology_a, otherwise the value from topology_b is
        taken

    Returns
    -------
    Genotype
        The new topology
    jax.random.PRNGKey
        The new random key
    """
    # The tricky part in the crossover is that both topologies could have the same sub-id on
    # different indices. We need to make sure that we don't end up with the same substation twice
    # in the new topology

    max_num_splits = topologies_a.action_index.shape[0]
    max_num_disconnections = topologies_a.disconnections.shape[0]
    subkeys = jax.random.split(random_key, 3)

    # The probability to take the value from a is prob_take_a
    # The probability to take the value from b is 1 - prob_take_a
    base_sample_probs = jnp.ones(2 * max_num_splits)
    base_sample_probs = base_sample_probs.at[:max_num_splits].set(prob_take_a)
    base_sample_probs = base_sample_probs.at[max_num_splits:].set(1 - prob_take_a)

    topologies_concatenated: Genotype = jax.tree_util.tree_map(
        lambda x, y: jnp.concatenate([x, y], axis=0),
        topologies_a,
        topologies_b,
    )
    sub_ids_concatenated = extract_sub_ids(topologies_concatenated.action_index, action_set)

    indices_sampled = sample_unique_from_array(
        random_key=subkeys[0],
        sample_pool=sub_ids_concatenated,
        sample_probs=base_sample_probs,
        n_samples=max_num_splits,
    )

    actions = topologies_concatenated.action_index.at[indices_sampled].get(mode="fill", fill_value=int_max())

    # Sample disconnection choices
    if max_num_disconnections != 0:
        base_sample_probs = jnp.ones(2 * max_num_disconnections)
        base_sample_probs = base_sample_probs.at[:max_num_disconnections].set(prob_take_a)
        base_sample_probs = base_sample_probs.at[max_num_disconnections:].set(1 - prob_take_a)

        disconnections_concatenated: Int[Array, " 2*max_num_disconnections"] = topologies_concatenated.disconnections
        indices_sampled = sample_unique_from_array(
            random_key=subkeys[1],
            sample_pool=disconnections_concatenated,
            sample_probs=base_sample_probs,
            n_samples=max_num_disconnections,
        )

        disconnections = disconnections_concatenated.at[indices_sampled].get(mode="fill", fill_value=int_max())
    else:
        disconnections = jnp.array([], dtype=int)

    return Genotype(
        action_index=actions,
        disconnections=disconnections,
        nodal_injections_optimized=topologies_a.nodal_injections_optimized,
    ), subkeys[-1]

crossover #

crossover(
    topologies_a,
    topologies_b,
    random_key,
    action_set,
    prob_take_a,
)

Crossover two topologies while making sure that no substation is present twice.

PARAMETER DESCRIPTION
topologies_a

The first topology

TYPE: Genotype

topologies_b

The second topology

TYPE: Genotype

random_key

The random key to use for the crossover

TYPE: PRNGKey

action_set

The branch action set containing available actions on a per-substation basis.

TYPE: ActionSet

prob_take_a

The probability to take the value from topology_a, otherwise the value from topology_b is taken

TYPE: float

RETURNS DESCRIPTION
Genotype

The new topology

PRNGKey

The new random key

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def crossover(
    topologies_a: Genotype,
    topologies_b: Genotype,
    random_key: jax.random.PRNGKey,
    action_set: ActionSet,
    prob_take_a: float,
) -> tuple[Genotype, jax.random.PRNGKey]:
    """Crossover two topologies while making sure that no substation is present twice.

    Parameters
    ----------
    topologies_a : Genotype
        The first topology
    topologies_b : Genotype
        The second topology
    random_key : jax.random.PRNGKey
        The random key to use for the crossover
    action_set : ActionSet
        The branch action set containing available actions on a per-substation basis.
    prob_take_a : float
        The probability to take the value from topology_a, otherwise the value from topology_b is
        taken

    Returns
    -------
    Genotype
        The new topology
    jax.random.PRNGKey
        The new random key
    """
    batch_size = topologies_a.action_index.shape[0]
    random_key = jax.random.split(random_key, batch_size)
    topo, random_keys = jax.vmap(crossover_unbatched, in_axes=(0, 0, 0, None, None))(
        topologies_a, topologies_b, random_key, action_set, prob_take_a
    )
    return topo, random_keys[0]

unsplit_substation #

unsplit_substation(
    random_key,
    sub_ids,
    action,
    split_idx,
    substation_unsplit_prob,
)

Reset a split substation to the unsplit state, with a certain probability.

PARAMETER DESCRIPTION
random_key

The random key to use for the reset

TYPE: PRNGKey

sub_ids

The substation ids before the reset

TYPE: Int[Array, ' max_num_splits']

action

The branch/inj topology action before the reset

TYPE: Int[Array, ' max_num_splits']

split_idx

The index of the substation to reset

TYPE: Int[Array, ' ']

substation_unsplit_prob

The probability to reset the substation to the unsplit state

TYPE: float

RETURNS DESCRIPTION
Int[Array, ' max_num_splits']

The substation ids after the reset. If a reset occurred, the substation id at split_idx will be set to int_max

Int[Array, ' max_num_splits']

The topology action after the reset. If a reset occurred, the topology at split_idx will be set to int_max

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/evolution_functions.py
def unsplit_substation(
    random_key: jax.random.PRNGKey,
    sub_ids: Int[Array, " max_num_splits"],
    action: Int[Array, " max_num_splits"],
    split_idx: Int[Array, " "],
    substation_unsplit_prob: float,
) -> tuple[
    Int[Array, " max_num_splits"],
    Int[Array, " max_num_splits"],
]:
    """Reset a split substation to the unsplit state, with a certain probability.

    Parameters
    ----------
    random_key : jax.random.PRNGKey
        The random key to use for the reset
    sub_ids : Int[Array, " max_num_splits"]
        The substation ids before the reset
    action : Int[Array, " max_num_splits"]
        The branch/inj topology action before the reset
    split_idx : Int[Array, " "]
        The index of the substation to reset
    substation_unsplit_prob : float
        The probability to reset the substation to the unsplit state

    Returns
    -------
    Int[Array, " max_num_splits"]
        The substation ids after the reset. If a reset occurred, the substation id at split_idx
        will be set to int_max
    Int[Array, " max_num_splits"]
        The topology action after the reset. If a reset occurred, the topology at split_idx
        will be set to int_max
    """
    # There is a certain probability that we reset the substation to the unsplit state
    reset_mask = jax.random.bernoulli(random_key, p=substation_unsplit_prob, shape=(1,))[0]

    sub_ids = sub_ids.at[split_idx].set(
        jnp.where(reset_mask, int_max(), sub_ids[split_idx]),
        mode="drop",
    )
    action = action.at[split_idx].set(
        jnp.where(reset_mask, int_max(), action[split_idx]),
        mode="drop",
    )

    return sub_ids, action

toop_engine_topology_optimizer.dc.genetic_functions.initialization #

Initialization of the genetic algorithm for branch and injection choice optimization.

logger module-attribute #

logger = Logger(__name__)

JaxOptimizerData #

The part of the optimizer data that lives on GPU.

If distributed is enabled, every item will have a leading device dimension.

repertoire instance-attribute #

repertoire

The repertoire object

emitter_state instance-attribute #

emitter_state

The emitter state object

dynamic_informations instance-attribute #

dynamic_informations

The list containing the dynamic information objects

random_key instance-attribute #

random_key

The random key

latest_iteration instance-attribute #

latest_iteration

The iteration that this emitter_state/repertoire belong to

update_max_mw_flows_according_to_double_limits #

update_max_mw_flows_according_to_double_limits(
    dynamic_informations,
    solver_configs,
    lower_limit,
    upper_limit,
)

Update all dynamic informations max mw loads.

Runs an initial n-1 analysis to determine limits in mw.

PARAMETER DESCRIPTION
dynamic_informations

List of static informations to calculate with max_mw_flow limits set at 1.0

TYPE: tuple[DynamicInformation, ...]

solver_configs

List of solver configurations to use for the loadflow

TYPE: tuple[SolverConfig, ...]

lower_limit

The relative lower limit to set, for branches whose n-1 flows are below the lower limit

TYPE: float

upper_limit

The relative upper_limit determining at what relative load a branch is considered overloaded. Branches in the band between lower and upper limit are considered overloaded if more load is added.

TYPE: float

RETURNS DESCRIPTION
tuple[DynamicInformation, ...]

The updated dynamic informations with new limits set.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/initialization.py
def update_max_mw_flows_according_to_double_limits(
    dynamic_informations: tuple[DynamicInformation, ...],
    solver_configs: tuple[SolverConfig, ...],
    lower_limit: float,
    upper_limit: float,
) -> tuple[DynamicInformation, ...]:
    """Update all dynamic informations max mw loads.

    Runs an initial n-1 analysis to determine limits in mw.

    Parameters
    ----------
    dynamic_informations: tuple[DynamicInformation, ...]
        List of static informations to calculate with max_mw_flow limits set at 1.0
    solver_configs: tuple[SolverConfig, ...]
        List of solver configurations to use for the loadflow
    lower_limit: float
        The relative lower limit to set, for branches whose n-1 flows are below the lower limit
    upper_limit: float
        The relative upper_limit determining at what relative load a branch is considered overloaded.
        Branches in the band between lower and upper limit are considered overloaded if more load is added.

    Returns
    -------
    tuple[DynamicInformation, ...]
        The updated dynamic informations with new limits set.

    """
    if lower_limit > upper_limit:
        raise ValueError(f"Lower limit {lower_limit} must be smaller than upper limit {upper_limit}")

    updated_dynamic_informations = []
    for dynamic_information, solver_config in zip(dynamic_informations, solver_configs, strict=True):
        solver_config_local = replace(solver_config, batch_size_bsdf=1)
        lf_res, success = compute_symmetric_batch(
            topology_batch=default_topology(solver_config_local),
            disconnection_batch=None,
            injections=None,
            nodal_inj_start_options=None,
            dynamic_information=dynamic_information,
            solver_config=solver_config_local,
        )
        assert jnp.all(success)
        # We will always have N-1 limits, so we compute the N-0 limits on the N-0 loadflow results
        # However, the N-0 results lack a dimension, so we need to add a virtual "failure" dim
        limited_max_mw_flow = compute_double_limits(
            lf_res.n_0_matrix[0, :, None, :],
            dynamic_information.branch_limits.max_mw_flow,
            lower_limit=lower_limit,
            upper_limit=upper_limit,
        )

        limited_max_mw_flow_n_1 = compute_double_limits(
            lf_res.n_1_matrix[0],
            dynamic_information.branch_limits.max_mw_flow_n_1
            if dynamic_information.branch_limits.max_mw_flow_n_1 is not None
            else dynamic_information.branch_limits.max_mw_flow,
            lower_limit=lower_limit,
            upper_limit=upper_limit,
        )

        updated_dynamic_informations.append(
            replace(
                dynamic_information,
                branch_limits=replace(
                    dynamic_information.branch_limits,
                    max_mw_flow_limited=limited_max_mw_flow,
                    max_mw_flow_n_1_limited=limited_max_mw_flow_n_1,
                ),
            )
        )

    return tuple(updated_dynamic_informations)

verify_static_information #

verify_static_information(
    static_informations, max_num_disconnections
)

Verify the static information.

This function will be called after loading the static information. It should be used to verify that the static information is correct and can be used for the optimization run.

PARAMETER DESCRIPTION
static_informations

The static information to verify

TYPE: Iterable[StaticInformation]

max_num_disconnections

The maximum number of disconnections that can be made

TYPE: int

RAISES DESCRIPTION
AssertionError

If the static information is not correct

RETURNS DESCRIPTION
None
Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/initialization.py
def verify_static_information(
    static_informations: Iterable[StaticInformation],
    max_num_disconnections: int,
) -> None:
    """Verify the static information.

    This function will be called after loading the static information. It should be used to verify
    that the static information is correct and can be used for the optimization run.

    Parameters
    ----------
    static_informations : Iterable[StaticInformation]
        The static information to verify
    max_num_disconnections : int
        The maximum number of disconnections that can be made

    Raises
    ------
    AssertionError
        If the static information is not correct

    Returns
    -------
    None
    """
    first_static_information = next(iter(static_informations))

    assert all(
        [
            jnp.array_equal(
                static_information.solver_config.branches_per_sub.val,
                first_static_information.solver_config.branches_per_sub.val,
            )
            for static_information in static_informations
        ]
    )
    first_set = first_static_information.dynamic_information.action_set
    if first_set is not None:
        assert all(
            static_information.dynamic_information.action_set is not None for static_information in static_informations
        )
        assert all(
            [(static_information.dynamic_information.action_set == first_set) for static_information in static_informations]
        ), "All static informations must have the same branch actions"
        assert all(
            [
                jnp.array_equal(
                    static_information.dynamic_information.action_set.n_actions_per_sub,
                    first_set.n_actions_per_sub,
                )
                for static_information in static_informations
            ]
        ), "All static informations must have the same number of branch actions"

    assert first_static_information.dynamic_information.disconnectable_branches.shape[0] >= max_num_disconnections, (
        "Not enough disconnectable branches for the maximum number of disconnections, "
        + f"got {first_static_information.dynamic_information.disconnectable_branches.shape[0]} and {max_num_disconnections}"
    )
    if first_static_information.dynamic_information.disconnectable_branches.shape[0] > 0:
        assert all(
            [
                jnp.array_equal(
                    first_static_information.dynamic_information.disconnectable_branches,
                    static_information.dynamic_information.disconnectable_branches,
                )
                for static_information in static_informations
            ]
        ), "All static informations must have the same disconnectable branches"

update_static_information #

update_static_information(static_informations, batch_size)

Perform any necessary preprocessing on the static information.

This harmonizes the static informations and makes sure some information that is optional in the solver is always there.

PARAMETER DESCRIPTION
static_informations

The list of static informations to preprocess

TYPE: list[StaticInformation]

batch_size

The batch size to use, will replace the batch size in the solver config

TYPE: int

RETURNS DESCRIPTION
list[StaticInformation]

The updated static informations

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/initialization.py
def update_static_information(
    static_informations: tuple[StaticInformation, ...],
    batch_size: int,
) -> tuple[StaticInformation, ...]:
    """Perform any necessary preprocessing on the static information.

    This harmonizes the static informations and makes sure some information that is optional in the solver is always there.

    Parameters
    ----------
    static_informations : list[StaticInformation]
        The list of static informations to preprocess
    batch_size : int
        The batch size to use, will replace the batch size in the solver config

    Returns
    -------
    list[StaticInformation]
        The updated static informations
    """
    dynamic_informations = [static_information.dynamic_information for static_information in static_informations]
    # Furthermore, we want to make sure that we always have limited max mw flows to be able to
    # compute those metrics
    dynamic_informations = [
        replace(
            dynamic_information,
            branch_limits=replace(
                dynamic_information.branch_limits,
                max_mw_flow_limited=dynamic_information.branch_limits.max_mw_flow
                if dynamic_information.branch_limits.max_mw_flow_limited is None
                else dynamic_information.branch_limits.max_mw_flow_limited,
                n0_n1_max_diff=jnp.zeros_like(dynamic_information.branch_limits.max_mw_flow)
                if dynamic_information.branch_limits.n0_n1_max_diff is None
                else dynamic_information.branch_limits.n0_n1_max_diff,
            ),
        )
        for dynamic_information in dynamic_informations
    ]

    # Make sure all the solver configs have the correct batch size
    solver_configs = [static_information.solver_config for static_information in static_informations]
    solver_configs = [
        replace(solver_config, batch_size_bsdf=batch_size, batch_size_injection=batch_size)
        for solver_config in solver_configs
    ]

    static_informations = [
        replace(
            static_information,
            solver_config=solver_config,
            dynamic_information=dynamic_information,
        )
        for static_information, solver_config, dynamic_information in zip(
            static_informations, solver_configs, dynamic_informations, strict=True
        )
    ]

    return static_informations

initialize_genetic_algorithm #

initialize_genetic_algorithm(
    batch_size,
    max_num_splits,
    max_num_disconnections,
    static_informations,
    target_metrics,
    substation_split_prob,
    substation_unsplit_prob,
    action_set,
    n_subs_mutated_lambda,
    disconnect_prob,
    reconnect_prob,
    pst_mutation_sigma,
    proportion_crossover,
    crossover_mutation_ratio,
    random_seed,
    observed_metrics,
    me_descriptors,
    distributed,
    devices=None,
    cell_depth=1,
    mutation_repetition=1,
    n_worst_contingencies=10,
)

Initialize the mapelites algorithm.

PARAMETER DESCRIPTION
batch_size

The batch size to use

TYPE: int

max_num_splits

The maximum number of substations that can be split

TYPE: int

max_num_disconnections

The maximum number of disconnections that can be made

TYPE: int

static_informations

The static information to use for the optimization run

TYPE: list[StaticInformation]

target_metrics

The target metrics to use for the optimization run

TYPE: tuple[tuple[MetricType, float], ...]

substation_split_prob

The probability to split a substation

TYPE: float

substation_unsplit_prob

The probability to reset a split substation to the unsplit state

TYPE: float

action_set

The action set to use for mutations

TYPE: ActionSet

n_subs_mutated_lambda

The lambda parameter for the Poisson distribution to determine the number of substations to mutate

TYPE: float

disconnect_prob

The probability to disconnect a new branch

TYPE: float

reconnect_prob

The probability to reconnect a disconnected branch, will overwrite a possible disconnect

TYPE: float

pst_mutation_sigma

The sigma to use for the normal distribution to sample the PST tap mutation from.

TYPE: float

proportion_crossover

The proportion of crossover to mutation

TYPE: float

crossover_mutation_ratio

The ratio of crossover to mutation

TYPE: float

random_seed

The random seed to use for reproducibility

TYPE: int

observed_metrics

The observed metrics, i.e. which metrics are to be computed for logging purposes.

TYPE: tuple[MetricType, ...]

me_descriptors

The descriptors to use for map elites

TYPE: tuple[DescriptorDef, ...]

distributed

Whether to run the optimization on multiple devices

TYPE: bool

devices

The devices to run the optimization on, if distributed

TYPE: Optional[list[Device]] DEFAULT: None

cell_depth

The cell depth to use if applicable

TYPE: int DEFAULT: 1

mutation_repetition

More chance to get unique mutations by mutating mutation_repetition copies of the repertoire

TYPE: int DEFAULT: 1

n_worst_contingencies

The number of worst contingencies to consider in the scoring function for calculating top_k_overloads_n_1.

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
DiscreteMapElites

The genetic algorithm object including scoring, mutate and crossover functions

JaxOptimizerData

The initialized jax dataclass

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/initialization.py
def initialize_genetic_algorithm(
    batch_size: int,
    max_num_splits: int,
    max_num_disconnections: int,
    static_informations: tuple[StaticInformation, ...],
    target_metrics: tuple[tuple[MetricType, float], ...],
    substation_split_prob: float,
    substation_unsplit_prob: float,
    action_set: ActionSet,
    n_subs_mutated_lambda: float,
    disconnect_prob: float,
    reconnect_prob: float,
    pst_mutation_sigma: float,
    proportion_crossover: float,
    crossover_mutation_ratio: float,
    random_seed: int,
    observed_metrics: tuple[MetricType, ...],
    me_descriptors: tuple[DescriptorDef, ...],
    distributed: bool,
    devices: Optional[list[jax.Device]] = None,
    cell_depth: int = 1,
    mutation_repetition: int = 1,
    n_worst_contingencies: int = 10,
) -> tuple[DiscreteMapElites, JaxOptimizerData]:
    """Initialize the mapelites algorithm.

    Parameters
    ----------
    batch_size : int
        The batch size to use
    max_num_splits : int
        The maximum number of substations that can be split
    max_num_disconnections : int
        The maximum number of disconnections that can be made
    static_informations : list[StaticInformation]
        The static information to use for the optimization run
    target_metrics : tuple[tuple[MetricType, float], ...]
        The target metrics to use for the optimization run
    substation_split_prob : float
        The probability to split a substation
    substation_unsplit_prob : float
        The probability to reset a split substation to the unsplit state
    action_set : ActionSet
        The action set to use for mutations
    n_subs_mutated_lambda : float
        The lambda parameter for the Poisson distribution to determine the number of substations to mutate
    disconnect_prob : float
        The probability to disconnect a new branch
    reconnect_prob : float
        The probability to reconnect a disconnected branch, will overwrite a possible disconnect
    pst_mutation_sigma : float
        The sigma to use for the normal distribution to sample the PST tap mutation from.
    proportion_crossover : float
        The proportion of crossover to mutation
    crossover_mutation_ratio : float
        The ratio of crossover to mutation
    random_seed: int
        The random seed to use for reproducibility
    observed_metrics: tuple[MetricType, ...]
        The observed metrics, i.e. which metrics are to be computed for logging purposes.
    me_descriptors: tuple[Descriptor, ...]
        The descriptors to use for map elites
    distributed: bool
        Whether to run the optimization on multiple devices
    devices: Optional[list[jax.Device]]
        The devices to run the optimization on, if distributed
    cell_depth: int
        The cell depth to use if applicable
    mutation_repetition: int
        More chance to get unique mutations by mutating mutation_repetition copies of the repertoire
    n_worst_contingencies: int
        The number of worst contingencies to consider in the scoring function for calculating
        top_k_overloads_n_1.

    Returns
    -------
    DiscreteMapElites
        The genetic algorithm object including scoring, mutate and crossover functions
    JaxOptimizerData
        The initialized jax dataclass
    """
    n_devices = len(jax.devices()) if distributed else 1

    dynamic_informations = tuple([static_information.dynamic_information for static_information in static_informations])
    solver_configs = tuple(
        [replace(static_information.solver_config, batch_size_bsdf=batch_size) for static_information in static_informations]
    )

    initial_topologies = empty_repertoire(
        batch_size=batch_size * n_devices,
        max_num_splits=max_num_splits,
        max_num_disconnections=max_num_disconnections,
        n_timesteps=dynamic_informations[0].n_timesteps,
        starting_taps=dynamic_informations[0].nodal_injection_information.starting_tap_idx
        if dynamic_informations[0].nodal_injection_information is not None
        else None,
    )

    scoring_function_partial = partial(
        scoring_function,
        solver_configs=solver_configs,
        target_metrics=target_metrics,
        observed_metrics=observed_metrics,
        descriptor_metrics=tuple([desc.metric for desc in me_descriptors]),
        n_worst_contingencies=n_worst_contingencies,
    )

    mutate_partial = partial(
        mutate,
        substation_split_prob=substation_split_prob,
        substation_unsplit_prob=substation_unsplit_prob,
        action_set=action_set,
        n_disconnectable_branches=len(dynamic_informations[0].disconnectable_branches),
        n_subs_mutated_lambda=n_subs_mutated_lambda,
        disconnect_prob=disconnect_prob,
        reconnect_prob=reconnect_prob,
        pst_mutation_sigma=pst_mutation_sigma,
        pst_n_taps=dynamic_informations[0].nodal_injection_information.pst_n_taps
        if dynamic_informations[0].nodal_injection_information is not None
        else None,
        mutation_repetition=mutation_repetition,
    )
    crossover_partial = partial(crossover, action_set=action_set, prob_take_a=proportion_crossover)

    emitter = TrackingMixingEmitter(
        mutate_partial,
        crossover_partial,
        crossover_mutation_ratio,
        batch_size,
    )
    algo = DiscreteMapElites(
        scoring_function=scoring_function_partial,
        emitter=emitter,
        metrics_function=default_ga_metrics,  # TODO: Why do we set this to default and not observed?
        distributed=distributed,
        n_cells_per_dim=tuple([desc.num_cells for desc in me_descriptors]),
        cell_depth=cell_depth,
    )

    random_key = jax.random.PRNGKey(random_seed)
    latest_iteration = jnp.array(1, dtype=int)

    init_fn = algo.init
    # If we are running on multiple devices, we need to replicate the data so it lives on every
    # device. The only exception is the random key, where we want a different one on every device
    if distributed:
        initial_topologies = jax.tree_util.tree_map(
            lambda x: jnp.reshape(
                x,
                (
                    n_devices,
                    batch_size,
                )
                + x.shape[1:],
            ),
            initial_topologies,
        )
        random_key = jax.random.split(random_key, n_devices)
        dynamic_informations = jax.tree_util.tree_map(
            lambda x: jax.device_put_replicated(x, devices),
            dynamic_informations,
        )
        latest_iteration = jax.device_put_replicated(latest_iteration, devices)

        init_fn = jax.pmap(
            init_fn,
            axis_name="p",
            in_axes=(
                jax.tree_util.tree_map(lambda _x: 0, initial_topologies),
                0,
                jax.tree_util.tree_map(
                    lambda _x: 0,
                    dynamic_informations,
                ),
            ),
        )

    repertoire, emitter_state, random_key = init_fn(initial_topologies, random_key, dynamic_informations)

    jax_data = JaxOptimizerData(
        repertoire=repertoire,
        emitter_state=emitter_state,
        dynamic_informations=dynamic_informations,
        random_key=random_key,
        latest_iteration=latest_iteration,
    )
    return algo, jax_data

flatten_fitnesses_if_distributed #

flatten_fitnesses_if_distributed(fitnesses)

Flatten the fitnesses if distributed.

PARAMETER DESCRIPTION
fitnesses

The fitnesses to flatten

TYPE: Float[Array, ' ... individuals metrics']

RETURNS DESCRIPTION
Float[Array, ' individuals metrics']

The flattened fitnesses

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/initialization.py
def flatten_fitnesses_if_distributed(
    fitnesses: Float[Array, " ... individuals metrics"],
) -> Float[Array, " individuals metrics"]:
    """Flatten the fitnesses if distributed.

    Parameters
    ----------
    fitnesses : Float[Array, " ... individuals metrics"]
        The fitnesses to flatten

    Returns
    -------
    Float[Array, " individuals metrics"]
        The flattened fitnesses
    """
    if len(fitnesses.shape) == 3:
        fitnesses = jnp.reshape(
            fitnesses,
            (
                fitnesses.shape[0] * fitnesses.shape[1],
                fitnesses.shape[2],
            ),
        )
    assert len(fitnesses.shape) == 2
    return fitnesses

get_repertoire_metrics #

get_repertoire_metrics(repertoire, observed_metrics)

Get the metrics of the best individual in the Mapelites repertoire.

PARAMETER DESCRIPTION
repertoire

The repertoire

TYPE: DiscreteMapElitesRepertoire

observed_metrics

The metrics to observe (max_flow_n_0, median_flow_n_0 ...)

TYPE: tuple[MetricType, ...]

RETURNS DESCRIPTION
float

The fitness

dict[MetricType, float]

The metrics as defined in METRICS

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/initialization.py
def get_repertoire_metrics(
    repertoire: DiscreteMapElitesRepertoire, observed_metrics: tuple[MetricType, ...]
) -> tuple[float, dict[MetricType, float], Float[Array, " ... n_cells_per_dim"]]:
    """Get the metrics of the best individual in the Mapelites repertoire.

    Parameters
    ----------
    repertoire : DiscreteMapElitesRepertoire
        The repertoire

    observed_metrics : tuple[MetricType, ...]
        The metrics to observe (max_flow_n_0, median_flow_n_0 ...)

    Returns
    -------
    float
        The fitness
    dict[MetricType, float]
        The metrics as defined in METRICS
    """
    distributed = len(repertoire.fitnesses.shape) > 1
    repertoire = jax.tree_util.tree_map(lambda x: x[0], repertoire) if distributed else repertoire

    fitnesses = repertoire.fitnesses
    # Get best individual and its metrics
    best_idx = jnp.argsort(fitnesses, descending=True)
    metrics = jax.tree_util.tree_map(lambda x: x[best_idx], repertoire.extra_scores)
    # only keep metrics in observed_metrics
    metrics = {key: metrics[key] for key in observed_metrics}
    fitnesses = fitnesses[best_idx]

    best_individual_fitness = fitnesses[0].item()
    # best_individual_metrics corresponds to the first element of each observed metric
    best_individual_metrics = {key: value[0].item() for key, value in metrics.items()}

    return best_individual_fitness, best_individual_metrics  # , descriptors[0]

algo_setup #

algo_setup(
    ga_args,
    lf_args,
    double_limits,
    static_information_files,
    processed_gridfile_fs,
)

Set up the genetic algorithm run.

PARAMETER DESCRIPTION
ga_args

The genetic algorithm parameters

TYPE: GeneticAlgorithParameters

lf_args

The loadflow solver parameters

TYPE: LoadflowSolverParameters

double_limits

The lower and upper limit for the relative max mw flow if double limits are used

TYPE: Optional[tuple[float, float]]

static_information_files

A list of files with static information to load

TYPE: list[str]

processed_gridfile_fs

The target filesystem for the preprocessing worker. This contains all processed grid files. During the import job, a new folder import_results.data_folder was created which will be completed with the preprocess call to this function. Internally, only the data folder is passed around as a dirfs. Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the unprocessed gridfiles were already done.

TYPE: AbstractFileSystem

RETURNS DESCRIPTION
DiscreteMapElites

The initialized genetic algorithm object, can be used to update the optimization run

JaxOptimizerData

The jax dataclass of all GPU data including dynamic information and the GA data

tuple[SolverConfig, ...]

The solver configurations for every timestep (the dynamic information is part of the jax dataclass)

float

The initial fitness, for logging purposes

dict

The initial metrics, for logging purposes

list[StaticInformationDescription]

Some statistics on the static information dataclasses that were loaded

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/initialization.py
def algo_setup(
    ga_args: BatchedMEParameters,
    lf_args: LoadflowSolverParameters,
    double_limits: Optional[tuple[float, float]],
    static_information_files: list[str],
    processed_gridfile_fs: AbstractFileSystem,
) -> tuple[
    DiscreteMapElites,
    JaxOptimizerData,
    tuple[SolverConfig, ...],
    float,
    dict,
    list[StaticInformationStats],
]:
    """Set up the genetic algorithm run.

    Parameters
    ----------
    ga_args : GeneticAlgorithParameters
        The genetic algorithm parameters
    lf_args : LoadflowSolverParameters
        The loadflow solver parameters
    double_limits: Optional[tuple[float, float]]
        The lower and upper limit for the relative max mw flow if double limits are used
    static_information_files : list[str]
        A list of files with static information to load
    processed_gridfile_fs: AbstractFileSystem
        The target filesystem for the preprocessing worker. This contains all processed grid files.
        During the import job,  a new folder import_results.data_folder was created
        which will be completed with the preprocess call to this function.
        Internally, only the data folder is passed around as a dirfs.
        Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the
        unprocessed gridfiles were already done.

    Returns
    -------
    DiscreteMapElites
        The initialized genetic algorithm object, can be used to update the optimization run
    JaxOptimizerData
        The jax dataclass of all GPU data including dynamic information and the GA data
    tuple[SolverConfig, ...]
        The solver configurations for every timestep (the dynamic information is part of the jax
        dataclass)
    float
        The initial fitness, for logging purposes
    dict
        The initial metrics, for logging purposes
    list[StaticInformationDescription]
        Some statistics on the static information dataclasses that were loaded
    """
    n_devices = len(jax.devices()) if lf_args.distributed else 1

    static_informations = tuple(
        [load_static_information_fs(filesystem=processed_gridfile_fs, filename=str(f)) for f in static_information_files]
    )

    logger.info(f"Running {n_devices} GPUs with config {ga_args}, {lf_args}")

    verify_static_information(static_informations, lf_args.max_num_disconnections)

    static_informations = update_static_information(static_informations, lf_args.batch_size)

    if double_limits is not None:
        logger.info(f"Updating double limits to {double_limits}")
        dynamic_infos = update_max_mw_flows_according_to_double_limits(
            dynamic_informations=[s.dynamic_information for s in static_informations],
            solver_configs=[s.solver_config for s in static_informations],
            lower_limit=double_limits[0],
            upper_limit=double_limits[1],
        )
        static_informations = [
            replace(static_information, dynamic_information=dynamic_info)
            for static_information, dynamic_info in zip(static_informations, dynamic_infos, strict=True)
        ]

    algo, jax_data = initialize_genetic_algorithm(
        batch_size=lf_args.batch_size,
        max_num_splits=lf_args.max_num_splits,
        max_num_disconnections=lf_args.max_num_disconnections,
        static_informations=static_informations,
        target_metrics=ga_args.target_metrics,
        substation_split_prob=ga_args.substation_split_prob,
        substation_unsplit_prob=ga_args.substation_unsplit_prob,
        action_set=static_informations[0].dynamic_information.action_set,
        n_subs_mutated_lambda=ga_args.n_subs_mutated_lambda,
        disconnect_prob=ga_args.disconnect_prob,
        reconnect_prob=ga_args.reconnect_prob,
        pst_mutation_sigma=ga_args.pst_mutation_sigma,
        proportion_crossover=ga_args.proportion_crossover,
        crossover_mutation_ratio=ga_args.crossover_mutation_ratio,
        random_seed=ga_args.random_seed,
        observed_metrics=ga_args.observed_metrics,
        distributed=lf_args.distributed,
        devices=jax.devices() if lf_args.distributed else None,
        me_descriptors=ga_args.me_descriptors,
        mutation_repetition=ga_args.mutation_repetition,
        cell_depth=ga_args.cell_depth,
        n_worst_contingencies=ga_args.n_worst_contingencies,
    )

    initial_fitness, initial_metrics = get_repertoire_metrics(
        jax.tree_util.tree_map(lambda x: x[0], jax_data.repertoire) if lf_args.distributed else jax_data.repertoire,
        ga_args.observed_metrics,
    )

    static_information_descriptions = [
        extract_static_information_stats(
            static_information=static_information,
            overload_n0=initial_metrics.get("overload_energy_n_0", 0.0),
            overload_n1=initial_metrics.get("overload_energy_n_1", 0.0),
            time="",
        )
        for static_information in static_informations
    ]

    for desc in static_information_descriptions:
        logger.info(f"Starting optimization with static information: {desc}")

    return (
        algo,
        jax_data,
        [static_information.solver_config for static_information in static_informations],
        initial_fitness,
        initial_metrics,
        static_information_descriptions,
    )

toop_engine_topology_optimizer.dc.genetic_functions.scoring_functions #

Create scoring functions for the genetic algorithm.

METRICS module-attribute #

METRICS = {
    "max_flow_n_0": maximum,
    "median_flow_n_0": maximum,
    "overload_energy_n_0": add,
    "overload_energy_limited_n_0": add,
    "underload_energy_n_0": add,
    "transport_n_0": add,
    "exponential_overload_energy_n_0": add,
    "exponential_overload_energy_limited_n_0": add,
    "critical_branch_count_n_0": maximum,
    "critical_branch_count_limited_n_0": maximum,
    "max_flow_n_1": maximum,
    "median_flow_n_1": maximum,
    "overload_energy_n_1": add,
    "overload_energy_limited_n_1": add,
    "underload_energy_n_1": add,
    "transport_n_1": add,
    "exponential_overload_energy_n_1": add,
    "exponential_overload_energy_limited_n_1": add,
    "critical_branch_count_n_1": maximum,
    "critical_branch_count_limited_n_1": maximum,
    "n0_n1_delta": add,
    "cross_coupler_flow": add,
    "switching_distance": maximum,
    "split_subs": maximum,
    "n_2_penalty": add,
}

compute_overloads #

compute_overloads(
    topologies,
    dynamic_information,
    solver_config,
    observed_metrics,
    n_worst_contingencies=10,
)

Compute the overloads for a single timestep by invoking the solver and aggregating the results.

PARAMETER DESCRIPTION
topologies

The topologies to score, where the first max_num_splits entries are the substations, the second max_num_splits entries are the branch topos and the last max_num_splits entries are the injection topos

TYPE: Genotype

dynamic_information

The dynamic information of the grid

TYPE: DynamicInformation

solver_config

The static solver configuration

TYPE: SolverConfig

observed_metrics

The metrics to observe

TYPE: tuple[MetricType, ...]

n_worst_contingencies

The number of worst contingencies to return, by default 10

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
dict[str, Float[Array, ' batch_size']]

A dictionary with the overload energy, transport, max flow and other metrics, from aggregate_to_metric_batched

NodalInjOptimResults

The results of the nodal injection optimization

Bool[Array, ' batch_size']

Whether the topologies were successfully solved

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/scoring_functions.py
def compute_overloads(
    topologies: Genotype,
    dynamic_information: DynamicInformation,
    solver_config: SolverConfig,
    observed_metrics: tuple[MetricType, ...],
    n_worst_contingencies: int = 10,
) -> tuple[dict[str, Float[Array, " batch_size"]], Optional[NodalInjOptimResults], Bool[Array, " batch_size"]]:
    """Compute the overloads for a single timestep by invoking the solver and aggregating the results.

    Parameters
    ----------
    topologies : Genotype
        The topologies to score, where the first max_num_splits entries are the substations, the
        second max_num_splits entries are the branch topos and the last max_num_splits entries are
        the injection topos
    dynamic_information : DynamicInformation
        The dynamic information of the grid
    solver_config : SolverConfig
        The static solver configuration
    observed_metrics : tuple[MetricType, ...]
        The metrics to observe
    n_worst_contingencies : int, optional
        The number of worst contingencies to return, by default 10

    Returns
    -------
    dict[str, Float[Array, " batch_size"]]
        A dictionary with the overload energy, transport, max flow and other metrics, from aggregate_to_metric_batched
    NodalInjOptimResults
        The results of the nodal injection optimization
    Bool[Array, " batch_size"]
        Whether the topologies were successfully solved
    """
    topo_comp, disconnections, nodal_inj_start = translate_topology(topologies)

    lf_res, success = compute_symmetric_batch(
        topology_batch=topo_comp,
        disconnection_batch=disconnections,
        injections=None,  # Use from action set
        nodal_inj_start_options=nodal_inj_start,
        dynamic_information=dynamic_information,
        solver_config=solver_config,
    )

    aggregates = {}
    for metric_name in observed_metrics:
        metric = aggregate_to_metric_batched(
            lf_res_batch=lf_res,
            branch_limits=dynamic_information.branch_limits,
            reassignment_distance=dynamic_information.action_set.reassignment_distance,
            n_relevant_subs=dynamic_information.n_sub_relevant,
            metric=metric_name,
        )
        metric = jnp.where(success, metric, jnp.inf)
        aggregates[metric_name] = metric

    # Note: compute_overloads is called for each timestep separately, so the results are not batched.
    # As we don't have multi timestep optimisation support yet, we compute the worst k contingencies
    # sequentially one timestep at a time. This means that the timestep dimension will always be 1.
    #  TODO This is a temporary solution until we have multi timestep support.
    worst_k_res = jax.vmap(get_worst_k_contingencies, in_axes=(None, 0, None))(
        n_worst_contingencies, lf_res.n_1_matrix, dynamic_information.branch_limits.max_mw_flow
    )
    aggregates["top_k_overloads_n_1"] = worst_k_res.top_k_overloads[:, 0]  # Take the first timestep only
    aggregates["case_indices"] = worst_k_res.case_indices[:, 0, :]

    return aggregates, lf_res.nodal_injections_optimized, success

scoring_function #

scoring_function(
    topologies,
    random_key,
    dynamic_informations,
    solver_configs,
    target_metrics,
    observed_metrics,
    descriptor_metrics,
    n_worst_contingencies=10,
)

Create scoring function for the genetic algorithm.

PARAMETER DESCRIPTION
topologies

The topologies to score

TYPE: Genotype

random_key

The random key to use for the scoring (currently not used)

TYPE: PRNGKey

dynamic_informations

The dynamic information of the grid for every timestep

TYPE: tuple[DynamicInformation, ...]

solver_configs

The solver configuration for every timestep

TYPE: tuple[SolverConfig, ...]

target_metrics

The list of metrics to optimize for with their weights

TYPE: tuple[tuple[MetricType, float], ...]

observed_metrics

The observed metrics

TYPE: tuple[MetricType, ...]

descriptor_metrics

The metrics to use as descriptors

TYPE: tuple[MetricType, ...]

n_worst_contingencies

The number of worst contingencies to consider for calculating top_k_overloads_n_1

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
Float[Array, ' batch_size']

The metrics of the topologies

Int[Array, ' batch_size n_dims']

The descriptors of the topologies

dict

The extra scores

dict

Emitter Information

PRNGKey

The random key that was passed in, unused

Genotype

The genotypes that were passed in, but updated to account for in-the-loop optimizations such as the nodal injection optimization.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/scoring_functions.py
def scoring_function(
    topologies: Genotype,
    random_key: jax.random.PRNGKey,
    dynamic_informations: tuple[DynamicInformation, ...],
    solver_configs: tuple[SolverConfig, ...],
    target_metrics: tuple[tuple[MetricType, float], ...],
    observed_metrics: tuple[MetricType, ...],
    descriptor_metrics: tuple[MetricType, ...],
    n_worst_contingencies: int = 10,
) -> tuple[
    Float[Array, " batch_size"],
    Int[Array, " batch_size n_dims"],
    dict,
    dict,
    jax.random.PRNGKey,
    Genotype,
]:
    """Create scoring function for the genetic algorithm.

    Parameters
    ----------
    topologies : Genotype
        The topologies to score
    random_key : jax.random.PRNGKey
        The random key to use for the scoring (currently not used)
    dynamic_informations : tuple[DynamicInformation, ...]
        The dynamic information of the grid for every timestep
    solver_configs : tuple[SolverConfig, ...]
        The solver configuration for every timestep
    target_metrics : tuple[tuple[MetricType, float], ...]
        The list of metrics to optimize for with their weights
    observed_metrics : tuple[MetricType, ...]
        The observed metrics
    descriptor_metrics : tuple[MetricType, ...]
        The metrics to use as descriptors
    n_worst_contingencies : int, optional
        The number of worst contingencies to consider for calculating
        top_k_overloads_n_1

    Returns
    -------
    Float[Array, " batch_size"]
        The metrics of the topologies
    Int[Array, " batch_size n_dims"]
        The descriptors of the topologies
    dict
        The extra scores
    dict
        Emitter Information
    jax.random.PRNGKey
        The random key that was passed in, unused
    Genotype
        The genotypes that were passed in, but updated to account for in-the-loop optimizations such as the nodal
        injection optimization.
    """
    n_topologies = len(topologies.action_index)

    metrics, nodal_injections_optimized, success = compute_overloads(
        topologies=topologies,
        dynamic_information=dynamic_informations[0],
        solver_config=solver_configs[0],
        observed_metrics=observed_metrics,
        n_worst_contingencies=n_worst_contingencies,
    )
    # Sequentially compute each subsequent timestep
    for dynamic_information, solver_config in zip(dynamic_informations[1:], solver_configs[1:], strict=True):
        metrics_local, _nodal_injections_optimized_local, success_local = compute_overloads(
            topologies=topologies,
            dynamic_information=dynamic_information,
            solver_config=solver_config,
            observed_metrics=observed_metrics,
        )
        success = success & success_local

        # TODO figure out how to stack nodal_inj optim results for multiple timesteps
        for key in observed_metrics:
            combine_fn = METRICS[key]
            metrics[key] = combine_fn(metrics[key], metrics_local[key])

    fitness = sum(-metrics[metric_name] * weight for metric_name, weight in target_metrics)

    emitter_info = {
        "n_branch_combis": jnp.array(n_topologies, dtype=int),
        "n_inj_combis": jnp.array(n_topologies, dtype=int),
        "n_split_grids": jnp.sum(~success),
    }

    descriptors = jnp.stack([metrics[key].astype(int) for key in descriptor_metrics], axis=1)

    topologies = replace(topologies, nodal_injections_optimized=nodal_injections_optimized)

    return (
        fitness,
        descriptors,
        metrics,
        emitter_info,
        random_key,
        topologies,
    )

translate_topology #

translate_topology(topology)

Translate the topology into the format used by the solver.

PARAMETER DESCRIPTION
topology

The topology in genotype form

TYPE: Genotype

RETURNS DESCRIPTION
ActionIndexComputations

The topology computations

Int[Array, ' batch_size max_num_disconnections']

The branch disconnections to apply

NodalInjStartOptions | None

The nodal injection optimization start options containing pst taps, or None if no PST optimization

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/scoring_functions.py
def translate_topology(
    topology: Genotype,
) -> tuple[
    ActionIndexComputations,
    Int[Array, " batch_size max_num_disconnections"],
    NodalInjStartOptions | None,
]:
    """Translate the topology into the format used by the solver.

    Parameters
    ----------
    topology : Genotype
        The topology in genotype form

    Returns
    -------
    ActionIndexComputations
        The topology computations
    Int[Array, " batch_size max_num_disconnections"]
        The branch disconnections to apply
    NodalInjStartOptions | None
        The nodal injection optimization start options containing pst taps, or None if no PST optimization
    """
    batch_size, _max_num_splits = topology.action_index.shape

    topology = fix_dtypes(topology)

    # Branch actions can be read straight out of the branch actions array
    topo_comp = ActionIndexComputations(
        action=topology.action_index,
        pad_mask=jnp.ones((batch_size,), dtype=bool),
    )

    nodal_inj_start = make_start_options(topology.nodal_injections_optimized)

    return topo_comp, topology.disconnections, nodal_inj_start

filter_repo #

filter_repo(repertoire, initial_fitness)

Reduce the repertoire to only valid and deduplicated topologies.

This will not return any topologies that are worse than the initial fitness and deduplicate the topologies. The function is currently not jax jit compatible.

PARAMETER DESCRIPTION
repertoire

The repertoire to reduce

TYPE: DiscreteMapElitesRepertoire

initial_fitness

The initial fitness of the grid. This is used to filter out topologies that are worse than this fitness.

TYPE: float

RETURNS DESCRIPTION
DiscreteMapElitesRepertoire

The reduced repertoire with only valid and deduplicated topologies.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/scoring_functions.py
def filter_repo(
    repertoire: DiscreteMapElitesRepertoire,
    initial_fitness: float,
) -> DiscreteMapElitesRepertoire:
    """Reduce the repertoire to only valid and deduplicated topologies.

    This will not return any topologies that are worse than the initial fitness and deduplicate the topologies.
    The function is currently not jax jit compatible.

    Parameters
    ----------
    repertoire : DiscreteMapElitesRepertoire
        The repertoire to reduce
    initial_fitness : float
        The initial fitness of the grid. This is used to filter out topologies that are worse than this fitness.

    Returns
    -------
    DiscreteMapElitesRepertoire
        The reduced repertoire with only valid and deduplicated topologies.
    """
    distributed = len(repertoire.fitnesses.shape) > 1
    if distributed:
        repertoire = repertoire[0]

    assert len(repertoire.fitnesses.shape) == 1, "Wrong shape on repertoire"
    # Deduplicate the repertoire and remove invalid entries
    valid_mask = jnp.isfinite(repertoire.fitnesses) & (repertoire.fitnesses > initial_fitness)
    repertoire = repertoire[valid_mask]

    _, indices = deduplicate_genotypes(repertoire.genotypes)
    repertoire = repertoire[indices]

    return repertoire

convert_to_topologies #

convert_to_topologies(
    repertoire, contingency_ids, grid_model_low_tap=None
)

Take a repertoire and convert it to a list of kafka-sendable topologies.

PARAMETER DESCRIPTION
repertoire

The repertoire to convert. You might want to filter it using filter_repo first.

TYPE: DiscreteMapElitesRepertoire

contingency_ids

The contingency IDs for each topology

TYPE: list[str]

grid_model_low_tap

The lowest tap value in the grid model, used to convert the relative tap values in the genotype to absolute tap values that can be sent to the kafka topics. This will only be read if nodal_injection results are present in the genotype.

TYPE: Int[Array, ' n_controllable_psts'] | None DEFAULT: None

RETURNS DESCRIPTION
list[Topology]

The list of topologies in the format used by the kafka topics.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/scoring_functions.py
def convert_to_topologies(
    repertoire: DiscreteMapElitesRepertoire,
    contingency_ids: list[str],
    grid_model_low_tap: Int[Array, " n_controllable_psts"] | None = None,
) -> list[Topology]:
    """Take a repertoire and convert it to a list of kafka-sendable topologies.

    Parameters
    ----------
    repertoire : DiscreteMapElitesRepertoire
        The repertoire to convert. You might want to filter it using filter_repo first.
    contingency_ids : list[str]
        The contingency IDs for each topology
    grid_model_low_tap : Int[Array, " n_controllable_psts"] | None
        The lowest tap value in the grid model, used to convert the relative tap values in the genotype to absolute tap
        values that can be sent to the kafka topics. This will only be read if nodal_injection results are present
        in the genotype.

    Returns
    -------
    list[Topology]
        The list of topologies in the format used by the kafka topics.
    """
    topologies = []

    for i in range(len(repertoire.fitnesses)):
        iter_repertoire = repertoire[i]

        action_indices = [int(act) for act in iter_repertoire.genotypes.action_index if act != int_max()]

        disconnections = [int(disc) for disc in iter_repertoire.genotypes.disconnections if disc != int_max()]

        nodal_inj = iter_repertoire.genotypes.nodal_injections_optimized
        pst_setpoints = None
        if nodal_inj is not None:
            assert grid_model_low_tap is not None, (
                "grid_model_low_tap must be provided if nodal_injections_optimized is present"
            )
            assert len(nodal_inj.pst_tap_idx.shape) == 2
            assert nodal_inj.pst_tap_idx.shape[0] == 1, "Only one timestep is supported, but found shape " + str(
                nodal_inj.pst_tap_idx.shape
            )
            tap_array = nodal_inj.pst_tap_idx[0].astype(int) + grid_model_low_tap
            pst_setpoints = tap_array.tolist()

        case_indices = iter_repertoire.extra_scores.pop("case_indices", [])
        case_ids = np.array(contingency_ids)[case_indices].tolist()
        metrics = Metrics(
            fitness=float(iter_repertoire.fitnesses),
            extra_scores={key: value.item() for key, value in iter_repertoire.extra_scores.items()},
            worst_k_contingency_cases=case_ids,
        )

        topologies.append(
            Topology(
                actions=action_indices,
                disconnections=disconnections,
                pst_setpoints=pst_setpoints,
                metrics=metrics,
            )
        )
    return topologies

summarize_repo #

summarize_repo(
    repertoire,
    initial_fitness,
    contingency_ids,
    grid_model_low_tap=None,
)

Summarize the repertoire into a list of topologies.

PARAMETER DESCRIPTION
repertoire

The repertoire to summarize

TYPE: DiscreteMapElitesRepertoire

initial_fitness

The initial fitness of the grid

TYPE: float

contingency_ids

The contingency IDs for each topology. Here we assume that this list is common for all the topologies in the repertoire. TODO: Fix me to have per topology contingency ids if needed

TYPE: list[str]

grid_model_low_tap

The lowest tap value in the grid model, from nodal_injection_information.grid_model_low_tap.

TYPE: Int[Array, ' n_controllable_psts'] | None DEFAULT: None

RETURNS DESCRIPTION
list[Topology]

The summarized topologies

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/scoring_functions.py
def summarize_repo(
    repertoire: DiscreteMapElitesRepertoire,
    initial_fitness: float,
    contingency_ids: list[str],
    grid_model_low_tap: Int[Array, " n_controllable_psts"] | None = None,
) -> list[Topology]:
    """Summarize the repertoire into a list of topologies.

    Parameters
    ----------
    repertoire : DiscreteMapElitesRepertoire
        The repertoire to summarize
    initial_fitness : float
        The initial fitness of the grid
    contingency_ids : list[str]
        The contingency IDs for each topology. Here we assume that this list is common for all the topologies
        in the repertoire.
        TODO: Fix me to have per topology contingency ids if needed
    grid_model_low_tap : Int[Array, " n_controllable_psts"] | None
        The lowest tap value in the grid model, from nodal_injection_information.grid_model_low_tap.

    Returns
    -------
    list[Topology]
        The summarized topologies
    """
    with jax.default_device(jax.devices("cpu")[0]):
        best_repo = filter_repo(
            repertoire=repertoire,
            initial_fitness=initial_fitness,
        )

        topologies = convert_to_topologies(best_repo, contingency_ids, grid_model_low_tap=grid_model_low_tap)

    return topologies

summarize #

summarize(
    repertoire,
    emitter_state,
    initial_fitness,
    initial_metrics,
    contingency_ids,
    grid_model_low_tap=None,
)

Summarize the repertoire and emitter state into a serializable dictionary.

PARAMETER DESCRIPTION
repertoire

The repertoire to summarize

TYPE: DiscreteMapElitesRepertoire

emitter_state

The emitter state to summarize

TYPE: EmitterState

initial_fitness

The initial fitness of the grid

TYPE: float

initial_metrics

The initial metrics of the grid

TYPE: dict

contingency_ids

A list of contingency ids. Here we assume that the list of contingency ids is common for all the topologies

TYPE: list[str]

grid_model_low_tap

The lowest tap value in the grid model, from nodal_injection_information.grid_model_low_tap.

TYPE: Int[Array, ' n_controllable_psts'] | None DEFAULT: None

RETURNS DESCRIPTION
dict

The summarized dictionary, json serializable

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/genetic_functions/scoring_functions.py
def summarize(
    repertoire: DiscreteMapElitesRepertoire,
    emitter_state: EmitterState,
    initial_fitness: float,
    initial_metrics: dict,
    contingency_ids: list[str],
    grid_model_low_tap: Int[Array, " n_controllable_psts"] | None = None,
) -> dict:
    """Summarize the repertoire and emitter state into a serializable dictionary.

    Parameters
    ----------
    repertoire : DiscreteMapElitesRepertoire
        The repertoire to summarize
    emitter_state : EmitterState
        The emitter state to summarize
    initial_fitness : float
        The initial fitness of the grid
    initial_metrics : dict
        The initial metrics of the grid
    contingency_ids : list[str]
        A list of contingency ids. Here we assume that the list of contingency ids is common for all the topologies
    grid_model_low_tap : Int[Array, " n_controllable_psts"] | None
        The lowest tap value in the grid model, from nodal_injection_information.grid_model_low_tap.

    Returns
    -------
    dict
        The summarized dictionary, json serializable
    """
    topologies = summarize_repo(
        repertoire=repertoire,
        initial_fitness=initial_fitness,
        contingency_ids=contingency_ids,
        grid_model_low_tap=grid_model_low_tap,
    )
    max_fitness = max(t.metrics.fitness for t in topologies) if len(topologies) > 0 else initial_fitness

    # Store the topologies
    best_topos = [t.model_dump() for t in topologies]
    retval = {k: v.item() for k, v in emitter_state.items()}
    retval.update(
        {
            "max_fitness": max_fitness,
            "best_topos": best_topos,
            "initial_fitness": initial_fitness,
            "initial_metrics": initial_metrics,
        }
    )
    return retval

Repertoire#

toop_engine_topology_optimizer.dc.repertoire.discrete_map_elites #

Core components of the MAP-Elites algorithm.

Adapted from QDax (https://github.com/adaptive-intelligent-robotics/QDax)

EmitterScores module-attribute #

EmitterScores = PyTree

DiscreteMapElites #

DiscreteMapElites(
    scoring_function,
    emitter,
    metrics_function,
    n_cells_per_dim,
    cell_depth=1,
    distributed=False,
)

Discrete MAP-Elites algorithm.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_map_elites.py
def __init__(
    self,
    scoring_function: Callable[
        [Genotype, RNGKey, PyTree],
        Tuple[Fitness, Descriptor, ExtraScores, EmitterScores, RNGKey, Genotype],
    ],
    emitter: Emitter,
    metrics_function: Callable[[DiscreteMapElitesRepertoire], Metrics],
    n_cells_per_dim: tuple[int, ...],
    cell_depth: PositiveInt = 1,
    distributed: bool = False,
) -> None:
    self._scoring_function = scoring_function
    self._emitter = emitter
    self._metrics_function = metrics_function
    self._distributed = distributed
    self._n_cells_per_dim = n_cells_per_dim
    self._cell_depth = cell_depth

init #

init(genotypes, random_key, scoring_data)

Initialize a Map-Elites repertoire with an initial population of genotypes.

Requires the definition of centroids that can be computed with any method such as CVT or Euclidean mapping.

Before the repertoire is initialised, individuals are gathered from all the devices.

Args: genotypes: initial genotypes, pytree in which leaves have shape (batch_size, num_features) random_key: a random key used for stochastic operations. n_cells_per_dim: number of cells per dimension in the repertoire

RETURNS DESCRIPTION
An initialized MAP-Elite repertoire with the initial state of the emitter,

and a random key.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_map_elites.py
def init(
    self,
    genotypes: Genotype,
    random_key: RNGKey,
    scoring_data: PyTree,
) -> Tuple[DiscreteMapElitesRepertoire, Optional[EmitterState], RNGKey]:
    """Initialize a Map-Elites repertoire with an initial population of genotypes.

    Requires the definition of centroids that can be computed with any method
    such as CVT or Euclidean mapping.

    Before the repertoire is initialised, individuals are gathered from all the
    devices.

    Args:
        genotypes: initial genotypes, pytree in which leaves
            have shape (batch_size, num_features)
        random_key: a random key used for stochastic operations.
        n_cells_per_dim: number of cells per dimension in the repertoire

    Returns
    -------
        An initialized MAP-Elite repertoire with the initial state of the emitter,
        and a random key.
    """
    # score initial genotypes
    (
        fitnesses,
        descriptors,
        extra_scores,
        emitter_scores,
        random_key,
        genotypes,
    ) = self._scoring_function(genotypes, random_key, scoring_data)

    # gather across all devices
    if self._distributed:
        (
            genotypes,
            fitnesses,
            descriptors,
            extra_scores,
        ) = jax.tree_util.tree_map(
            lambda x: jnp.concatenate(jax.lax.all_gather(x, axis_name="p"), axis=0),
            (genotypes, fitnesses, descriptors, extra_scores),
        )

    # init the repertoire
    repertoire = init_repertoire(
        genotypes=genotypes,
        descriptors=descriptors,
        fitnesses=fitnesses,
        extra_scores=extra_scores,
        n_cells_per_dim=self._n_cells_per_dim,
        cell_depth=self._cell_depth,
    )

    # get initial state of the emitter
    emitter_state, random_key = self._emitter.init(
        random_key=random_key,
        init_genotypes=genotypes,
    )

    # update emitter state
    emitter_state = self._emitter.state_update(
        emitter_state=emitter_state,
        repertoire=repertoire,
        genotypes=genotypes,
        fitnesses=fitnesses,
        descriptors=descriptors,
        extra_scores=emitter_scores,
    )

    return repertoire, emitter_state, random_key

update #

update(repertoire, emitter_state, random_key, scoring_data)

Perform one iteration of the MAP-Elites algorithm.

  1. A batch of genotypes is sampled in the repertoire and the genotypes are copied.
  2. The copies are mutated and crossed-over
  3. The obtained offsprings are scored and then added to the repertoire.

Before the repertoire is updated, individuals are gathered from all the devices.

Args: repertoire: the MAP-Elites repertoire emitter_state: state of the emitter random_key: a jax PRNG random key

RETURNS DESCRIPTION
the updated MAP-Elites repertoire

the updated (if needed) emitter state metrics about the updated repertoire a new jax PRNG key

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_map_elites.py
@partial(jax.jit, static_argnames=("self",))
def update(
    self,
    repertoire: DiscreteMapElitesRepertoire,
    emitter_state: Optional[EmitterState],
    random_key: RNGKey,
    scoring_data: PyTree,
) -> Tuple[DiscreteMapElitesRepertoire, Optional[EmitterState], Metrics, RNGKey]:
    """Perform one iteration of the MAP-Elites algorithm.

    1. A batch of genotypes is sampled in the repertoire and the genotypes
        are copied.
    2. The copies are mutated and crossed-over
    3. The obtained offsprings are scored and then added to the repertoire.

    Before the repertoire is updated, individuals are gathered from all the
    devices.

    Args:
        repertoire: the MAP-Elites repertoire
        emitter_state: state of the emitter
        random_key: a jax PRNG random key

    Returns
    -------
        the updated MAP-Elites repertoire
        the updated (if needed) emitter state
        metrics about the updated repertoire
        a new jax PRNG key
    """
    # generate offsprings with the emitter
    genotypes, _extra_info, random_key = self._emitter.emit(repertoire, emitter_state, random_key)
    # scores the offsprings
    (
        fitnesses,
        descriptors,
        extra_scores,
        emitter_info,
        random_key,
        genotypes,
    ) = self._scoring_function(genotypes, random_key, scoring_data)

    # gather across all devices
    if self._distributed:
        (
            genotypes,
            fitnesses,
            descriptors,
            extra_scores,
        ) = jax.tree_util.tree_map(
            lambda x: jnp.concatenate(jax.lax.all_gather(x, axis_name="p"), axis=0),
            (genotypes, fitnesses, descriptors, extra_scores),
        )

    # add genotypes in the repertoire
    repertoire = add_to_repertoire(
        repertoire=repertoire,
        batch_of_genotypes=genotypes,
        batch_of_descriptors=descriptors,
        batch_of_fitnesses=fitnesses,
        batch_of_extra_scores=extra_scores,
    )

    # update emitter state after scoring is made
    emitter_state = self._emitter.state_update(
        emitter_state=emitter_state,
        repertoire=repertoire,
        genotypes=genotypes,
        fitnesses=fitnesses,
        descriptors=descriptors,
        extra_scores=emitter_info,
    )

    # update the metrics
    metrics = self._metrics_function(repertoire)

    return repertoire, emitter_state, metrics, random_key

toop_engine_topology_optimizer.dc.repertoire.discrete_me_repertoire #

Contains the DiscreteMapElitesRepertoire class and utility functions.

This file contains util functions and a class to define a repertoire, used to store individuals in the MAP-Elites algorithm as well as several variants. Adapted from QDax (https://github.com/adaptive-intelligent-robotics/QDax)

DiscreteMapElitesRepertoire #

A class to store the MAP-Elites repertoire.

genotypes instance-attribute #

genotypes

The genotypes in the repertoire.

The PyTree can be a simple Jax array or a more complex nested structure such as to represent parameters of neural network in Flax.

fitnesses instance-attribute #

fitnesses

The fitness of solutions in each cell of the repertoire.

descriptors instance-attribute #

descriptors

The descriptors of solutions in each cell of the repertoire.

extra_scores instance-attribute #

extra_scores

The extra scores of solutions in each cell of the repertoire. Usually the metrics

n_cells_per_dim instance-attribute #

n_cells_per_dim

The number of cells per dimension.

cell_depth class-attribute instance-attribute #

cell_depth = 1

Each cell contains cell_depth unique individuals

sample #

sample(random_key, num_samples)

Sample elements in the repertoire.

PARAMETER DESCRIPTION
random_key

the random key to be used for sampling

TYPE: RNGKey

num_samples

the number of samples to be drawn from the repertoire

TYPE: int

RETURNS DESCRIPTION
samples

the sampled genotypes

TYPE: Genotype

random_key

the updated random key

TYPE: jax PRNG key

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_me_repertoire.py
@partial(jax.jit, static_argnames=("num_samples",))
def sample(self: DiscreteMapElitesRepertoire, random_key: RNGKey, num_samples: int) -> Tuple[Genotype, RNGKey]:
    """Sample elements in the repertoire.

    Parameters
    ----------
    random_key: jax PRNG key
        the random key to be used for sampling
    num_samples: int
        the number of samples to be drawn from the repertoire

    Returns
    -------
    samples: Genotype
        the sampled genotypes
    random_key: jax PRNG key
        the updated random key
    """
    repertoire_empty = self.fitnesses == -jnp.inf
    p = (1.0 - repertoire_empty) / jnp.sum(1.0 - repertoire_empty)

    random_key, subkey = jax.random.split(random_key)
    samples = jax.tree_util.tree_map(
        lambda x: jax.random.choice(subkey, x, shape=(num_samples,), p=p),
        self.genotypes,
    )

    return samples, random_key

__getitem__ #

__getitem__(index)

Get a slice of the repertoire.

PARAMETER DESCRIPTION
index

the index of the elements to be selected

TYPE: Union[int, slice, ndarray]

RETURNS DESCRIPTION
a new repertoire with the selected elements
Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_me_repertoire.py
def __getitem__(self, index: Union[int, slice, jnp.ndarray]) -> DiscreteMapElitesRepertoire:
    """Get a slice of the repertoire.

    Parameters
    ----------
    index: int, slice or jnp.ndarray
        the index of the elements to be selected

    Returns
    -------
        a new repertoire with the selected elements
    """
    return DiscreteMapElitesRepertoire(
        genotypes=jax.tree_util.tree_map(lambda x: x[index], self.genotypes),
        fitnesses=self.fitnesses[index],
        descriptors=self.descriptors[index],
        extra_scores=jax.tree_util.tree_map(lambda x: x[index], self.extra_scores),
        n_cells_per_dim=self.n_cells_per_dim,
        cell_depth=self.cell_depth,
    )

get_cells_indices #

get_cells_indices(descriptors, n_cells_per_dim)

Compute the index for many descriptors at once.

Args: descriptors: an array that contains the descriptors of the solutions. Its shape is (batch_size, num_descriptors) n_cells_per_dim: the number of cells per dimension

RETURNS DESCRIPTION
The index of the cell in which each descriptor belongs.
Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_me_repertoire.py
@partial(jax.jit, static_argnames=("n_cells_per_dim",))
def get_cells_indices(
    descriptors: Int[Array, " batch_size *dim"],
    n_cells_per_dim: tuple[int, ...],
) -> Int[Array, " batch_size"]:
    """Compute the index for many descriptors at once.

    Args:
        descriptors: an array that contains the descriptors of the
            solutions. Its shape is (batch_size, num_descriptors)
        n_cells_per_dim: the number of cells per dimension

    Returns
    -------
        The index of the cell in which each descriptor belongs.
    """
    return jax.vmap(partial(get_cell_index, n_cells_per_dim=n_cells_per_dim))(descriptors)

get_cell_index #

get_cell_index(descriptor, n_cells_per_dim)

Compute the index of the cell in which each descriptor belongs.

The index is effectively like the reshape operation, spreading multiple dimensions to a flat single dimension.

Args: descriptor: an array that contains the descriptors. There is one descriptor per dimension n_cells_per_dim: the number of cells per dimension

RETURNS DESCRIPTION
The index of the cell in which each descriptor belongs.
Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_me_repertoire.py
@partial(jax.jit, static_argnames=("n_cells_per_dim",))
def get_cell_index(
    descriptor: Int[Array, " n_dims"],
    n_cells_per_dim: tuple[int, ...],
) -> Int[Array, " "]:
    """Compute the index of the cell in which each descriptor belongs.

    The index is effectively like the reshape operation, spreading multiple dimensions to a flat
    single dimension.

    Args:
        descriptor: an array that contains the descriptors. There is one descriptor per dimension
        n_cells_per_dim: the number of cells per dimension

    Returns
    -------
        The index of the cell in which each descriptor belongs.
    """
    assert len(descriptor) == len(n_cells_per_dim)
    assert all(c > 0 for c in n_cells_per_dim)

    return jnp.ravel_multi_index(
        multi_index=descriptor,
        dims=n_cells_per_dim,
        mode="clip",
    )  # jittable thanks to clip mode

add_to_repertoire #

add_to_repertoire(
    repertoire,
    batch_of_genotypes,
    batch_of_descriptors,
    batch_of_fitnesses,
    batch_of_extra_scores=None,
)

Add a batch of elements to the repertoire.

Args: repertoire: the MAP-Elites repertoire to which the elements will be added. batch_of_genotypes: a batch of genotypes to be added to the repertoire. Similarly to the repertoire.genotypes argument, this is a PyTree in which the leaves have a shape (batch_size, num_features) batch_of_descriptors: an array that contains the descriptors of the aforementioned genotypes. Its shape is (batch_size, num_descriptors). This will determine the cell in which the genotype will be stored, all descriptors are clipped to be within the bounds of n_cells_per_dim. batch_of_fitnesses: an array that contains the fitnesses of the aforementioned genotypes. Its shape is (batch_size,) batch_of_extra_scores: tree that contains the extra_scores of aforementioned genotypes.

RETURNS DESCRIPTION
The updated MAP-Elites repertoire.
Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_me_repertoire.py
@jax.jit
def add_to_repertoire(
    repertoire: DiscreteMapElitesRepertoire,
    batch_of_genotypes: PyTree[Shaped[Array, " batch_size *feature_dims"]],
    batch_of_descriptors: Int[Array, " batch_size n_dims"],
    batch_of_fitnesses: Float[Array, " batch_size"],
    batch_of_extra_scores: Optional[PyTree[Shaped[Array, " batch_size *extra_dims"]]] = None,
) -> DiscreteMapElitesRepertoire:
    """Add a batch of elements to the repertoire.

    Args:
        repertoire: the MAP-Elites repertoire to which the elements will be added.
        batch_of_genotypes: a batch of genotypes to be added to the repertoire.
            Similarly to the repertoire.genotypes argument, this is a PyTree in which
            the leaves have a shape (batch_size, num_features)
        batch_of_descriptors: an array that contains the descriptors of the
            aforementioned genotypes. Its shape is (batch_size, num_descriptors). This will
            determine the cell in which the genotype will be stored, all descriptors are clipped
            to be within the bounds of n_cells_per_dim.
        batch_of_fitnesses: an array that contains the fitnesses of the
            aforementioned genotypes. Its shape is (batch_size,)
        batch_of_extra_scores: tree that contains the extra_scores of
            aforementioned genotypes.

    Returns
    -------
        The updated MAP-Elites repertoire.
    """
    if repertoire.cell_depth > 1:
        return add_to_repertoire_with_cell_depth(
            repertoire,
            batch_of_genotypes,
            batch_of_descriptors,
            batch_of_fitnesses,
            batch_of_extra_scores,
        )
    return add_to_repertoire_without_cell_depth(
        repertoire,
        batch_of_genotypes,
        batch_of_descriptors,
        batch_of_fitnesses,
        batch_of_extra_scores,
    )

add_to_repertoire_without_cell_depth #

add_to_repertoire_without_cell_depth(
    repertoire,
    batch_of_genotypes,
    batch_of_descriptors,
    batch_of_fitnesses,
    batch_of_extra_scores=None,
)

Add a batch of elements to the repertoire.

PARAMETER DESCRIPTION
repertoire

the MAP-Elites repertoire to which the elements will be added.

TYPE: DiscreteMapElitesRepertoire

batch_of_genotypes

a batch of genotypes to be added to the repertoire.

TYPE: PyTree[Shaped[Array, ' batch_size *feature_dims']]

batch_of_descriptors

an array that contains the descriptors of the genotypes.

TYPE: Int[Array, ' batch_size n_dims']

batch_of_fitnesses

an array that contains the fitnesses of the genotypes.

TYPE: Float[Array, ' batch_size']

batch_of_extra_scores

tree that contains the extra_scores of genotypes.

TYPE: Optional[PyTree[Shaped[Array, ' batch_size *extra_dims']]] DEFAULT: None

RETURNS DESCRIPTION
DiscreteMapElitesRepertoire

The updated MAP-Elites repertoire.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_me_repertoire.py
@jax.jit
def add_to_repertoire_without_cell_depth(
    repertoire: DiscreteMapElitesRepertoire,
    batch_of_genotypes: PyTree[Shaped[Array, " batch_size *feature_dims"]],
    batch_of_descriptors: Int[Array, " batch_size n_dims"],
    batch_of_fitnesses: Float[Array, " batch_size"],
    batch_of_extra_scores: Optional[PyTree[Shaped[Array, " batch_size *extra_dims"]]] = None,
) -> DiscreteMapElitesRepertoire:
    """Add a batch of elements to the repertoire.

    Parameters
    ----------
    repertoire: DiscreteMapElitesRepertoire
        the MAP-Elites repertoire to which the elements will be added.
    batch_of_genotypes: PyTree[Shaped[Array, " batch_size *feature_dims"]]
        a batch of genotypes to be added to the repertoire.
    batch_of_descriptors: Int[Array, " batch_size n_dims"]
        an array that contains the descriptors of the genotypes.
    batch_of_fitnesses: Float[Array, " batch_size"]
        an array that contains the fitnesses of the genotypes.
    batch_of_extra_scores: Optional[PyTree[Shaped[Array, " batch_size *extra_dims"]]]
        tree that contains the extra_scores of genotypes.

    Returns
    -------
    DiscreteMapElitesRepertoire
        The updated MAP-Elites repertoire.
    """
    batch_of_indices = get_cells_indices(batch_of_descriptors, repertoire.n_cells_per_dim)
    batch_of_indices = jnp.expand_dims(batch_of_indices, axis=-1)
    batch_of_fitnesses = jnp.expand_dims(batch_of_fitnesses, axis=-1)
    repertoire_size = repertoire.fitnesses.shape[0]

    # get fitness segment max
    # Necessary because there could be multiple scores with the same descriptor
    best_fitnesses = jax.ops.segment_max(
        batch_of_fitnesses,
        batch_of_indices.astype(jnp.int32).squeeze(axis=-1),
        num_segments=repertoire_size,
    )

    cond_values = jnp.take_along_axis(best_fitnesses, batch_of_indices, 0)

    # put dominated fitness to -jnp.inf
    batch_of_fitnesses = jnp.where(batch_of_fitnesses == cond_values, batch_of_fitnesses, -jnp.inf)

    # get addition condition
    repertoire_fitnesses = jnp.expand_dims(repertoire.fitnesses, axis=-1)
    current_fitnesses = jnp.take_along_axis(repertoire_fitnesses, batch_of_indices, 0)
    addition_condition = batch_of_fitnesses > current_fitnesses

    # assign fake position when relevant : num_centroids is out of bound
    batch_of_indices = jnp.where(addition_condition, batch_of_indices, repertoire_size).squeeze(axis=-1)

    # create new repertoire
    new_repertoire_genotypes = jax.tree_util.tree_map(
        lambda repertoire_genotypes, new_genotypes: repertoire_genotypes.at[batch_of_indices].set(
            new_genotypes, mode="drop"
        ),
        repertoire.genotypes,
        batch_of_genotypes,
    )

    # compute new fitness and descriptors
    new_fitnesses = repertoire.fitnesses.at[batch_of_indices].set(batch_of_fitnesses.squeeze(axis=-1), mode="drop")
    new_descriptors = repertoire.descriptors.at[batch_of_indices].set(batch_of_descriptors, mode="drop")
    if batch_of_extra_scores is None:
        new_extra_scores = None
    else:
        new_extra_scores = jax.tree_util.tree_map(
            lambda repertoire_extra_scores, new_extra_scores: repertoire_extra_scores.at[batch_of_indices].set(
                new_extra_scores, mode="drop"
            ),
            repertoire.extra_scores,
            batch_of_extra_scores,
        )

    return DiscreteMapElitesRepertoire(
        genotypes=new_repertoire_genotypes,
        fitnesses=new_fitnesses,
        descriptors=new_descriptors,
        extra_scores=new_extra_scores,
        n_cells_per_dim=repertoire.n_cells_per_dim,
        cell_depth=repertoire.cell_depth,
    )

add_to_repertoire_with_cell_depth #

add_to_repertoire_with_cell_depth(
    repertoire,
    batch_of_genotypes,
    batch_of_descriptors,
    batch_of_fitnesses,
    batch_of_extra_scores=None,
)

Add a batch of elements to a repertoire with depth.

Assumes the repertoire puts elements on a same depth level next to another (layer1 layer1 layer2 layer2 ...) Manipulates abstract indices instead of moving genotypes directly.

PARAMETER DESCRIPTION
repertoire

the MAP-Elites repertoire to which the elements will be added.

TYPE: DiscreteMapElitesRepertoire

batch_of_genotypes

a batch of genotypes to be added to the repertoire.

TYPE: PyTree[Shaped[Array, ' batch_size *feature_dims']]

batch_of_descriptors

an array that contains the descriptors of the genotypes.

TYPE: Int[Array, ' batch_size n_dims']

batch_of_fitnesses

an array that contains the fitnesses of the genotypes.

TYPE: Float[Array, ' batch_size']

batch_of_extra_scores

tree that contains the extra_scores of genotypes.

TYPE: Optional[PyTree[Shaped[Array, ' batch_size *extra_dims']]] DEFAULT: None

RETURNS DESCRIPTION
DiscreteMapElitesRepertoire

The updated MAP-Elites repertoire.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_me_repertoire.py
@jax.jit
def add_to_repertoire_with_cell_depth(
    repertoire: DiscreteMapElitesRepertoire,
    batch_of_genotypes: PyTree[Shaped[Array, " batch_size *feature_dims"]],
    batch_of_descriptors: Int[Array, " batch_size n_dims"],
    batch_of_fitnesses: Float[Array, " batch_size"],
    batch_of_extra_scores: Optional[PyTree[Shaped[Array, " batch_size *extra_dims"]]] = None,
) -> DiscreteMapElitesRepertoire:
    """Add a batch of elements to a repertoire with depth.

    Assumes the repertoire puts elements on a same depth level next to another (layer1 layer1 layer2 layer2 ...)
    Manipulates abstract indices instead of moving genotypes directly.

    Parameters
    ----------
    repertoire: DiscreteMapElitesRepertoire
        the MAP-Elites repertoire to which the elements will be added.
    batch_of_genotypes: PyTree[Shaped[Array, " batch_size *feature_dims"]]
        a batch of genotypes to be added to the repertoire.
    batch_of_descriptors: Int[Array, " batch_size n_dims"]
        an array that contains the descriptors of the genotypes.
    batch_of_fitnesses: Float[Array, " batch_size"]
        an array that contains the fitnesses of the genotypes.
    batch_of_extra_scores: Optional[PyTree[Shaped[Array, " batch_size *extra_dims"]]]
        tree that contains the extra_scores of genotypes.

    Returns
    -------
    DiscreteMapElitesRepertoire
        The updated MAP-Elites repertoire.
    """
    cell_depth = repertoire.cell_depth
    num_cells = np.prod(np.array(repertoire.n_cells_per_dim)).item()
    repertoire_size = cell_depth * num_cells
    batch_size = batch_of_fitnesses.shape[0]

    # Work on indices in order to abstract the genotype
    abstract_current_genotypes = jnp.arange(repertoire_size)
    abstract_new_genotypes = jnp.arange(start=repertoire_size, stop=repertoire_size + batch_size)

    """
    Part 1 : Sort
    """

    # rearrange repertoire fitnesses so cell fitnesses are in the same dimension
    current_fitnesses_per_cell: Float[Array, "num_cells cell_depth"] = jnp.reshape(
        repertoire.fitnesses,
        (-1, num_cells),  # one dimension per cell
    ).T  # transpose to have cells in the first dimension

    # calculate new indices for each layer
    new_indices: Int[Array, "batch_size"] = get_cells_indices(
        batch_of_descriptors, repertoire.n_cells_per_dim
    )  # example : [0, 0, 0, 1]

    # repeat to get :
    # [[0, 0, 0, 1],
    #  [0, 0, 0, 1],
    #  [0, 0, 0, 1]]
    # shape (num_cells, batch_size)
    repeated_indices: Int[Array, "num_cells batch_size"] = jnp.tile(new_indices, reps=(num_cells, 1))

    # Create an array of shape (num_cells, batch_size) where the nth element contains the number n
    # example : [[0, 0, 0, 0],
    #            [1, 1, 1, 1],
    #            [2, 2, 2, 2]]
    cell_number: Int[Array, "num_cells"] = jnp.arange(num_cells)
    repeated_cell_number: Int[Array, "num_cells batch_size"] = jnp.repeat(
        cell_number.reshape(-1, 1),
        repeats=batch_size,
        axis=-1,
        total_repeat_length=batch_size,  # to make it jittable
    )

    # Repeat new fitnesses
    repeated_new_fitnesses: Float[Array, "num_cells batch_size"] = jnp.tile(batch_of_fitnesses, reps=(num_cells, 1))

    # condition is true if the fitness is at the right index
    # example : [[True , True , True , False],
    #            [False, False, False, True ],
    #            [False, False, False, False]]
    belongs_to_right_cell = repeated_cell_number == repeated_indices

    filtered_new_fitnesses_per_cell: Float[Array, "num_cells batch_size"] = jnp.where(
        belongs_to_right_cell,
        repeated_new_fitnesses,
        -jnp.inf,
    )

    # extend current fitnesses per cell to welcome new ones
    all_fitnesses_per_cell: Float[Array, "num_cells max_size_per_cell"] = jnp.concatenate(
        [current_fitnesses_per_cell, filtered_new_fitnesses_per_cell], axis=1
    )

    sorted_fitness_indices_per_cell: Int[Array, "num_cells max_size_per_cell"] = jnp.argsort(
        all_fitnesses_per_cell, axis=-1, descending=True
    )

    # we only want the first cell_depth elements of each cell
    cropped_best_fitness_indices_per_cell: Int[Array, "num_cells cell_depth"] = sorted_fitness_indices_per_cell[
        :, :cell_depth
    ]

    # adapt indices to select genotypes : must be laid flat with first num_cells belonging to the first depth etc
    # adapted_indices_for_selection = cropped_best_fitness_indices_per_cell.T.reshape(-1)

    # Shape genotypes per-cell to use the indices
    current_genotypes_per_cell: Int[Array, "num_cells cell_depth"] = abstract_current_genotypes.reshape(-1, num_cells).T
    repeated_batch_of_genotypes: Int[Array, "num_cells batch_size"] = (
        jnp.repeat(abstract_new_genotypes, repeats=num_cells).reshape(-1, num_cells).T
    )
    all_genotypes_per_cell: Int[Array, "num_cells max_size_per_cell"] = jnp.concatenate(
        [current_genotypes_per_cell, repeated_batch_of_genotypes],
        axis=1,  # concatenate on cells
    )

    # select the genotypes by order of fitness. If a cell doesn't belong there,
    # it will be placed last (-inf fitness) and thus cropped out
    cropped_selected_genotypes_per_cell: Int[Array, "num_cells cell_depth"] = jnp.take_along_axis(
        all_genotypes_per_cell, cropped_best_fitness_indices_per_cell, axis=1
    )

    # reshape to the original shape to apply on a concat of repertoire and new batch
    final_indices_selection: Int[Array, "repertoire_size"] = cropped_selected_genotypes_per_cell.T.reshape(
        num_cells * cell_depth
    )

    """
    Part 2 : Selection
    """

    # genotypes
    selected_genotypes = jax.tree.map(
        lambda x, y: jnp.concat([x, y])[final_indices_selection],
        repertoire.genotypes,
        batch_of_genotypes,
    )

    # fitness
    selected_fitness = jnp.concat([repertoire.fitnesses, batch_of_fitnesses])[final_indices_selection]

    # descriptors
    selected_descriptors = jnp.concat([repertoire.descriptors, batch_of_descriptors], axis=0)[final_indices_selection]

    # extra scores
    if batch_of_extra_scores is None:
        selected_extra_scores = None
    else:
        selected_extra_scores = jax.tree_util.tree_map(
            lambda x, y: jnp.concatenate([x, y], axis=0)[final_indices_selection],
            repertoire.extra_scores,
            batch_of_extra_scores,
        )

    return DiscreteMapElitesRepertoire(
        genotypes=selected_genotypes,
        fitnesses=selected_fitness,
        descriptors=selected_descriptors,
        extra_scores=selected_extra_scores,
        n_cells_per_dim=repertoire.n_cells_per_dim,
        cell_depth=cell_depth,
    )

init_repertoire #

init_repertoire(
    genotypes,
    fitnesses,
    descriptors,
    extra_scores,
    n_cells_per_dim,
    cell_depth,
)

Initialize a Map-Elites repertoire with an initial population of genotypes.

Requires the definition of centroids that can be computed with any method such as CVT or Euclidean mapping.

Note: this function has been kept outside of the object MapElites, so it can be called easily called from other modules.

Args: genotypes: initial genotypes, pytree in which leaves have shape (batch_size, num_features) fitnesses: fitness of the initial genotypes of shape (batch_size,) descriptors: descriptors of the initial genotypes of shape (batch_size, num_descriptors) extra_scores: the observed load flow metrics n_cells_per_dim: the number of cells per dimension cell_depth: the number of topologies per cell

RETURNS DESCRIPTION
an initialized MAP-Elite repertoire
Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/discrete_me_repertoire.py
def init_repertoire(
    genotypes: Genotype,
    fitnesses: Fitness,
    descriptors: Descriptor,
    extra_scores: Optional[ExtraScores],
    n_cells_per_dim: tuple[int, ...],
    cell_depth: Static[int],
) -> DiscreteMapElitesRepertoire:
    """Initialize a Map-Elites repertoire with an initial population of genotypes.

    Requires the definition of centroids that can be computed with any method
    such as CVT or Euclidean mapping.

    Note: this function has been kept outside of the object MapElites, so it can
    be called easily called from other modules.

    Args:
        genotypes: initial genotypes, pytree in which leaves
            have shape (batch_size, num_features)
        fitnesses: fitness of the initial genotypes of shape (batch_size,)
        descriptors: descriptors of the initial genotypes
            of shape (batch_size, num_descriptors)
        extra_scores: the observed load flow metrics
        n_cells_per_dim: the number of cells per dimension
        cell_depth: the number of topologies per cell

    Returns
    -------
        an initialized MAP-Elite repertoire
    """
    # retrieve one genotype from the population
    (first_genotype, first_extra_score) = jax.tree_util.tree_map(lambda x: x[0], (genotypes, extra_scores))

    # create a repertoire with default values
    repertoire = _init_default(
        genotype=first_genotype,
        extra_scores=first_extra_score,
        n_cells_per_dim=n_cells_per_dim,
        cell_depth=cell_depth,
    )

    # add initial population to the repertoire
    new_repertoire = add_to_repertoire(repertoire, genotypes, descriptors, fitnesses, extra_scores)

    return new_repertoire  # type: ignore

toop_engine_topology_optimizer.dc.repertoire.plotting #

Plotting functions for the Map-Elites algorithm.

plot_repertoire_1d #

plot_repertoire_1d(
    fitnesses, n_cells_per_dim, descriptor_metrics
)

Plot a bar chart of the repertoire's fitnesses.

PARAMETER DESCRIPTION
fitnesses

The fitnesses of the repertoire

TYPE: Fitness

n_cells_per_dim

The number of cells per dimension

TYPE: tuple[int, ...]

descriptor_metrics

The descriptor metrics

TYPE: tuple[str, ...]

RETURNS DESCRIPTION
Figure

The plot

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/plotting.py
def plot_repertoire_1d(
    fitnesses: Fitness,
    n_cells_per_dim: tuple[int, ...],
    descriptor_metrics: tuple[str, ...],
) -> plt.Figure:
    """Plot a bar chart of the repertoire's fitnesses.

    Parameters
    ----------
    fitnesses : Fitness
        The fitnesses of the repertoire
    n_cells_per_dim : tuple[int, ...]
        The number of cells per dimension
    descriptor_metrics : tuple[str, ...]
        The descriptor metrics

    Returns
    -------
    plt.Figure
        The plot
    """
    plt.bar(
        x=range(n_cells_per_dim[0]),
        height=fitnesses,
    )
    plt.xlabel(descriptor_metrics[0])
    plt.ylabel("Fitness")
    return plt

plot_repertoire_2d #

plot_repertoire_2d(
    fitnesses, n_cells_per_dim, descriptor_metrics
)

Plot a heatmap of the repertoire's fitnesses.

PARAMETER DESCRIPTION
fitnesses

The fitnesses of the repertoire

TYPE: Fitness

n_cells_per_dim

The number of cells per dimension

TYPE: tuple[int, ...]

descriptor_metrics

The descriptor metrics

TYPE: tuple[str, ...]

RETURNS DESCRIPTION
Axes

The plot

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/plotting.py
def plot_repertoire_2d(
    fitnesses: Fitness,
    n_cells_per_dim: tuple[int, ...],
    descriptor_metrics: tuple[str, ...],
) -> Axes:
    """Plot a heatmap of the repertoire's fitnesses.

    Parameters
    ----------
    fitnesses : Fitness
        The fitnesses of the repertoire
    n_cells_per_dim : tuple[int, ...]
        The number of cells per dimension
    descriptor_metrics : tuple[str, ...]
        The descriptor metrics

    Returns
    -------
    Axes
        The plot
    """
    reshaped_fitnesses = fitnesses.reshape(n_cells_per_dim)
    ax = sns.heatmap(
        reshaped_fitnesses,
        cbar_kws={"label": "Fitness"},
        linewidths=0.01,
        linecolor=(0.3, 0.3, 0.3, 0.3),
    )  # vmin=-15000, vmax=-5000
    ax.invert_yaxis()
    ax.set_xlabel(descriptor_metrics[1])  # first axis is vertical for some reason
    ax.set_ylabel(descriptor_metrics[0])
    return ax

plot_repertoire #

plot_repertoire(
    fitnesses,
    iteration,
    folder,
    n_cells_per_dim,
    descriptor_metrics,
    save_plot,
)

Plot the repertoire (1D or 2D) and saves the figure.

PARAMETER DESCRIPTION
fitnesses

The fitnesses of the repertoire

TYPE: Fitness

iteration

The current iteration number

TYPE: Optional[int]

folder

The folder to save the plot. Will create a "plots" folder inside.

TYPE: str

n_cells_per_dim

The number of cells per dimension

TYPE: tuple[int, ...]

descriptor_metrics

The descriptor metrics

TYPE: tuple[str, ...]

save_plot

Whether to save the plot

TYPE: bool

RETURNS DESCRIPTION
Axes | Figure

The plot. Axes for heatmap, plt.Figure for barchart.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/repertoire/plotting.py
def plot_repertoire(
    fitnesses: Fitness,
    iteration: Optional[int],
    folder: Optional[str],
    n_cells_per_dim: tuple[int, ...],
    descriptor_metrics: tuple[str, ...],
    save_plot: bool,
) -> Axes | plt.Figure:
    """Plot the repertoire (1D or 2D) and saves the figure.

    Parameters
    ----------
    fitnesses : Fitness
        The fitnesses of the repertoire
    iteration : Optional[int]
        The current iteration number
    folder : str
        The folder to save the plot. Will create a "plots" folder inside.
    n_cells_per_dim : tuple[int, ...]
        The number of cells per dimension
    descriptor_metrics : tuple[str, ...]
        The descriptor metrics
    save_plot : bool
        Whether to save the plot

    Returns
    -------
    Axes | plt.Figure
        The plot. Axes for heatmap, plt.Figure for barchart.
    """
    plt.clf()  # clear or it plots on top of itself

    # Prepare fitnesses : convert neginf to minimum fitness * weight
    weight = 1.05  # Any number greater than 1, can fine-tune to your liking
    minimum_fitness = jnp.min(fitnesses, where=jnp.isfinite(fitnesses), initial=0)
    fitnesses = jnp.nan_to_num(fitnesses, neginf=weight * minimum_fitness)

    # plot in N dimensions
    if len(n_cells_per_dim) == 1:  # can replace with plotting_fn = plot_repertoire_1d
        plot = plot_repertoire_1d(fitnesses, n_cells_per_dim, descriptor_metrics)
    elif len(n_cells_per_dim) == 2:
        plot = plot_repertoire_2d(fitnesses, n_cells_per_dim, descriptor_metrics)
    else:
        raise ValueError(
            "Only 1D and 2D plots are supported"
        )  # TODO : 3D heatmaps https://www.geeksforgeeks.org/3d-heatmap-in-python/

    # add iteration to the title
    if iteration is not None:
        plt.title(f"Map-Elites repertoire iteration {iteration}")
    else:
        plt.title("Map-Elites repertoire")

    # save the figure
    if save_plot:
        folder = os.path.join(folder, "plots")
        os.makedirs(folder, exist_ok=True)
        filename = "repertoire.png" if iteration is None else f"repertoire_{iteration}.png"
        plt.savefig(os.path.join(folder, filename))
    return plot

Worker#

toop_engine_topology_optimizer.dc.worker.optimizer #

Callable functions for the optimizer worker.

OptimizerData dataclass #

OptimizerData(
    start_params,
    optimization_id,
    solver_configs,
    algo,
    initial_fitness,
    initial_metrics,
    jax_data,
    start_time,
)

A wrapper dataclass for all the data stored by the optimizer.

Because this dataclass holds irrelevant information for the GPU optimization, it is split into two dataclasses. The parent (this one) is used to store all the information and the child (JaxOptimizerData) is used to store the information that is needed on the GPU.

start_params instance-attribute #

start_params

The initial args for the optimization run

optimization_id instance-attribute #

optimization_id

The id of the optimization run

solver_configs instance-attribute #

solver_configs

The solver config for every timestep

algo instance-attribute #

algo

The genetic algorithm object

initial_fitness instance-attribute #

initial_fitness

The initial fitness value

initial_metrics instance-attribute #

initial_metrics

The initial metrics

jax_data instance-attribute #

jax_data

Everything that needs to live on GPU. This dataclass is updated per-iteration while OptimizerData is only updated per-epoch, hence there are points in the command flow where this variable is out of sync. At the end of an epoch, this dataclass is updated to match the state in the optimization.

start_time instance-attribute #

start_time

The time the optimization run started

initialize_optimization #

initialize_optimization(
    params,
    optimization_id,
    static_information_files,
    processed_gridfile_fs,
)

Initialize the optimization run.

This function will be called at the start of the optimization run. It should be used to load the static information files and set up the optimization run.

PARAMETER DESCRIPTION
params

The parameters for the optimization run

TYPE: DCOptimizerParameters

optimization_id

The id of the optimization run, used to annotate results and heartbeats

TYPE: str

static_information_files

The paths to the static information files to load

TYPE: tuple[str, ...]

processed_gridfile_fs

The target filesystem for the preprocessing worker. This contains all processed grid files. During the import job, a new folder import_results.data_folder was created which will be completed with the preprocess call to this function. Internally, only the data folder is passed around as a dirfs. Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the unprocessed gridfiles were already done.

TYPE: AbstractFileSystem

RETURNS DESCRIPTION
OptimizerData

The data to store for the optimization run

list[StaticInformationStats]

The static information descriptions, will be sent via the heartbeats channel

Strategy

The initial strategy (unsplit) for the grid, including the initial fitness and metrics

RAISES DESCRIPTION
Exception

If an error occurs during the initialization. It will be caught by the worker and sent back to the results topic

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/worker/optimizer.py
def initialize_optimization(
    params: DCOptimizerParameters,
    optimization_id: str,
    static_information_files: tuple[str, ...],
    processed_gridfile_fs: AbstractFileSystem,
) -> tuple[OptimizerData, list[StaticInformationStats], Strategy]:
    """Initialize the optimization run.

    This function will be called at the start of the optimization run. It should be used to load
    the static information files and set up the optimization run.

    Parameters
    ----------
    params : DCOptimizerParameters
        The parameters for the optimization run
    optimization_id : str
        The id of the optimization run, used to annotate results and heartbeats
    static_information_files : tuple[str, ...]
        The paths to the static information files to load
    processed_gridfile_fs: AbstractFileSystem
        The target filesystem for the preprocessing worker. This contains all processed grid files.
        During the import job,  a new folder import_results.data_folder was created
        which will be completed with the preprocess call to this function.
        Internally, only the data folder is passed around as a dirfs.
        Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the
        unprocessed gridfiles were already done.


    Returns
    -------
    OptimizerData
        The data to store for the optimization run
    list[StaticInformationStats]
        The static information descriptions, will be sent via the heartbeats channel
    Strategy
        The initial strategy (unsplit) for the grid, including the initial fitness and metrics

    Raises
    ------
    Exception
        If an error occurs during the initialization. It will be caught by the worker and sent back to
        the results topic
    """
    (
        algo,
        jax_data,
        solver_configs,
        initial_fitness,
        initial_metrics,
        static_information_descriptions,
    ) = algo_setup(
        ga_args=params.ga_config,
        lf_args=params.loadflow_solver_config,
        double_limits=(
            params.double_limits.lower,
            params.double_limits.upper,
        )
        if params.double_limits is not None
        else None,
        static_information_files=static_information_files,
        processed_gridfile_fs=processed_gridfile_fs,
    )

    metrics = convert_metrics(initial_fitness, initial_metrics)

    initial_topology = Strategy(
        timesteps=[
            Topology(
                actions=[],
                disconnections=[],
                pst_setpoints=None,
                metrics=metrics,
            )
            for _ in static_information_descriptions
        ]
    )

    optimization_data = OptimizerData(
        start_params=params,
        optimization_id=optimization_id,
        solver_configs=solver_configs,
        algo=algo,
        jax_data=jax_data,
        initial_fitness=initial_fitness,
        initial_metrics=initial_metrics,
        start_time=time.time(),
    )

    return optimization_data, static_information_descriptions, initial_topology

convert_metrics #

convert_metrics(fitness, metrics_dict)

Convert a metrics dictionary to a Metrics dataclass.

PARAMETER DESCRIPTION
fitness

The fitness value

TYPE: float

metrics_dict

The metrics dictionary

TYPE: dict[MetricType, float]

RETURNS DESCRIPTION
Metrics

The metrics dataclass

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/worker/optimizer.py
def convert_metrics(fitness: float, metrics_dict: dict[MetricType, float]) -> Metrics:
    """Convert a metrics dictionary to a Metrics dataclass.

    Parameters
    ----------
    fitness : float
        The fitness value

    metrics_dict : dict[MetricType, float]
        The metrics dictionary

    Returns
    -------
    Metrics
        The metrics dataclass
    """
    case_indices = metrics_dict.pop("case_indices", None)
    metrics = Metrics(
        fitness=fitness,
        extra_scores=metrics_dict,
        worst_k_contingency_cases=case_indices,
    )

    return metrics

run_single_iteration #

run_single_iteration(_i, jax_data, update_fn)

Run a single iteration of the optimization.

This involves updating the genetic algorithm and calling the metrics callback

PARAMETER DESCRIPTION
_i

The iteration number, will be ignored. Its only purpose is to make the function signature compatible with lax.fori_loop

TYPE: Int[Array, '']

jax_data

The data stored for the optimization run from the last epoch or from initialize_optimization

TYPE: JaxOptimizerData

update_fn
1
2
3
            [GARepertoire, EmitterState, jax.random.PRNGKey, Any],
            tuple[GARepertoire, EmitterState, Any, jax.random.PRNGKey]
        )]

The update function of the genetic algorithm

TYPE: Callable[(

RETURNS DESCRIPTION
JaxOptimizerData

The updated data for the optimization run

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/worker/optimizer.py
@partial(
    jax.jit,
    static_argnames=("update_fn"),
    donate_argnames=("jax_data",),
)
def run_single_iteration(
    _i: Int[Array, ""],
    jax_data: JaxOptimizerData,
    update_fn: Callable[
        [DiscreteMapElitesRepertoire, EmitterState, jax.random.PRNGKey, Any],
        tuple[DiscreteMapElitesRepertoire, EmitterState, Any, jax.random.PRNGKey],
    ],
) -> JaxOptimizerData:
    """Run a single iteration of the optimization.

    This involves updating the genetic algorithm and calling the metrics callback

    Parameters
    ----------
    _i : Int[Array, ""]
        The iteration number, will be ignored. Its only purpose is to make the function signature
        compatible with lax.fori_loop
    jax_data : JaxOptimizerData
        The data stored for the optimization run from the last epoch or from initialize_optimization
    update_fn : Callable[(
                        [GARepertoire, EmitterState, jax.random.PRNGKey, Any],
                        tuple[GARepertoire, EmitterState, Any, jax.random.PRNGKey]
                    )]
        The update function of the genetic algorithm

    Returns
    -------
    JaxOptimizerData
        The updated data for the optimization run
    """
    repertoire, emitter_state, _metrics, random_key = update_fn(
        jax_data.repertoire,
        jax_data.emitter_state,
        jax_data.random_key,
        jax_data.dynamic_informations,
    )

    jax_data = replace(
        jax_data,
        repertoire=repertoire,
        emitter_state=emitter_state,
        random_key=random_key,
        latest_iteration=jax_data.latest_iteration + 1,
    )

    return jax_data

run_single_device_epoch #

run_single_device_epoch(
    jax_data, iterations_per_epoch, update_fn
)

Run one epoch of the optimization on a single device.

Basically this is just a fori loop over the iterations_per_epoch, calling run_single_iteration This can be used to pass to pmap

PARAMETER DESCRIPTION
jax_data

The data stored for the optimization run from the last epoch or from initialize_optimization

TYPE: JaxOptimizerData

iterations_per_epoch

The number of iterations to run in this epoch

TYPE: int

update_fn
1
2
3
            [GARepertoire, EmitterState, jax.random.PRNGKey, Any],
            tuple[GARepertoire, EmitterState, Any, jax.random.PRNGKey]
        )]

The update function of the genetic algorithm

TYPE: Callable[(

RETURNS DESCRIPTION
JaxOptimizerData

The updated data for the optimization run

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/worker/optimizer.py
@partial(
    jax.jit,
    static_argnames=("iterations_per_epoch", "update_fn"),
    # donate_argnames=("jax_data",),
)
def run_single_device_epoch(
    jax_data: JaxOptimizerData,
    iterations_per_epoch: int,
    update_fn: Callable[
        [DiscreteMapElitesRepertoire, EmitterState, jax.random.PRNGKey, Any],
        tuple[DiscreteMapElitesRepertoire, EmitterState, Any, jax.random.PRNGKey],
    ],
) -> JaxOptimizerData:
    """Run one epoch of the optimization on a single device.

    Basically this is just a fori loop over the iterations_per_epoch, calling run_single_iteration
    This can be used to pass to pmap

    Parameters
    ----------
    jax_data : JaxOptimizerData
        The data stored for the optimization run from the last epoch or from initialize_optimization
    iterations_per_epoch : int
        The number of iterations to run in this epoch
    update_fn : Callable[(
                        [GARepertoire, EmitterState, jax.random.PRNGKey, Any],
                        tuple[GARepertoire, EmitterState, Any, jax.random.PRNGKey]
                    )]
        The update function of the genetic algorithm

    Returns
    -------
    JaxOptimizerData
        The updated data for the optimization run
    """
    return lax.fori_loop(
        0,
        iterations_per_epoch,
        partial(run_single_iteration, update_fn=update_fn),
        jax_data,
    )

run_epoch #

run_epoch(optimizer_data)

Run one epoch of the optimization.

This function will be called repeatedly by the worker. It should include multiple iterations, according to the configuration of the optimizer. Furthermore it should call the metrics_callback during the epoch, at least at the beginning. This could happen through a jax callback to not block the main thread.

PARAMETER DESCRIPTION
optimizer_data

The data stored for the optimization run from the last epoch or from initialize_optimization

TYPE: OptimizerData

RETURNS DESCRIPTION
OptimizerData

The updated data for the optimization run

RAISES DESCRIPTION
Exception

If an error occurs during the epoch. It will be caught by the worker and sent back to the results topic

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/worker/optimizer.py
def run_epoch(
    optimizer_data: OptimizerData,
) -> OptimizerData:
    """Run one epoch of the optimization.

    This function will be called repeatedly by the worker. It should include multiple iterations,
    according to the configuration of the optimizer. Furthermore it should call the metrics_callback
    during the epoch, at least at the beginning. This could happen through a jax callback to not
    block the main thread.

    Parameters
    ----------
    optimizer_data : OptimizerData
        The data stored for the optimization run from the last epoch or from initialize_optimization

    Returns
    -------
    OptimizerData
        The updated data for the optimization run

    Raises
    ------
    Exception
        If an error occurs during the epoch. It will be caught by the worker and sent back to the
        results topic
    """
    epoch_fn = run_single_device_epoch
    if optimizer_data.start_params.loadflow_solver_config.distributed:
        epoch_fn = jax.pmap(
            epoch_fn,
            axis_name="p",
            static_broadcasted_argnums=(1, 2),
            donate_argnums=(0,),
        )

    jax_data = epoch_fn(
        optimizer_data.jax_data,
        optimizer_data.start_params.ga_config.iterations_per_epoch,
        optimizer_data.algo.update,
    )

    return replace(
        optimizer_data,
        jax_data=jax_data,
    )

extract_results #

extract_results(optimizer_data)

Pull the results from the optimizer.

This should give the current best topologies along with metrics for each topology, where the number of saved topologies is a configuration parameter in the StartOptimizationCommand.

PARAMETER DESCRIPTION
optimizer_data

The data stored for the optimization run

TYPE: OptimizerData

RETURNS DESCRIPTION
TopologyPushResult

The current best topologies extracted and in topo-vect form.

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/worker/optimizer.py
def extract_results(optimizer_data: OptimizerData) -> TopologyPushResult:
    """Pull the results from the optimizer.

    This should give the current best topologies along with metrics for each topology, where the
    number of saved topologies is a configuration parameter in the StartOptimizationCommand.

    Parameters
    ----------
    optimizer_data : OptimizerData
        The data stored for the optimization run

    Returns
    -------
    TopologyPushResult
        The current best topologies extracted and in topo-vect form.
    """
    # Assuming that contingency_ids stay the same for all timesteps
    contingency_ids = optimizer_data.solver_configs[0].contingency_ids

    # Get grid_model_low_tap if nodal injection information is available
    nodal_inj_info = optimizer_data.jax_data.dynamic_informations[0].nodal_injection_information
    grid_model_low_tap = nodal_inj_info.grid_model_low_tap if nodal_inj_info is not None else None

    topologies = summarize_repo(
        optimizer_data.jax_data.repertoire,
        initial_fitness=optimizer_data.initial_fitness,
        contingency_ids=contingency_ids,
        grid_model_low_tap=grid_model_low_tap,
    )

    # Convert it to strategies
    # TODO change once we support multiple timesteps
    formatted_topologies = [Strategy(timesteps=[topo]) for topo in topologies]

    return TopologyPushResult(strategies=formatted_topologies, epoch=optimizer_data.jax_data.latest_iteration.max().item())

toop_engine_topology_optimizer.dc.worker.worker #

Kafka worker for the genetic algorithm optimization.

logger module-attribute #

logger = Logger(__name__)

Args #

Bases: BaseModel

Launch arguments for the worker, which can not be changed during the optimization run.

kafka_broker class-attribute instance-attribute #

kafka_broker = 'localhost:9092'

The Kafka broker to connect to.

optimizer_command_topic class-attribute instance-attribute #

optimizer_command_topic = 'commands'

The Kafka topic to listen for commands on.

optimizer_results_topic class-attribute instance-attribute #

optimizer_results_topic = 'results'

The topic to push results to.

optimizer_heartbeat_topic class-attribute instance-attribute #

optimizer_heartbeat_topic = 'heartbeat'

The topic to push heartbeats to.

heartbeat_interval_ms class-attribute instance-attribute #

heartbeat_interval_ms = 1000

The interval in milliseconds to send heartbeats.

idle_loop #

idle_loop(
    consumer, send_heartbeat_fn, heartbeat_interval_ms
)

Run idle loop of the worker.

This will be running when the worker is currently not optimizing This will wait until a StartOptimizationCommand is received and return it. In case a ShutdownCommand is received, the worker will exit with the exit code provided in the command.

PARAMETER DESCRIPTION
consumer

The initialized Kafka consumer to listen for commands on.

TYPE: LongRunningKafkaConsumer

send_heartbeat_fn

A function to call when there were no messages received for a while.

TYPE: Callable[[HeartbeatUnion], None]

heartbeat_interval_ms

The time to wait for a new command in milliseconds. If no command has been received, a heartbeat will be sent and then the receiver will wait for commands again.

TYPE: int

RETURNS DESCRIPTION
StartOptimizationCommand

The start optimization command to start the optimization run with

RAISES DESCRIPTION
SystemExit

If a ShutdownCommand is received

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/worker/worker.py
def idle_loop(
    consumer: LongRunningKafkaConsumer,
    send_heartbeat_fn: Callable[[HeartbeatUnion], None],
    heartbeat_interval_ms: int,
) -> StartOptimizationCommand:
    """Run idle loop of the worker.

    This will be running when the worker is currently not optimizing
    This will wait until a StartOptimizationCommand is received and return it. In case a
    ShutdownCommand is received, the worker will exit with the exit code provided in the command.

    Parameters
    ----------
    consumer : LongRunningKafkaConsumer
        The initialized Kafka consumer to listen for commands on.
    send_heartbeat_fn : Callable[[HeartbeatUnion], None]
        A function to call when there were no messages received for a while.
    heartbeat_interval_ms : int
        The time to wait for a new command in milliseconds. If no command has been received, a
        heartbeat will be sent and then the receiver will wait for commands again.

    Returns
    -------
    StartOptimizationCommand
        The start optimization command to start the optimization run with

    Raises
    ------
    SystemExit
        If a ShutdownCommand is received
    """
    send_heartbeat_fn(IdleHeartbeat())
    logger.info("Entering idle loop")
    while True:
        message = consumer.poll(timeout=heartbeat_interval_ms / 1000)

        # Wait timeout exceeded
        if not message:
            send_heartbeat_fn(IdleHeartbeat())
            continue

        command = Command.model_validate_json(deserialize_message(message.value()))

        if isinstance(command.command, StartOptimizationCommand):
            return command.command

        if isinstance(command.command, ShutdownCommand):
            logger.info("Shutting down due to ShutdownCommand")
            consumer.commit()
            consumer.consumer.close()
            raise SystemExit(command.command.exit_code)

        # If we are here, we received a command that we do not know
        logger.warning(f"Received unknown command, dropping: {command} / {message.value}")
        consumer.commit()

optimization_loop #

optimization_loop(
    dc_params,
    grid_files,
    send_result_fn,
    send_heartbeat_fn,
    optimization_id,
    processed_gridfile_fs,
)

Run an optimization until the optimization has converged

PARAMETER DESCRIPTION
dc_params

The parameters for the optimization run, usually from the start command

TYPE: DCOptimizerParameters

grid_files

The grid files to load, where each gridfile represents one timestep.

TYPE: list[GridFile]

send_result_fn

A function to call to send results back to the results topic.

TYPE: Callable[[ResultUnion], None]

send_heartbeat_fn

A function to call after every epoch to signal that the worker is still alive.

TYPE: Callable[[HeartbeatUnion], None]

optimization_id

The id of the optimization run. This will be used to identify the optimization run in the results. Should stay the same for the whole optimization run and should be equal to the kafka event key.

TYPE: str

processed_gridfile_fs

The target filesystem for the preprocessing worker. This contains all processed grid files. During the import job, a new folder import_results.data_folder was created which will be completed with the preprocess call to this function. Internally, only the data folder is passed around as a dirfs. Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the unprocessed gridfiles were already done.

TYPE: AbstractFileSystem

RAISES DESCRIPTION
SystemExit

If a ShutdownCommand is received

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/worker/worker.py
def optimization_loop(
    dc_params: DCOptimizerParameters,
    grid_files: list[GridFile],
    send_result_fn: Callable[[ResultUnion], None],
    send_heartbeat_fn: Callable[[HeartbeatUnion], None],
    optimization_id: str,
    processed_gridfile_fs: AbstractFileSystem,
) -> None:
    """Run an optimization until the optimization has converged

    Parameters
    ----------
    dc_params : DCOptimizerParameters
        The parameters for the optimization run, usually from the start command
    grid_files : list[GridFile]
        The grid files to load, where each gridfile represents one timestep.
    send_result_fn : Callable[[ResultUnion], None]
        A function to call to send results back to the results topic.
    send_heartbeat_fn : Callable[[HeartbeatUnion], None]
        A function to call after every epoch to signal that the worker is still alive.
    optimization_id : str
        The id of the optimization run. This will be used to identify the optimization run in the
        results. Should stay the same for the whole optimization run and should be equal to the kafka
        event key.
    processed_gridfile_fs: AbstractFileSystem
        The target filesystem for the preprocessing worker. This contains all processed grid files.
        During the import job,  a new folder import_results.data_folder was created
        which will be completed with the preprocess call to this function.
        Internally, only the data folder is passed around as a dirfs.
        Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the
        unprocessed gridfiles were already done.

    Raises
    ------
    SystemExit
        If a ShutdownCommand is received
    """
    logger.info(f"Initializing DC optimization {optimization_id}")
    try:
        send_heartbeat_fn(
            OptimizationStartedHeartbeat(
                optimization_id=optimization_id,
            )
        )
        optimizer_data, stats, initial_strategy = initialize_optimization(
            params=dc_params,
            optimization_id=optimization_id,
            static_information_files=[gf.static_information_file for gf in grid_files],
            processed_gridfile_fs=processed_gridfile_fs,
        )
        send_result_fn(
            OptimizationStartedResult(
                initial_topology=initial_strategy,
                initial_stats=stats,
            )
        )

    except Exception as e:
        send_result_fn(OptimizationStoppedResult(reason="error", message=str(e)))
        logger.error(f"Error during initialization of optimization {optimization_id}: {e}")
        return

    def push_topologies(optimizer_data: OptimizerData) -> None:
        """Push topologies to the results topic."""
        with jax.default_device(jax.devices("cpu")[0]):
            push_result = extract_results(optimizer_data)
            if len(push_result.strategies):
                send_result_fn(push_result)
                best_fitness = max(
                    timestep.metrics.fitness for strategy in push_result.strategies for timestep in strategy.timesteps
                )
                logger.info(
                    f"Sent {len(push_result.strategies)} strategies with best fitness {best_fitness} to results topic"
                )
            else:
                logger.warning("No strategies extracted, skipping push.")

    logger.info(f"Starting optimization {optimization_id}")
    epoch = 0
    running = True
    start_time = time.time()
    while running:
        try:
            optimizer_data = run_epoch(optimizer_data)
            push_topologies(optimizer_data)
        except Exception as e:
            # Send a stop message to the results
            send_result_fn(OptimizationStoppedResult(reason="error", message=str(e)))

            logger.error(f"Error during optimization {optimization_id}, epoch {epoch}: {e}")
            return
        epoch += 1

        if time.time() - start_time > dc_params.ga_config.runtime_seconds:
            logger.info(f"Stopping optimization {optimization_id} at epoch {epoch} due to runtime limit")
            send_result_fn(OptimizationStoppedResult(epoch=epoch, reason="converged", message="runtime limit"))
            running = False
            break

        send_heartbeat_fn(
            OptimizationStatsHeartbeat(
                optimization_id=optimization_id,
                wall_time=time.time() - start_time,
                iteration=epoch,
                num_branch_topologies_tried=optimizer_data.jax_data.emitter_state["total_branch_combis"].sum().item(),
                num_injection_topologies_tried=optimizer_data.jax_data.emitter_state["total_inj_combis"].sum().item(),
            )
        )

main #

main(
    args, processed_gridfile_fs, producer, command_consumer
)

Run the main DC worker loop.

PARAMETER DESCRIPTION
args

The command line arguments

TYPE: Args

processed_gridfile_fs

The target filesystem for the preprocessing worker. This contains all processed grid files. During the import job, a new folder import_results.data_folder was created which will be completed with the preprocess call to this function. Internally, only the data folder is passed around as a dirfs. Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the unprocessed gridfiles were already done.

TYPE: AbstractFileSystem

producer

The Kafka producer to send results and heartbeats with.

TYPE: Producer

command_consumer

The Kafka consumer to receive commands with.

TYPE: LongRunningKafkaConsumer

RAISES DESCRIPTION
SystemExit

If the worker receives a ShutdownCommand

Source code in packages/topology_optimizer_pkg/src/toop_engine_topology_optimizer/dc/worker/worker.py
def main(
    args: Args,
    processed_gridfile_fs: AbstractFileSystem,
    producer: Producer,
    command_consumer: LongRunningKafkaConsumer,
) -> None:
    """Run the main DC worker loop.

    Parameters
    ----------
    args : Args
        The command line arguments
    processed_gridfile_fs: AbstractFileSystem
        The target filesystem for the preprocessing worker. This contains all processed grid files.
        During the import job,  a new folder import_results.data_folder was created
        which will be completed with the preprocess call to this function.
        Internally, only the data folder is passed around as a dirfs.
        Note that the unprocessed_gridfile_fs is not needed here anymore, as all preprocessing steps that need the
        unprocessed gridfiles were already done.
    producer : Producer
        The Kafka producer to send results and heartbeats with.
    command_consumer : LongRunningKafkaConsumer
        The Kafka consumer to receive commands with.

    Raises
    ------
    SystemExit
        If the worker receives a ShutdownCommand
    """
    instance_id = str(uuid4())
    logger.info(f"Starting DC worker {instance_id} with config {args}")
    jax.config.update("jax_enable_x64", True)
    jax.config.update("jax_logging_level", "INFO")

    def send_heartbeat(message: HeartbeatUnion, ping_consumer: bool) -> None:
        heartbeat = Heartbeat(
            optimizer_type=OptimizerType.DC,
            instance_id=instance_id,
            message=message,
        )
        producer.produce(
            args.optimizer_heartbeat_topic,
            value=serialize_message(heartbeat.model_dump_json()),
            key=heartbeat.instance_id.encode(),
        )
        producer.flush()
        if ping_consumer:
            command_consumer.heartbeat()

    def send_result(message: ResultUnion, optimization_id: str) -> None:
        result = Result(
            result=message,
            optimization_id=optimization_id,
            optimizer_type=OptimizerType.DC,
            instance_id=instance_id,
        )
        producer.produce(
            args.optimizer_results_topic,
            value=serialize_message(result.model_dump_json()),
            key=optimization_id.encode(),
        )
        producer.flush()

    while True:
        command = idle_loop(
            consumer=command_consumer,
            send_heartbeat_fn=partial(send_heartbeat, ping_consumer=False),
            heartbeat_interval_ms=args.heartbeat_interval_ms,
        )
        command_consumer.start_processing()
        optimization_loop(
            dc_params=command.dc_params,
            grid_files=command.grid_files,
            send_result_fn=partial(send_result, optimization_id=command.optimization_id),
            send_heartbeat_fn=partial(send_heartbeat, ping_consumer=True),
            optimization_id=command.optimization_id,
            processed_gridfile_fs=processed_gridfile_fs,
        )
        command_consumer.stop_processing()