Skip to content

Contingency Analysis#

AC Loadflow Service#

toop_engine_contingency_analysis.ac_loadflow_service.ac_loadflow_service #

Get the results of the AC Contingency Analysis for the given network

get_ac_loadflow_results #

get_ac_loadflow_results(
    net,
    n_minus_1_definition,
    timestep=0,
    job_id="",
    n_processes=1,
    batch_size=None,
    lf_params=None,
)

Get the results of the AC loadflow for the given network

PARAMETER DESCRIPTION
net

The network to run the contingency analysis on

TYPE: pandapowerNet | Network

n_minus_1_definition

The N-1 definition to use for the contingency analysis. Contains outages and monitored elements

TYPE: Nminus1Definition

timestep

The timestep of the results. Used to identify the results in the database

TYPE: int DEFAULT: 0

job_id

The job id of the current job

TYPE: str DEFAULT: ''

n_processes

The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process

TYPE: int DEFAULT: 1

batch_size

The size of the batches to use for the parallelization. This is ignored for Powsybl at the moment. If None, the batch size is computed based on the number of contingencies and the number of processes.

TYPE: Optional[int] DEFAULT: None

lf_params

Loadflow parameters to use for the computation. dict for pandapower, pypowsybl.loadflow.Parameters for powsybl. If None, default parameters are used.

TYPE: Parameters | dict | None DEFAULT: None

RETURNS DESCRIPTION
LoadflowResultsPolars

The results of the Contingency analysis

RAISES DESCRIPTION
ValueError

If the network is not a PandapowerNetwork or PowsyblNetwork

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/ac_loadflow_service.py
def get_ac_loadflow_results(
    net: PandapowerNetwork | PowsyblNetwork,
    n_minus_1_definition: Nminus1Definition,
    timestep: int = 0,
    job_id: str = "",
    n_processes: int = 1,
    batch_size: Optional[int] = None,
    lf_params: pypowsybl.loadflow.Parameters | dict | None = None,
) -> LoadflowResultsPolars:
    """Get the results of the AC loadflow for the given network

    Parameters
    ----------
    net : pp.pandapowerNet | pypowsybl.network.Network
        The network to run the contingency analysis on
    n_minus_1_definition: Nminus1Definition
        The N-1 definition to use for the contingency analysis. Contains outages and monitored elements
    timestep: int, default=0
        The timestep of the results. Used to identify the results in the database
    job_id: str, default=""
        The job id of the current job
    n_processes: int, default=1
        The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.
        If > 1, the analysis is run in parallel
        Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process
    batch_size: int, optional
        The size of the batches to use for the parallelization.
        This is ignored for Powsybl at the moment.
        If None, the batch size is computed based on the number of contingencies and the number of processes.
    lf_params: pypowsybl.loadflow.Parameters or dict, optional
        Loadflow parameters to use for the computation.
        dict for pandapower, pypowsybl.loadflow.Parameters for powsybl. If None, default parameters are used.

    Returns
    -------
    LoadflowResultsPolars
        The results of the Contingency analysis

    Raises
    ------
    ValueError
        If the network is not a PandapowerNetwork or PowsyblNetwork
    """
    if isinstance(net, PandapowerNetwork):
        cfg = ContingencyAnalysisConfig(
            runpp_kwargs=lf_params if isinstance(lf_params, dict) else None,
            method="ac",
            polars=True,
            parallel=ParallelConfig(
                n_processes=n_processes,
                batch_size=batch_size,
            ),
        )
        lf_results = run_contingency_analysis_pandapower(
            net,
            n_minus_1_definition,
            job_id,
            timestep,
            cfg=cfg,
        )
    elif isinstance(net, PowsyblNetwork):
        lf_results = run_contingency_analysis_powsybl(
            net,
            n_minus_1_definition,
            job_id,
            timestep,
            n_processes=n_processes,
            method="ac",
            polars=True,
            lf_params=lf_params if isinstance(lf_params, pypowsybl.loadflow.Parameters) else None,
        )
    else:
        raise ValueError("net must be a pandapowerNet or powsybl network")

    return lf_results

toop_engine_contingency_analysis.ac_loadflow_service.compute_metrics #

Provides functions to compute the metrics directly from the results dataframes.

This is similar to jax.aggregate_results.py but straight on the results dataframes.

compute_overload_column #

compute_overload_column(branch_results, field='i')

Compute the overload column for further aggregation.

This is just a max operation

PARAMETER DESCRIPTION
branch_results

The branch results dataframe containing the loading information.

TYPE: LazyFrame[BranchResultSchemaPolars]

field

The field to use for the overload calculation, either "p" for power or "i" for current, by default "i".

TYPE: Literal[p, i] DEFAULT: 'i'

branch_results_with_overload : patpl.LazyFrame
1
The branch results dataframe with an additional "overload" column.
Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
def compute_overload_column(
    branch_results: patpl.LazyFrame[BranchResultSchemaPolars], field: Literal["p", "i"] = "i"
) -> pl.LazyFrame:
    """Compute the overload column for further aggregation.

    This is just a max operation

    Parameters
    ----------
    branch_results : patpl.LazyFrame[BranchResultSchemaPolars]
        The branch results dataframe containing the loading information.
    field : Literal["p", "i"], optional
        The field to use for the overload calculation, either "p" for power or "i" for current, by default "i".

    branch_results_with_overload : patpl.LazyFrame
    -------
        The branch results dataframe with an additional "overload" column.
    """
    branch_results_with_overload = branch_results.with_columns(
        _val_max=(pl.col(field) / pl.col("loading")).abs(),
    ).with_columns(
        overload=(pl.col(field).abs() - pl.col("_val_max")),
    )
    return branch_results_with_overload

compute_max_load #

compute_max_load(branch_results)

Compute the highest loading of the branches in the results.

This is just a max operation

PARAMETER DESCRIPTION
branch_results

The branch results dataframe containing the loading information.

TYPE: LazyFrame[BranchResultSchemaPolars]

RETURNS DESCRIPTION
float | None

The maximum loading in factor of maximum rated current (percent / 100) of any branch in the results. None if the loading column is missing or if there are no valid loading values (e.g., all NaN).

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
@pa.check_types
def compute_max_load(branch_results: patpl.LazyFrame[BranchResultSchemaPolars]) -> float | None:
    """Compute the highest loading of the branches in the results.

    This is just a max operation

    Parameters
    ----------
    branch_results : patpl.LazyFrame[BranchResultSchemaPolars]
        The branch results dataframe containing the loading information.

    Returns
    -------
    float | None
        The maximum loading in factor of maximum rated current (percent / 100) of any branch in the results.
        None if the loading column is missing or if there are no valid loading values (e.g., all NaN).
    """
    max_loading = branch_results.select(pl.col("loading").max()).collect().item()
    return max_loading

compute_overload_energy #

compute_overload_energy(branch_results, field='i')

Compute the maximum overload current of the branches in the results.

This is just a max operation

PARAMETER DESCRIPTION
branch_results

The branch results dataframe containing the loading information.

TYPE: LazyFrame[BranchResultSchemaPolars]

field

The field to use for the overload calculation, either "p" for power or "i" for current, by default "i".

TYPE: Literal[p, i] DEFAULT: 'i'

RETURNS DESCRIPTION
float | None

The maximum overload total current or power, or None if there are no valid overload values.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
@pa.check_types
def compute_overload_energy(
    branch_results: patpl.LazyFrame[BranchResultSchemaPolars], field: Literal["p", "i"] = "i"
) -> float | None:
    """Compute the maximum overload current of the branches in the results.

    This is just a max operation

    Parameters
    ----------
    branch_results : patpl.LazyFrame[BranchResultSchemaPolars]
        The branch results dataframe containing the loading information.
    field : Literal["p", "i"], optional
        The field to use for the overload calculation, either "p" for power or "i" for current, by default "i".

    Returns
    -------
    float | None
        The maximum overload total current or power, or None if there are no valid overload values.
    """
    branch_results_with_overload = compute_overload_column(branch_results, field=field)
    overload = (
        branch_results_with_overload.select("timestep", "element", "overload")
        .drop_nans("overload")
        .filter(pl.col("overload") > 0)
        .group_by(["timestep", "element"])
        .agg(pl.max("overload").alias("overload"))
        .drop_nans("overload")
        .select(pl.col("overload").sum())
        .collect()
        .item()
    )

    return overload

count_critical_branches #

count_critical_branches(
    branch_results, critical_threshold=1.0
)

Count how many branches are above 100% in any side/contingency

PARAMETER DESCRIPTION
branch_results

The branch results dataframe containing the loading information.

TYPE: LazyFrame[BranchResultSchemaPolars]

critical_threshold

The loading threshold to consider a branch as critical, by default 1.0 (100%)

TYPE: float DEFAULT: 1.0

RETURNS DESCRIPTION
int | None

The number of branches that are overloaded in any side/contingency, or None if there are no valid loading values.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
@pa.check_types
def count_critical_branches(
    branch_results: patpl.LazyFrame[BranchResultSchemaPolars], critical_threshold: float = 1.0
) -> int | None:
    """Count how many branches are above 100% in any side/contingency

    Parameters
    ----------
    branch_results : patpl.LazyFrame[BranchResultSchemaPolars]
        The branch results dataframe containing the loading information.
    critical_threshold : float, optional
        The loading threshold to consider a branch as critical, by default 1.0 (100%)

    Returns
    -------
    int | None
        The number of branches that are overloaded in any side/contingency, or None if there are no valid loading values.
    """
    # Do an any-aggregation across branch sides and contingencies (group by timestep/element)
    # This will return a boolean series with True for each branch that is overloaded in any contingency/side
    # Summing this will give the count of critical branches
    return int(
        branch_results.filter(pl.col("loading").fill_nan(-1.0) > critical_threshold)
        .select("timestep", "element")
        .unique()
        .select(pl.len())
        .collect()
        .item()
    )

compute_max_va_diff #

compute_max_va_diff(va_diff_results)

Compute the maximum voltage angle difference.

PARAMETER DESCRIPTION
va_diff_results

The voltage angle difference results dataframe.

TYPE: LazyFrame[VADiffResultSchemaPolars]

RETURNS DESCRIPTION
float | None

The maximum voltage angle difference in degrees, or None if there are no valid values.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
def compute_max_va_diff(va_diff_results: patpl.LazyFrame[VADiffResultSchemaPolars]) -> float | None:
    """Compute the maximum voltage angle difference.

    Parameters
    ----------
    va_diff_results : patpl.LazyFrame[VADiffResultSchemaPolars]
        The voltage angle difference results dataframe.

    Returns
    -------
    float | None
        The maximum voltage angle difference in degrees, or None if there are no valid values.
    """
    max_va_diff = va_diff_results.select(pl.col("va_diff").max()).collect().item()
    if max_va_diff is None or pd.isna(max_va_diff):
        return None
    return float(max_va_diff)

get_worst_k_contingencies_ac #

get_worst_k_contingencies_ac(
    branch_results, k=10, field="p", base_case_id="BASECASE"
)

Get the worst k contingencies based on overload energy.

If k is greater than the number of contingencies, all contingencies will be returned.

PARAMETER DESCRIPTION
branch_results

The branch results dataframe containing the loading information.

TYPE: DataFrame[BranchResultSchemaPolars]

k

The number of worst contingencies to return, by default 10.

TYPE: int DEFAULT: 10

field

The field to use for the overload calculation, either "p" for power or "i" for current, by default "p".

TYPE: Literal[p, i] DEFAULT: 'p'

base_case_id

The contingency ID for the base case (N-0), by default "BASECASE".

TYPE: str DEFAULT: 'BASECASE'

RETURNS DESCRIPTION
tuple[list[list[str]], list[float]]

A tuple containing: - A list of lists with the contingency IDs for each timestep. The length of the outer list is the number of timesteps while the inner lists contain the top k contingencies for that timestep. - A list of total overload energy for each timestep. The length matches the number of timesteps.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
def get_worst_k_contingencies_ac(
    branch_results: patpl.LazyFrame[BranchResultSchemaPolars],
    k: int = 10,
    field: Literal["p", "i"] = "p",
    base_case_id: str = "BASECASE",
) -> tuple[list[list[str]], list[float]]:
    """Get the worst k contingencies based on overload energy.

    If k is greater than the number of contingencies, all contingencies will be returned.

    Parameters
    ----------
    branch_results : DataFrame[BranchResultSchemaPolars]
        The branch results dataframe containing the loading information.
    k : int, optional
        The number of worst contingencies to return, by default 10.
    field : Literal["p", "i"], optional
        The field to use for the overload calculation, either "p" for power or "i" for current, by default "p".
    base_case_id : str, optional
        The contingency ID for the base case (N-0), by default "BASECASE".

    Returns
    -------
    tuple[list[list[str]], list[float]]
        A tuple containing:
        - A list of lists with the contingency IDs for each timestep. The length of the outer list is
        the number of timesteps while the inner lists contain the top k contingencies for that timestep.
        - A list of total overload energy for each timestep. The length matches the number of timesteps.
    """
    branch_results_with_overload = compute_overload_column(branch_results, field=field).drop_nans("overload")
    overload = branch_results_with_overload.filter(pl.col("overload") > 0)
    overload_n1 = overload.filter(pl.col("contingency") != base_case_id)
    # Compute per (timestep, contingency) max overload using polars lazy API
    overload_per_cont = overload_n1.group_by(["timestep", "contingency"]).agg(pl.max("overload").alias("overload")).collect()

    if overload_per_cont.height == 0:
        return [], []

    contingencies: list[list[str]] = []
    overloads: list[float] = []

    for t in overload_per_cont.get_column("timestep").unique().to_list():
        df_t = overload_per_cont.filter(pl.col("timestep") == t).sort("overload", descending=True).head(k)
        cont_ids = df_t.get_column("contingency").to_list()
        contingencies.append(cont_ids)

        if cont_ids:
            br_results_top_k = branch_results.filter(pl.col("contingency").is_in(cont_ids))
            overload_top_k = compute_overload_energy(br_results_top_k, field=field)
        else:
            overload_top_k = 0.0
        overloads.append(float(overload_top_k))

    return contingencies, overloads

compute_metrics #

compute_metrics(loadflow_results, base_case_id=None)

Compute the metrics from the loadflow results.

N-1 overload energy will exclude the base case results if base_case_id is provided, otherwise it will include all contingencies. This method will return None for metrics that cannot be computed due to missing or invalid data. For example, if basecase is provided and is the only contingency, then N-1 metrics will be None since there are no valid N-1 contingencies to compute on.

PARAMETER DESCRIPTION
loadflow_results

The loadflow results containing the branch results.

TYPE: LoadflowResultsPolars

base_case_id

The contingency ID for the base case (N-0). If not provided, no n-0 metrics will be computed.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
dict[MetricType, float | None]

A dictionary with the computed metrics.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
def compute_metrics(
    loadflow_results: LoadflowResultsPolars,
    base_case_id: Optional[str] = None,
) -> dict[MetricType, float | None]:
    """Compute the metrics from the loadflow results.

    N-1 overload energy will exclude the base case results if base_case_id is provided,
    otherwise it will include all contingencies.
    This method will return None for metrics that cannot be computed due to missing or invalid data. For example,
    if basecase is provided and is the only contingency, then N-1 metrics will be None
    since there are no valid N-1 contingencies to compute on.

    Parameters
    ----------
    loadflow_results : LoadflowResultsPolars
        The loadflow results containing the branch results.
    base_case_id : Optional[str], optional
        The contingency ID for the base case (N-0). If not provided, no n-0 metrics will be computed.

    Returns
    -------
    dict[MetricType, float | None]
        A dictionary with the computed metrics.
    """
    n_1_branch_res = (
        loadflow_results.branch_results.filter(pl.col("contingency") != base_case_id)
        if base_case_id is not None
        else loadflow_results.branch_results
    )
    n_1_va_diff_res = (
        loadflow_results.va_diff_results.filter(pl.col("contingency") != base_case_id)
        if base_case_id is not None
        else loadflow_results.va_diff_results
    )

    metrics = {
        "max_flow_n_1": compute_max_load(n_1_branch_res),
        "overload_energy_n_1": compute_overload_energy(n_1_branch_res, field="p"),
        "max_va_diff_n_1": compute_max_va_diff(n_1_va_diff_res),
        "overload_current_n_1": compute_overload_energy(n_1_branch_res, field="i"),
        "critical_branch_count_n_1": count_critical_branches(n_1_branch_res),
    }

    if base_case_id is not None:
        # Base case (N-0) results as Polars LazyFrames
        n_0_branch_res = loadflow_results.branch_results.filter(pl.col("contingency") == base_case_id)

        # Va diff may not contain the base case; filtering will yield an empty frame if absent.
        # compute_max_va_diff already returns 0.0 if empty/None, so no explicit fallback needed.
        n_0_va_diff = loadflow_results.va_diff_results.filter(pl.col("contingency") == base_case_id)

        new_metrics = {
            "max_flow_n_0": compute_max_load(n_0_branch_res),
            "overload_energy_n_0": compute_overload_energy(n_0_branch_res, field="p"),
            "max_va_diff_n_0": compute_max_va_diff(n_0_va_diff) or 0.0,  # Default to 0.0 if no valid va_diff values for n-0
            "overload_current_n_0": compute_overload_energy(n_0_branch_res, field="i"),
            "critical_branch_count_n_0": count_critical_branches(n_0_branch_res),
        }

        for metric_name, value in new_metrics.items():
            assert value is not None, (
                f"{metric_name} could not be computed, possibly due to missing or invalid base case results."
            )

        metrics.update(new_metrics)
    return metrics

toop_engine_contingency_analysis.ac_loadflow_service.kafka_client #

Provides a wrapper around the confluent_kafka Consumer to allow long running processes.

There are fundamentally two ways how to deal with long running processes in kafka: - increase the max.poll.interval.ms to a very high value so the processing can happen in between. - pause the consumer while processing and resume it afterwards, regularly polling the paused consumer to reset the max.poll.interval.ms timeout. These polls will not consume any messages, but will reset the timeout.

The first method is not recommended as it inhibits rebalances during frozen time, does not detect frozen consumers and is generally not what kafka was designed for. However, kafka does not provide a way to pause and resume a topic, only a topic-partition. That means if a consumer is paused but then a rebalance happens, new topic-partitions will not be paused and the consumer may receive messages for it. However, the polls we are doing are happening in the processing loop and we are fundamentally unable to process anything there. Hence, this wrapper provides a way to pause and resume a consumer, listening to the assignment changes and pausing or resuming the new TPs accordingly. As a drawback, this consumer then looses the ability to consume multiple topics.

logger module-attribute #

logger = get_logger(__name__)

LongRunningKafkaConsumer #

LongRunningKafkaConsumer(
    topic,
    group_id,
    bootstrap_servers,
    client_id,
    max_poll_interval_ms=1800000,
    kafka_auth_config=None,
)

A kafka consumer for long running processes that need to pause and resume the topic consumption.

Initialize the LongRunningKafkaConsumer.

PARAMETER DESCRIPTION
topic

The topic to subscribe to. This can only be a single topic as the consumer will pause and resume it.

TYPE: str

group_id

The consumer group id

TYPE: str

bootstrap_servers

The bootstrap servers to connect to, e.g. "localhost:9092

TYPE: str

client_id

The client id to use for the consumer. This is used for logging and debugging purposes.

TYPE: str

max_poll_interval_ms

The maximum time in milliseconds between polls before the consumer is considered dead. Defaults to 1_800_000 (30 minutes). Set this long enough so the process fits in with confidence.

TYPE: int DEFAULT: 1800000

kafka_auth_config

Additional kafka authentication configuration to pass to the consumer. Defaults to None.

TYPE: dict | None DEFAULT: None

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def __init__(
    self,
    topic: str,
    group_id: str,
    bootstrap_servers: str,
    client_id: str,
    max_poll_interval_ms: int = 1_800_000,
    kafka_auth_config: dict | None = None,
) -> None:
    """Initialize the LongRunningKafkaConsumer.

    Parameters
    ----------
    topic : str
        The topic to subscribe to. This can only be a single topic as the consumer will pause and resume it.
    group_id : str
        The consumer group id
    bootstrap_servers : str
        The bootstrap servers to connect to, e.g. "localhost:9092
    client_id : str
        The client id to use for the consumer. This is used for logging and debugging purposes.
    max_poll_interval_ms : int, optional
        The maximum time in milliseconds between polls before the consumer is considered dead. Defaults to 1_800_000
        (30 minutes). Set this long enough so the process fits in with confidence.
    kafka_auth_config : dict | None, optional
        Additional kafka authentication configuration to pass to the consumer. Defaults to None.
    """
    self.topic = topic
    consumer_config = {
        "bootstrap.servers": bootstrap_servers,
        "group.id": group_id,
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
        "client.id": client_id,
        "max.poll.interval.ms": max_poll_interval_ms,
        "log_level": 2,
    }
    if kafka_auth_config:
        consumer_config.update(kafka_auth_config)
    self.consumer = Consumer(
        consumer_config,
        logger=logger.bind(consumer=client_id),
    )
    self.client_id = client_id
    self.assignment: list[TopicPartition] = []
    self.consumer.subscribe(
        [self.topic],
        on_assign=lambda _consumer, assignment: self._update_assignment(new_tps=assignment, removed_tps=[]),
        on_revoke=lambda _consumer, assignment: self._update_assignment(new_tps=[], removed_tps=assignment),
        on_lost=lambda _consumer, assignment: self._update_assignment(new_tps=[], removed_tps=assignment),
    )
    self.last_msg: Optional[Message] = None
    self.is_paused = False

topic instance-attribute #

topic = topic

consumer instance-attribute #

consumer = Consumer(
    consumer_config, logger=bind(consumer=client_id)
)

client_id instance-attribute #

client_id = client_id

assignment instance-attribute #

assignment = []

last_msg instance-attribute #

last_msg = None

is_paused instance-attribute #

is_paused = False

consume #

consume(timeout, num_messages)

Consume a batch of messages from the kafka topic at once.

This will commit all offsets directly after consuming the messages.

PARAMETER DESCRIPTION
timeout

The maximum time to wait for messages in seconds. If no messages are available, returns an empty list.

TYPE: float | int

num_messages

The maximum number of messages to consume. If more messages are available, they will not be consumed.

TYPE: int

RETURNS DESCRIPTION
list[Message]

The consumed messages, or an empty list if no messages are available within the timeout.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def consume(self, timeout: float | int, num_messages: int) -> list[Message]:
    """Consume a batch of messages from the kafka topic at once.

    This will commit all offsets directly after consuming the messages.

    Parameters
    ----------
    timeout : float | int
        The maximum time to wait for messages in seconds. If no messages are available, returns an empty list.
    num_messages : int
        The maximum number of messages to consume. If more messages are available, they will not be consumed.

    Returns
    -------
    list[Message]
        The consumed messages, or an empty list if no messages are available within the timeout.
    """
    if self.last_msg is not None:
        raise RuntimeError("Commit the last message either through commit or stop_processing before consuming again")

    messages = self.consumer.consume(num_messages=num_messages, timeout=float(timeout))
    if not messages:
        return []

    self.consumer.commit(message=messages[-1], asynchronous=True)
    return messages

poll #

poll(timeout)

Consume a single message from the Kafka topic.

This will not commit the offset to the broker

PARAMETER DESCRIPTION
timeout

The maximum time to wait for a message in seconds. If no message is available, returns None.

TYPE: float | int

RETURNS DESCRIPTION
Optional[Message]

The consumed message, or None if no message is available within the timeout.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def poll(self, timeout: float | int) -> Optional[Message]:
    """Consume a single message from the Kafka topic.

    This will not commit the offset to the broker

    Parameters
    ----------
    timeout : float | int
        The maximum time to wait for a message in seconds. If no message is available, returns None.

    Returns
    -------
    Optional[Message]
        The consumed message, or None if no message is available within the timeout.
    """
    if self.last_msg is not None:
        raise RuntimeError("Commit the last message either through commit or stop_processing before consuming again")

    msg = self.consumer.poll(timeout=float(timeout))
    self.last_msg = msg
    return msg

commit #

commit()

Commit the last consumed message.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def commit(self) -> None:
    """Commit the last consumed message."""
    if self.last_msg is not None:
        self.consumer.commit(message=self.last_msg, asynchronous=False)
        self.last_msg = None
    else:
        raise RuntimeError("No message to commit")

start_processing #

start_processing()

Start a long running process to consume the message.

This will internally pause the consumer. To not exceed the poll timeout, call heartbeat() periodically while processing, e.g. every epoch

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def start_processing(self) -> None:
    """Start a long running process to consume the message.

    This will internally pause the consumer. To not exceed the poll timeout, call heartbeat() periodically while
    processing, e.g. every epoch
    """
    self.is_paused = True
    self._update_assignment([], [])

heartbeat #

heartbeat()

Send a heartbeat to the kafka topic while processing to reset the max poll interval timeout.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def heartbeat(self) -> None:
    """Send a heartbeat to the kafka topic while processing to reset the max poll interval timeout."""
    if not self.is_paused:
        raise RuntimeError("Cannot send heartbeat while not processing")
    self._update_assignment([], [])
    msg = self.consumer.poll(timeout=0)
    if msg is not None:
        raise RuntimeError("Heartbeat should not consume messages")

stop_processing #

stop_processing()

Stop the long running process and commit the last message

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def stop_processing(self) -> None:
    """Stop the long running process and commit the last message"""
    self.is_paused = False
    self._update_assignment([], [])
    if self.last_msg:
        self.consumer.commit(message=self.last_msg, asynchronous=False)
        self.last_msg = None

close #

close()

Close the consumer and commit the last message if any.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def close(self) -> None:
    """Close the consumer and commit the last message if any."""
    if self.last_msg:
        self.consumer.commit(message=self.last_msg, asynchronous=False)
        self.last_msg = None
    self.consumer.close()

toop_engine_contingency_analysis.ac_loadflow_service.lf_worker #

Module contains functions for the kafka communication of the ac loadflow worker.

General Idea: - The worker will listen for commands on the preprocessing kafka topic - Once the initial conversion is done, it runs an initial loadflow - Once the optimiuation is done, The command contains a path to the pandapower or powsybl grid file The command contains the N-1 Definition

  • The worker will load the grid file and the N-1 definition
  • The worker will run the N-1 analysis on the grid file with as many processes as possible
  • The worker will send the results to a kafka topic - ErrorResult, if anything goes wrong - SuccessResult, if everything goes well even if loadflow fails - LoadflowStartedResult? # Use the LoadflowResultsClass as the result result
  • The worker will send a heartbeat to a kafka topic every X seconds

Questions: - Does it make sense to return the results in batches? - Faster results, but dont really tell the full story - How to deal with grid updates? - Separate Service? (Load would be doubled) - Passing the grid file as path valid? - Otherwise large files need to be passed as bytes which kafka supports but is not really intended

File: worker.py Author: Leonard Hilfrich Created: 05/2024

logger module-attribute #

logger = get_logger(__name__)

args module-attribute #

args = cli(LoadflowWorkerArgs)

LoadflowWorkerArgs dataclass #

LoadflowWorkerArgs(
    kafka_broker="localhost:9092",
    loadflow_command_topic="loadflow_commands",
    loadflow_results_topic="loadflow_results",
    loadflow_heartbeat_topic="loadflow_heartbeat",
    heartbeat_interval_ms=1000,
    instance_id="loadflow_worker",
    processed_gridfile_folder=Path("processed_gridfiles"),
    loadflow_result_folder=Path("loadflow_results"),
    n_processes=1,
)

Holds arguments which must be provided at the launch of the worker.

Contains arguments that static for each loadflow run.

kafka_broker class-attribute instance-attribute #

kafka_broker = 'localhost:9092'

The Kafka broker to connect to.

loadflow_command_topic class-attribute instance-attribute #

loadflow_command_topic = 'loadflow_commands'

The Kafka topic to listen for commands on.

loadflow_results_topic class-attribute instance-attribute #

loadflow_results_topic = 'loadflow_results'

The topic to push results to.

loadflow_heartbeat_topic class-attribute instance-attribute #

loadflow_heartbeat_topic = 'loadflow_heartbeat'

The topic to push heartbeats to.

heartbeat_interval_ms class-attribute instance-attribute #

heartbeat_interval_ms = 1000

The interval in milliseconds to send heartbeats.

instance_id class-attribute instance-attribute #

instance_id = 'loadflow_worker'

The instance id of the worker, used to identify the worker in the logs.

processed_gridfile_folder class-attribute instance-attribute #

processed_gridfile_folder = Path('processed_gridfiles')

A folder where pre-processed grid files are stored - this should be a NFS share together with the backend and optimizer.

loadflow_result_folder class-attribute instance-attribute #

loadflow_result_folder = Path('loadflow_results')

A folder where the loadflow results are stored - this should be a NFS share together with the backend and optimizer.

n_processes class-attribute instance-attribute #

n_processes = 1

The number of processes to use for the loadflow calculation. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel

idle_loop #

idle_loop(
    consumer, send_heartbeat_fn, heartbeat_interval_ms
)

Start the idle loop of the worker.

This will be running when the worker is currently not preprocessing This will wait until a StartCalculationCommand is received and return it. In case a ShutdownCommand is received, the worker will exit with the exit code provided in the command.

PARAMETER DESCRIPTION
consumer

The initialized Kafka consumer to listen for commands on.

TYPE: Consumer

send_heartbeat_fn

A function to call when there were no messages received for a while.

TYPE: callable

heartbeat_interval_ms

The time to wait for a new command in milliseconds. If no command has been received, a heartbeat will be sent and then the receiver will wait for commands again.

TYPE: int

RETURNS DESCRIPTION
StartCalculationCommand

The start calculation command to start the loadflow calculation run with

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/lf_worker.py
def idle_loop(
    consumer: LongRunningKafkaConsumer,
    send_heartbeat_fn: Callable[[], None],
    heartbeat_interval_ms: int,
) -> StartCalculationCommand:
    """Start the idle loop of the worker.

    This will be running when the worker is currently not preprocessing
    This will wait until a StartCalculationCommand is received and return it. In case a
    ShutdownCommand is received, the worker will exit with the exit code provided in the command.

    Parameters
    ----------
    consumer : Consumer
        The initialized Kafka consumer to listen for commands on.
    send_heartbeat_fn : callable
        A function to call when there were no messages received for a while.
    heartbeat_interval_ms : int
        The time to wait for a new command in milliseconds. If no command has been received, a
        heartbeat will be sent and then the receiver will wait for commands again.

    Returns
    -------
    StartCalculationCommand
        The start calculation command to start the loadflow calculation run with
    """
    send_heartbeat_fn()
    logger.info("Entering idle loop")
    while True:
        message = consumer.poll(timeout=heartbeat_interval_ms / 1000.0)

        # Wait timeout exceeded
        if not message:
            send_heartbeat_fn()
            continue

        command = LoadflowServiceCommand.model_validate_json(deserialize_message(message.value()))

        if isinstance(command.command, StartCalculationCommand):
            return command.command

        if isinstance(command.command, ShutdownCommand):
            consumer.commit()
            consumer.consumer.close()
            raise SystemExit(command.command.exit_code)

        # If we are here, we received a command that we do not know
        logger.warning(f"Received unknown command, dropping: {command}")
        consumer.commit()

solver_loop #

solver_loop(
    command,
    producer,
    processed_grid_path,
    loadflow_solver_path,
    heartbeat_fn,
    instance_id,
    n_processes,
    results_topic,
)

Start the solver loop of the worker.

This will be running when the worker is currently solving the loadflow This will wait until a StartCalculationCommand is received and return it. In case a ShutdownCommand is received, the worker will exit with the exit code provided in the command.

PARAMETER DESCRIPTION
command

The command to start the optimization run with.

TYPE: StartCalculationCommand

producer

The initialized Kafka producer to send results to.

TYPE: KafkaProducer

processed_grid_path

The path to the pre-processed grid files. This is used to load the grid files.

TYPE: Path

loadflow_solver_path

The path to the loadflow solver results. This is used to save the loadflow results.

TYPE: Path

heartbeat_fn

A function to call to send a heartbeat message to the kafka topic.

TYPE: Callable

instance_id

The instance id of the worker, used to identify the worker in the logs.

TYPE: str

n_processes

The number of processes to use for the optimization run. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process

TYPE: int

results_topic

The topic to push results to.

TYPE: str

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/lf_worker.py
def solver_loop(
    command: StartCalculationCommand,
    producer: Producer,
    processed_grid_path: Path,
    loadflow_solver_path: Path,
    heartbeat_fn: Callable,
    instance_id: str,
    n_processes: int,
    results_topic: str,
) -> None:
    """Start the solver loop of the worker.

    This will be running when the worker is currently solving the loadflow
    This will wait until a StartCalculationCommand is received and return it. In case a
    ShutdownCommand is received, the worker will exit with the exit code provided in the command.

    Parameters
    ----------
    command : StartCalculationCommand
        The command to start the optimization run with.
    producer : KafkaProducer
        The initialized Kafka producer to send results to.
    processed_grid_path : Path
        The path to the pre-processed grid files. This is used to load the grid files.
    loadflow_solver_path : Path
        The path to the loadflow solver results. This is used to save the loadflow results.
    heartbeat_fn : Callable
        A function to call to send a heartbeat message to the kafka topic.
    instance_id : str
        The instance id of the worker, used to identify the worker in the logs.
    n_processes : int
        The number of processes to use for the optimization run. If 1, the analysis is run sequentially.
        If > 1, the analysis is run in parallel
        Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process
    results_topic : str
        The topic to push results to.
    """
    start_time = time.time()
    dirfs = DirFileSystem(str(loadflow_solver_path))
    try:
        if command.grid_data.n_1_definition is None:
            raise ValueError("No N-1 definition provided. This is currently not supported.")
        n_minus_1_definition = command.grid_data.n_1_definition

        for job in command.jobs:
            producer.produce(
                results_topic,
                value=serialize_message(
                    LoadflowBaseResult(
                        job_id=job.id,
                        instance_id=instance_id,
                        loadflow_id=command.loadflow_id,
                        runtime=0.0,
                        result=LoadflowStartedResult(),
                    ).model_dump_json()
                ),
                key=command.loadflow_id.encode(),
            )
            job_loadflow_results_polars = LoadflowResultsPolars(job_id=job.id)
            for i, grid in enumerate(command.grid_data.grid_files):
                heartbeat_fn(
                    command.loadflow_id, time.time() - start_time, f"Loadflow Calculation run started for timestep {i}"
                )
                net = load_base_grid(processed_grid_path / grid, command.grid_data.grid_type)
                timestep_result_polars = get_ac_loadflow_results(
                    net=net, n_minus_1_definition=n_minus_1_definition, timestep=i, job_id=job.id, n_processes=n_processes
                )
                job_loadflow_results_polars = concatenate_loadflow_results_polars(
                    [job_loadflow_results_polars, timestep_result_polars]
                )
                ref = save_loadflow_results_polars(dirfs, job.id, job_loadflow_results_polars)
                if i < len(command.grid_data.grid_files) - 1:
                    result_msg = LoadflowStreamResult(
                        loadflow_reference=ref,
                        solved_timesteps=list(range(i + 1)),
                        remainging_timesteps=list(range(i + 1, len(command.grid_data.grid_files))),
                    )
                else:
                    result_msg = LoadflowSuccessResult(loadflow_reference=ref)

                producer.produce(
                    topic=results_topic,
                    value=serialize_message(
                        LoadflowBaseResult(
                            job_id=job.id,
                            loadflow_id=command.loadflow_id,
                            instance_id=instance_id,
                            runtime=time.time() - start_time,
                            result=result_msg,
                        ).model_dump_json()
                    ),
                    key=command.loadflow_id.encode(),
                )
    except Exception as e:
        logger.error(f"Error while processing {command.loadflow_id}: {e}")
        producer.produce(
            topic=results_topic,
            value=serialize_message(
                LoadflowBaseResult(
                    job_id=command.loadflow_id,
                    instance_id=instance_id,
                    loadflow_id=command.loadflow_id,
                    runtime=time.time() - start_time,
                    result=ErrorResult(error=str(e)),
                ).model_dump_json()
            ),
            key=command.loadflow_id.encode(),
        )

load_base_grid_fs #

load_base_grid_fs(filesystem, grid_path, grid_type)

Load the base grid from the grid file.

Force loading pandapower if grid type is pandapower, otherwise load powsybl.

PARAMETER DESCRIPTION
filesystem

The filesystem to load the grid from

TYPE: AbstractFileSystem

grid_path

The grid to load

TYPE: Path

grid_type

The type of the grid, either "pandapower", "powsybl", "ucte" or "cgmes".

TYPE: Literal[pandapower, powsybl, ucte, cgmes]

RETURNS DESCRIPTION
PandapowerNet | Network

The loaded grid

RAISES DESCRIPTION
ValueError

If the grid type is not supported.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/lf_worker.py
def load_base_grid_fs(
    filesystem: AbstractFileSystem,
    grid_path: Path,
    grid_type: Literal["pandapower", "powsybl", "ucte", "cgmes"],
) -> pandapower.pandapowerNet | Network:
    """Load the base grid from the grid file.

    Force loading pandapower if grid type is pandapower, otherwise load powsybl.

    Parameters
    ----------
    filesystem : AbstractFileSystem
        The filesystem to load the grid from
    grid_path : Path
        The grid to load
    grid_type: Literal["pandapower", "powsybl", "ucte", "cgmes"]
        The type of the grid, either "pandapower", "powsybl", "ucte" or "cgmes".

    Returns
    -------
    PandapowerNet | Network
        The loaded grid

    Raises
    ------
    ValueError
        If the grid type is not supported.
    """
    if grid_type == "pandapower":
        return load_pandapower_from_fs(filesystem, grid_path)
    if grid_type in ["powsybl", "ucte", "cgmes"]:
        return load_powsybl_from_fs(filesystem, grid_path)
    raise ValueError(f"Unknown grid type: {grid_type}")

load_base_grid #

load_base_grid(grid_path, grid_type)

Load the base grid from the grid file.

PARAMETER DESCRIPTION
grid_path

The grid to load

TYPE: Path

grid_type

The type of the grid, either "pandapower", "powsybl", "ucte" or "cgmes".

TYPE: Literal[pandapower, powsybl, ucte, cgmes]

RETURNS DESCRIPTION
PandapowerNet | Network

The loaded grid

RAISES DESCRIPTION
ValueError

If the grid type is not supported.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/lf_worker.py
def load_base_grid(
    grid_path: Path, grid_type: Literal["pandapower", "powsybl", "ucte", "cgmes"]
) -> pandapower.pandapowerNet | Network:
    """Load the base grid from the grid file.

    Parameters
    ----------
    grid_path : Path
        The grid to load
    grid_type: Literal["pandapower", "powsybl", "ucte", "cgmes"]
        The type of the grid, either "pandapower", "powsybl", "ucte" or "cgmes".

    Returns
    -------
    PandapowerNet | Network
        The loaded grid

    Raises
    ------
    ValueError
        If the grid type is not supported.
    """
    return load_base_grid_fs(LocalFileSystem(), grid_path, grid_type)

main #

main(args)

Start main function of the worker.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/lf_worker.py
def main(args: LoadflowWorkerArgs) -> None:
    """Start main function of the worker."""
    logger.info(f"Starting importer instance {args.instance_id}")
    consumer = LongRunningKafkaConsumer(
        topic=args.loadflow_command_topic,
        group_id="loadflow-worker",
        bootstrap_servers=args.kafka_broker,
        client_id=args.instance_id,
    )

    producer = Producer(
        {
            "bootstrap.servers": args.kafka_broker,
            "client.id": args.instance_id,
            "log_level": 2,
        },
        logger=logger,
    )

    def heartbeat_idle() -> None:
        producer.produce(
            args.loadflow_heartbeat_topic,
            value=serialize_message(
                LoadflowHeartbeat(
                    idle=True,
                    status_info=None,
                ).model_dump_json()
            ),
            key=args.instance_id.encode("utf-8"),
        )
        producer.flush()

    def heartbeat_fn(job_id: str, runtime: float, message: str = "") -> None:
        producer.produce(
            args.loadflow_heartbeat_topic,
            value=serialize_message(
                LoadflowHeartbeat(
                    idle=False,
                    status_info=LoadflowStatusInfo(
                        loadflow_id=job_id,
                        runtime=runtime,
                        message=message,
                    ),
                ).model_dump_json()
            ),
            key=args.instance_id.encode("utf-8"),
        )
        producer.flush()
        consumer.heartbeat()

    while True:
        command = idle_loop(
            consumer=consumer,
            send_heartbeat_fn=heartbeat_idle,
            heartbeat_interval_ms=args.heartbeat_interval_ms,
        )
        consumer.start_processing()
        solver_loop(
            command=command,
            producer=producer,
            processed_grid_path=args.processed_gridfile_folder,
            loadflow_solver_path=args.loadflow_result_folder,
            heartbeat_fn=heartbeat_fn,
            instance_id=args.instance_id,
            n_processes=args.n_processes,
            results_topic=args.loadflow_results_topic,
        )
        producer.flush()
        consumer.stop_processing()

Contingency Analysis PandaPower#

toop_engine_contingency_analysis.pandapower.contingency_analysis_pandapower #

Compute the N-1 AC/DC power flow for the pandapower network.

OutageElementResults dataclass #

OutageElementResults(
    branch_results,
    full_branch_results,
    node_results,
    va_diff_results,
    regulating_element_results,
    switch_results,
)

Result tables collected for one outage calculation.

branch_results instance-attribute #

branch_results

full_branch_results instance-attribute #

full_branch_results

node_results instance-attribute #

node_results

va_diff_results instance-attribute #

va_diff_results

regulating_element_results instance-attribute #

regulating_element_results

switch_results instance-attribute #

switch_results

run_single_outage #

run_single_outage(
    net,
    grouped_contingency,
    ctx,
    slack_allocation_config=None,
)

Compute a single outage for the given network.

When slack_allocation_config is provided it is forwarded to :func:run_outage_power_flow, which owns all slack-allocation logic (initial PF assignment and in-loop SpPS reassignment).

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
@pa.check_types
def run_single_outage(
    net: pp.pandapowerNet,
    grouped_contingency: PandapowerContingencyGroup,
    ctx: SingleOutageContext,
    slack_allocation_config: SlackAllocationConfig | None = None,
) -> LoadflowResults:
    """Compute a single outage for the given network.

    When *slack_allocation_config* is provided it is forwarded to
    :func:`run_outage_power_flow`, which owns all slack-allocation logic
    (initial PF assignment and in-loop SpPS reassignment).
    """
    outaged_elements = grouped_contingency.elements

    status, spps_result = run_outage_power_flow(
        net=net,
        spps=ctx.spps,
        method=ctx.method,
        outaged_elements=outaged_elements,
        runpp_kwargs=ctx.runpp_kwargs,
        slack_allocation_config=slack_allocation_config,
        basecase_net=ctx.basecase_net,
    )

    spps_results = (
        _build_spps_results(
            spps_result=spps_result,
            contingencies=grouped_contingency.contingencies,
            timestep=ctx.timestep,
        )
        if spps_result is not None
        else get_empty_dataframe_from_model(SppsResultsSchema)
    )

    convergence_df = _build_convergence_results(
        grouped_contingency=grouped_contingency,
        timestep=ctx.timestep,
        status=status,
    )

    element_results = _collect_element_results(
        net=net,
        grouped_contingency=grouped_contingency,
        ctx=ctx,
        status=status,
    )

    cascade_results = _collect_cascade_results(
        net=net,
        ctx=ctx,
        grouped_contingency=grouped_contingency,
        status=status,
        branch_results_df=element_results.full_branch_results,
        switch_results_df=element_results.switch_results,
    )

    return LoadflowResults(
        job_id=ctx.job_id,
        branch_results=element_results.branch_results,
        node_results=element_results.node_results,
        converged=convergence_df,
        regulating_element_results=element_results.regulating_element_results,
        va_diff_results=element_results.va_diff_results,
        switch_results=element_results.switch_results,
        warnings=[],
        spps_results=spps_results,
        cascade_results=cascade_results,
    )

update_results_with_names #

update_results_with_names(df, element_name_map)

Enrich results DataFrame with element names.

This function fills missing values in the element_name column using a provided mapping from element indices to human-readable names.

Args: df: Results DataFrame. Expected to have: - a MultiIndex containing level "element" - a column "element_name" element_name_map: Mapping from element index (as found in the "element" index level) to element name.

RETURNS DESCRIPTION
Updated DataFrame (same object, modified in-place).
Notes
1
2
- Only missing or empty `element_name` values are filled.
- If an element is not found in `element_name_map`, the value remains NaN.
Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def update_results_with_names(
    df: pd.DataFrame,
    element_name_map: dict[str, str],
) -> pd.DataFrame:
    """
    Enrich results DataFrame with element names.

    This function fills missing values in the `element_name` column using a
    provided mapping from element indices to human-readable names.

    Args:
        df: Results DataFrame. Expected to have:
            - a MultiIndex containing level `"element"`
            - a column `"element_name"`
        element_name_map: Mapping from element index (as found in the `"element"`
            index level) to element name.

    Returns
    -------
        Updated DataFrame (same object, modified in-place).

    Notes
    -----
        - Only missing or empty `element_name` values are filled.
        - If an element is not found in `element_name_map`, the value remains NaN.
    """
    no_name_yet = (df["element_name"] == "") | (df["element_name"].isna())
    df.loc[no_name_yet, "element_name"] = df.loc[no_name_yet].index.get_level_values("element").map(element_name_map)
    return df

get_element_results_df #

get_element_results_df(
    net,
    contingency,
    monitored_elements,
    timestep,
    status,
    basecase_net,
    switch_element_mapping,
)

Get the element results dataframes for the given contingency and monitored elements.

PARAMETER DESCRIPTION
net

The pandapower network to get the results from

TYPE: pandapowerNet

contingency

The contingency to get the results for

TYPE: PandapowerContingency

monitored_elements

The monitored elements to get the results for

TYPE: DataFrame[PandapowerMonitoredElementSchema]

timestep

The timestep of the results

TYPE: int

status

The convergence status of the loadflow computation

TYPE: ConvergenceStatus

basecase_net

Deep-copy of the network after the base-case load flow. res_bus.vm_pu is used to compute per-bus voltage deviation.

TYPE: pandapowerNet

switch_element_mapping

Mapping between switches and connected elements, used to compute switch-level results during each outage.

TYPE: DataFrame[SwitchElementMappingSchema]

RETURNS DESCRIPTION
tuple[DataFrame, DataFrame, DataFrame, DataFrame, DataFrame]

Filtered branch results, full branch results, node results, voltage-angle difference results, and switch results.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def get_element_results_df(
    net: pp.pandapowerNet,
    contingency: PandapowerContingency,
    monitored_elements: pat.DataFrame[PandapowerMonitoredElementSchema],
    timestep: int,
    status: ConvergenceStatus,
    basecase_net: pp.pandapowerNet,
    switch_element_mapping: pat.DataFrame[SwitchElementMappingSchema],
) -> tuple[
    pat.DataFrame[BranchResultSchema],
    pat.DataFrame[BranchResultSchema],
    pat.DataFrame[NodeResultSchema],
    pat.DataFrame[VADiffResultSchema],
    pat.DataFrame[SwitchResultsSchema],
]:
    """Get the element results dataframes for the given contingency and monitored elements.

    Parameters
    ----------
    net : pp.pandapowerNet
        The pandapower network to get the results from
    contingency : PandapowerContingency
        The contingency to get the results for
    monitored_elements : pat.DataFrame[PandapowerMonitoredElementSchema]
        The monitored elements to get the results for
    timestep : int
        The timestep of the results
    status : ConvergenceStatus
        The convergence status of the loadflow computation
    basecase_net : pp.pandapowerNet
        Deep-copy of the network after the base-case load flow.  ``res_bus.vm_pu``
        is used to compute per-bus voltage deviation.
    switch_element_mapping : pat.DataFrame[SwitchElementMappingSchema]
        Mapping between switches and connected elements, used to compute
        switch-level results during each outage.

    Returns
    -------
    tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]
        Filtered branch results, full branch results, node results, voltage-angle
        difference results, and switch results.
    """
    if status == ConvergenceStatus.CONVERGED:
        full_branch_results_df = get_branch_results(net, contingency, timestep)
        node_results_df = get_node_result_df(net, contingency, timestep, basecase_net)
        va_diff_results = get_va_diff_results(net, timestep, monitored_elements, contingency)
        # IMPORTANT:
        # Do NOT filter branch/node results before this step.
        # Switch result calculation depends on connectivity and may require data
        # from non-monitored branches/nodes (e.g. a monitored switch connected to
        # an unmonitored line/trafo). Therefore we pass full result sets here.
        sw_results_df = get_switch_results(
            net, contingency, timestep, full_branch_results_df, node_results_df, switch_element_mapping
        )

        branch_results_df = full_branch_results_df[
            full_branch_results_df.index.isin(monitored_elements.index, level="element")
        ]
        node_results_df = node_results_df[node_results_df.index.isin(monitored_elements.index, level="element")]

    else:
        monitored_trafo3w = monitored_elements.query("table == 'trafo3w'").index.to_list()
        monitored_branches = monitored_elements.query("kind == 'branch' & table != 'trafo3w'").index.to_list()
        monitored_buses = monitored_elements.query("kind == 'bus'").index.to_list()
        branch_results_df = get_failed_branch_results(
            timestep, [contingency.unique_id], monitored_branches, monitored_trafo3w
        )
        full_branch_results_df = branch_results_df
        node_results_df = get_failed_node_results(timestep, [contingency.unique_id], monitored_buses)
        va_diff_results = get_failed_va_diff_results(timestep, monitored_elements, contingency)
        sw_results_df = get_failed_switch_results(timestep, switch_element_mapping, contingency)
    return branch_results_df, full_branch_results_df, node_results_df, va_diff_results, sw_results_df

run_contingency_analysis_sequential #

run_contingency_analysis_sequential(
    net, n_minus_1_definition, ctx
)

Compute a full N-1 analysis for the given network for a single timestep.

Iterates over every contingency group, deep-copies the network, and calls :func:run_single_outage. ctx.slack_allocation_config is forwarded to each outage call so that :func:run_outage_power_flow can handle slack-bus assignment (initial PF and SpPS in-loop reassignment) internally.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def run_contingency_analysis_sequential(
    net: pp.pandapowerNet,
    n_minus_1_definition: PandapowerNMinus1Definition,
    ctx: SequentialContingencyAnalysisContext,
) -> list[LoadflowResults]:
    """Compute a full N-1 analysis for the given network for a single timestep.

    Iterates over every contingency group, deep-copies the network, and calls
    :func:`run_single_outage`.  ``ctx.slack_allocation_config`` is forwarded to
    each outage call so that :func:`run_outage_power_flow` can handle slack-bus
    assignment (initial PF and SpPS in-loop reassignment) internally.
    """
    results = []

    single_outage_ctx = SingleOutageContext(
        monitored_elements=n_minus_1_definition.monitored_elements,
        timestep=ctx.timestep,
        job_id=ctx.job_id,
        method=ctx.method,
        runpp_kwargs=ctx.runpp_kwargs,
        basecase_net=ctx.basecase_net,
        switch_element_mapping=ctx.switch_element_mapping,
        spps=SingleOutageSppsContext(
            conditions=ctx.spps_conditions,
            actions=ctx.spps_actions,
            rules_max_iterations=ctx.spps_rules_max_iterations,
            on_power_flow_error=ctx.on_power_flow_error,
        ),
        cascade=ctx.cascade,
    )

    for grouped_contingency in n_minus_1_definition.grouped_contingencies:
        copy_net = deepcopy(net)

        single_res = run_single_outage(
            net=copy_net,
            grouped_contingency=grouped_contingency,
            ctx=single_outage_ctx,
            slack_allocation_config=ctx.slack_allocation_config,
        )

        results.append(single_res)

    return results

run_contingency_analysis_parallel #

run_contingency_analysis_parallel(
    net, n_minus_1_definition, ctx
)

Compute the N-1 AC/DC power flow for the network in parallel.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def run_contingency_analysis_parallel(
    net: pp.pandapowerNet,
    n_minus_1_definition: PandapowerNMinus1Definition,
    ctx: ParallelContingencyAnalysisContext,
) -> list[LoadflowResults]:
    """Compute the N-1 AC/DC power flow for the network in parallel."""
    n_outages = len(n_minus_1_definition.grouped_contingencies)
    batch_size = ctx.parallel.batch_size

    if batch_size is None:
        batch_size = math.ceil(n_outages / ctx.parallel.n_processes)

    work = []
    for i in range(0, n_outages, batch_size):
        grouped_batch = n_minus_1_definition.grouped_contingencies[i : i + batch_size]
        work.append(
            n_minus_1_definition.model_copy(
                update={
                    "contingencies": [],
                    "grouped_contingencies": grouped_batch,
                }
            )
        )

    handles = []
    result_lists = []
    _compute_remote = ray.remote(run_contingency_analysis_sequential)

    sequential_ctx = SequentialContingencyAnalysisContext(
        job_id=ctx.job_id,
        timestep=ctx.timestep,
        slack_allocation_config=ctx.slack_allocation_config,
        method=ctx.method,
        runpp_kwargs=ctx.runpp_kwargs,
        basecase_net=ctx.basecase_net,
        switch_element_mapping=ctx.switch_element_mapping,
        spps_conditions=ctx.spps_conditions,
        spps_actions=ctx.spps_actions,
        spps_rules_max_iterations=ctx.spps_rules_max_iterations,
        on_power_flow_error=ctx.on_power_flow_error,
        cascade=ctx.cascade,
    )

    for batch in work:
        handles.append(
            _compute_remote.remote(
                net=net,
                n_minus_1_definition=batch,
                ctx=sequential_ctx,
            )
        )

        if len(handles) >= ctx.parallel.n_processes:
            finished, handles = ray.wait(handles, num_returns=1)
            result_lists.extend(ray.get(finished))

    result_lists.extend(ray.get(handles))

    return [result for result_list in result_lists for result in result_list]

build_connectivity_df #

build_connectivity_df(groups)

Build a connectivity result table mapping contingencies to affected elements.

This function flattens a list of PandapowerContingencyGroup objects into a tabular representation where each row corresponds to a pair (contingency, element) along with the associated outage group identifier.

For each contingency in a group, all elements of that outage group are considered affected. This reflects the modeling assumption that outage groups represent sets of elements that become unavailable together when separated from the grid by circuit breakers.

PARAMETER DESCRIPTION
groups

List of contingency groups. Each group contains: - multiple contingencies affecting the same connected component(s), - a set of elements representing the full outage scope, - a unique outage_group_id.

TYPE: list[PandapowerContingencyGroup]

RETURNS DESCRIPTION
DataFrame[ConnectivityResultSchema]

A Pandas DataFrame with: - MultiIndex: * contingency (str): contingency identifier * element (str): element identifier - Column: * outage_group_id (str): identifier of the outage group

Each row indicates that a given element is affected by a given contingency through their shared outage group.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def build_connectivity_df(groups: list[PandapowerContingencyGroup]) -> pat.DataFrame[ConnectivityResultSchema]:
    """
    Build a connectivity result table mapping contingencies to affected elements.

    This function flattens a list of PandapowerContingencyGroup objects into a
    tabular representation where each row corresponds to a pair
    (contingency, element) along with the associated outage group identifier.

    For each contingency in a group, all elements of that outage group are
    considered affected. This reflects the modeling assumption that outage
    groups represent sets of elements that become unavailable together when
    separated from the grid by circuit breakers.

    Parameters
    ----------
    groups : list[PandapowerContingencyGroup]
        List of contingency groups. Each group contains:
        - multiple contingencies affecting the same connected component(s),
        - a set of elements representing the full outage scope,
        - a unique outage_group_id.

    Returns
    -------
    pat.DataFrame[ConnectivityResultSchema]
        A Pandas DataFrame with:
        - MultiIndex:
            * contingency (str): contingency identifier
            * element (str): element identifier
        - Column:
            * outage_group_id (str): identifier of the outage group

        Each row indicates that a given element is affected by a given
        contingency through their shared outage group.
    """
    records = [(c.unique_id, e.unique_id, g.outage_group_id) for g in groups for c in g.contingencies for e in g.elements]

    return pd.DataFrame(records, columns=["contingency", "element", "outage_group_id"]).set_index(["contingency", "element"])

run_contingency_analysis_pandapower #

run_contingency_analysis_pandapower(
    net, n_minus_1_definition, job_id, timestep, cfg
)

Compute the N-1 AC/DC power flow for the network.

PARAMETER DESCRIPTION
net

Pandapower network with topology already applied.

TYPE: pandapowerNet

n_minus_1_definition

N-1 definition containing contingencies and monitored elements.

TYPE: Nminus1Definition

job_id

Identifier of the current job.

TYPE: str

timestep

Timestep associated with the computed results.

TYPE: int

cfg

Execution configuration (method, islanding/slack settings, parallelization, cascade screening, etc.).

TYPE: ContingencyAnalysisConfig

RETURNS DESCRIPTION
Union[LoadflowResults, LoadflowResultsPolars]

The results of the loadflow computation

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def run_contingency_analysis_pandapower(
    net: pp.pandapowerNet,
    n_minus_1_definition: Nminus1Definition,
    job_id: str,
    timestep: int,
    cfg: ContingencyAnalysisConfig,
) -> Union[LoadflowResults, LoadflowResultsPolars]:
    """Compute the N-1 AC/DC power flow for the network.

    Parameters
    ----------
    net : pp.pandapowerNet
        Pandapower network with topology already applied.
    n_minus_1_definition : Nminus1Definition
        N-1 definition containing contingencies and monitored elements.
    job_id : str
        Identifier of the current job.
    timestep : int
        Timestep associated with the computed results.
    cfg : ContingencyAnalysisConfig
        Execution configuration (method, islanding/slack settings, parallelization,
        cascade screening, etc.).

    Returns
    -------
    Union[LoadflowResults, LoadflowResultsPolars]
        The results of the loadflow computation
    """
    pp_n1_definition = translate_nminus1_for_pandapower(n_minus_1_definition, net)
    if cfg.apply_outage_grouping:
        pp_n1_definition.grouped_contingencies = get_outage_group_for_contingency(
            net=net,
            contingencies=pp_n1_definition.contingencies,
        )
    else:
        pp_n1_definition.grouped_contingencies = [
            PandapowerContingencyGroup(contingencies=[cont], elements=cont.elements, outage_group_id=str(uuid.uuid4()))
            for cont in pp_n1_definition.contingencies
        ]

    slack_allocation_config = SlackAllocationConfig(
        min_island_size=cfg.min_island_size,
    )

    _run_base_case_loadflow(
        net=net,
        cfg=cfg,
        slack_allocation_config=slack_allocation_config,
    )

    switch_element_mapping = get_switch_mapped_elements(
        net=net,
        monitored_elements=pp_n1_definition.monitored_elements,
        side="bus",
    )

    if cfg.parallel.n_processes == 1 and cfg.parallel.batch_size is None:
        results = run_contingency_analysis_sequential(
            net=net,
            n_minus_1_definition=pp_n1_definition,
            ctx=SequentialContingencyAnalysisContext(
                job_id=job_id,
                timestep=timestep,
                slack_allocation_config=slack_allocation_config,
                method=cfg.method,
                runpp_kwargs=cfg.runpp_kwargs,
                basecase_net=deepcopy(net),
                switch_element_mapping=switch_element_mapping,
                spps_conditions=pp_n1_definition.spps_conditions,
                spps_actions=pp_n1_definition.spps_actions,
                spps_rules_max_iterations=cfg.spps_rules_max_iterations,
                on_power_flow_error=cfg.on_power_flow_error,
                cascade=cfg.cascade,
            ),
        )
    else:
        results = run_contingency_analysis_parallel(
            net=net,
            n_minus_1_definition=pp_n1_definition,
            ctx=ParallelContingencyAnalysisContext(
                job_id=job_id,
                timestep=timestep,
                slack_allocation_config=slack_allocation_config,
                basecase_net=deepcopy(net),
                switch_element_mapping=switch_element_mapping,
                spps_conditions=pp_n1_definition.spps_conditions,
                spps_actions=pp_n1_definition.spps_actions,
                method=cfg.method,
                runpp_kwargs=cfg.runpp_kwargs,
                spps_rules_max_iterations=cfg.spps_rules_max_iterations,
                on_power_flow_error=cfg.on_power_flow_error,
                parallel=cfg.parallel,
                cascade=cfg.cascade,
            ),
        )
    lf_result = concatenate_loadflow_results(results)

    missing_element_warnings = [
        f"Element with id {element.id} not found in the network." for element in pp_n1_definition.missing_elements
    ]
    missing_contingency_warnings = [
        f"Contingency with id {contingency.id} contains elements that are not found in the network."
        for contingency in pp_n1_definition.missing_contingencies
    ]
    duplicated_id_warnings = [
        f"Element with id {element_id} is not unique in the grid."
        for element_id in pp_n1_definition.duplicated_grid_elements
    ]
    lf_result.warnings = [
        *duplicated_id_warnings,
        *missing_element_warnings,
        *missing_contingency_warnings,
        *lf_result.warnings,
    ]

    if cfg.apply_outage_grouping:
        lf_result.connectivity_result = build_connectivity_df(pp_n1_definition.grouped_contingencies)

    if not cfg.polars:
        return lf_result
    return convert_pandas_loadflow_results_to_polars(lf_result)

toop_engine_contingency_analysis.pandapower.pandapower_helpers #

Contingency Analysis PyPowsybl#

toop_engine_contingency_analysis.pypowsybl.contingency_analysis_powsybl #

Compute the N-1 AC/DC power flow for the network.

PowsyblBranchLimitCache dataclass #

PowsyblBranchLimitCache(
    chosen_limit,
    monitored_branches,
    current_limit_fingerprint,
    branch_limits,
)

Cached prepared branch limits for a specific monitored branch set and network limit snapshot.

It contains the branch_limits dataframe as the core cached item, along with metadata needed to validate whether the cache can be reused safely.

chosen_limit instance-attribute #

chosen_limit

The limit name used when preparing the cached branch limits.

monitored_branches instance-attribute #

monitored_branches

The normalized monitored branch ids for which the cached limits were prepared.

current_limit_fingerprint instance-attribute #

current_limit_fingerprint

A fingerprint of the raw current-limit table used to detect stale cache entries.

branch_limits instance-attribute #

branch_limits

The prepared branch-limit dataframe ready for contingency-analysis postprocessing.

matches #

matches(
    monitored_branches,
    chosen_limit,
    current_limit_fingerprint,
)

Check whether this cache can be reused for the current request.

PARAMETER DESCRIPTION
monitored_branches

The monitored branch ids needed for the current analysis.

TYPE: Sequence[str]

chosen_limit

The selected operational limit name for the current analysis.

TYPE: str

current_limit_fingerprint

The fingerprint of the current network's raw limit table.

TYPE: str

RETURNS DESCRIPTION
bool

True if the cache can be reused safely, otherwise False.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/contingency_analysis_powsybl.py
def matches(self, monitored_branches: Sequence[str], chosen_limit: str, current_limit_fingerprint: str) -> bool:
    """Check whether this cache can be reused for the current request.

    Parameters
    ----------
    monitored_branches : Sequence[str]
        The monitored branch ids needed for the current analysis.
    chosen_limit : str
        The selected operational limit name for the current analysis.
    current_limit_fingerprint : str
        The fingerprint of the current network's raw limit table.

    Returns
    -------
    bool
        True if the cache can be reused safely, otherwise False.
    """
    return (
        self.chosen_limit == chosen_limit
        and self.monitored_branches == normalize_monitored_branches_for_powsybl(monitored_branches)
        and self.current_limit_fingerprint == current_limit_fingerprint
    )

build_branch_limit_cache #

build_branch_limit_cache(
    net, monitored_branches, chosen_limit="permanent_limit"
)

Build a reusable cache for the expensive branch-limit preparation stage.

PARAMETER DESCRIPTION
net

The active Powsybl network whose current limits are used for cache construction.

TYPE: Network

monitored_branches

The monitored branch ids for which limits should be prepared.

TYPE: Sequence[str]

chosen_limit

The limit name to select, by default "permanent_limit".

TYPE: str DEFAULT: 'permanent_limit'

RETURNS DESCRIPTION
PowsyblBranchLimitCache

A cache entry containing prepared limits and the metadata needed to validate reuse.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/contingency_analysis_powsybl.py
def build_branch_limit_cache(
    net: Network, monitored_branches: Sequence[str], chosen_limit: str = "permanent_limit"
) -> PowsyblBranchLimitCache:
    """Build a reusable cache for the expensive branch-limit preparation stage.

    Parameters
    ----------
    net : Network
        The active Powsybl network whose current limits are used for cache construction.
    monitored_branches : Sequence[str]
        The monitored branch ids for which limits should be prepared.
    chosen_limit : str, optional
        The limit name to select, by default "permanent_limit".

    Returns
    -------
    PowsyblBranchLimitCache
        A cache entry containing prepared limits and the metadata needed to validate reuse.
    """
    current_limits = get_current_branch_limits_for_powsybl(net)
    return PowsyblBranchLimitCache(
        chosen_limit=chosen_limit,
        monitored_branches=normalize_monitored_branches_for_powsybl(monitored_branches),
        current_limit_fingerprint=fingerprint_current_limits_for_powsybl(current_limits),
        branch_limits=translate_branch_limits_for_powsybl(
            branch_limits=current_limits,
            monitored_branches=list(monitored_branches),
            chosen_limit=chosen_limit,
        ),
    )

run_powsybl_analysis #

run_powsybl_analysis(
    net,
    n_minus_1_definition,
    lf_params,
    method="ac",
    n_processes=1,
)

Run the powsybl security analysis for the given network and N-1 definition.

PARAMETER DESCRIPTION
net

The powsybl network to compute the Contingency Analysis for

TYPE: Network

n_minus_1_definition

The N-1 definition to use for the contingency analysis. Contains outages and monitored elements

TYPE: Nminus1Definition

lf_params

Loadflow parameters to use for the computation.

TYPE: Parameters

method

The method to use for the contingency analysis. Either "ac" or "dc", by default "ac"

TYPE: Literal[ac, dc] DEFAULT: 'ac'

n_processes

The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
res

The security analysis result from powsybl containing the monitored elements and the results of the contingencies.

TYPE: SecurityAnalysisResult

basecase_id

The name of the basecase contingency, if it is included in the run. Otherwise None.

TYPE: str | None

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/contingency_analysis_powsybl.py
def run_powsybl_analysis(
    net: Network,
    n_minus_1_definition: PowsyblNMinus1Definition,
    lf_params: pypowsybl.loadflow.Parameters,
    method: Literal["ac", "dc"] = "ac",
    n_processes: int = 1,
) -> tuple[SecurityAnalysisResult, str | None]:
    """Run the powsybl security analysis for the given network and N-1 definition.

    Parameters
    ----------
    net : Network
        The powsybl network to compute the Contingency Analysis for
    n_minus_1_definition : Nminus1Definition
        The N-1 definition to use for the contingency analysis. Contains outages and monitored elements
    lf_params: pypowsybl.loadflow.Parameters
        Loadflow parameters to use for the computation.
    method : Literal["ac", "dc"], optional
        The method to use for the contingency analysis. Either "ac" or "dc", by default "ac"
    n_processes : int, optional
        The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.

    Returns
    -------
    res: SecurityAnalysisResult
        The security analysis result from powsybl containing the monitored elements and the results of the contingencies.
    basecase_id : str | None
        The name of the basecase contingency, if it is included in the run. Otherwise None.
    """
    # Run the actual loadflow computation
    analysis = pypowsybl.security.create_analysis()
    analysis.add_monitored_elements(
        branch_ids=n_minus_1_definition.monitored_elements["branches"],
        three_windings_transformer_ids=n_minus_1_definition.monitored_elements["trafo3w"],
        voltage_level_ids=n_minus_1_definition.monitored_elements["voltage_levels"],
    )
    basecase_id = None
    for contingency in n_minus_1_definition.contingencies:
        outages = contingency.elements
        if len(outages) > 1:
            analysis.add_multiple_elements_contingency(outages, contingency_id=contingency.id)
        elif len(outages) == 0:
            # If there are no outages, we add the basecase contingency
            basecase_id = contingency.id
        else:
            analysis.add_single_element_contingency(outages[0], contingency_id=contingency.id)

    if method == "dc":
        # For DC load flow, we need to set the distributed slack to false, as it is not supported in DC load flow
        lf_params = deepcopy(lf_params)
        lf_params.distributed_slack = False
    security_params = pypowsybl.security.impl.parameters.Parameters(
        load_flow_parameters=lf_params,
        provider_parameters={"threadCount": str(n_processes), "contingencyPropagation": "false"},
    )

    res = analysis.run_ac(net, security_params) if method == "ac" else analysis.run_dc(net, security_params)
    return res, basecase_id

run_contingency_analysis_polars #

run_contingency_analysis_polars(
    net,
    pow_n1_definition,
    job_id,
    timestep,
    lf_params,
    method="dc",
    n_processes=1,
)

Compute the N-0 + N-1 power flow for the network.

PARAMETER DESCRIPTION
net

The powsybl network to compute the Contingency Analysis for

TYPE: Network

pow_n1_definition

The N-1 definition to use for the contingency analysis. Contains outages and monitored elements

TYPE: PowsyblNMinus1Definition

job_id

The job id of the current job

TYPE: str

timestep

The timestep to use for the contingency analysis

TYPE: int

lf_params

Loadflow parameters to use for the computation.

TYPE: Parameters

method

Whether to compute the AC or DC power flow, by default "dc"

TYPE: Literal[ac, dc] DEFAULT: 'dc'

n_processes

The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process This is done via the openloadflow native threadCount parameter, which is set in the powsybl security analysis parameters.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
LoadflowResultsPolars

The results of the loadflow computation. Invalid or otherwise failed results will be set to NaN.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/contingency_analysis_powsybl.py
def run_contingency_analysis_polars(
    net: Network,
    pow_n1_definition: PowsyblNMinus1Definition,
    job_id: str,
    timestep: int,
    lf_params: pypowsybl.loadflow.Parameters,
    method: Literal["ac", "dc"] = "dc",
    n_processes: int = 1,
) -> LoadflowResultsPolars:
    """Compute the N-0 + N-1 power flow for the network.

    Parameters
    ----------
    net : Network
        The powsybl network to compute the Contingency Analysis for
    pow_n1_definition : PowsyblNMinus1Definition
        The N-1 definition to use for the contingency analysis. Contains outages and monitored elements
    job_id : str
        The job id of the current job
    timestep : int
        The timestep to use for the contingency analysis
    lf_params: pypowsybl.loadflow.Parameters
        Loadflow parameters to use for the computation.
    method : Literal["ac", "dc"], optional
        Whether to compute the AC or DC power flow, by default "dc"
    n_processes : int, optional
        The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.
        If > 1, the analysis is run in parallel
        Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process
        This is done via the openloadflow native threadCount parameter,
        which is set in the powsybl security analysis parameters.

    Returns
    -------
    LoadflowResultsPolars
        The results of the loadflow computation. Invalid or otherwise failed results will be set to NaN.
    """
    monitored_elements = pow_n1_definition.monitored_elements
    ca_result, basecase_id = run_powsybl_analysis(
        net, pow_n1_definition, lf_params=lf_params, method=method, n_processes=n_processes
    )
    bus_results = get_ca_bus_results(ca_result, lazy=True)
    branch_results = get_ca_branch_results(ca_result, lazy=True)
    three_windings_transformer_results = get_ca_three_windings_transformer_results(ca_result, lazy=True)
    post_contingency_results = ca_result.post_contingency_results
    pre_contingency_result = ca_result.pre_contingency_result

    all_outage_ids = [contingency.id for contingency in pow_n1_definition.contingencies if not contingency.is_basecase()]
    convergence_df, failed_outages = get_convergence_result_df(
        post_contingency_results, pre_contingency_result, all_outage_ids, timestep, basecase_id
    )
    add_name_column(convergence_df, pow_n1_definition.contingency_name_mapping, index_level="contingency")
    convergence_df = pl.from_pandas(convergence_df, include_index=True, nan_to_null=False).lazy()

    branch_limit_polars = pl.from_pandas(pow_n1_definition.branch_limits, include_index=True, nan_to_null=False).lazy()

    branch_results_df = get_branch_results_polars(
        branch_results,
        three_windings_transformer_results,
        monitored_elements["branches"],
        monitored_elements["trafo3w"],
        failed_outages,
        timestep,
        branch_limit_polars,
    )
    node_results_df = get_node_results_polars(
        bus_results,
        monitored_elements["buses"],
        pl.from_pandas(pow_n1_definition.bus_map, include_index=True, nan_to_null=False).lazy(),
        pl.from_pandas(pow_n1_definition.voltage_levels, include_index=True, nan_to_null=False).lazy(),
        failed_outages,
        timestep,
        method,
    )
    regulating_elements_df = get_regulating_element_results(
        monitored_elements["buses"], timestep=timestep, basecase_name=basecase_id
    )
    regulating_elements_df = pl.from_pandas(regulating_elements_df, include_index=True, nan_to_null=False).lazy()
    va_diff_results_df = get_va_diff_results_polars(
        bus_results=bus_results,
        outages=all_outage_ids,
        va_diff_with_buses=pl.from_pandas(pow_n1_definition.blank_va_diff, include_index=True, nan_to_null=False).lazy(),
        bus_map=pl.from_pandas(pow_n1_definition.bus_map, include_index=True, nan_to_null=False).lazy(),
        timestep=timestep,
    )
    branch_results_df = update_basename_polars(branch_results_df, basecase_id)
    branch_results_df = add_name_column_polars(
        branch_results_df, pow_n1_definition.element_name_mapping, index_level="element"
    )
    branch_results_df = add_name_column_polars(
        branch_results_df, pow_n1_definition.contingency_name_mapping, index_level="contingency"
    )

    node_results_df = update_basename_polars(node_results_df, basecase_id)
    node_results_df = add_name_column_polars(node_results_df, pow_n1_definition.element_name_mapping, index_level="element")
    node_results_df = add_name_column_polars(
        node_results_df, pow_n1_definition.contingency_name_mapping, index_level="contingency"
    )

    regulating_elements_df = update_basename_polars(regulating_elements_df, basecase_id)
    regulating_elements_df = add_name_column_polars(
        regulating_elements_df, pow_n1_definition.element_name_mapping, index_level="element"
    )
    regulating_elements_df = add_name_column_polars(
        regulating_elements_df, pow_n1_definition.contingency_name_mapping, index_level="contingency"
    )

    va_diff_results_df = update_basename_polars(va_diff_results_df, basecase_id)
    va_diff_results_df = add_name_column_polars(
        va_diff_results_df, pow_n1_definition.element_name_mapping, index_level="element"
    )
    va_diff_results_df = add_name_column_polars(
        va_diff_results_df, pow_n1_definition.contingency_name_mapping, index_level="contingency"
    )

    lf_results = LoadflowResultsPolars(
        job_id=job_id,
        branch_results=branch_results_df,
        node_results=node_results_df,
        regulating_element_results=regulating_elements_df,
        va_diff_results=va_diff_results_df,
        converged=convergence_df,
        warnings=[],
        lazy=True,
    )
    return lf_results

run_contingency_analysis_powsybl #

run_contingency_analysis_powsybl(
    net,
    n_minus_1_definition,
    job_id,
    timestep,
    method="ac",
    n_processes=1,
    polars=False,
    lf_params=None,
    branch_limit_cache=None,
)

Compute the Contingency Analysis for the network.

PARAMETER DESCRIPTION
net

The powsybl network to compute the Contingency Analysis for

TYPE: Network

n_minus_1_definition

The N-1 definition to use for the contingency analysis. Contains outages and monitored elements

TYPE: Nminus1Definition

job_id

The job id of the current job

TYPE: str

timestep

The timestep to use for the contingency analysis

TYPE: int

method

The method to use for the contingency analysis. Either "ac" or "dc", by default "dc"

TYPE: Literal[ac, dc] DEFAULT: 'ac'

n_processes

The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process

TYPE: int DEFAULT: 1

polars

Whether to use polars for the dataframe operations.

TYPE: bool DEFAULT: False

lf_params

Loadflow parameters to use for the computation. If None, a standard set will be used

TYPE: Optional[Parameters] DEFAULT: None

branch_limit_cache

Optional cache for the expensive prepared branch limits. If stale or None, it will be ignored and recomputed. If you want to run many contingency analyses with the same network, use this and precompute the cache with build_branch_limit_cache() to save time.

TYPE: Optional[PowsyblBranchLimitCache] DEFAULT: None

RETURNS DESCRIPTION
Union[LoadflowResults, LoadflowResultsPolars]

The results of the loadflow computation.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/contingency_analysis_powsybl.py
def run_contingency_analysis_powsybl(
    net: Network,
    n_minus_1_definition: Nminus1Definition,
    job_id: str,
    timestep: int,
    method: Literal["ac", "dc"] = "ac",
    n_processes: int = 1,
    polars: bool = False,
    lf_params: Optional[pypowsybl.loadflow.Parameters] = None,
    branch_limit_cache: Optional[PowsyblBranchLimitCache] = None,
) -> Union[LoadflowResults, LoadflowResultsPolars]:
    """Compute the Contingency Analysis for the network.

    Parameters
    ----------
    net : Network
        The powsybl network to compute the Contingency Analysis for
    n_minus_1_definition : Nminus1Definition
        The N-1 definition to use for the contingency analysis. Contains outages and monitored elements
    job_id : str
        The job id of the current job
    timestep : int
        The timestep to use for the contingency analysis
    method : Literal["ac", "dc"], optional
        The method to use for the contingency analysis. Either "ac" or "dc", by default "dc"
    n_processes : int, optional
        The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.
        If > 1, the analysis is run in parallel
        Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process
    polars: bool
        Whether to use polars for the dataframe operations.
    lf_params: Optional[pypowsybl.loadflow.Parameters]
        Loadflow parameters to use for the computation.
        If None, a standard set will be used
    branch_limit_cache : Optional[PowsyblBranchLimitCache]
        Optional cache for the expensive prepared branch limits. If stale or None, it will be ignored and recomputed.
        If you want to run many contingency analyses with the same network, use this and precompute the cache with
        build_branch_limit_cache() to save time.

    Returns
    -------
    Union[LoadflowResults, LoadflowResultsPolars]
        The results of the loadflow computation.
    """
    if lf_params is None:
        if method == "ac":
            lf_params = DISTRIBUTED_SLACK
        else:
            lf_params = SINGLE_SLACK

    if lf_params.distributed_slack:
        # We only do this once, before the first batch. So we dont have to redo it every iteration
        net = set_target_values_to_lf_values_incl_distributed_slack(net, method, lf_params=lf_params)
    pow_n1_definition = translate_nminus1_for_powsybl(
        n_minus_1_definition,
        net,
        branch_limit_cache=branch_limit_cache,
    )

    lf_result = run_contingency_analysis_polars(
        net=net,
        pow_n1_definition=pow_n1_definition,
        job_id=job_id,
        timestep=timestep,
        method=method,
        n_processes=n_processes,
        lf_params=lf_params,
    )
    if not polars:
        lf_result = convert_polars_loadflow_results_to_pandas(lf_result)
    missing_element_warnings = [
        f"Element with id {element.id} not found in the network." for element in pow_n1_definition.missing_elements
    ]
    missing_contingency_warnings = [
        f"Contingency with id {contingency.id} contains elements that are not found in the network."
        for contingency in pow_n1_definition.missing_contingencies
    ]
    lf_result.warnings = [*missing_element_warnings, *missing_contingency_warnings, *lf_result.warnings]
    return lf_result

toop_engine_contingency_analysis.pypowsybl.powsybl_helpers_polars #

Helper functions to translate the N-1 definition into a usable format for Powsybl.

This includes translating contingencies, monitored elements and collecting the necessary data from the network, so this only has to happen once.

POWSYBL_CONVERGENCE_MAP module-attribute #

POWSYBL_CONVERGENCE_MAP = {
    value: value,
    value: value,
    value: value,
    value: value,
}

get_node_results_polars #

get_node_results_polars(
    bus_results,
    monitored_buses,
    bus_map,
    voltage_levels,
    failed_outages,
    timestep,
    method,
)

Get the node results for the given outages and timestep.

TODO: This is currently faking the sum of p and q at the node

PARAMETER DESCRIPTION
bus_results

The bus results from the powsybl security analysis

TYPE: LazyFrame

monitored_buses

The list of monitored buses to get the node results for

TYPE: list[str]

bus_map

A mapping from busbar sections or bus_breaker_buses to the electrical buses. This is used to map the buses from bus_results to electrical buses and back to the monitored buses.

TYPE: LazyFrame

voltage_levels

The voltage levels of the buses. This is used to determine voltage limits and nominal v in DC.

TYPE: LazyFrame

failed_outages

The list of failed outages to get nan-node results for

TYPE: list[str]

timestep

The timestep to get the node results for

TYPE: int

method

The method to use for the node results. Either "ac" or "dc"

TYPE: Literal[ac, dc]

RETURNS DESCRIPTION
DataFrame[NodeResultSchemaPolars]

The node results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def get_node_results_polars(
    bus_results: pl.LazyFrame,
    monitored_buses: list[str],
    bus_map: pl.LazyFrame,
    voltage_levels: pl.LazyFrame,
    failed_outages: list[str],
    timestep: int,
    method: Literal["ac", "dc"],
) -> patpl.LazyFrame[NodeResultSchemaPolars]:
    """Get the node results for the given outages and timestep.

    TODO: This is currently faking the sum of p and q at the node

    Parameters
    ----------
    bus_results : pl.LazyFrame
        The bus results from the powsybl security analysis
    monitored_buses : list[str]
        The list of monitored buses to get the node results for
    bus_map: pl.LazyFrame
        A mapping from busbar sections or bus_breaker_buses to the electrical buses.
        This is used to map the buses from bus_results to electrical buses and back to the monitored buses.
    voltage_levels: pl.LazyFrame
        The voltage levels of the buses. This is used to determine
        voltage limits and nominal v in DC.
    failed_outages : list[str]
        The list of failed outages to get nan-node results for
    timestep : int
        The timestep to get the node results for
    method : Literal["ac", "dc"]
        The method to use for the node results. Either "ac" or "dc"

    Returns
    -------
    patpl.DataFrame[NodeResultSchemaPolars]
        The node results for the given outages and timestep
    """
    if bus_results.limit(1).collect().is_empty():
        return get_failed_node_results_polars(timestep, failed_outages, monitored_buses)
    # Translate bus_ids that could be busbar sections or bus_breaker_buses to the monitored buses
    # Should work for both busbar and bus_breaker models
    node_results = bus_results.drop("operator_strategy_id")
    node_results = node_results.rename({"contingency_id": "contingency"})
    node_results = node_results.join(
        bus_map.select("id", "bus_breaker_bus_id"), left_on=["bus_id"], right_on=["id"], how="left"
    )  # m:1 join
    node_results = node_results.drop_nulls("bus_breaker_bus_id")

    monitored_bus_map = bus_map.filter(pl.col("id").is_in(monitored_buses))
    bus_to_element_map = monitored_bus_map.select(
        pl.col("bus_breaker_bus_id"),
        pl.col("id").alias("element"),
    )
    node_results = node_results.join(bus_to_element_map, on=["bus_breaker_bus_id"], how="left")
    # remove not monitored buses
    node_results = node_results.drop_nulls("element")

    # Merge the actual voltage level in kV
    node_results = node_results.join(
        voltage_levels,
        left_on=["voltage_level_id"],
        right_on=["id"],
        how="left",
    )

    # set timestamp column
    node_results = node_results.with_columns(timestep=pl.lit(timestep))

    node_results = node_results.rename({"v_mag": "vm", "v_angle": "va"})

    # Calculate the values
    if method == "dc":
        node_results = node_results.with_columns(
            pl.when(pl.col("va").is_not_null())
            .then(pl.col("nominal_v"))  # fill vm with nominal v if va is present
            .otherwise(pl.col("vm"))  # keep original vm
            .alias("vm")
        )
    node_results = node_results.with_columns((pl.col("vm") - pl.col("nominal_v")).alias("vm_deviation"))
    node_results = node_results.with_columns(
        (pl.col("vm_deviation") / (pl.col("high_voltage_limit") - pl.col("nominal_v"))).alias("deviation_to_max")
    )
    node_results = node_results.with_columns(
        (pl.col("vm_deviation") / (pl.col("nominal_v") - pl.col("low_voltage_limit"))).alias("deviation_to_min")
    )
    node_results = node_results.with_columns(
        pl.when(pl.col("vm_deviation") >= 0)
        .then(pl.col("deviation_to_max"))
        .otherwise(pl.col("deviation_to_min"))  # keep original vm
        .alias("vm_loading")
    )

    failed_node_results = get_failed_node_results_polars(timestep, failed_outages, monitored_buses)

    # TODO: va_loading is not defined yet
    node_results = node_results.cast({"timestep": pl.Int64})

    # TODO: add p and q calculation at the node
    node_results = node_results.with_columns(
        p=pl.lit(float("nan")),  # TODO
        q=pl.lit(float("nan")),  # TODO
        vm_basecase_deviation=pl.lit(float("nan")),  # TODO
        element_name=pl.lit(""),  # will be filled later
        contingency_name=pl.lit(""),  # will be filled later
    )

    node_results = node_results.select(
        [
            "timestep",
            "contingency",
            "element",
            "vm",
            "va",
            "vm_loading",
            "p",
            "q",
            "vm_basecase_deviation",
            "element_name",
            "contingency_name",
        ]
    )
    all_node_results = pl.concat([node_results, failed_node_results])

    return all_node_results

get_branch_results_polars #

get_branch_results_polars(
    branch_results,
    three_winding_results,
    monitored_branches,
    monitored_trafo3w,
    failed_outages,
    timestep,
    branch_limits,
)

Get the branch results for the given outages and timestep.

PARAMETER DESCRIPTION
branch_results

The branch results from the powsybl security analysis

TYPE: LazyFrame

three_winding_results

The three winding transformer results from the powsybl security analysis

TYPE: LazyFrame

monitored_branches

The list of monitored branches with 2 sides to get the branch results for

TYPE: list[str]

monitored_trafo3w

The list of monitored three winding transformers to get the branch results for

TYPE: list[str]

failed_outages

The list of failed outages to get nan-branch results for

TYPE: list[str]

timestep

The timestep to get the branch results for

TYPE: int

branch_limits

The branch limits from the powsybl network

TYPE: LazyFrame

RETURNS DESCRIPTION
DataFrame[BranchResultSchemaPolars]

The polars branch results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def get_branch_results_polars(
    branch_results: pl.LazyFrame,
    three_winding_results: pl.LazyFrame,
    monitored_branches: list[str],
    monitored_trafo3w: list[str],
    failed_outages: list[str],
    timestep: int,
    branch_limits: pl.LazyFrame,
) -> patpl.LazyFrame[BranchResultSchemaPolars]:
    """Get the branch results for the given outages and timestep.

    Parameters
    ----------
    branch_results : pl.LazyFrame
        The branch results from the powsybl security analysis
    three_winding_results : pl.LazyFrame
        The three winding transformer results from the powsybl security analysis
    monitored_branches : list[str]
        The list of monitored branches with 2 sides to get the branch results for
    monitored_trafo3w : list[str]
        The list of monitored three winding transformers to get the branch results for
    failed_outages : list[str]
        The list of failed outages to get nan-branch results for
    timestep : int
        The timestep to get the branch results for
    branch_limits : pl.LazyFrame
        The branch limits from the powsybl network

    Returns
    -------
    patpl.DataFrame[BranchResultSchemaPolars]
        The polars branch results for the given outages and timestep
    """
    # Align all indizes
    branch_results = branch_results.drop("operator_strategy_id")
    branch_results = branch_results.rename({"contingency_id": "contingency", "branch_id": "element"})
    three_winding_results = three_winding_results.rename({"contingency_id": "contingency", "transformer_id": "element"})

    side_one_results = (
        pl.concat(
            [
                branch_results.select(["contingency", "element", "p1", "q1", "i1"]),
                three_winding_results.select(["contingency", "element", "p1", "q1", "i1"]),
            ]
        )
        .with_columns(side=pl.lit(BranchSide.ONE.value))
        .rename({"p1": "p", "q1": "q", "i1": "i"})
    )
    side_two_results = (
        pl.concat(
            [
                branch_results.select(["contingency", "element", "p2", "q2", "i2"]),
                three_winding_results.select(["contingency", "element", "p2", "q2", "i2"]),
            ]
        )
        .with_columns(side=pl.lit(BranchSide.TWO.value))
        .rename({"p2": "p", "q2": "q", "i2": "i"})
    )
    side_three_results = (
        three_winding_results.select(["contingency", "element", "p3", "q3", "i3"])
        .with_columns(side=pl.lit(BranchSide.THREE.value))
        .rename({"p3": "p", "q3": "q", "i3": "i"})
    )
    # Combine and Add timestep column
    converted_branch_results = pl.concat([side_one_results, side_two_results, side_three_results]).with_columns(
        timestep=pl.lit(timestep)
    )
    converted_branch_results = converted_branch_results.cast({"timestep": pl.Int64, "side": pl.Int64})
    branch_limits = branch_limits.cast({"side": pl.Int64, "value": pl.Float64})

    if not converted_branch_results.limit(1).collect().is_empty():
        converted_branch_results = (
            converted_branch_results.join(
                branch_limits, left_on=["element", "side"], right_on=["element_id", "side"], how="left"
            )  # m:1 join
            .with_columns(loading=pl.col("i") / pl.col("value"))
            .drop("value")
        )
    else:
        # add i column
        converted_branch_results = converted_branch_results.with_columns(i=pl.lit(float("nan")))
        # add loading column
        converted_branch_results = converted_branch_results.with_columns(loading=pl.lit(float("nan")))
        # cast null to str
        converted_branch_results = converted_branch_results.cast({"contingency": pl.String, "element": pl.String})
    # fill loading nulls with nans for loading
    converted_branch_results = converted_branch_results.with_columns(pl.col("loading").fill_null(float("nan")))
    # add empty element_name and contingency_name columns to match the schema
    converted_branch_results = converted_branch_results.with_columns(
        element_name=pl.lit(""),
        contingency_name=pl.lit(""),
    )

    # Add results for non convergent contingencies
    failed_branch_results = get_failed_branch_results_polars(timestep, failed_outages, monitored_branches, monitored_trafo3w)

    converted_branch_results = converted_branch_results.select(
        [
            "timestep",
            "contingency",
            "element",
            "side",
            "p",
            "q",
            "i",
            "loading",
            "element_name",
            "contingency_name",
        ]
    )
    converted_branch_results = pl.concat([converted_branch_results, failed_branch_results])

    return converted_branch_results

get_va_diff_results_polars #

get_va_diff_results_polars(
    bus_results,
    outages,
    va_diff_with_buses,
    bus_map,
    timestep,
)

Get the voltage angle difference results for the given outages and bus results.

PARAMETER DESCRIPTION
bus_results

The dataframe containing the bus results of powsybl contingency analysis.

TYPE: LazyFrame

outages

The list of outages to be considered. These are the contingency ids that are outaged.

TYPE: list[str]

va_diff_with_buses

The dataframe containing the voltage angle difference results with the bus pairs that need checking.

TYPE: LazyFrame

bus_map

A mapping from busbar sections to bus breaker buses. This is used to convert the busbar sections to bus breaker buses in the Node Breaker model.

TYPE: LazyFrame

timestep

The timestep of the results.

TYPE: int

RETURNS DESCRIPTION
VADiffResultSchemaPolars

The dataframe containing the voltage angle difference results for the given outages.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def get_va_diff_results_polars(
    bus_results: pl.LazyFrame, outages: list[str], va_diff_with_buses: pl.LazyFrame, bus_map: pl.LazyFrame, timestep: int
) -> patpl.LazyFrame[VADiffResultSchemaPolars]:
    """Get the voltage angle difference results for the given outages and bus results.

    Parameters
    ----------
    bus_results : pl.LazyFrame
        The dataframe containing the bus results of powsybl contingency analysis.
    outages : list[str]
        The list of outages to be considered. These are the contingency ids that are outaged.
    va_diff_with_buses : pl.LazyFrame
        The dataframe containing the voltage angle difference results with the bus pairs that need checking.
    bus_map: pl.LazyFrame
        A mapping from busbar sections to bus breaker buses. This is used to convert the busbar sections to bus breaker buses
        in the Node Breaker model.
    timestep : int
        The timestep of the results.

    Returns
    -------
    VADiffResultSchemaPolars
        The dataframe containing the voltage angle difference results for the given outages.
    """
    if len(outages) == 0 or bus_results.limit(1).collect().is_empty():
        return (
            pl.from_pandas(get_empty_dataframe_from_model(VADiffResultSchema), include_index=True, nan_to_null=False)
            .lazy()
            .cast({"timestep": pl.Int64, "va_diff": pl.Float64})
        )
    basecase_in_result = ""
    iteration_va_diff = va_diff_with_buses.filter(pl.col("contingency").is_in([basecase_in_result, *outages]))

    iteration_va_diff = iteration_va_diff.with_columns(timestep=pl.lit(timestep).cast(pl.Int64))
    # Map busbar sections where there are any. For the rest use the bus_breaker_bus_id from the results (here the bus id)
    bus_results = bus_results.join(
        bus_map.select("id", "bus_breaker_bus_id"), left_on=["bus_id"], right_on=["id"], how="left"
    )  # m:1 join

    # get the voltage angles for both buses in the va_diff definition
    iteration_va_diff = iteration_va_diff.join(
        bus_results.select("contingency_id", "bus_breaker_bus_id", "v_angle"),
        left_on=["contingency", "bus_breaker_bus1_id"],
        right_on=["contingency_id", "bus_breaker_bus_id"],
        how="left",
    )  # m:1 join
    iteration_va_diff = iteration_va_diff.rename({"v_angle": "v_angle_1"})
    iteration_va_diff = iteration_va_diff.join(
        bus_results.select("contingency_id", "bus_breaker_bus_id", "v_angle"),
        left_on=["contingency", "bus_breaker_bus2_id"],
        right_on=["contingency_id", "bus_breaker_bus_id"],
        how="left",
    )  # m:1 join
    iteration_va_diff = iteration_va_diff.rename({"v_angle": "v_angle_2"})

    # Calculate the voltage angle difference
    iteration_va_diff = iteration_va_diff.with_columns((pl.col("v_angle_1") - pl.col("v_angle_2")).alias("va_diff"))

    # drop duplicates
    iteration_va_diff = iteration_va_diff.unique()

    # add empty element_name and contingency_name columns to match the schema
    iteration_va_diff = iteration_va_diff.with_columns(
        element_name=pl.lit(""),  # will be filled later
        contingency_name=pl.lit(""),  # will be filled later
    )

    iteration_va_diff = iteration_va_diff.select(
        [
            "timestep",
            "contingency",
            "element",
            "va_diff",
            "element_name",
            "contingency_name",
        ]
    )

    return iteration_va_diff

update_basename_polars #

update_basename_polars(result_df, basecase_name=None)

Update the basecase name in the results dataframes.

This function updates the contingency index level of the results dataframes to reflect the basecase name. If the basecase is not included in the run, it will remove it from the results. Powsybl includes the basecase as an empty string by default.

The Dataframes are expected to have a multi-index with a "contingency" level. The Dataframes are updated inplace.

PARAMETER DESCRIPTION
result_df

The dataframe containing the branch / node / VADiff results

TYPE: LoadflowResultTablePolars

basecase_name

The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
LoadflowResultTablePolars

The updated dataframes with the basecase name set or removed.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def update_basename_polars(
    result_df: LoadflowResultTablePolars,
    basecase_name: Optional[str] = None,
) -> LoadflowResultTablePolars:
    """Update the basecase name in the results dataframes.

    This function updates the contingency index level of the results dataframes to
    reflect the basecase name. If the basecase is not included in the run, it will
    remove it from the results. Powsybl includes the basecase as an empty string by default.

    The Dataframes are expected to have a multi-index with a "contingency" level.
    The Dataframes are updated inplace.

    Parameters
    ----------
    result_df: LoadflowResultTablePolars
        The dataframe containing the branch / node / VADiff results
    basecase_name: Optional[str], optional
        The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

    Returns
    -------
    LoadflowResultTablePolars
        The updated dataframes with the basecase name set or removed.
    """
    if basecase_name is not None:
        # Replace the empty string with the basecase name
        result_df = result_df.with_columns(
            pl.when(pl.col("contingency") == "")
            .then(pl.lit(basecase_name))
            .otherwise(pl.col("contingency"))
            .alias("contingency")
        )

    else:
        # Remove the basecase from the results if it is not included in the run
        # A plain lazy filter can trigger a Polars optimizer bug here and resurrect
        # projected source columns like `p1` from the pre-conversion branch plan.
        # Adding a temporary row index changes the plan shape without changing the
        # result rows, which keeps the filter stable for disconnected-branch cases.
        # Issue: https://github.com/pola-rs/polars/issues/27509
        result_df = result_df.with_row_index("_row_idx").filter(pl.col("contingency") != "").drop("_row_idx")
    return result_df

add_name_column_polars #

add_name_column_polars(
    result_df, name_map, index_level="element"
)

Translate the element ids in the results dataframes to the original names.

This function translates the element names in the results dataframes to the original names from the Powsybl network. This is useful for debugging and for displaying the results.

PARAMETER DESCRIPTION
result_df

The dataframe containing the node / branch / VADiff results

TYPE: LoadflowResultTablePolars

name_map

A mapping from the element ids to the original names. This is used to translate the element names in the results.

TYPE: dict[str, str]

index_level

The index level storing the ids that should be mapped to the names. by default "element" for the monitored elements.

TYPE: str DEFAULT: 'element'

RETURNS DESCRIPTION
LoadflowResultTablePolars

The updated dataframe with the ids translated to the original names.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def add_name_column_polars(
    result_df: LoadflowResultTablePolars,
    name_map: dict[str, str],
    index_level: str = "element",
) -> LoadflowResultTablePolars:
    """Translate the element ids in the results dataframes to the original names.

    This function translates the element names in the results dataframes to the original names
    from the Powsybl network. This is useful for debugging and for displaying the results.

    Parameters
    ----------
    result_df: LoadflowResultTablePolars
        The dataframe containing the node / branch / VADiff results
    name_map: dict[str | str]
        A mapping from the element ids to the original names. This is used to translate the element names in the results.
    index_level: str, optional
        The index level storing the ids that should be mapped to the names. by default "element" for the monitored elements.

    Returns
    -------
    LoadflowResultTablePolars
        The updated dataframe with the ids translated to the original names.
    """
    result_df = result_df.with_columns(
        pl.col(index_level)
        .replace(name_map, default=pl.col(f"{index_level}_name").fill_null(""))
        .alias(f"{index_level}_name")
    )

    # fill nulls with empty string
    result_df = result_df.with_columns(pl.col(f"{index_level}_name").fill_null(""))
    return result_df

get_failed_node_results_polars #

get_failed_node_results_polars(
    timestep, failed_outages, monitored_nodes
)

Get the failed node results for the given outages and timestep.

A wrapper around get_failed_node_results to convert the pandas dataframe to a polars dataframe.

PARAMETER DESCRIPTION
timestep

The timestep to get the node results for

TYPE: int

failed_outages

The list of failed outages to get nan-node results for

TYPE: list[str]

monitored_nodes

The list of monitored nodes to get the node results for

TYPE: list[str]

RETURNS DESCRIPTION
DataFrame[NodeResultSchemaPolars]

The polars dataframe containing the failed node results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def get_failed_node_results_polars(
    timestep: int, failed_outages: list[str], monitored_nodes: list[str]
) -> patpl.LazyFrame[NodeResultSchemaPolars]:
    """Get the failed node results for the given outages and timestep.

    A wrapper around get_failed_node_results to convert the pandas dataframe to a polars dataframe.

    Parameters
    ----------
    timestep : int
        The timestep to get the node results for
    failed_outages : list[str]
        The list of failed outages to get nan-node results for
    monitored_nodes : list[str]
        The list of monitored nodes to get the node results for

    Returns
    -------
    patpl.DataFrame[NodeResultSchemaPolars]
        The polars dataframe containing the failed node results for the given outages and timestep
    """
    failed_node_results = get_failed_node_results(timestep, failed_outages, monitored_nodes)
    failed_node_results = pl.from_pandas(failed_node_results, include_index=True, nan_to_null=False).lazy()
    failed_node_results = failed_node_results.cast({"timestep": pl.Int64})
    return failed_node_results

get_failed_branch_results_polars #

get_failed_branch_results_polars(
    timestep,
    failed_outages,
    monitored_branches,
    monitored_trafo3w,
)

Get the failed branch results for the given outages and timestep.

A wrapper around get_failed_branch_results to convert the pandas dataframe to a polars dataframe.

PARAMETER DESCRIPTION
timestep

The timestep to get the branch results for

TYPE: int

failed_outages

The list of failed outages to get nan-branch results for

TYPE: list[str]

monitored_branches

The list of monitored branches with 2 sides to get the branch results for

TYPE: list[str]

monitored_trafo3w

The list of monitored three winding transformers to get the branch results for

TYPE: list[str]

RETURNS DESCRIPTION
DataFrame[BranchResultSchemaPolars]

The polars dataframe containing the failed branch results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def get_failed_branch_results_polars(
    timestep: int, failed_outages: list[str], monitored_branches: list[str], monitored_trafo3w: list[str]
) -> patpl.LazyFrame[BranchResultSchemaPolars]:
    """Get the failed branch results for the given outages and timestep.

    A wrapper around get_failed_branch_results to convert the pandas dataframe to a polars dataframe.

    Parameters
    ----------
    timestep : int
        The timestep to get the branch results for
    failed_outages : list[str]
        The list of failed outages to get nan-branch results for
    monitored_branches : list[str]
        The list of monitored branches with 2 sides to get the branch results for
    monitored_trafo3w : list[str]
        The list of monitored three winding transformers to get the branch results for

    Returns
    -------
    patpl.DataFrame[BranchResultSchemaPolars]
        The polars dataframe containing the failed branch results for the given outages and timestep
    """
    failed_branch_results = get_failed_branch_results(timestep, failed_outages, monitored_branches, monitored_trafo3w)
    failed_branch_results = pl.from_pandas(failed_branch_results, include_index=True, nan_to_null=False).lazy()
    failed_branch_results = failed_branch_results.cast({"timestep": pl.Int64, "side": pl.Int64})
    return failed_branch_results

toop_engine_contingency_analysis.pypowsybl.powsybl_helpers #

Helper functions to translate the N-1 definition into a usable format for Powsybl.

This includes translating contingencies, monitored elements and collecting the necessary data from the network, so this only has to happen once.

POWSYBL_CONVERGENCE_MAP module-attribute #

POWSYBL_CONVERGENCE_MAP = {
    value: value,
    value: value,
    value: value,
    value: value,
}

PowsyblBranchLimitCacheProtocol #

Bases: Protocol

Protocol for cache objects that provide prepared branch limits and reuse validation.

branch_limits instance-attribute #

branch_limits

matches #

matches(
    monitored_branches,
    chosen_limit,
    current_limit_fingerprint,
)

Return whether the cache entry can be reused for the current request.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def matches(self, monitored_branches: Sequence[str], chosen_limit: str, current_limit_fingerprint: str) -> bool:
    """Return whether the cache entry can be reused for the current request."""

PowsyblContingency #

Bases: BaseModel

A Powsybl contingency.

This is a simplified version of the PandapowerContingency that is used in Powsybl. It contains only the necessary information to run an N-1 analysis in Powsybl.

id instance-attribute #

id

The unique id of the contingency.

name class-attribute instance-attribute #

name = ''

The name of the contingency.

elements instance-attribute #

elements

The list of outaged element ids.

is_basecase #

is_basecase()

Check if the contingency is a basecase.

A basecase contingency has no outaged elements.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def is_basecase(self) -> bool:
    """Check if the contingency is a basecase.

    A basecase contingency has no outaged elements.
    """
    return len(self.elements) == 0

PowsyblMonitoredElements #

Bases: TypedDict

A dictionary to hold the monitored element ids for the N-1 analysis.

This is used to store the monitored elements in a format that can be used in Powsybl.

branches instance-attribute #

branches

trafo3w instance-attribute #

trafo3w

switches instance-attribute #

switches

voltage_levels instance-attribute #

voltage_levels

buses instance-attribute #

buses

PowsyblNMinus1Definition #

Bases: BaseModel

A Powsybl N-1 definition.

This is a simplified version of the NMinus1Definition that is used in Powsybl. It contains only the necessary information to run an N-1 analysis in Powsybl.

model_config class-attribute instance-attribute #

model_config = {'arbitrary_types_allowed': True}

contingencies instance-attribute #

contingencies

The outages to be considered. Maps contingency id to outaged element ids.

monitored_elements instance-attribute #

monitored_elements

The list of branches with two sides, to be monitored during the N-1 analysis.

missing_elements class-attribute instance-attribute #

missing_elements = []

A list of monitored elements that are not present in the network.

missing_contingencies class-attribute instance-attribute #

missing_contingencies = []

A list of contingencies whose elements are (partially) not present in the network.

branch_limits instance-attribute #

branch_limits

The branch limits to be used during the N-1 analysis. If None, the default limits will be used.

blank_va_diff instance-attribute #

blank_va_diff

The buses to be used during the N-1 analysis. This is used to determine the voltage levels of the monitored buses. Could be a busbar section or a bus_breaker_buse depending on the model type.

bus_map instance-attribute #

bus_map

A mapping from busbar sections and bus breaker buses to bus breaker buses, electrical buses and voltage_levels. This help to always get the correct buses, even if the model type changes.

element_name_mapping instance-attribute #

element_name_mapping

A mapping from element ids to their names. This is used to convert the element ids to their names in the results.

contingency_name_mapping instance-attribute #

contingency_name_mapping

A mapping from contingency ids to their names. This is used to convert the contingency ids to their names in the results.

voltage_levels instance-attribute #

voltage_levels

The voltage levels of the buses. This is used to determine voltage limits.

distributed_slack class-attribute instance-attribute #

distributed_slack = True

Whether to distribute the slack across the generators in the grid. Only relevant for powsybl grids.

contingency_propagation class-attribute instance-attribute #

contingency_propagation = False

Whether to enable powsybl's contingency propagation in the N-1 analysis.

https://powsybl.readthedocs.io/projects/powsybl-open-loadflow/en/latest/security/parameters.html Security Analysis will determine by topological search the switches with type circuit breakers (i.e. capable of opening fault currents) that must be opened to isolate the fault. Depending on the network structure, this could lead to more equipments to be simulated as tripped, because disconnectors and load break switches (i.e., not capable of opening fault currents) are not considered.

__getitem__ #

__getitem__(key)

Get a subset of the nminus1definition based on the contingencies.

If a string is given, the contingency id must be in the contingencies list. If an integer or slice is given, the case id will be indexed by the integer or slice.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def __getitem__(self, key: str | int | slice) -> "PowsyblNMinus1Definition":
    """Get a subset of the nminus1definition based on the contingencies.

    If a string is given, the contingency id must be in the contingencies list.
    If an integer or slice is given, the case id will be indexed by the integer or slice.
    """
    if isinstance(key, str):
        contingency_ids = [contingency.id for contingency in self.contingencies]
        if key not in contingency_ids:
            raise KeyError(f"Contingency id {key} not in contingencies.")
        index = contingency_ids.index(key)
        index = slice(index, index + 1)
    elif isinstance(key, int):
        index = slice(key, key + 1)
    elif isinstance(key, slice):
        index = key
    else:
        raise TypeError("Key must be a string, int or slice.")

    updated_definition = self.model_copy(
        update={
            "contingencies": self.contingencies[index],
        }
    )
    # pylint: disable=unsubscriptable-object
    return PowsyblNMinus1Definition.model_validate(updated_definition)

translate_contingency_to_powsybl #

translate_contingency_to_powsybl(
    contingencies, identifiables
)

Translate the contingencies to a format that can be used in Powsybl.

PARAMETER DESCRIPTION
contingencies

The list of contingencies to translate.

TYPE: list[Contingency]

identifiables

A dataframe containing the identifiables of the network. This is used to check if the elements are present in the network.

TYPE: DataFrame

RETURNS DESCRIPTION
pow_contingency

A list of PowsyblContingency objects, each containing the id, name and elements.

TYPE: list[PowsyblContingency]

missing_contingency

A list of all contingencies that are not fully present in the network.

TYPE: list[Contingency]

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def translate_contingency_to_powsybl(
    contingencies: list[Contingency], identifiables: pd.Index
) -> tuple[list[PowsyblContingency], list[Contingency]]:
    """Translate the contingencies to a format that can be used in Powsybl.

    Parameters
    ----------
    contingencies : list[Contingency]
        The list of contingencies to translate.
    identifiables : pd.DataFrame
        A dataframe containing the identifiables of the network.
        This is used to check if the elements are present in the network.

    Returns
    -------
    pow_contingency: list[PowsyblContingency]
        A list of PowsyblContingency objects, each containing the id, name and elements.
    missing_contingency: list[Contingency]
        A list of all contingencies that are not fully present in the network.
    """
    pow_contingencies = []
    missing_contingencies = []
    for contingency in contingencies:
        outaged_elements = []
        for element in contingency.elements:
            if element.id not in identifiables:
                missing_contingencies.append(contingency)
                break
            outaged_elements.append(element.id)
        else:
            pp_contingency = PowsyblContingency(
                id=contingency.id,
                name=contingency.name or "",
                elements=outaged_elements,
            )
            pow_contingencies.append(pp_contingency)

    return pow_contingencies, missing_contingencies

translate_monitored_elements_to_powsybl #

translate_monitored_elements_to_powsybl(
    nminus1_definition, branches, buses, switches
)

Translate the monitored elements to a format that can be used in Powsybl.

Also adds busses that are not monitored per se, but are needed for the voltage angle difference calculation.

PARAMETER DESCRIPTION
nminus1_definition

The original Nminus1Definition containing the monitored elements and outages.

TYPE: Nminus1Definition

branches

The dataframe containing the branches of the network and their voltage_id including 3w-trafos.

TYPE: DataFrame

buses

The dataframe containing the buses of the network and their voltage_id. These include busbar sections and bus_breaker buses.

TYPE: DataFrame

switches

The dataframe containing the switches of the network and their voltage_id.

TYPE: DataFrame

RETURNS DESCRIPTION
monitored_elements

A dictionary containing the monitored elements in a format that can be used in Powsybl.

TYPE: PowsyblMonitoredElements

element_name_mapping

A mapping from element ids to their names. This is used to convert the element ids to their names in the results.

TYPE: dict[str, str]

missing_elements

A list of monitored elements that are not present in the network.

TYPE: list[GridElement]

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def translate_monitored_elements_to_powsybl(
    nminus1_definition: Nminus1Definition, branches: pd.DataFrame, buses: pd.DataFrame, switches: pd.DataFrame
) -> tuple[PowsyblMonitoredElements, dict[str, str], list[GridElement]]:
    """Translate the monitored elements to a format that can be used in Powsybl.

    Also adds busses that are not monitored per se, but are needed for the voltage angle difference calculation.

    Parameters
    ----------
    nminus1_definition: Nminus1Definition
        The original Nminus1Definition containing the monitored elements and outages.
    branches : pd.DataFrame
        The dataframe containing the branches of the network and their voltage_id including 3w-trafos.
    buses : pd.DataFrame
        The dataframe containing the buses of the network and their voltage_id.
        These include busbar sections and bus_breaker buses.
    switches : pd.DataFrame
        The dataframe containing the switches of the network and their voltage_id.


    Returns
    -------
    monitored_elements: PowsyblMonitoredElements
        A dictionary containing the monitored elements in a format that can be used in Powsybl.
    element_name_mapping: dict[str, str]
        A mapping from element ids to their names. This is used to convert the element ids to their names in the results.
    missing_elements: list[GridElement]
        A list of monitored elements that are not present in the network.
    """
    monitored_elements = nminus1_definition.monitored_elements
    all_monitored_branches = [element.id for element in monitored_elements if element.kind == "branch"]
    missing_branches = set(all_monitored_branches) - set(branches.index)
    monitored_branch_df = branches.loc[
        [branch_id for branch_id in all_monitored_branches if branch_id not in missing_branches]
    ]
    monitored_branches = monitored_branch_df.loc[
        monitored_branch_df.type.isin(["LINE", "TWO_WINDINGS_TRANSFORMER", "TIE_LINE"])
    ].index.tolist()

    monitored_trafo3w = monitored_branch_df.loc[monitored_branch_df.type == "THREE_WINDINGS_TRANSFORMER"].index.tolist()

    all_monitored_buses = [element.id for element in monitored_elements if element.kind == "bus"]
    missing_buses = set(all_monitored_buses) - set(buses.index)
    monitored_buses = [bus_id for bus_id in all_monitored_buses if bus_id not in missing_buses]

    all_monitored_switches = [element.id for element in monitored_elements if element.kind == "switch"]
    missing_switches = set(all_monitored_switches) - set(switches.index)
    monitored_switches = [switch_id for switch_id in all_monitored_switches if switch_id not in missing_switches]

    # The voltagelevels of outaged branches are relevant for the voltage angle difference calculation.
    all_outaged_branch_ids = [
        elem.id for contingency in nminus1_definition.contingencies for elem in contingency.elements if elem.kind == "branch"
    ]
    missing_outage_branches = set(all_outaged_branch_ids) - set(branches.index)
    outaged_branch_df = branches.loc[
        [branch_id for branch_id in all_outaged_branch_ids if branch_id not in missing_outage_branches]
    ]
    outaged_branch_ids = outaged_branch_df.loc[
        outaged_branch_df.type.isin(["LINE", "TWO_WINDINGS_TRANSFORMER", "TIE_LINE"])
    ].index.tolist()
    monitored_voltage_levels = set(
        buses.loc[monitored_buses, "voltage_level_id"].unique().tolist()
        + switches.loc[monitored_switches, "voltage_level_id"].unique().tolist()
        + branches.loc[monitored_branches + outaged_branch_ids, "voltage_level1_id"].unique().tolist()
        + branches.loc[monitored_branches + outaged_branch_ids, "voltage_level2_id"].unique().tolist()
    )
    powsybl_monitored_elements = PowsyblMonitoredElements(
        branches=monitored_branches,
        trafo3w=monitored_trafo3w,
        switches=monitored_switches,
        buses=monitored_buses,
        voltage_levels=list(monitored_voltage_levels),
    )

    element_name_mapping = {element.id: element.name or "" for element in monitored_elements}
    missing_element_ids = missing_branches | missing_buses | missing_switches
    missing_elements = [element for element in monitored_elements if element.id in missing_element_ids]
    return powsybl_monitored_elements, element_name_mapping, missing_elements

prepare_branch_limits #

prepare_branch_limits(
    branch_limits, chosen_limit, monitored_branches
)

Prepare the branch limits for the N-1 analysis.

This is done here, to avoid having to do this in every process.

PARAMETER DESCRIPTION
branch_limits

The dataframe containing the branch limits of the network.

TYPE: DataFrame

chosen_limit

The name of the limit to be used for the N-1 analysis. This is usually "permanent_limit".

TODO Decide if and how this could be extended to other limits.#

TYPE: str

monitored_branches

The list of branches to be monitored during the N-1 analysis.

TYPE: list[str]

RETURNS DESCRIPTION
branch_limits

The dataframe containing the branch limits for the N-1 analysis in the right format.

TYPE: DataFrame

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def prepare_branch_limits(branch_limits: pd.DataFrame, chosen_limit: str, monitored_branches: list[str]) -> pd.DataFrame:
    """Prepare the branch limits for the N-1 analysis.

    This is done here, to avoid having to do this in every process.

    Parameters
    ----------
    branch_limits : pd.DataFrame
        The dataframe containing the branch limits of the network.
    chosen_limit : str
        The name of the limit to be used for the N-1 analysis. This is usually "permanent_limit".
        #TODO Decide if and how this could be extended to other limits.
    monitored_branches : list[str]
        The list of branches to be monitored during the N-1 analysis.

    Returns
    -------
    branch_limits : pd.DataFrame
        The dataframe containing the branch limits for the N-1 analysis in the right format.
    """
    translated_limits = branch_limits.reset_index()
    chosen_limit_type = translated_limits["name"] == chosen_limit
    limit_monitored = translated_limits["element_id"].isin(monitored_branches)
    translated_limits = translated_limits[chosen_limit_type & limit_monitored]
    translated_limits["side"] = translated_limits["side"].map({"ONE": 1, "TWO": 2, "THREE": 3})
    return translated_limits.groupby(by=["element_id", "side"]).min()[["value"]]

get_blank_va_diff #

get_blank_va_diff(
    all_outages, single_branch_outages, monitored_switches
)

Get a blank dataframe for the voltage angle difference results.

This already includes all possible contingencies and monitored switches. The buses of the switches and the outaged branches are added later.

PARAMETER DESCRIPTION
all_outages

The list of all outages to be considered. For all of these cases, all switches need to be checked

TYPE: list[str]

single_branch_outages

A dictionary mapping contingency ids to single outaged element ids. For all of these cases, the specific outaged branch need to be checked.

TYPE: dict[str, str]

monitored_switches

The list of monitored switches to be considered. These are only the switches that are open and retained.

TYPE: list[str]

RETURNS DESCRIPTION
DataFrame

A blank dataframe with the correct index for the voltage angle difference results. The index is a MultiIndex with the following levels: - timestep: The timestep of the results - contingency: The contingency id (including an empty string for the base case) - element: The element id (the switch or outaged branch)

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def get_blank_va_diff(
    all_outages: list[str], single_branch_outages: dict[str, str], monitored_switches: list[str]
) -> pd.DataFrame:
    """Get a blank dataframe for the voltage angle difference results.

    This already includes all possible contingencies and monitored switches.
    The buses of the switches and the outaged branches are added later.

    Parameters
    ----------
    all_outages : list[str]
        The list of all outages to be considered. For all of these cases, all switches need to be checked
    single_branch_outages : dict[str, str]
        A dictionary mapping contingency ids to single outaged element ids.
        For all of these cases, the specific outaged branch need to be checked.
    monitored_switches : list[str]
        The list of monitored switches to be considered. These are only the switches that are open and retained.

    Returns
    -------
    pd.DataFrame
        A blank dataframe with the correct index for the voltage angle difference results.
        The index is a MultiIndex with the following levels:
        - timestep: The timestep of the results
        - contingency: The contingency id (including an empty string for the base case)
        - element: The element id (the switch or outaged branch)
    """
    basecase_in_result = ""
    switch_va_diff_df = pd.DataFrame(
        index=pd.MultiIndex.from_product(
            [
                [basecase_in_result, *all_outages],  # Add the empty string for the basecase
                monitored_switches,
            ],
            names=["contingency", "element"],
        )
    )
    outage_va_diff_df = pd.DataFrame(
        index=pd.MultiIndex.from_product(
            [
                single_branch_outages.keys(),
            ],
            names=["contingency"],
        )
    )
    outage_va_diff_df["element"] = single_branch_outages.values()
    outage_va_diff_df.set_index(["element"], append=True, inplace=True)
    blank_va_diff_df = pd.concat([switch_va_diff_df, outage_va_diff_df], axis=0)
    return blank_va_diff_df

get_blank_va_diff_with_buses #

get_blank_va_diff_with_buses(
    branches,
    switches,
    pow_contingencies,
    monitored_switches,
)

Get a blank dataframe for the voltage angle difference results with the buspairs that need checking.

The buspairs are bus_breaker_buses (net.get_bus_breaker_view_buses)

PARAMETER DESCRIPTION
branches

The dataframe containing the branches of the network and their bus_breaker_buses.

TYPE: DataFrame

switches

The dataframe containing the switches of the network and their bus_breaker_buses.

TYPE: DataFrame

pow_contingencies

The list of all contingencies to be considered. For all of these cases, all switches need to be checked. For single outages we also consider the outaged branches.

TYPE: list[PowsyblContingency]

monitored_switches

The list of monitored switches to be considered. These are only the switches that are open and retained.

TYPE: list[str]

RETURNS DESCRIPTION
DataFrame

A blank dataframe with the correct index for the voltage angle difference results. The index is a MultiIndex with the following levels: - contingency: The contingency id (including an empty string for the base case) - element: The element id (the switch or outaged branch) - bus_breaker_bus1_id: The first bus_breaker_bus_id of the element - bus_breaker_bus2_id: The second bus_breaker_bus_id of the element

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def get_blank_va_diff_with_buses(
    branches: pd.DataFrame,
    switches: pd.DataFrame,
    pow_contingencies: list[PowsyblContingency],
    monitored_switches: list[str],
) -> pd.DataFrame:
    """Get a blank dataframe for the voltage angle difference results with the buspairs that need checking.

    The buspairs are bus_breaker_buses (net.get_bus_breaker_view_buses)

    Parameters
    ----------
    branches : pd.DataFrame
        The dataframe containing the branches of the network and their bus_breaker_buses.
    switches : pd.DataFrame
        The dataframe containing the switches of the network and their bus_breaker_buses.
    pow_contingencies: list[PowsyblContingency]
        The list of all contingencies to be considered. For all of these cases, all switches need to be checked.
        For single outages we also consider the outaged branches.
    monitored_switches : list[str]
        The list of monitored switches to be considered. These are only the switches that are open and retained.

    Returns
    -------
    pd.DataFrame
        A blank dataframe with the correct index for the voltage angle difference results.
        The index is a MultiIndex with the following levels:
        - contingency: The contingency id (including an empty string for the base case)
        - element: The element id (the switch or outaged branch)
        - bus_breaker_bus1_id: The first bus_breaker_bus_id of the element
        - bus_breaker_bus2_id: The second bus_breaker_bus_id of the element

    """
    branch_indizes = branches.query("type in ['LINE', 'TWO_WINDINGS_TRANSFORMER', 'TIE_LINE']").index
    single_contingencies = [contingency for contingency in pow_contingencies if len(contingency.elements) == 1]
    single_branch_outages = {
        contingency.id: contingency.elements[0]
        for contingency in single_contingencies
        if contingency.elements[0] in branch_indizes
    }
    branches = branches.loc[single_branch_outages.values()][["bus_breaker_bus1_id", "bus_breaker_bus2_id"]]
    switches = switches.loc[monitored_switches]
    switches = switches[switches.open & switches.retained][["bus_breaker_bus1_id", "bus_breaker_bus2_id"]]
    element_df = pd.concat([branches, switches], axis=0)
    all_outage_ids = [contingency.id for contingency in pow_contingencies if len(contingency.elements) > 0]
    blank_va_diff = get_blank_va_diff(all_outage_ids, single_branch_outages, switches.index.tolist())

    va_diff_with_buses = blank_va_diff.merge(
        element_df, left_on=blank_va_diff.index.get_level_values("element"), right_index=True, how="left"
    ).drop(columns="key_0")
    return va_diff_with_buses

get_va_diff_results #

get_va_diff_results(
    bus_results,
    outages,
    va_diff_with_buses,
    bus_map,
    timestep,
)

Get the voltage angle difference results for the given outages and bus results.

PARAMETER DESCRIPTION
bus_results

The dataframe containing the bus results of powsybl contingency analysis.

TYPE: DataFrame

outages

The list of outages to be considered. These are the contingency ids that are outaged.

TYPE: list[str]

va_diff_with_buses

The dataframe containing the voltage angle difference results with the bus pairs that need checking.

TYPE: DataFrame

bus_map

A mapping from busbar sections to bus breaker buses. This is used to convert the busbar sections to bus breaker buses in the Node Breaker model.

TYPE: DataFrame

timestep

The timestep of the results.

TYPE: int

RETURNS DESCRIPTION
DataFrame

The dataframe containing the voltage angle difference results for the given outages.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types
def get_va_diff_results(
    bus_results: pd.DataFrame, outages: list[str], va_diff_with_buses: pd.DataFrame, bus_map: pd.DataFrame, timestep: int
) -> pat.DataFrame[VADiffResultSchema]:
    """Get the voltage angle difference results for the given outages and bus results.

    Parameters
    ----------
    bus_results : pd.DataFrame
        The dataframe containing the bus results of powsybl contingency analysis.
    outages : list[str]
        The list of outages to be considered. These are the contingency ids that are outaged.
    va_diff_with_buses : pd.DataFrame
        The dataframe containing the voltage angle difference results with the bus pairs that need checking.
    bus_map: pd.DataFrame
        A mapping from busbar sections to bus breaker buses. This is used to convert the busbar sections to bus breaker buses
        in the Node Breaker model.
    timestep : int
        The timestep of the results.

    Returns
    -------
    pd.DataFrame
        The dataframe containing the voltage angle difference results for the given outages.
    """
    if len(outages) == 0 or len(va_diff_with_buses) == 0:
        return get_empty_dataframe_from_model(VADiffResultSchema)
    basecase_in_result = ""
    iteration_va_diff = va_diff_with_buses.loc[
        va_diff_with_buses.index.get_level_values("contingency").isin([basecase_in_result, *outages])
    ]
    iteration_va_diff["timestep"] = timestep
    # Map busbar sections where there are any. For the rest use the bus_breaker_bus_id from the results (here the bus id)
    bus_results = bus_results.merge(
        bus_map.bus_breaker_bus_id, left_on=bus_results.index.get_level_values("bus_id"), right_index=True, how="left"
    )

    iteration_va_diff = iteration_va_diff.reset_index()
    # Map the values from the results to the buses of the switches and the outaged branches
    iteration_va_diff = iteration_va_diff.merge(
        bus_results[["v_angle"]].add_suffix("_1"),
        left_on=["contingency", "bus_breaker_bus1_id"],
        right_on=[bus_results.index.get_level_values("contingency_id"), bus_results.bus_breaker_bus_id],
        how="left",
    )
    iteration_va_diff = iteration_va_diff.merge(
        bus_results[["v_angle"]].add_suffix("_2"),
        left_on=["contingency", "bus_breaker_bus2_id"],
        right_on=[bus_results.index.get_level_values("contingency_id"), bus_results.bus_breaker_bus_id],
        how="left",
    )
    iteration_va_diff.drop_duplicates(inplace=True)
    iteration_va_diff.set_index(["timestep", "contingency", "element"], inplace=True)
    iteration_va_diff["va_diff"] = iteration_va_diff["v_angle_1"] - iteration_va_diff["v_angle_2"]

    iteration_va_diff = iteration_va_diff.drop(
        columns=["bus_breaker_bus1_id", "bus_breaker_bus2_id", "v_angle_1", "v_angle_2"]
    )

    # set empty columns to NaN
    iteration_va_diff["element_name"] = ""
    iteration_va_diff["contingency_name"] = ""

    return iteration_va_diff

get_busbar_mapping #

get_busbar_mapping(net)

Get a map between the different kind of buses in the network.

Maps busbar sections and bus breaker buses to bus breaker buses and electrical buses. Maps the electrical buses to monitored buses (either bus breaker or busbar sections).

PARAMETER DESCRIPTION
net

The Powsybl network to use for the translation. This is used to get the busbar sections and bus breaker buses.

TYPE: Network

RETURNS DESCRIPTION
DataFrame

A dataframe containing the busbar mapping from busbar sections to bus breaker buses.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def get_busbar_mapping(net: Network) -> pd.DataFrame:
    """Get a map between the different kind of buses in the network.

    Maps busbar sections and bus breaker buses to bus breaker buses and electrical buses.
    Maps the electrical buses to monitored buses (either bus breaker or busbar sections).

    Parameters
    ----------
    net : Network
        The Powsybl network to use for the translation. This is used to get the busbar sections and bus breaker buses.

    Returns
    -------
    pd.DataFrame
        A dataframe containing the busbar mapping from busbar sections to bus breaker buses.
    """
    busbar_sections = net.get_injections(attributes=["type", "bus_breaker_bus_id", "bus_id", "voltage_level_id"]).query(
        "type == 'BUSBAR_SECTION'"
    )
    mapping_cols = ["bus_breaker_bus_id", "bus_id", "voltage_level_id"]
    bus_breaker_buses = net.get_bus_breaker_view_buses(attributes=["voltage_level_id", "bus_id"])
    bus_breaker_buses["bus_breaker_bus_id"] = bus_breaker_buses.index
    bus_map = pd.concat([busbar_sections[mapping_cols], bus_breaker_buses[mapping_cols]], axis=0)
    return bus_map

translate_branch_limits_for_powsybl #

translate_branch_limits_for_powsybl(
    branch_limits,
    monitored_branches,
    chosen_limit="permanent_limit",
)

Translate current operational limits into prepared monitored branch limits for Powsybl.

PARAMETER DESCRIPTION
branch_limits

The raw operational limits dataframe, already filtered to the relevant limit type.

TYPE: DataFrame

monitored_branches

The monitored branch ids whose limits are needed for the analysis.

TYPE: list[str]

chosen_limit

The limit name to select, by default "permanent_limit".

TYPE: str DEFAULT: 'permanent_limit'

RETURNS DESCRIPTION
DataFrame

The prepared branch limit dataframe indexed by element id and side.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def translate_branch_limits_for_powsybl(
    branch_limits: pd.DataFrame, monitored_branches: list[str], chosen_limit: str = "permanent_limit"
) -> pd.DataFrame:
    """Translate current operational limits into prepared monitored branch limits for Powsybl.

    Parameters
    ----------
    branch_limits : pd.DataFrame
        The raw operational limits dataframe, already filtered to the relevant limit type.
    monitored_branches : list[str]
        The monitored branch ids whose limits are needed for the analysis.
    chosen_limit : str, optional
        The limit name to select, by default "permanent_limit".

    Returns
    -------
    pd.DataFrame
        The prepared branch limit dataframe indexed by element id and side.
    """
    return prepare_branch_limits(branch_limits, chosen_limit=chosen_limit, monitored_branches=monitored_branches)

normalize_monitored_branches_for_powsybl #

normalize_monitored_branches_for_powsybl(
    monitored_branches,
)

Normalize monitored branch ids for cache-key comparisons.

PARAMETER DESCRIPTION
monitored_branches

The monitored branch ids to normalize.

TYPE: Sequence[str]

RETURNS DESCRIPTION
tuple[str, ...]

The monitored branch ids sorted into a stable tuple.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def normalize_monitored_branches_for_powsybl(monitored_branches: Sequence[str]) -> tuple[str, ...]:
    """Normalize monitored branch ids for cache-key comparisons.

    Parameters
    ----------
    monitored_branches : Sequence[str]
        The monitored branch ids to normalize.

    Returns
    -------
    tuple[str, ...]
        The monitored branch ids sorted into a stable tuple.
    """
    return tuple(sorted(monitored_branches))

fingerprint_current_limits_for_powsybl #

fingerprint_current_limits_for_powsybl(current_limits)

Create a stable fingerprint for a current-limit snapshot.

PARAMETER DESCRIPTION
current_limits

The raw current-limit dataframe from the active network.

TYPE: DataFrame

RETURNS DESCRIPTION
str

A stable hash representing the current-limit contents.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def fingerprint_current_limits_for_powsybl(current_limits: pd.DataFrame) -> str:
    """Create a stable fingerprint for a current-limit snapshot.

    Parameters
    ----------
    current_limits : pd.DataFrame
        The raw current-limit dataframe from the active network.

    Returns
    -------
    str
        A stable hash representing the current-limit contents.
    """
    normalized_limits = current_limits.reset_index()
    if len(normalized_limits.columns) > 0:
        normalized_limits = normalized_limits.sort_values(by=sorted(normalized_limits.columns), kind="mergesort")
    normalized_limits = normalized_limits.reset_index(drop=True)
    limit_hash = pd.util.hash_pandas_object(normalized_limits, index=False)
    return hashlib.sha256(limit_hash.to_numpy().tobytes()).hexdigest()

get_current_branch_limits_for_powsybl #

get_current_branch_limits_for_powsybl(net)

Load the raw current operational limits from the active network.

PARAMETER DESCRIPTION
net

The active Powsybl network.

TYPE: Network

RETURNS DESCRIPTION
DataFrame

The current operational limits filtered to current-type limits.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def get_current_branch_limits_for_powsybl(net: Network) -> pd.DataFrame:
    """Load the raw current operational limits from the active network.

    Parameters
    ----------
    net : Network
        The active Powsybl network.

    Returns
    -------
    pd.DataFrame
        The current operational limits filtered to current-type limits.
    """
    return net.get_operational_limits().query("type=='CURRENT'")

resolve_branch_limits_for_powsybl #

resolve_branch_limits_for_powsybl(
    net,
    monitored_branches,
    branch_limit_cache=None,
    chosen_limit="permanent_limit",
)

Resolve prepared branch limits, reusing a cache entry when it is still valid.

PARAMETER DESCRIPTION
net

The active Powsybl network whose current limits are used for resolution.

TYPE: Network

monitored_branches

The monitored branch ids whose prepared limits are needed.

TYPE: list[str]

branch_limit_cache

Optional cache for the prepared branch limits. If stale, it is ignored.

TYPE: Optional[PowsyblBranchLimitCacheProtocol] DEFAULT: None

chosen_limit

The limit name to select, by default "permanent_limit".

TYPE: str DEFAULT: 'permanent_limit'

RETURNS DESCRIPTION
DataFrame

The prepared branch-limit dataframe to use for the analysis.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def resolve_branch_limits_for_powsybl(
    net: Network,
    monitored_branches: list[str],
    branch_limit_cache: Optional[PowsyblBranchLimitCacheProtocol] = None,
    chosen_limit: str = "permanent_limit",
) -> pd.DataFrame:
    """Resolve prepared branch limits, reusing a cache entry when it is still valid.

    Parameters
    ----------
    net : Network
        The active Powsybl network whose current limits are used for resolution.
    monitored_branches : list[str]
        The monitored branch ids whose prepared limits are needed.
    branch_limit_cache : Optional[PowsyblBranchLimitCacheProtocol], optional
        Optional cache for the prepared branch limits. If stale, it is ignored.
    chosen_limit : str, optional
        The limit name to select, by default "permanent_limit".

    Returns
    -------
    pd.DataFrame
        The prepared branch-limit dataframe to use for the analysis.
    """
    current_limits = get_current_branch_limits_for_powsybl(net)
    current_limit_fingerprint = fingerprint_current_limits_for_powsybl(current_limits)
    if branch_limit_cache is not None and branch_limit_cache.matches(
        monitored_branches=monitored_branches,
        chosen_limit=chosen_limit,
        current_limit_fingerprint=current_limit_fingerprint,
    ):
        return branch_limit_cache.branch_limits
    return translate_branch_limits_for_powsybl(
        branch_limits=current_limits,
        monitored_branches=monitored_branches,
        chosen_limit=chosen_limit,
    )

translate_nminus1_components_for_powsybl #

translate_nminus1_components_for_powsybl(
    n_minus_1_definition, net
)

Translate contingencies, monitored elements, and network-derived metadata for Powsybl.

PARAMETER DESCRIPTION
n_minus_1_definition

The N-1 definition to translate.

TYPE: Nminus1Definition

net

The Powsybl network to use for the translation. This is used to get the busbarsections, buses, branches and switches.

TYPE: Network

RETURNS DESCRIPTION
PowsyblNMinus1Definition

The translated N-1 definition with all non-limit components prepared.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def translate_nminus1_components_for_powsybl(
    n_minus_1_definition: Nminus1Definition, net: Network
) -> PowsyblNMinus1Definition:
    """Translate contingencies, monitored elements, and network-derived metadata for Powsybl.

    Parameters
    ----------
    n_minus_1_definition : Nminus1Definition
        The N-1 definition to translate.
    net : Network
        The Powsybl network to use for the translation. This is used to get the busbarsections, buses, branches and switches.

    Returns
    -------
    PowsyblNMinus1Definition
        The translated N-1 definition with all non-limit components prepared.
    """
    _validate_powsybl_id_type(n_minus_1_definition)

    busmap = get_busbar_mapping(net)
    voltage_levels = net.get_voltage_levels(attributes=["nominal_v", "high_voltage_limit", "low_voltage_limit"])
    voltage_levels["high_voltage_limit"] = voltage_levels["high_voltage_limit"].fillna(voltage_levels["nominal_v"] * 1.2)
    voltage_levels["low_voltage_limit"] = voltage_levels["low_voltage_limit"].fillna(voltage_levels["nominal_v"] * 0.8)

    branches = net.get_branches(
        attributes=["type", "voltage_level1_id", "voltage_level2_id", "bus_breaker_bus1_id", "bus_breaker_bus2_id"]
    )
    trafo3ws = net.get_3_windings_transformers(
        attributes=[
            "voltage_level1_id",
            "voltage_level2_id",
            "voltage_level3_id",
            "bus_breaker_bus1_id",
            "bus_breaker_bus2_id",
            "bus_breaker_bus3_id",
        ]
    ).assign(type="THREE_WINDINGS_TRANSFORMER")
    all_branches = pd.concat([branches, trafo3ws], axis=0)
    switches = net.get_switches(
        attributes=["open", "retained", "voltage_level_id", "bus_breaker_bus1_id", "bus_breaker_bus2_id"]
    )
    identifiables = net.get_identifiables(attributes=[]).index
    pow_contingencies, missing_contingencies = translate_contingency_to_powsybl(
        n_minus_1_definition.contingencies, identifiables
    )
    contingency_name_map = {contingency.id: contingency.name or "" for contingency in n_minus_1_definition.contingencies}
    (monitored_elements, element_name_map, missing_elements) = translate_monitored_elements_to_powsybl(
        n_minus_1_definition, all_branches, busmap, switches
    )

    va_diff_with_buses = get_blank_va_diff_with_buses(branches, switches, pow_contingencies, monitored_elements["switches"])
    return PowsyblNMinus1Definition(
        contingencies=pow_contingencies,
        blank_va_diff=va_diff_with_buses,
        monitored_elements=monitored_elements,
        branch_limits=pd.DataFrame(),
        bus_map=busmap,
        element_name_mapping=element_name_map,
        contingency_name_mapping=contingency_name_map,
        voltage_levels=voltage_levels,
        missing_elements=missing_elements,
        missing_contingencies=missing_contingencies,
    )

translate_nminus1_for_powsybl #

translate_nminus1_for_powsybl(
    n_minus_1_definition, net, branch_limit_cache=None
)

Translate the N-1 definition to a format that can be used in Powsybl.

PARAMETER DESCRIPTION
n_minus_1_definition

The generic N-1 definition to translate.

TYPE: Nminus1Definition

net

The active Powsybl network whose state is used to resolve monitored elements and limits.

TYPE: Network

branch_limit_cache

Optional cache for the prepared branch limits. If stale, it is ignored.

TYPE: Optional[PowsyblBranchLimitCacheProtocol] DEFAULT: None

RETURNS DESCRIPTION
PowsyblNMinus1Definition

The translated N-1 definition including prepared branch limits.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def translate_nminus1_for_powsybl(
    n_minus_1_definition: Nminus1Definition,
    net: Network,
    branch_limit_cache: Optional[PowsyblBranchLimitCacheProtocol] = None,
) -> PowsyblNMinus1Definition:
    """Translate the N-1 definition to a format that can be used in Powsybl.

    Parameters
    ----------
    n_minus_1_definition : Nminus1Definition
        The generic N-1 definition to translate.
    net : Network
        The active Powsybl network whose state is used to resolve monitored elements and limits.
    branch_limit_cache : Optional[PowsyblBranchLimitCacheProtocol], optional
        Optional cache for the prepared branch limits. If stale, it is ignored.

    Returns
    -------
    PowsyblNMinus1Definition
        The translated N-1 definition including prepared branch limits.
    """
    translated_nminus1 = translate_nminus1_components_for_powsybl(n_minus_1_definition, net)
    translated_branch_limits = resolve_branch_limits_for_powsybl(
        net=net,
        monitored_branches=translated_nminus1.monitored_elements["branches"],
        branch_limit_cache=branch_limit_cache,
    )
    return PowsyblNMinus1Definition.model_validate(
        translated_nminus1.model_copy(update={"branch_limits": translated_branch_limits})
    )

get_regulating_element_results #

get_regulating_element_results(
    monitored_buses, timestep, basecase_name=None
)

Get the regulating element results for the given outages and timestep.

TODO: This is a fake implementation, we need to get the real results from the powsybl security analysis

PARAMETER DESCRIPTION
monitored_buses

The list of monitored buses to get the regulating element results for

TYPE: list[str]

timestep

The timestep to get the regulating element results for

TYPE: int

basecase_name

The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
DataFrame[RegulatingElementResultSchema]

The regulating element results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types
def get_regulating_element_results(
    monitored_buses: list[str], timestep: int, basecase_name: str | None = None
) -> pat.DataFrame[RegulatingElementResultSchema]:
    """Get the regulating element results for the given outages and timestep.

    TODO: This is a fake implementation, we need to get the real results from the powsybl security analysis

    Parameters
    ----------
    monitored_buses : list[str]
        The list of monitored buses to get the regulating element results for
    timestep : int
        The timestep to get the regulating element results for
    basecase_name : str | None, optional
        The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

    Returns
    -------
    pat.DataFrame[RegulatingElementResultSchema]
        The regulating element results for the given outages and timestep
    """
    regulating_elements = get_empty_dataframe_from_model(RegulatingElementResultSchema)
    # TODO dont fake this
    if basecase_name and len(monitored_buses) > 0:
        regulating_elements.loc[(timestep, basecase_name, monitored_buses[0]), "value"] = -9999.0
        regulating_elements.loc[(timestep, basecase_name, monitored_buses[0]), "regulating_element_type"] = (
            RegulatingElementType.GENERATOR_Q.value
        )
        regulating_elements.loc[(timestep, basecase_name, monitored_buses[0]), "value"] = 9999.0
        regulating_elements.loc[(timestep, basecase_name, monitored_buses[0]), "regulating_element_type"] = (
            RegulatingElementType.SLACK_P.value
        )
    return regulating_elements

get_node_results #

get_node_results(
    bus_results,
    monitored_buses,
    bus_map,
    voltage_levels,
    failed_outages,
    timestep,
    method,
)

Get the node results for the given outages and timestep.

TODO: This is currently faking the sum of p and q at the node

PARAMETER DESCRIPTION
bus_results

The bus results from the powsybl security analysis

TYPE: DataFrame

monitored_buses

The list of monitored buses to get the node results for

TYPE: list[str]

bus_map

A mapping from busbar sections or bus_breaker_buses to the electrical buses. This is used to map the buses from bus_results to electrical buses and back to the monitored buses.

TYPE: DataFrame

voltage_levels

The voltage levels of the buses. This is used to determine voltage limits and nominal v in DC.

TYPE: DataFrame

failed_outages

The list of failed outages to get nan-node results for

TYPE: list[str]

timestep

The timestep to get the node results for

TYPE: int

method

The method to use for the node results. Either "ac" or "dc"

TYPE: Literal[ac, dc]

RETURNS DESCRIPTION
DataFrame[NodeResultSchema]

The node results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types
def get_node_results(
    bus_results: pd.DataFrame,
    monitored_buses: list[str],
    bus_map: pd.DataFrame,
    voltage_levels: pd.DataFrame,
    failed_outages: list[str],
    timestep: int,
    method: Literal["ac", "dc"],
) -> pat.DataFrame[NodeResultSchema]:
    """Get the node results for the given outages and timestep.

    TODO: This is currently faking the sum of p and q at the node

    Parameters
    ----------
    bus_results : pd.DataFrame
        The bus results from the powsybl security analysis
    monitored_buses : list[str]
        The list of monitored buses to get the node results for
    bus_map: pd.DataFrame,
        A mapping from busbar sections or bus_breaker_buses to the electrical buses.
        This is used to map the buses from bus_results to electrical buses and back to the monitored buses.
    voltage_levels: pd.DataFrame,
        The voltage levels of the buses. This is used to determine
        voltage limits and nominal v in DC.
    failed_outages : list[str]
        The list of failed outages to get nan-node results for
    timestep : int
        The timestep to get the node results for
    method : Literal["ac", "dc"]
        The method to use for the node results. Either "ac" or "dc"

    Returns
    -------
    pat.DataFrame[NodeResultSchema]
        The node results for the given outages and timestep
    """
    if bus_results.empty:
        return get_failed_node_results(timestep, failed_outages, monitored_buses)
    # Translate bus_ids that could be busbar sections or bus_breaker_buses to the monitored buses
    # Should work for both busbar and bus_breaker models
    node_results = deepcopy(bus_results)
    node_results["bus_breaker_bus_id"] = node_results.index.get_level_values("bus_id").map(bus_map.bus_breaker_bus_id)
    node_results = node_results.dropna(subset=["bus_breaker_bus_id"])
    monitored_bus_map = bus_map.loc[monitored_buses]
    bus_to_element_map = pd.DataFrame(
        data={"element": monitored_bus_map.index.values}, index=monitored_bus_map.bus_breaker_bus_id.values
    )
    node_results = node_results.merge(bus_to_element_map, right_index=True, left_on="bus_breaker_bus_id")

    # Merge the actual voltage level in kV
    voltage_columns = voltage_levels.columns.to_list()
    node_results[voltage_columns] = voltage_levels.loc[node_results.index.get_level_values("voltage_level_id")].values
    node_results = node_results.assign(timestep=0)
    node_results.index = pd.MultiIndex.from_arrays(
        [
            node_results.timestep.values,
            node_results.index.get_level_values("contingency_id").values,
            node_results.element.values,
        ],
        names=["timestep", "contingency", "element"],
    )

    node_results.rename(columns={"v_mag": "vm", "v_angle": "va"}, inplace=True)

    # Calculate the values
    if method == "dc":
        has_va = node_results["va"].notna().values
        node_results.loc[has_va, "vm"] = node_results.loc[has_va, "nominal_v"]
    vm_deviation = node_results["vm"].values - node_results["nominal_v"].values
    deviation_to_max = vm_deviation / (node_results["high_voltage_limit"].values - node_results["nominal_v"].values)
    deviation_to_min = vm_deviation / (node_results["nominal_v"].values - node_results["low_voltage_limit"].values)
    higher_voltage = vm_deviation > 0
    node_results.loc[higher_voltage, "vm_loading"] = deviation_to_max[higher_voltage]
    node_results.loc[~higher_voltage, "vm_loading"] = deviation_to_min[~higher_voltage]
    # TODO Add sum of p and q at the node
    failed_node_results = get_failed_node_results(timestep, failed_outages, monitored_buses)

    all_node_results = pd.concat([node_results, failed_node_results], axis=0)[["vm", "va", "vm_loading"]]

    # set empty dataframe columns to NaN
    all_node_results["p"] = np.nan
    all_node_results["q"] = np.nan
    all_node_results["vm_basecase_deviation"] = np.nan
    all_node_results["element_name"] = ""
    all_node_results["contingency_name"] = ""

    return all_node_results

get_branch_results #

get_branch_results(
    branch_results,
    three_winding_results,
    monitored_branches,
    monitored_trafo3w,
    failed_outages,
    timestep,
    branch_limits,
)

Get the branch results for the given outages and timestep.

PARAMETER DESCRIPTION
branch_results

The branch results from the powsybl security analysis

TYPE: DataFrame

three_winding_results

The three winding transformer results from the powsybl security analysis

TYPE: DataFrame

monitored_branches

The list of monitored branches with 2 sides to get the branch results for

TYPE: list[str]

monitored_trafo3w

The list of monitored three winding transformers to get the branch results for

TYPE: list[str]

failed_outages

The list of failed outages to get nan-branch results for

TYPE: list[str]

timestep

The timestep to get the branch results for

TYPE: int

branch_limits

The branch limits from the powsybl network

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame[BranchResultSchema]

The branch results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types
def get_branch_results(
    branch_results: pd.DataFrame,
    three_winding_results: pd.DataFrame,
    monitored_branches: list[str],
    monitored_trafo3w: list[str],
    failed_outages: list[str],
    timestep: int,
    branch_limits: pd.DataFrame,
) -> pat.DataFrame[BranchResultSchema]:
    """Get the branch results for the given outages and timestep.

    Parameters
    ----------
    branch_results : pd.DataFrame
        The branch results from the powsybl security analysis
    three_winding_results : pd.DataFrame
        The three winding transformer results from the powsybl security analysis
    monitored_branches : list[str]
        The list of monitored branches with 2 sides to get the branch results for
    monitored_trafo3w : list[str]
        The list of monitored three winding transformers to get the branch results for
    failed_outages : list[str]
        The list of failed outages to get nan-branch results for
    timestep : int
        The timestep to get the branch results for
    branch_limits : pd.DataFrame
        The branch limits from the powsybl network

    Returns
    -------
    pat.DataFrame[BranchResultSchema]
        The branch results for the given outages and timestep
    """
    # Align all indizes
    branch_results = branch_results.droplevel("operator_strategy_id")
    branch_results.index.rename({"contingency_id": "contingency", "branch_id": "element"}, inplace=True)
    three_winding_results.index.rename({"contingency_id": "contingency", "transformer_id": "element"}, inplace=True)

    side_one_results = (
        pd.concat([branch_results[["p1", "q1", "i1"]], three_winding_results[["p1", "q1", "i1"]]], axis=0)
        .assign(side=BranchSide.ONE.value)
        .rename(columns={"p1": "p", "q1": "q", "i1": "i"})
    )
    side_two_results = (
        pd.concat([branch_results[["p2", "q2", "i2"]], three_winding_results[["p2", "q2", "i2"]]], axis=0)
        .assign(side=BranchSide.TWO.value)
        .rename(columns={"p2": "p", "q2": "q", "i2": "i"})
    )
    side_three_results = (
        three_winding_results[["p3", "q3", "i3"]]
        .assign(side=BranchSide.THREE.value)
        .rename(columns={"p3": "p", "q3": "q", "i3": "i"})
    )
    # Combine and Add timestep column
    converted_branch_results = pd.concat([side_one_results, side_two_results, side_three_results], axis=0).assign(
        timestep=timestep
    )
    converted_branch_results = converted_branch_results.set_index(["side", "timestep"], append=True)
    converted_branch_results = converted_branch_results.reorder_levels(["timestep", "contingency", "element", "side"])

    # Add missing MultiIndex levels
    # divide current flow by current limits, but only keep the rows, that were there before
    indexer = pd.MultiIndex.from_arrays(
        [converted_branch_results.index.get_level_values("element"), converted_branch_results.index.get_level_values("side")]
    )
    converted_branch_results["loading"] = converted_branch_results["i"].values / branch_limits.reindex(indexer).value.values

    # Add results for non convergent contingencies
    failed_branch_results = get_failed_branch_results(timestep, failed_outages, monitored_branches, monitored_trafo3w)

    converted_branch_results = pd.concat([converted_branch_results, failed_branch_results], axis=0)
    return converted_branch_results

get_convergence_result_df #

get_convergence_result_df(
    post_contingency_results,
    pre_contingency_result,
    outages,
    timestep,
    basecase_name=None,
)

Get the convergence dataframe for the given outages and timestep.

PARAMETER DESCRIPTION
post_contingency_results

The post contingency results from the powsybl security analysis. Maps contingency id to PostContingencyResult.

TYPE: dict[str, PostContingencyResult]

pre_contingency_result

The pre contingency result from the powsybl security analysis. Holds the Basecase.

TYPE: PreContingencyResult

outages

The list of outages to get the convergence results for

TYPE: list[str]

timestep

The timestep to get the convergence results for

TYPE: int

basecase_name

The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
DataFrame[ConvergedSchema]

The convergence dataframe for the given outages and timestep

list[str]

The list of failed outages

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types
def get_convergence_result_df(
    post_contingency_results: dict[str, PostContingencyResult],
    pre_contingency_result: PreContingencyResult,
    outages: list[str],
    timestep: int,
    basecase_name: Optional[str] = None,
) -> tuple[pat.DataFrame[ConvergedSchema], list[str]]:
    """Get the convergence dataframe for the given outages and timestep.

    Parameters
    ----------
    post_contingency_results: dict[str, PostContingencyResult],
        The post contingency results from the powsybl security analysis.
        Maps contingency id to PostContingencyResult.
    pre_contingency_result : PreContingencyResult
        The pre contingency result from the powsybl security analysis. Holds the Basecase.
    outages : list[str]
        The list of outages to get the convergence results for
    timestep : int
        The timestep to get the convergence results for
    basecase_name : Optional[str], optional
        The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

    Returns
    -------
    pat.DataFrame[ConvergedSchema]
        The convergence dataframe for the given outages and timestep
    list[str]
        The list of failed outages
    """
    converge_converted_df = pd.DataFrame(index=outages)
    converge_converted_df.index.name = "contingency"
    converge_converted_df["timestep"] = timestep
    converge_converted_df.set_index(["timestep"], inplace=True, append=True)
    converge_converted_df = converge_converted_df.reorder_levels(["timestep", "contingency"], axis=0)
    converge_converted_df["status"] = [
        post_contingency_results[contingency].status.value
        if contingency in post_contingency_results
        else pypowsybl.loadflow.ComponentStatus.NO_CALCULATION.value
        for contingency in outages
    ]
    converge_converted_df.status = converge_converted_df["status"].map(POWSYBL_CONVERGENCE_MAP)
    failed_outages = [
        outage
        for outage, success in zip(outages, converge_converted_df.status.values == "CONVERGED", strict=True)
        if not success
    ]

    if basecase_name is not None:
        # Add the basecase to the convergence dataframe
        converge_converted_df.loc[(timestep, basecase_name), "status"] = POWSYBL_CONVERGENCE_MAP[
            pre_contingency_result.status.value
        ]

    converge_converted_df["iteration_count"] = np.nan
    converge_converted_df["warnings"] = ""
    converge_converted_df["contingency_name"] = ""

    return converge_converted_df, failed_outages

update_basename #

update_basename(result_df, basecase_name=None)

Update the basecase name in the results dataframes.

This function updates the contingency index level of the results dataframes to reflect the basecase name. If the basecase is not included in the run, it will remove it from the results. Powsybl includes the basecase as an empty string by default.

The Dataframes are expected to have a multi-index with a "contingency" level. The Dataframes are updated inplace.

PARAMETER DESCRIPTION
result_df

The dataframe containing the branch / node / VADiff results

TYPE: LoadflowResultTable

basecase_name

The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
LOADFLOW_RESULT_TABLE

The updated dataframes with the basecase name set or removed.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types(inplace=True)
def update_basename(
    result_df: LoadflowResultTable,
    basecase_name: Optional[str] = None,
) -> LoadflowResultTable:
    """Update the basecase name in the results dataframes.

    This function updates the contingency index level of the results dataframes to
    reflect the basecase name. If the basecase is not included in the run, it will
    remove it from the results. Powsybl includes the basecase as an empty string by default.

    The Dataframes are expected to have a multi-index with a "contingency" level.
    The Dataframes are updated inplace.

    Parameters
    ----------
    result_df: LOADFLOW_RESULT_TABLE
        The dataframe containing the branch / node / VADiff results
    basecase_name: Optional[str], optional
        The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

    Returns
    -------
    LOADFLOW_RESULT_TABLE
        The updated dataframes with the basecase name set or removed.
    """
    contingency_index_level = result_df.index.names.index("contingency")
    if basecase_name is not None:
        result_df.index = result_df.index.set_levels(
            result_df.index.levels[contingency_index_level].map(lambda x: basecase_name if x == "" else x),
            level=contingency_index_level,
        )
        return result_df
    result_df.drop("", level=contingency_index_level, axis=0, inplace=True, errors="ignore")
    return result_df

add_name_column #

add_name_column(result_df, name_map, index_level='element')

Translate the element ids in the results dataframes to the original names.

This function translates the element names in the results dataframes to the original names from the Powsybl network. This is useful for debugging and for displaying the results.

PARAMETER DESCRIPTION
result_df

The dataframe containing the node / branch / VADiff results

TYPE: LoadflowResultTable

name_map

A mapping from the element ids to the original names. This is used to translate the element names in the results.

TYPE: dict[str, str]

index_level

The index level storing the ids that should be mapped to the names. by default "element" for the monitored elements.

TYPE: str DEFAULT: 'element'

RETURNS DESCRIPTION
LoadflowResultTable

The updated dataframe with the ids translated to the original names.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types(inplace=True)
def add_name_column(
    result_df: LoadflowResultTable,
    name_map: dict[str, str],
    index_level: str = "element",
) -> LoadflowResultTable:
    """Translate the element ids in the results dataframes to the original names.

    This function translates the element names in the results dataframes to the original names
    from the Powsybl network. This is useful for debugging and for displaying the results.

    Parameters
    ----------
    result_df: LoadflowResultTable
        The dataframe containing the node / branch / VADiff results
    name_map: dict[str | str]
        A mapping from the element ids to the original names. This is used to translate the element names in the results.
    index_level: str, optional
        The index level storing the ids that should be mapped to the names. by default "element" for the monitored elements.

    Returns
    -------
    LoadflowResultTable
        The updated dataframe with the ids translated to the original names.
    """
    result_df[f"{index_level}_name"] = result_df.index.get_level_values(index_level).map(name_map).fillna("")
    return result_df

set_target_values_to_lf_values_incl_distributed_slack #

set_target_values_to_lf_values_incl_distributed_slack(
    net, method, lf_params
)

Update the target values of generators to include the distributed slack.

This is necessary if you want to run the security analysis for generators without distributed their outaged power across the whole network, but still want to mantain the original n0-flows.

PARAMETER DESCRIPTION
net

The powsybl network to update

TYPE: Network

method

The method to use for the loadflow, either "ac" or "dc"

TYPE: Literal[ac, dc]

lf_params

The loadflow parameters to use for the loadflow calculation. This is used to run the loadflow and get the original flows of the network.

TYPE: Parameters

RETURNS DESCRIPTION
Network

The updated network

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def set_target_values_to_lf_values_incl_distributed_slack(
    net: Network, method: Literal["ac", "dc"], lf_params: pypowsybl.loadflow.Parameters
) -> Network:
    """Update the target values of generators to include the distributed slack.

    This is necessary if you want to run the security analysis for generators without distributed their
    outaged power across the whole network, but still want to mantain the original n0-flows.

    Parameters
    ----------
    net : Network
        The powsybl network to update
    method : Literal["ac", "dc"]
        The method to use for the loadflow, either "ac" or "dc"
    lf_params : pypowsybl.loadflow.Parameters
        The loadflow parameters to use for the loadflow calculation.
        This is used to run the loadflow and get the original flows of the network.

    Returns
    -------
    Network
        The updated network
    """
    if method == "ac":
        pypowsybl.loadflow.run_ac(net, lf_params)
    else:
        pypowsybl.loadflow.run_dc(net, lf_params)
    gens = net.get_generators()
    gens["target_p"] = (-gens["p"]).fillna(gens["target_p"])
    if method == "ac":
        gens["target_q"] = (-gens["q"]).fillna(gens["target_q"])
    net.update_generators(gens[["target_p", "target_q"]])
    batteries = net.get_batteries()
    batteries["target_p"] = (-batteries["p"]).fillna(batteries["target_p"])
    if method == "ac":
        batteries["target_q"] = (-batteries["q"]).fillna(batteries["target_q"])
    net.update_batteries(batteries[["target_p", "target_q"]])
    return net

get_full_nminus1_definition_powsybl #

get_full_nminus1_definition_powsybl(net)

Get the full N-1 definition from a Powsybl network.

This function retrieves the N-1 definition from a Powsybl network, including: Monitored Elements all lines, trafos, buses and switches Contingencies all lines, trafos, generators and loads Basecase contingency with name "BASECASE"

PARAMETER DESCRIPTION
net

The Powsybl network to retrieve the N-1 definition from.

TYPE: Network

RETURNS DESCRIPTION
Nminus1Definition

The complete N-1 definition for the given Powsybl network.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def get_full_nminus1_definition_powsybl(net: pypowsybl.network.Network) -> Nminus1Definition:
    """Get the full N-1 definition from a Powsybl network.

    This function retrieves the N-1 definition from a Powsybl network, including:
        Monitored Elements
            all lines, trafos, buses and switches
        Contingencies
            all lines, trafos, generators and loads
            Basecase contingency with name "BASECASE"

    Parameters
    ----------
    net : pypowsybl.network.Network
        The Powsybl network to retrieve the N-1 definition from.

    Returns
    -------
    Nminus1Definition
        The complete N-1 definition for the given Powsybl network.
    """
    lines = [
        GridElement(id=id, name=getattr(row, "name", ""), type="LINE", kind="branch")
        for id, row in net.get_lines(attributes=["name"]).iterrows()
    ]
    trafo2w = [
        GridElement(id=id, name=getattr(row, "name", ""), type="TWO_WINDINGS_TRANSFORMER", kind="branch")
        for id, row in net.get_2_windings_transformers(attributes=["name"]).iterrows()
    ]
    trafos3w = [
        GridElement(id=id, name=getattr(row, "name", ""), type="THREE_WINDINGS_TRANSFORMER", kind="branch")
        for id, row in net.get_3_windings_transformers(attributes=["name"]).iterrows()
    ]

    branch_elements = [*lines, *trafo2w, *trafos3w]

    switches = [
        GridElement(id=id, name=getattr(row, "name", ""), type="SWITCH", kind="switch")
        for id, row in net.get_switches(attributes=["name"]).iterrows()
    ]
    buses = net.get_busbar_sections(attributes=[])
    if buses.empty:
        buses = net.get_bus_breaker_view_buses(attributes=[])
    buses = [GridElement(id=id, name=getattr(row, "name", ""), type="BUS", kind="bus") for id, row in buses.iterrows()]
    monitored_elements = [*branch_elements, *switches, *buses]

    generators = [
        GridElement(id=id, name=getattr(row, "name", ""), type="GENERATOR", kind="injection")
        for id, row in net.get_generators(attributes=["name"]).iterrows()
    ]
    loads = [
        GridElement(id=id, name=getattr(row, "name", ""), type="LOAD", kind="injection")
        for id, row in net.get_loads(attributes=["name"]).iterrows()
    ]
    outaged_elements = [*branch_elements, *generators, *loads]

    basecase_contingency = [Contingency(id="BASECASE", name="BASECASE", elements=[])]
    single_contingencies = [
        Contingency(id=element.id, name=element.name or "", elements=[element]) for element in outaged_elements
    ]

    nminus1_definition = Nminus1Definition(
        contingencies=[*basecase_contingency, *single_contingencies],
        monitored_elements=monitored_elements,
        id_type="powsybl",
    )
    return nminus1_definition