Skip to content

Contingency Analysis#

AC Loadflow Service#

toop_engine_contingency_analysis.ac_loadflow_service.ac_loadflow_service #

Get the results of the AC Contingency Analysis for the given network

get_ac_loadflow_results #

get_ac_loadflow_results(
    net,
    n_minus_1_definition,
    timestep=0,
    job_id="",
    n_processes=1,
    batch_size=None,
)

Get the results of the AC loadflow for the given network

PARAMETER DESCRIPTION
net

The network to run the contingency analysis on

TYPE: pandapowerNet | Network

n_minus_1_definition

The N-1 definition to use for the contingency analysis. Contains outages and monitored elements

TYPE: Nminus1Definition

timestep

The timestep of the results. Used to identify the results in the database

TYPE: int DEFAULT: 0

job_id

The job id of the current job

TYPE: str DEFAULT: ''

n_processes

The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process

TYPE: int DEFAULT: 1

batch_size

The size of the batches to use for the parallelization. This is ignored for Powsybl at the moment. If None, the batch size is computed based on the number of contingencies and the number of processes.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
LoadflowResultsPolars

The results of the Contingency analysis

RAISES DESCRIPTION
ValueError

If the network is not a PandapowerNetwork or PowsyblNetwork

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/ac_loadflow_service.py
def get_ac_loadflow_results(
    net: PandapowerNetwork | PowsyblNetwork,
    n_minus_1_definition: Nminus1Definition,
    timestep: int = 0,
    job_id: str = "",
    n_processes: int = 1,
    batch_size: Optional[int] = None,
) -> LoadflowResultsPolars:
    """Get the results of the AC loadflow for the given network

    Parameters
    ----------
    net : pp.pandapowerNet | pypowsybl.network.Network
        The network to run the contingency analysis on
    n_minus_1_definition: Nminus1Definition
        The N-1 definition to use for the contingency analysis. Contains outages and monitored elements
    timestep: int, default=0
        The timestep of the results. Used to identify the results in the database
    job_id: str, default=""
        The job id of the current job
    n_processes: int, default=1
        The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.
        If > 1, the analysis is run in parallel
        Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process
    batch_size: int, optional
        The size of the batches to use for the parallelization.
        This is ignored for Powsybl at the moment.
        If None, the batch size is computed based on the number of contingencies and the number of processes.

    Returns
    -------
    LoadflowResultsPolars
        The results of the Contingency analysis

    Raises
    ------
    ValueError
        If the network is not a PandapowerNetwork or PowsyblNetwork
    """
    if isinstance(net, PandapowerNetwork):
        lf_results = run_contingency_analysis_pandapower(
            net,
            n_minus_1_definition,
            job_id,
            timestep,
            n_processes=n_processes,
            batch_size=batch_size,
            method="ac",
            polars=True,
        )
    elif isinstance(net, PowsyblNetwork):
        lf_results = run_contingency_analysis_powsybl(
            net,
            n_minus_1_definition,
            job_id,
            timestep,
            n_processes=n_processes,
            method="ac",
            polars=True,
        )
    else:
        raise ValueError("net must be a pandapowerNet or powsybl network")

    return lf_results

toop_engine_contingency_analysis.ac_loadflow_service.compute_metrics #

Provides functions to compute the metrics directly from the results dataframes.

This is similar to jax.aggregate_results.py but straight on the results dataframes.

compute_overload_column #

compute_overload_column(branch_results, field='i')

Compute the overload column for further aggregation.

This is just a max operation

PARAMETER DESCRIPTION
branch_results

The branch results dataframe containing the loading information.

TYPE: LazyFrame[BranchResultSchemaPolars]

field

The field to use for the overload calculation, either "p" for power or "i" for current, by default "i".

TYPE: Literal[p, i] DEFAULT: 'i'

branch_results_with_overload : patpl.LazyFrame
1
The branch results dataframe with an additional "overload" column.
Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
def compute_overload_column(
    branch_results: patpl.LazyFrame[BranchResultSchemaPolars], field: Literal["p", "i"] = "i"
) -> pl.LazyFrame:
    """Compute the overload column for further aggregation.

    This is just a max operation

    Parameters
    ----------
    branch_results : patpl.LazyFrame[BranchResultSchemaPolars]
        The branch results dataframe containing the loading information.
    field : Literal["p", "i"], optional
        The field to use for the overload calculation, either "p" for power or "i" for current, by default "i".

    branch_results_with_overload : patpl.LazyFrame
    -------
        The branch results dataframe with an additional "overload" column.
    """
    branch_results_with_overload = branch_results.with_columns(
        _val_max=(pl.col(field) / pl.col("loading")).abs(),
    ).with_columns(
        overload=(pl.col(field).abs() - pl.col("_val_max")),
    )
    return branch_results_with_overload

compute_max_load #

compute_max_load(branch_results)

Compute the highest loading of the branches in the results.

This is just a max operation

PARAMETER DESCRIPTION
branch_results

The branch results dataframe containing the loading information.

TYPE: LazyFrame[BranchResultSchemaPolars]

RETURNS DESCRIPTION
float

The maximum loading in factor of maximum rated current (percent / 100) of any branch in the results.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
@pa.check_types
def compute_max_load(branch_results: patpl.LazyFrame[BranchResultSchemaPolars]) -> float:
    """Compute the highest loading of the branches in the results.

    This is just a max operation

    Parameters
    ----------
    branch_results : patpl.LazyFrame[BranchResultSchemaPolars]
        The branch results dataframe containing the loading information.

    Returns
    -------
    float
        The maximum loading in factor of maximum rated current (percent / 100) of any branch in the results.
    """
    max_loading = branch_results.select(pl.col("loading").max()).collect().item()
    return max_loading

compute_overload_energy #

compute_overload_energy(branch_results, field='i')

Compute the maximum overload current of the branches in the results.

This is just a max operation

PARAMETER DESCRIPTION
branch_results

The branch results dataframe containing the loading information.

TYPE: LazyFrame[BranchResultSchemaPolars]

field

The field to use for the overload calculation, either "p" for power or "i" for current, by default "i".

TYPE: Literal[p, i] DEFAULT: 'i'

RETURNS DESCRIPTION
float

The maximum overload total current or power

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
@pa.check_types
def compute_overload_energy(
    branch_results: patpl.LazyFrame[BranchResultSchemaPolars], field: Literal["p", "i"] = "i"
) -> float:
    """Compute the maximum overload current of the branches in the results.

    This is just a max operation

    Parameters
    ----------
    branch_results : patpl.LazyFrame[BranchResultSchemaPolars]
        The branch results dataframe containing the loading information.
    field : Literal["p", "i"], optional
        The field to use for the overload calculation, either "p" for power or "i" for current, by default "i".

    Returns
    -------
    float
        The maximum overload total current or power
    """
    branch_results_with_overload = compute_overload_column(branch_results, field=field)
    overload = (
        branch_results_with_overload.select("timestep", "element", "overload")
        .drop_nulls()
        .filter(pl.col("overload") > 0)
        .group_by(["timestep", "element"])
        .agg(pl.max("overload").alias("overload"))
        .drop_nans("overload")
        .select(pl.col("overload").sum())
        .collect()
        .item()
    )

    return overload

count_critical_branches #

count_critical_branches(
    branch_results, critical_threshold=1.0
)

Count how many branches are above 100% in any side/contingency

PARAMETER DESCRIPTION
branch_results

The branch results dataframe containing the loading information.

TYPE: LazyFrame[BranchResultSchemaPolars]

critical_threshold

The loading threshold to consider a branch as critical, by default 1.0 (100%)

TYPE: float DEFAULT: 1.0

RETURNS DESCRIPTION
int

The number of branches that are overloaded in any side/contingency.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
@pa.check_types
def count_critical_branches(
    branch_results: patpl.LazyFrame[BranchResultSchemaPolars], critical_threshold: float = 1.0
) -> int:
    """Count how many branches are above 100% in any side/contingency

    Parameters
    ----------
    branch_results : patpl.LazyFrame[BranchResultSchemaPolars]
        The branch results dataframe containing the loading information.
    critical_threshold : float, optional
        The loading threshold to consider a branch as critical, by default 1.0 (100%)

    Returns
    -------
    int
        The number of branches that are overloaded in any side/contingency.
    """
    # Do an any-aggregation across branch sides and contingencies (group by timestep/element)
    # This will return a boolean series with True for each branch that is overloaded in any contingency/side
    # Summing this will give the count of critical branches
    return int(
        branch_results.filter(pl.col("loading").fill_nan(-1.0) > critical_threshold)
        .select("timestep", "element")
        .unique()
        .select(pl.len())
        .collect()
        .item()
    )

compute_max_va_diff #

compute_max_va_diff(va_diff_results)

Compute the maximum voltage angle difference.

PARAMETER DESCRIPTION
va_diff_results

The voltage angle difference results dataframe.

TYPE: LazyFrame[VADiffResultSchemaPolars]

RETURNS DESCRIPTION
float

The maximum voltage angle difference in degrees.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
def compute_max_va_diff(va_diff_results: patpl.LazyFrame[VADiffResultSchemaPolars]) -> float:
    """Compute the maximum voltage angle difference.

    Parameters
    ----------
    va_diff_results : patpl.LazyFrame[VADiffResultSchemaPolars]
        The voltage angle difference results dataframe.

    Returns
    -------
    float
        The maximum voltage angle difference in degrees.
    """
    max_va_diff = va_diff_results.select(pl.col("va_diff").max()).collect().item()
    if max_va_diff is None or pd.isna(max_va_diff):
        return 0.0
    return float(max_va_diff)

get_worst_k_contingencies_ac #

get_worst_k_contingencies_ac(
    branch_results, k=10, field="p", base_case_id="BASECASE"
)

Get the worst k contingencies based on overload energy.

If k is greater than the number of contingencies, all contingencies will be returned.

PARAMETER DESCRIPTION
branch_results

The branch results dataframe containing the loading information.

TYPE: DataFrame[BranchResultSchemaPolars]

k

The number of worst contingencies to return, by default 10.

TYPE: int DEFAULT: 10

field

The field to use for the overload calculation, either "p" for power or "i" for current, by default "p".

TYPE: Literal[p, i] DEFAULT: 'p'

base_case_id

The contingency ID for the base case (N-0), by default "BASECASE".

TYPE: str DEFAULT: 'BASECASE'

RETURNS DESCRIPTION
tuple[list[list[str]], list[float]]

A tuple containing: - A list of lists with the contingency IDs for each timestep. The length of the outer list is the number of timesteps while the inner lists contain the top k contingencies for that timestep. - A list of total overload energy for each timestep. The length matches the number of timesteps.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
def get_worst_k_contingencies_ac(
    branch_results: patpl.LazyFrame[BranchResultSchemaPolars],
    k: int = 10,
    field: Literal["p", "i"] = "p",
    base_case_id: str = "BASECASE",
) -> tuple[list[list[str]], list[float]]:
    """Get the worst k contingencies based on overload energy.

    If k is greater than the number of contingencies, all contingencies will be returned.

    Parameters
    ----------
    branch_results : DataFrame[BranchResultSchemaPolars]
        The branch results dataframe containing the loading information.
    k : int, optional
        The number of worst contingencies to return, by default 10.
    field : Literal["p", "i"], optional
        The field to use for the overload calculation, either "p" for power or "i" for current, by default "p".
    base_case_id : str, optional
        The contingency ID for the base case (N-0), by default "BASECASE".

    Returns
    -------
    tuple[list[list[str]], list[float]]
        A tuple containing:
        - A list of lists with the contingency IDs for each timestep. The length of the outer list is
        the number of timesteps while the inner lists contain the top k contingencies for that timestep.
        - A list of total overload energy for each timestep. The length matches the number of timesteps.
    """
    branch_results_with_overload = compute_overload_column(branch_results, field=field).drop_nans("overload")
    overload = branch_results_with_overload.filter(pl.col("overload") > 0)
    overload_n1 = overload.filter(pl.col("contingency") != base_case_id)
    # Compute per (timestep, contingency) max overload using polars lazy API
    overload_per_cont = overload_n1.group_by(["timestep", "contingency"]).agg(pl.max("overload").alias("overload")).collect()

    if overload_per_cont.height == 0:
        return [], []

    contingencies: list[list[str]] = []
    overloads: list[float] = []

    for t in overload_per_cont.get_column("timestep").unique().to_list():
        df_t = overload_per_cont.filter(pl.col("timestep") == t).sort("overload", descending=True).head(k)
        cont_ids = df_t.get_column("contingency").to_list()
        contingencies.append(cont_ids)

        if cont_ids:
            br_results_top_k = branch_results.filter(pl.col("contingency").is_in(cont_ids))
            overload_top_k = compute_overload_energy(br_results_top_k, field=field)
        else:
            overload_top_k = 0.0
        overloads.append(float(overload_top_k))

    return contingencies, overloads

compute_metrics #

compute_metrics(loadflow_results, base_case_id=None)

Compute the metrics from the loadflow results.

PARAMETER DESCRIPTION
loadflow_results

The loadflow results containing the branch results.

TYPE: LoadflowResultsPolars

base_case_id

The contingency ID for the base case (N-0). If not provided, no n-0 metrics will be computed.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
dict[MetricType, float]

A dictionary with the computed metrics.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/compute_metrics.py
def compute_metrics(
    loadflow_results: LoadflowResultsPolars,
    base_case_id: Optional[str] = None,
) -> dict[MetricType, float]:
    """Compute the metrics from the loadflow results.

    Parameters
    ----------
    loadflow_results : LoadflowResultsPolars
        The loadflow results containing the branch results.
    base_case_id : Optional[str], optional
        The contingency ID for the base case (N-0). If not provided, no n-0 metrics will be computed.

    Returns
    -------
    dict[MetricType, float]
        A dictionary with the computed metrics.
    """
    metrics = {
        "max_flow_n_1": compute_max_load(loadflow_results.branch_results),
        "overload_energy_n_1": compute_overload_energy(loadflow_results.branch_results, field="p"),
        "max_va_diff_n_1": compute_max_va_diff(loadflow_results.va_diff_results),
        "overload_current_n_1": compute_overload_energy(loadflow_results.branch_results, field="i"),
        "critical_branch_count_n_1": count_critical_branches(loadflow_results.branch_results),
    }

    if base_case_id is not None:
        # Base case (N-0) results as Polars LazyFrames
        n_0_branch_res = loadflow_results.branch_results.filter(pl.col("contingency") == base_case_id)

        # Va diff may not contain the base case; filtering will yield an empty frame if absent.
        # compute_max_va_diff already returns 0.0 if empty/None, so no explicit fallback needed.
        n_0_va_diff = loadflow_results.va_diff_results.filter(pl.col("contingency") == base_case_id)
        metrics.update(
            {
                "max_flow_n_0": compute_max_load(n_0_branch_res),
                "overload_energy_n_0": compute_overload_energy(n_0_branch_res, field="p"),
                "max_va_diff_n_0": compute_max_va_diff(n_0_va_diff),
                "overload_current_n_0": compute_overload_energy(n_0_branch_res, field="i"),
                "critical_branch_count_n_0": count_critical_branches(n_0_branch_res),
            }
        )
    return metrics

toop_engine_contingency_analysis.ac_loadflow_service.kafka_client #

Provides a wrapper around the confluent_kafka Consumer to allow long running processes.

There are fundamentally two ways how to deal with long running processes in kafka: - increase the max.poll.interval.ms to a very high value so the processing can happen in between. - pause the consumer while processing and resume it afterwards, regularly polling the paused consumer to reset the max.poll.interval.ms timeout. These polls will not consume any messages, but will reset the timeout.

The first method is not recommended as it inhibits rebalances during frozen time, does not detect frozen consumers and is generally not what kafka was designed for. However, kafka does not provide a way to pause and resume a topic, only a topic-partition. That means if a consumer is paused but then a rebalance happens, new topic-partitions will not be paused and the consumer may receive messages for it. However, the polls we are doing are happening in the processing loop and we are fundamentally unable to process anything there. Hence, this wrapper provides a way to pause and resume a consumer, listening to the assignment changes and pausing or resuming the new TPs accordingly. As a drawback, this consumer then looses the ability to consume multiple topics.

logger module-attribute #

logger = Logger(__name__)

LongRunningKafkaConsumer #

LongRunningKafkaConsumer(
    topic,
    group_id,
    bootstrap_servers,
    client_id,
    max_poll_interval_ms=1800000,
    kafka_auth_config=None,
)

A kafka consumer for long running processes that need to pause and resume the topic consumption.

Initialize the LongRunningKafkaConsumer.

PARAMETER DESCRIPTION
topic

The topic to subscribe to. This can only be a single topic as the consumer will pause and resume it.

TYPE: str

group_id

The consumer group id

TYPE: str

bootstrap_servers

The bootstrap servers to connect to, e.g. "localhost:9092

TYPE: str

client_id

The client id to use for the consumer. This is used for logging and debugging purposes.

TYPE: str

max_poll_interval_ms

The maximum time in milliseconds between polls before the consumer is considered dead. Defaults to 1_800_000 (30 minutes). Set this long enough so the process fits in with confidence.

TYPE: int DEFAULT: 1800000

kafka_auth_config

Additional kafka authentication configuration to pass to the consumer. Defaults to None.

TYPE: dict | None DEFAULT: None

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def __init__(
    self,
    topic: str,
    group_id: str,
    bootstrap_servers: str,
    client_id: str,
    max_poll_interval_ms: int = 1_800_000,
    kafka_auth_config: dict | None = None,
) -> None:
    """Initialize the LongRunningKafkaConsumer.

    Parameters
    ----------
    topic : str
        The topic to subscribe to. This can only be a single topic as the consumer will pause and resume it.
    group_id : str
        The consumer group id
    bootstrap_servers : str
        The bootstrap servers to connect to, e.g. "localhost:9092
    client_id : str
        The client id to use for the consumer. This is used for logging and debugging purposes.
    max_poll_interval_ms : int, optional
        The maximum time in milliseconds between polls before the consumer is considered dead. Defaults to 1_800_000
        (30 minutes). Set this long enough so the process fits in with confidence.
    kafka_auth_config : dict | None, optional
        Additional kafka authentication configuration to pass to the consumer. Defaults to None.
    """
    self.topic = topic
    consumer_config = {
        "bootstrap.servers": bootstrap_servers,
        "group.id": group_id,
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
        "client.id": client_id,
        "max.poll.interval.ms": max_poll_interval_ms,
        "log_level": 2,
    }
    if kafka_auth_config:
        consumer_config.update(kafka_auth_config)
    self.consumer = Consumer(
        consumer_config,
        logger=getLogger(f"consumer_{client_id}"),
    )
    self.client_id = client_id
    self.assignment: list[TopicPartition] = []
    self.consumer.subscribe(
        [self.topic],
        on_assign=lambda _consumer, assignment: self._update_assignment(new_tps=assignment, removed_tps=[]),
        on_revoke=lambda _consumer, assignment: self._update_assignment(new_tps=[], removed_tps=assignment),
        on_lost=lambda _consumer, assignment: self._update_assignment(new_tps=[], removed_tps=assignment),
    )
    self.last_msg: Optional[Message] = None
    self.is_paused = False

topic instance-attribute #

topic = topic

consumer instance-attribute #

consumer = Consumer(
    consumer_config,
    logger=getLogger(f"consumer_{client_id}"),
)

client_id instance-attribute #

client_id = client_id

assignment instance-attribute #

assignment = []

last_msg instance-attribute #

last_msg = None

is_paused instance-attribute #

is_paused = False

consume #

consume(timeout, num_messages)

Consume a batch of messages from the kafka topic at once.

This will commit all offsets directly after consuming the messages.

PARAMETER DESCRIPTION
timeout

The maximum time to wait for messages in seconds. If no messages are available, returns an empty list.

TYPE: float | int

num_messages

The maximum number of messages to consume. If more messages are available, they will not be consumed.

TYPE: int

RETURNS DESCRIPTION
list[Message]

The consumed messages, or an empty list if no messages are available within the timeout.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def consume(self, timeout: float | int, num_messages: int) -> list[Message]:
    """Consume a batch of messages from the kafka topic at once.

    This will commit all offsets directly after consuming the messages.

    Parameters
    ----------
    timeout : float | int
        The maximum time to wait for messages in seconds. If no messages are available, returns an empty list.
    num_messages : int
        The maximum number of messages to consume. If more messages are available, they will not be consumed.

    Returns
    -------
    list[Message]
        The consumed messages, or an empty list if no messages are available within the timeout.
    """
    if self.last_msg is not None:
        raise RuntimeError("Commit the last message either through commit or stop_processing before consuming again")

    messages = self.consumer.consume(num_messages=num_messages, timeout=float(timeout))
    if not messages:
        return []

    self.consumer.commit(message=messages[-1], asynchronous=True)
    return messages

poll #

poll(timeout)

Consume a single message from the Kafka topic.

This will not commit the offset to the broker

PARAMETER DESCRIPTION
timeout

The maximum time to wait for a message in seconds. If no message is available, returns None.

TYPE: float | int

RETURNS DESCRIPTION
Optional[Message]

The consumed message, or None if no message is available within the timeout.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def poll(self, timeout: float | int) -> Optional[Message]:
    """Consume a single message from the Kafka topic.

    This will not commit the offset to the broker

    Parameters
    ----------
    timeout : float | int
        The maximum time to wait for a message in seconds. If no message is available, returns None.

    Returns
    -------
    Optional[Message]
        The consumed message, or None if no message is available within the timeout.
    """
    if self.last_msg is not None:
        raise RuntimeError("Commit the last message either through commit or stop_processing before consuming again")

    msg = self.consumer.poll(timeout=float(timeout))
    self.last_msg = msg
    return msg

commit #

commit()

Commit the last consumed message.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def commit(self) -> None:
    """Commit the last consumed message."""
    if self.last_msg is not None:
        self.consumer.commit(message=self.last_msg, asynchronous=False)
        self.last_msg = None
    else:
        raise RuntimeError("No message to commit")

start_processing #

start_processing()

Start a long running process to consume the message.

This will internally pause the consumer. To not exceed the poll timeout, call heartbeat() periodically while processing, e.g. every epoch

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def start_processing(self) -> None:
    """Start a long running process to consume the message.

    This will internally pause the consumer. To not exceed the poll timeout, call heartbeat() periodically while
    processing, e.g. every epoch
    """
    self.is_paused = True
    self._update_assignment([], [])

heartbeat #

heartbeat()

Send a heartbeat to the kafka topic while processing to reset the max poll interval timeout.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def heartbeat(self) -> None:
    """Send a heartbeat to the kafka topic while processing to reset the max poll interval timeout."""
    if not self.is_paused:
        raise RuntimeError("Cannot send heartbeat while not processing")
    self._update_assignment([], [])
    msg = self.consumer.poll(timeout=0)
    if msg is not None:
        raise RuntimeError("Heartbeat should not consume messages")

stop_processing #

stop_processing()

Stop the long running process and commit the last message

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def stop_processing(self) -> None:
    """Stop the long running process and commit the last message"""
    self.is_paused = False
    self._update_assignment([], [])
    if self.last_msg:
        self.consumer.commit(message=self.last_msg, asynchronous=False)
        self.last_msg = None

close #

close()

Close the consumer and commit the last message if any.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/kafka_client.py
def close(self) -> None:
    """Close the consumer and commit the last message if any."""
    if self.last_msg:
        self.consumer.commit(message=self.last_msg, asynchronous=False)
        self.last_msg = None
    self.consumer.close()

toop_engine_contingency_analysis.ac_loadflow_service.lf_worker #

Module contains functions for the kafka communication of the ac loadflow worker.

General Idea: - The worker will listen for commands on the preprocessing kafka topic - Once the initial conversion is done, it runs an initial loadflow - Once the optimiuation is done, The command contains a path to the pandapower or powsybl grid file The command contains the N-1 Definition

  • The worker will load the grid file and the N-1 definition
  • The worker will run the N-1 analysis on the grid file with as many processes as possible
  • The worker will send the results to a kafka topic - ErrorResult, if anything goes wrong - SuccessResult, if everything goes well even if loadflow fails - LoadflowStartedResult? # Use the LoadflowResultsClass as the result result
  • The worker will send a heartbeat to a kafka topic every X seconds

Questions: - Does it make sense to return the results in batches? - Faster results, but dont really tell the full story - How to deal with grid updates? - Separate Service? (Load would be doubled) - Passing the grid file as path valid? - Otherwise large files need to be passed as bytes which kafka supports but is not really intended

File: worker.py Author: Leonard Hilfrich Created: 05/2024

logger module-attribute #

logger = getLogger(__name__)

args module-attribute #

args = cli(LoadflowWorkerArgs)

LoadflowWorkerArgs dataclass #

LoadflowWorkerArgs(
    kafka_broker="localhost:9092",
    loadflow_command_topic="loadflow_commands",
    loadflow_results_topic="loadflow_results",
    loadflow_heartbeat_topic="loadflow_heartbeat",
    heartbeat_interval_ms=1000,
    instance_id="loadflow_worker",
    processed_gridfile_folder=Path("processed_gridfiles"),
    loadflow_result_folder=Path("loadflow_results"),
    n_processes=1,
)

Holds arguments which must be provided at the launch of the worker.

Contains arguments that static for each loadflow run.

kafka_broker class-attribute instance-attribute #

kafka_broker = 'localhost:9092'

The Kafka broker to connect to.

loadflow_command_topic class-attribute instance-attribute #

loadflow_command_topic = 'loadflow_commands'

The Kafka topic to listen for commands on.

loadflow_results_topic class-attribute instance-attribute #

loadflow_results_topic = 'loadflow_results'

The topic to push results to.

loadflow_heartbeat_topic class-attribute instance-attribute #

loadflow_heartbeat_topic = 'loadflow_heartbeat'

The topic to push heartbeats to.

heartbeat_interval_ms class-attribute instance-attribute #

heartbeat_interval_ms = 1000

The interval in milliseconds to send heartbeats.

instance_id class-attribute instance-attribute #

instance_id = 'loadflow_worker'

The instance id of the worker, used to identify the worker in the logs.

processed_gridfile_folder class-attribute instance-attribute #

processed_gridfile_folder = Path('processed_gridfiles')

A folder where pre-processed grid files are stored - this should be a NFS share together with the backend and optimizer.

loadflow_result_folder class-attribute instance-attribute #

loadflow_result_folder = Path('loadflow_results')

A folder where the loadflow results are stored - this should be a NFS share together with the backend and optimizer.

n_processes class-attribute instance-attribute #

n_processes = 1

The number of processes to use for the loadflow calculation. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel

idle_loop #

idle_loop(
    consumer, send_heartbeat_fn, heartbeat_interval_ms
)

Start the idle loop of the worker.

This will be running when the worker is currently not preprocessing This will wait until a StartCalculationCommand is received and return it. In case a ShutdownCommand is received, the worker will exit with the exit code provided in the command.

PARAMETER DESCRIPTION
consumer

The initialized Kafka consumer to listen for commands on.

TYPE: Consumer

send_heartbeat_fn

A function to call when there were no messages received for a while.

TYPE: callable

heartbeat_interval_ms

The time to wait for a new command in milliseconds. If no command has been received, a heartbeat will be sent and then the receiver will wait for commands again.

TYPE: int

RETURNS DESCRIPTION
StartOptimizationCommand

The start optimization command to start the optimization run with

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/lf_worker.py
def idle_loop(
    consumer: LongRunningKafkaConsumer,
    send_heartbeat_fn: Callable[[], None],
    heartbeat_interval_ms: int,
) -> StartCalculationCommand:
    """Start the idle loop of the worker.

    This will be running when the worker is currently not preprocessing
    This will wait until a StartCalculationCommand is received and return it. In case a
    ShutdownCommand is received, the worker will exit with the exit code provided in the command.

    Parameters
    ----------
    consumer : Consumer
        The initialized Kafka consumer to listen for commands on.
    send_heartbeat_fn : callable
        A function to call when there were no messages received for a while.
    heartbeat_interval_ms : int
        The time to wait for a new command in milliseconds. If no command has been received, a
        heartbeat will be sent and then the receiver will wait for commands again.

    Returns
    -------
    StartOptimizationCommand
        The start optimization command to start the optimization run with
    """
    send_heartbeat_fn()
    logger.info("Entering idle loop")
    while True:
        message = consumer.poll(timeout=heartbeat_interval_ms / 1000.0)

        # Wait timeout exceeded
        if not message:
            send_heartbeat_fn()
            continue

        command = LoadflowServiceCommand.model_validate_json(deserialize_message(message.value()))

        if isinstance(command.command, StartCalculationCommand):
            return command.command

        if isinstance(command.command, ShutdownCommand):
            consumer.commit()
            consumer.consumer.close()
            raise SystemExit(command.command.exit_code)

        # If we are here, we received a command that we do not know
        logger.warning(f"Received unknown command, dropping: {command}")
        consumer.commit()

solver_loop #

solver_loop(
    command,
    producer,
    processed_grid_path,
    loadflow_solver_path,
    heartbeat_fn,
    instance_id,
    n_processes,
    results_topic,
)

Start the solver loop of the worker.

This will be running when the worker is currently solving the loadflow This will wait until a StartCalculationCommand is received and return it. In case a ShutdownCommand is received, the worker will exit with the exit code provided in the command.

PARAMETER DESCRIPTION
command

The command to start the optimization run with.

TYPE: StartCalculationCommand

producer

The initialized Kafka producer to send results to.

TYPE: KafkaProducer

processed_grid_path

The path to the pre-processed grid files. This is used to load the grid files.

TYPE: Path

loadflow_solver_path

The path to the loadflow solver results. This is used to save the loadflow results.

TYPE: Path

heartbeat_fn

A function to call to send a heartbeat message to the kafka topic.

TYPE: Callable

instance_id

The instance id of the worker, used to identify the worker in the logs.

TYPE: str

n_processes

The number of processes to use for the optimization run. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process

TYPE: int

results_topic

The topic to push results to.

TYPE: str

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/lf_worker.py
def solver_loop(
    command: StartCalculationCommand,
    producer: Producer,
    processed_grid_path: Path,
    loadflow_solver_path: Path,
    heartbeat_fn: Callable,
    instance_id: str,
    n_processes: int,
    results_topic: str,
) -> None:
    """Start the solver loop of the worker.

    This will be running when the worker is currently solving the loadflow
    This will wait until a StartCalculationCommand is received and return it. In case a
    ShutdownCommand is received, the worker will exit with the exit code provided in the command.

    Parameters
    ----------
    command : StartCalculationCommand
        The command to start the optimization run with.
    producer : KafkaProducer
        The initialized Kafka producer to send results to.
    processed_grid_path : Path
        The path to the pre-processed grid files. This is used to load the grid files.
    loadflow_solver_path : Path
        The path to the loadflow solver results. This is used to save the loadflow results.
    heartbeat_fn : Callable
        A function to call to send a heartbeat message to the kafka topic.
    instance_id : str
        The instance id of the worker, used to identify the worker in the logs.
    n_processes : int
        The number of processes to use for the optimization run. If 1, the analysis is run sequentially.
        If > 1, the analysis is run in parallel
        Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process
    results_topic : str
        The topic to push results to.
    """
    start_time = time.time()
    dirfs = DirFileSystem(str(loadflow_solver_path))
    try:
        if command.grid_data.n_1_definition is None:
            raise ValueError("No N-1 definition provided. This is currently not supported.")
        n_minus_1_definition = command.grid_data.n_1_definition

        for job in command.jobs:
            producer.produce(
                results_topic,
                value=serialize_message(
                    LoadflowBaseResult(
                        job_id=job.id,
                        instance_id=instance_id,
                        loadflow_id=command.loadflow_id,
                        runtime=0.0,
                        result=LoadflowStartedResult(),
                    ).model_dump_json()
                ),
                key=command.loadflow_id.encode(),
            )
            job_loadflow_results_polars = LoadflowResultsPolars(job_id=job.id)
            for i, grid in enumerate(command.grid_data.grid_files):
                heartbeat_fn(
                    command.loadflow_id, time.time() - start_time, f"Loadflow Calculation run started for timestep {i}"
                )
                net = load_base_grid(processed_grid_path / grid, command.grid_data.grid_type)
                timestep_result_polars = get_ac_loadflow_results(
                    net=net, n_minus_1_definition=n_minus_1_definition, timestep=i, job_id=job.id, n_processes=n_processes
                )
                job_loadflow_results_polars = concatenate_loadflow_results_polars(
                    [job_loadflow_results_polars, timestep_result_polars]
                )
                ref = save_loadflow_results_polars(dirfs, job.id, job_loadflow_results_polars)
                if i < len(command.grid_data.grid_files) - 1:
                    result_msg = LoadflowStreamResult(
                        loadflow_reference=ref,
                        solved_timesteps=list(range(i + 1)),
                        remainging_timesteps=list(range(i + 1, len(command.grid_data.grid_files))),
                    )
                else:
                    result_msg = LoadflowSuccessResult(loadflow_reference=ref)

                producer.produce(
                    topic=results_topic,
                    value=serialize_message(
                        LoadflowBaseResult(
                            job_id=job.id,
                            loadflow_id=command.loadflow_id,
                            instance_id=instance_id,
                            runtime=time.time() - start_time,
                            result=result_msg,
                        ).model_dump_json()
                    ),
                    key=command.loadflow_id.encode(),
                )
    except Exception as e:
        logger.error(f"Error while processing {command.loadflow_id}: {e}")
        producer.produce(
            topic=results_topic,
            value=serialize_message(
                LoadflowBaseResult(
                    job_id=command.loadflow_id,
                    instance_id=instance_id,
                    loadflow_id=command.loadflow_id,
                    runtime=time.time() - start_time,
                    result=ErrorResult(error=str(e)),
                ).model_dump_json()
            ),
            key=command.loadflow_id.encode(),
        )

load_base_grid_fs #

load_base_grid_fs(filesystem, grid_path, grid_type)

Load the base grid from the grid file.

Force loading pandapower if grid type is pandapower, otherwise load powsybl.

PARAMETER DESCRIPTION
filesystem

The filesystem to load the grid from

TYPE: AbstractFileSystem

grid_path

The grid to load

TYPE: Path

grid_type

The type of the grid, either "pandapower", "powsybl", "ucte" or "cgmes".

TYPE: Literal[pandapower, powsybl, ucte, cgmes]

RETURNS DESCRIPTION
PandapowerNet | Network

The loaded grid

RAISES DESCRIPTION
ValueError

If the grid type is not supported.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/lf_worker.py
def load_base_grid_fs(
    filesystem: AbstractFileSystem,
    grid_path: Path,
    grid_type: Literal["pandapower", "powsybl", "ucte", "cgmes"],
) -> pandapower.pandapowerNet | Network:
    """Load the base grid from the grid file.

    Force loading pandapower if grid type is pandapower, otherwise load powsybl.

    Parameters
    ----------
    filesystem : AbstractFileSystem
        The filesystem to load the grid from
    grid_path : Path
        The grid to load
    grid_type: Literal["pandapower", "powsybl", "ucte", "cgmes"]
        The type of the grid, either "pandapower", "powsybl", "ucte" or "cgmes".

    Returns
    -------
    PandapowerNet | Network
        The loaded grid

    Raises
    ------
    ValueError
        If the grid type is not supported.
    """
    if grid_type == "pandapower":
        return load_pandapower_from_fs(filesystem, grid_path)
    if grid_type in ["powsybl", "ucte", "cgmes"]:
        return load_powsybl_from_fs(filesystem, grid_path)
    raise ValueError(f"Unknown grid type: {grid_type}")

load_base_grid #

load_base_grid(grid_path, grid_type)

Load the base grid from the grid file.

PARAMETER DESCRIPTION
grid_path

The grid to load

TYPE: Path

grid_type

The type of the grid, either "pandapower", "powsybl", "ucte" or "cgmes".

TYPE: Literal[pandapower, powsybl, ucte, cgmes]

RETURNS DESCRIPTION
PandapowerNet | Network

The loaded grid

RAISES DESCRIPTION
ValueError

If the grid type is not supported.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/lf_worker.py
def load_base_grid(
    grid_path: Path, grid_type: Literal["pandapower", "powsybl", "ucte", "cgmes"]
) -> pandapower.pandapowerNet | Network:
    """Load the base grid from the grid file.

    Parameters
    ----------
    grid_path : Path
        The grid to load
    grid_type: Literal["pandapower", "powsybl", "ucte", "cgmes"]
        The type of the grid, either "pandapower", "powsybl", "ucte" or "cgmes".

    Returns
    -------
    PandapowerNet | Network
        The loaded grid

    Raises
    ------
    ValueError
        If the grid type is not supported.
    """
    return load_base_grid_fs(LocalFileSystem(), grid_path, grid_type)

main #

main(args)

Start main function of the worker.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/ac_loadflow_service/lf_worker.py
def main(args: LoadflowWorkerArgs) -> None:
    """Start main function of the worker."""
    logger.info(f"Starting importer instance {args.instance_id}")
    consumer = LongRunningKafkaConsumer(
        topic=args.loadflow_command_topic,
        group_id="loadflow-worker",
        bootstrap_servers=args.kafka_broker,
        client_id=args.instance_id,
    )

    producer = Producer(
        {
            "bootstrap.servers": args.kafka_broker,
            "client.id": args.instance_id,
            "log_level": 2,
        },
        logger=logger,
    )

    def heartbeat_idle() -> None:
        producer.produce(
            args.loadflow_heartbeat_topic,
            value=serialize_message(
                LoadflowHeartbeat(
                    idle=True,
                    status_info=None,
                ).model_dump_json()
            ),
            key=args.instance_id.encode("utf-8"),
        )
        producer.flush()

    def heartbeat_fn(job_id: str, runtime: float, message: str = "") -> None:
        producer.produce(
            args.loadflow_heartbeat_topic,
            value=serialize_message(
                LoadflowHeartbeat(
                    idle=False,
                    status_info=LoadflowStatusInfo(
                        loadflow_id=job_id,
                        runtime=runtime,
                        message=message,
                    ),
                ).model_dump_json()
            ),
            key=args.instance_id.encode("utf-8"),
        )
        producer.flush()
        consumer.heartbeat()

    while True:
        command = idle_loop(
            consumer=consumer,
            send_heartbeat_fn=heartbeat_idle,
            heartbeat_interval_ms=args.heartbeat_interval_ms,
        )
        consumer.start_processing()
        solver_loop(
            command=command,
            producer=producer,
            processed_grid_path=args.processed_gridfile_folder,
            loadflow_solver_path=args.loadflow_result_folder,
            heartbeat_fn=heartbeat_fn,
            instance_id=args.instance_id,
            n_processes=args.n_processes,
            results_topic=args.loadflow_results_topic,
        )
        producer.flush()
        consumer.stop_processing()

Contingency Analysis PandaPower#

toop_engine_contingency_analysis.pandapower.contingency_analysis_pandapower #

Compute the N-1 AC/DC power flow for the pandapower network.

run_single_outage #

run_single_outage(
    net,
    contingency,
    monitored_elements,
    timestep,
    job_id,
    method,
    runpp_kwargs=None,
)

Compute a single outage for the given network

PARAMETER DESCRIPTION
net

The network to compute the outage for

TYPE: pandapowerNet

contingency

The contingency to compute the outage for

TYPE: PandapowerContingency

monitored_elements

The elements to monitor during the outage

TYPE: DataFrame[PandapowerMonitoredElementSchema]

timestep

The timestep of the results

TYPE: int

job_id

The job id of the current job

TYPE: str

method

The method to use for the loadflow. Either "ac" or "dc"

TYPE: Literal[ac, dc]

runpp_kwargs

Additional keyword arguments to pass to runpp/rundcpp functions, by default None

TYPE: Optional[dict] DEFAULT: None

RETURNS DESCRIPTION
LoadflowResults

The results of the ContingencyAnalysis computation

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
@pa.check_types
def run_single_outage(
    net: pp.pandapowerNet,
    contingency: PandapowerContingency,
    monitored_elements: pat.DataFrame[PandapowerMonitoredElementSchema],
    timestep: int,
    job_id: str,
    method: Literal["ac", "dc"],
    runpp_kwargs: Optional[dict] = None,
) -> LoadflowResults:
    """Compute a single outage for the given network

    Parameters
    ----------
    net : pp.pandapowerNet
        The network to compute the outage for
    contingency: PandapowerContingency,
        The contingency to compute the outage for
    monitored_elements: pat.DataFrame[PandapowerMonitoredElementSchema],
        The elements to monitor during the outage
    timestep : int
        The timestep of the results
    job_id : str
        The job id of the current job
    method : Literal["ac", "dc"]
        The method to use for the loadflow. Either "ac" or "dc"
    runpp_kwargs : Optional[dict], optional
        Additional keyword arguments to pass to runpp/rundcpp functions, by default None

    Returns
    -------
    LoadflowResults
        The results of the ContingencyAnalysis computation
    """
    outaged_elements = contingency.elements

    were_in_service = set_outaged_elements_out_of_service(net, outaged_elements)
    if not any(were_in_service):
        # If no elements were outaged, this is the base case and we should not run the loadflow
        status = ConvergenceStatus.NO_CALCULATION
    else:
        runpp_kwargs = runpp_kwargs or {}
        try:
            pp.rundcpp(net, **runpp_kwargs) if method == "dc" else pp.runpp(net, **runpp_kwargs)
            status = ConvergenceStatus.CONVERGED
        except pp.LoadflowNotConverged:
            status = ConvergenceStatus.FAILED

    convergence_df = get_convergence_df(timestep=timestep, contingency=contingency, status=status.value)
    regulating_elements_df = get_regulating_element_results(timestep, monitored_elements, contingency)

    branch_results_df, node_results_df, va_diff_results = get_element_results_df(
        net, contingency, monitored_elements, timestep, status
    )

    restore_elements_to_service(net, outaged_elements, were_in_service)

    element_name_map = monitored_elements["name"].to_dict()
    for df in [branch_results_df, node_results_df, regulating_elements_df, va_diff_results]:
        no_name_yet = df["element_name"] == ""
        df.loc[no_name_yet, "element_name"] = df.loc[no_name_yet].index.get_level_values("element").map(element_name_map)
        df["contingency_name"] = contingency.name
    lf_result = LoadflowResults(
        job_id=job_id,
        branch_results=branch_results_df,
        node_results=node_results_df,
        converged=convergence_df,
        regulating_element_results=regulating_elements_df,
        va_diff_results=va_diff_results,
        warnings=[],
    )
    return lf_result

restore_elements_to_service #

restore_elements_to_service(
    net, outaged_elements, were_in_service
)

Restore the outaged elements to their original in_service status.

PARAMETER DESCRIPTION
net

The pandapower network to restore the elements in

TYPE: pandapowerNet

outaged_elements

The elements that were outaged

TYPE: list[PandapowerElements]

were_in_service

A list indicating whether each element was in service before being set out of service

TYPE: list[bool]

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def restore_elements_to_service(
    net: pp.pandapowerNet, outaged_elements: list[PandapowerElements], were_in_service: list[bool]
) -> None:
    """Restore the outaged elements to their original in_service status.

    Parameters
    ----------
    net : pp.pandapowerNet
        The pandapower network to restore the elements in
    outaged_elements : list[PandapowerElements]
        The elements that were outaged
    were_in_service : list[bool]
        A list indicating whether each element was in service before being set out of service
    """
    for i, element in enumerate(outaged_elements):
        if were_in_service[i]:
            net[element.table].loc[int(element.table_id), "in_service"] = True

get_element_results_df #

get_element_results_df(
    net, contingency, monitored_elements, timestep, status
)

Get the element results dataframes for the given contingency and monitored elements.

PARAMETER DESCRIPTION
net

The pandapower network to get the results from

TYPE: pandapowerNet

contingency

The contingency to get the results for

TYPE: PandapowerContingency

monitored_elements

The monitored elements to get the results for

TYPE: DataFrame[PandapowerMonitoredElementSchema]

timestep

The timestep of the results

TYPE: int

status

The convergence status of the loadflow computation

TYPE: ConvergenceStatus

RETURNS DESCRIPTION
tuple[DataFrame, DataFrame, DataFrame]

The branch results dataframe, node results dataframe and va diff results dataframe

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def get_element_results_df(
    net: pp.pandapowerNet,
    contingency: PandapowerContingency,
    monitored_elements: pat.DataFrame[PandapowerMonitoredElementSchema],
    timestep: int,
    status: ConvergenceStatus,
) -> tuple[pat.DataFrame[BranchResultSchema], pat.DataFrame[NodeResultSchema], pat.DataFrame[VADiffResultSchema]]:
    """Get the element results dataframes for the given contingency and monitored elements.

    Parameters
    ----------
    net : pp.pandapowerNet
        The pandapower network to get the results from
    contingency : PandapowerContingency
        The contingency to get the results for
    monitored_elements : pat.DataFrame[PandapowerMonitoredElementSchema]
        The monitored elements to get the results for
    timestep : int
        The timestep of the results
    status : ConvergenceStatus
        The convergence status of the loadflow computation

    Returns
    -------
    tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
        The branch results dataframe, node results dataframe and va diff results dataframe
    """
    if status == ConvergenceStatus.CONVERGED:
        branch_results_df = get_branch_results(net, contingency, monitored_elements, timestep)
        node_results_df = get_node_result_df(net, contingency, monitored_elements, timestep)
        va_diff_results = get_va_diff_results(net, timestep, monitored_elements, contingency)
    else:
        monitored_trafo3w = monitored_elements.query("table == 'trafo3w'").index.to_list()
        monitored_branches = monitored_elements.query("kind == 'branch' & table != 'trafo3w'").index.to_list()
        monitored_buses = monitored_elements.query("kind == 'bus'").index.to_list()
        branch_results_df = get_failed_branch_results(
            timestep, [contingency.unique_id], monitored_branches, monitored_trafo3w
        )
        node_results_df = get_failed_node_results(timestep, [contingency.unique_id], monitored_buses)
        va_diff_results = get_failed_va_diff_results(timestep, monitored_elements, contingency)
    return branch_results_df, node_results_df, va_diff_results

set_outaged_elements_out_of_service #

set_outaged_elements_out_of_service(net, outaged_elements)

Set the outaged elements in the network to out of service.

Returns info if the elements were in service before being set out of service.

PARAMETER DESCRIPTION
net

The pandapower network to set the elements out of service in

TYPE: pandapowerNet

outaged_elements

The elements to set out of service

TYPE: list[PandapowerElements]

RETURNS DESCRIPTION
list[bool]

A list indicating whether each element was in service before being set out of service

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def set_outaged_elements_out_of_service(net: pp.pandapowerNet, outaged_elements: list[PandapowerElements]) -> list[bool]:
    """Set the outaged elements in the network to out of service.

    Returns info if the elements were in service before being set out of service.

    Parameters
    ----------
    net : pp.pandapowerNet
        The pandapower network to set the elements out of service in
    outaged_elements : list[PandapowerElements]
        The elements to set out of service

    Returns
    -------
    list[bool]
        A list indicating whether each element was in service before being set out of service
    """
    were_in_service = []
    if len(outaged_elements) == 0:
        # This is the base case. Append a dummy True so it does not raise due to no elements being outaged
        were_in_service.append(True)
    else:
        for element in outaged_elements:
            was_in_service = net[element.table].loc[element.table_id, "in_service"]
            were_in_service.append(bool(was_in_service))
            net[element.table].loc[element.table_id, "in_service"] = False
    return were_in_service

run_contingency_analysis_sequential #

run_contingency_analysis_sequential(
    net,
    n_minus_1_definition,
    job_id,
    timestep,
    slack_allocation_config,
    method="dc",
    runpp_kwargs=None,
)

Compute a full N-1 analysis for the given network, but a single timestep

PARAMETER DESCRIPTION
net

The network to compute the N-1 analysis for

TYPE: pandapowerNet

n_minus_1_definition

The N-1 definition to use for the analysis

TYPE: PandapowerNMinus1Definition

job_id

The job id of the current job

TYPE: str

timestep

The timestep of the results

TYPE: int

slack_allocation_config

Precomputed configuration for slack allocation per island.

TYPE: SlackAllocationConfig

method

The method to use for the loadflow, by default "dc"

TYPE: Literal[ac, dc] DEFAULT: 'dc'

runpp_kwargs

Additional keyword arguments to pass to runpp/rundcpp functions, by default None

TYPE: Optional[dict] DEFAULT: None

RETURNS DESCRIPTION
list[LoadflowResults]

A list of the results per contingency

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def run_contingency_analysis_sequential(
    net: pp.pandapowerNet,
    n_minus_1_definition: PandapowerNMinus1Definition,
    job_id: str,
    timestep: int,
    slack_allocation_config: SlackAllocationConfig,
    method: Literal["ac", "dc"] = "dc",
    runpp_kwargs: Optional[dict] = None,
) -> list[LoadflowResults]:
    """Compute a full N-1 analysis for the given network, but a single timestep

    Parameters
    ----------
    net : pp.pandapowerNet
        The network to compute the N-1 analysis for
    n_minus_1_definition: PandapowerNMinus1Definition,
        The N-1 definition to use for the analysis
    job_id : str
        The job id of the current job
    timestep : int
        The timestep of the results
    slack_allocation_config : SlackAllocationConfig
        Precomputed configuration for slack allocation per island.
    method : Literal["ac", "dc"], optional
        The method to use for the loadflow, by default "dc"
    runpp_kwargs : Optional[dict], optional
        Additional keyword arguments to pass to runpp/rundcpp functions, by default None

    Returns
    -------
    list[LoadflowResults]
        A list of the results per contingency
    """
    results = []

    for contingency in n_minus_1_definition.contingencies:
        copy_net = deepcopy(net)
        elements_ids = [element.unique_id for element in contingency.elements]
        removed_edges = assign_slack_per_island(
            net=copy_net,
            net_graph=slack_allocation_config.net_graph,
            bus_lookup=slack_allocation_config.bus_lookup,
            elements_ids=elements_ids,
            min_island_size=slack_allocation_config.min_island_size,
        )

        single_res = run_single_outage(
            net=copy_net,
            contingency=contingency,
            monitored_elements=n_minus_1_definition.monitored_elements,
            timestep=timestep,
            job_id=job_id,
            method=method,
            runpp_kwargs=runpp_kwargs,
        )

        results.append(single_res)
        slack_allocation_config.net_graph.add_edges_from(removed_edges)

    return results

run_contingency_analysis_parallel #

run_contingency_analysis_parallel(
    net,
    n_minus_1_definition,
    job_id,
    timestep,
    slack_allocation_config,
    method="dc",
    n_processes=1,
    batch_size=None,
    runpp_kwargs=None,
)

Compute the N-1 AC/DC power flow for the network.

PARAMETER DESCRIPTION
net

The pandapower network to compute the N-1 power flow for, with the topolgy already applied. You can either pass the network directly or a ray.ObjectRef to the network (wrapped in a list to avoid dereferencing the object).

TYPE: pandapowerNet

n_minus_1_definition

The N-1 definition to use for the analysis. Contains outages and monitored elements

TYPE: PandapowerNMinus1Definition

job_id

The job id of the current job

TYPE: str

timestep

The timestep of the results

TYPE: int

slack_allocation_config

Precomputed configuration for slack allocation per island.

TYPE: SlackAllocationConfig

method

The method to use for the loadflow, by default "dc"

TYPE: Literal[ac, dc] DEFAULT: 'dc'

n_processes

The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel. Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process

TYPE: int DEFAULT: 1

batch_size

The size of the batches to use for the parallelization. If None, the batch size is set to the number of contingencies divided by the number of processes, rounded up. This is used to avoid too many handles in

TYPE: Optional[int] DEFAULT: None

runpp_kwargs

Additional keyword arguments to pass to runpp/rundcpp functions, by default None

TYPE: Optional[dict] DEFAULT: None

RETURNS DESCRIPTION
list[LoadflowResults]

A list of the results per contingency

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def run_contingency_analysis_parallel(
    net: pp.pandapowerNet,
    n_minus_1_definition: PandapowerNMinus1Definition,
    job_id: str,
    timestep: int,
    slack_allocation_config: SlackAllocationConfig,
    method: Literal["ac", "dc"] = "dc",
    n_processes: int = 1,
    batch_size: Optional[int] = None,
    runpp_kwargs: Optional[dict] = None,
) -> list[LoadflowResults]:
    """Compute the N-1 AC/DC power flow for the network.

    Parameters
    ----------
    net: pp.pandapowerNet,
        The pandapower network to compute the N-1 power flow for, with the topolgy already applied.
        You can either pass the network directly or a ray.ObjectRef to the network (wrapped in a
        list to avoid dereferencing the object).
    n_minus_1_definition: PandapowerNMinus1Definition,
        The N-1 definition to use for the analysis. Contains outages and monitored elements
    job_id : str
        The job id of the current job
    timestep : int
        The timestep of the results
    slack_allocation_config : SlackAllocationConfig
        Precomputed configuration for slack allocation per island.
    method : Literal["ac", "dc"], optional
        The method to use for the loadflow, by default "dc"
    n_processes : int, optional
        The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.
        If > 1, the analysis is run in parallel. Paralelization is done by splitting the contingencies into
        chunks and running each chunk in a separate process
    batch_size : Optional[int]
        The size of the batches to use for the parallelization. If None, the batch size is set to the number of
        contingencies divided by the number of processes, rounded up. This is used to avoid too many handles in
    runpp_kwargs : Optional[dict], optional
        Additional keyword arguments to pass to runpp/rundcpp functions, by default None

    Returns
    -------
    list[LoadflowResults]
        A list of the results per contingency
    """
    n_outages = len(n_minus_1_definition.contingencies)
    if batch_size is None:
        batch_size = math.ceil(n_outages / n_processes)
    work = [n_minus_1_definition[i : i + batch_size] for i in range(0, n_outages, batch_size)]

    # Schedule work until the handle list is too long, then wait for the first result and continue
    handles = []
    result_lists = []
    _compute_remote = ray.remote(run_contingency_analysis_sequential)
    for batch in work:
        handles.append(
            _compute_remote.remote(
                net=net,
                n_minus_1_definition=batch,
                job_id=job_id,
                timestep=timestep,
                slack_allocation_config=slack_allocation_config,
                method=method,
                runpp_kwargs=runpp_kwargs,
            )
        )
        if len(handles) >= n_processes:
            # Wait for the first result and continue
            finished, handles = ray.wait(handles, num_returns=1)
            result_lists.extend(ray.get(finished))
    result_lists.extend(ray.get(handles))

    # Sort the result_lists back into the original order
    del handles
    # Flatten the result_lists
    results = [result for result_list in result_lists for result in result_list]
    return results

run_contingency_analysis_pandapower #

run_contingency_analysis_pandapower(
    net,
    n_minus_1_definition,
    job_id,
    timestep,
    min_island_size=11,
    method="ac",
    n_processes=1,
    batch_size=None,
    runpp_kwargs=None,
    polars=False,
)

Compute the N-1 AC/DC power flow for the network.

PARAMETER DESCRIPTION
net

The pandapower network to compute the N-1 power flow for, with the topology already applied.

TYPE: pandapowerNet

n_minus_1_definition

The N-1 definition to use for the analysis. Contains outages and monitored elements

TYPE: Nminus1Definition

job_id

The job id of the current job

TYPE: str

timestep

The timestep of the results

TYPE: int

min_island_size

The minimum island size to consider

TYPE: int DEFAULT: 11

method

The method to use for the loadflow, by default "ac"

TYPE: Literal[ac, dc] DEFAULT: 'ac'

n_processes

The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel. Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process

TYPE: int DEFAULT: 1

batch_size

The size of the batches to use for the parallelization. If None, the batch size is set to the number of contingencies divided by the number of processes, rounded up.

TYPE: Optional[int] DEFAULT: None

runpp_kwargs

Additional keyword arguments to pass to runpp/rundcpp functions, by default None

TYPE: Optional[dict] DEFAULT: None

polars

Whether to return the results as a LoadflowResultsPolars object. If False, returns a LoadflowResults object. Note that this only affects the type of the returned object, the computations are the same.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[LoadflowResults, LoadflowResultsPolars]

The results of the loadflow computation

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pandapower/contingency_analysis_pandapower.py
def run_contingency_analysis_pandapower(
    net: pp.pandapowerNet,
    n_minus_1_definition: Nminus1Definition,
    job_id: str,
    timestep: int,
    min_island_size: int = 11,
    method: Literal["ac", "dc"] = "ac",
    n_processes: int = 1,
    batch_size: Optional[int] = None,
    runpp_kwargs: Optional[dict] = None,
    polars: bool = False,
) -> Union[LoadflowResults, LoadflowResultsPolars]:
    """Compute the N-1 AC/DC power flow for the network.

    Parameters
    ----------
    net : pp.pandapowerNet
        The pandapower network to compute the N-1 power flow for, with the topology already applied.
    n_minus_1_definition : Nminus1Definition
        The N-1 definition to use for the analysis. Contains outages and monitored elements
    job_id : str
        The job id of the current job
    timestep : int
        The timestep of the results
    min_island_size: int
        The minimum island size to consider
    method : Literal["ac", "dc"], optional
        The method to use for the loadflow, by default "ac"
    n_processes : int, optional
        The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.
        If > 1, the analysis is run in parallel. Paralelization is done by splitting the contingencies into
        chunks and running each chunk in a separate process
    batch_size : Optional[int]
        The size of the batches to use for the parallelization. If None, the batch size is set to the number of
        contingencies divided by the number of processes, rounded up.
    runpp_kwargs : Optional[dict], optional
        Additional keyword arguments to pass to runpp/rundcpp functions, by default None
    polars: bool, default=False
        Whether to return the results as a LoadflowResultsPolars object. If False, returns a LoadflowResults
        object. Note that this only affects the type of the returned object, the computations are the same.

    Returns
    -------
    Union[LoadflowResults, LoadflowResultsPolars]
        The results of the loadflow computation
    """
    pp_n1_definition = translate_nminus1_for_pandapower(n_minus_1_definition, net)
    net_graph = top.create_nxgraph(net)
    bus_lookup, _ = create_bus_lookup_simple(net)
    slack_allocation_config = SlackAllocationConfig(
        net_graph=net_graph,
        bus_lookup=bus_lookup,
        min_island_size=min_island_size,
    )

    if n_processes == 1 and batch_size is None:
        results = run_contingency_analysis_sequential(
            net=net,
            n_minus_1_definition=pp_n1_definition,
            job_id=job_id,
            timestep=timestep,
            slack_allocation_config=slack_allocation_config,
            method=method,
            runpp_kwargs=runpp_kwargs,
        )
    else:
        results = run_contingency_analysis_parallel(
            net=net,
            n_minus_1_definition=pp_n1_definition,
            job_id=job_id,
            timestep=timestep,
            slack_allocation_config=slack_allocation_config,
            method=method,
            n_processes=n_processes,
            batch_size=batch_size,
            runpp_kwargs=runpp_kwargs,
        )
    lf_result = concatenate_loadflow_results(results)

    missing_element_warnings = [
        f"Element with id {element.id} not found in the network." for element in pp_n1_definition.missing_elements
    ]
    missing_contingency_warnings = [
        f"Contingency with id {contingency.id} contains elements that are not found in the network."
        for contingency in pp_n1_definition.missing_contingencies
    ]
    duplicated_id_warnings = [
        f"Element with id {element_id} is not unique in the grid."
        for element_id in pp_n1_definition.duplicated_grid_elements
    ]
    lf_result.warnings = [
        *duplicated_id_warnings,
        *missing_element_warnings,
        *missing_contingency_warnings,
        *lf_result.warnings,
    ]
    if not polars:
        return lf_result
    return convert_pandas_loadflow_results_to_polars(lf_result)

toop_engine_contingency_analysis.pandapower.pandapower_helpers #

Contingency Analysis PyPowsybl#

toop_engine_contingency_analysis.pypowsybl.contingency_analysis_powsybl #

Compute the N-1 AC/DC power flow for the network.

run_powsybl_analysis #

run_powsybl_analysis(
    net, n_minus_1_definition, method="ac", n_processes=1
)

Run the powsybl security analysis for the given network and N-1 definition.

PARAMETER DESCRIPTION
net

The powsybl network to compute the Contingency Analysis for

TYPE: Network

n_minus_1_definition

The N-1 definition to use for the contingency analysis. Contains outages and monitored elements

TYPE: Nminus1Definition

method

The method to use for the contingency analysis. Either "ac" or "dc", by default "ac"

TYPE: Literal[ac, dc] DEFAULT: 'ac'

n_processes

The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
res

The security analysis result from powsybl containing the monitored elements and the results of the contingencies.

TYPE: SecurityAnalysisResult

basecase_id

The name of the basecase contingency, if it is included in the run. Otherwise None.

TYPE: str | None

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/contingency_analysis_powsybl.py
def run_powsybl_analysis(
    net: Network,
    n_minus_1_definition: PowsyblNMinus1Definition,
    method: Literal["ac", "dc"] = "ac",
    n_processes: int = 1,
) -> tuple[SecurityAnalysisResult, str | None]:
    """Run the powsybl security analysis for the given network and N-1 definition.

    Parameters
    ----------
    net : Network
        The powsybl network to compute the Contingency Analysis for
    n_minus_1_definition : Nminus1Definition
        The N-1 definition to use for the contingency analysis. Contains outages and monitored elements
    method : Literal["ac", "dc"], optional
        The method to use for the contingency analysis. Either "ac" or "dc", by default "ac"
    n_processes : int, optional
        The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.

    Returns
    -------
    res: SecurityAnalysisResult
        The security analysis result from powsybl containing the monitored elements and the results of the contingencies.
    basecase_id : str | None
        The name of the basecase contingency, if it is included in the run. Otherwise None.
    """
    # Run the actual loadflow computation
    analysis = pypowsybl.security.create_analysis()
    analysis.add_monitored_elements(
        branch_ids=n_minus_1_definition.monitored_elements["branches"],
        three_windings_transformer_ids=n_minus_1_definition.monitored_elements["trafo3w"],
        voltage_level_ids=n_minus_1_definition.monitored_elements["voltage_levels"],
    )
    basecase_id = None
    for contingency in n_minus_1_definition.contingencies:
        outages = contingency.elements
        if len(outages) > 1:
            analysis.add_multiple_elements_contingency(outages, contingency_id=contingency.id)
        elif len(outages) == 0:
            # If there are no outages, we add the basecase contingency
            basecase_id = contingency.id
        else:
            analysis.add_single_element_contingency(outages[0], contingency_id=contingency.id)
    if method == "ac" and n_minus_1_definition.distributed_slack:
        # If we have distributed slack and AC loadflows, we need to set the slack to a single generator
        # This is done by setting the target values of the generators to the loadflow values
        lf_params = DISTRIBUTED_SLACK

    else:
        # The security analysis in DC should always run with a single slack to avoid changing gen values for each N-1 case
        # This way it matches the current way our N-1 analysis in the GPU-solver is set up
        lf_params = SINGLE_SLACK
    contingency_propagation = "true" if n_minus_1_definition.contingency_propagation else "false"
    security_params = pypowsybl.security.impl.parameters.Parameters(
        load_flow_parameters=lf_params,
        provider_parameters={"threadCount": str(n_processes), "contingencyPropagation": contingency_propagation},
    )

    res = analysis.run_ac(net, security_params) if method == "ac" else analysis.run_dc(net, security_params)
    return res, basecase_id

run_contingency_analysis_polars #

run_contingency_analysis_polars(
    net,
    pow_n1_definition,
    job_id,
    timestep,
    method="dc",
    n_processes=1,
)

Compute the N-0 + N-1 power flow for the network.

PARAMETER DESCRIPTION
net

The powsybl network to compute the Contingency Analysis for

TYPE: Network

pow_n1_definition

The N-1 definition to use for the contingency analysis. Contains outages and monitored elements

TYPE: PowsyblNMinus1Definition

job_id

The job id of the current job

TYPE: str

timestep

The timestep to use for the contingency analysis

TYPE: int

method

Whether to compute the AC or DC power flow, by default "dc"

TYPE: Literal[ac, dc] DEFAULT: 'dc'

n_processes

The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process This is done via the openloadflow native threadCount parameter, which is set in the powsybl security analysis parameters.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
LoadflowResultsPolars

The results of the loadflow computation. Invalid or otherwise failed results will be set to NaN.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/contingency_analysis_powsybl.py
def run_contingency_analysis_polars(
    net: Network,
    pow_n1_definition: PowsyblNMinus1Definition,
    job_id: str,
    timestep: int,
    method: Literal["ac", "dc"] = "dc",
    n_processes: int = 1,
) -> LoadflowResultsPolars:
    """Compute the N-0 + N-1 power flow for the network.

    Parameters
    ----------
    net : Network
        The powsybl network to compute the Contingency Analysis for
    pow_n1_definition : PowsyblNMinus1Definition
        The N-1 definition to use for the contingency analysis. Contains outages and monitored elements
    job_id : str
        The job id of the current job
    timestep : int
        The timestep to use for the contingency analysis
    method : Literal["ac", "dc"], optional
        Whether to compute the AC or DC power flow, by default "dc"
    n_processes : int, optional
        The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.
        If > 1, the analysis is run in parallel
        Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process
        This is done via the openloadflow native threadCount parameter,
        which is set in the powsybl security analysis parameters.

    Returns
    -------
    LoadflowResultsPolars
        The results of the loadflow computation. Invalid or otherwise failed results will be set to NaN.
    """
    monitored_elements = pow_n1_definition.monitored_elements
    ca_result, basecase_id = run_powsybl_analysis(net, pow_n1_definition, method, n_processes=n_processes)
    bus_results = get_ca_bus_results(ca_result, lazy=True)
    branch_results = get_ca_branch_results(ca_result, lazy=True)
    three_windings_transformer_results = get_ca_three_windings_transformer_results(ca_result, lazy=True)
    post_contingency_results = ca_result.post_contingency_results
    pre_contingency_result = ca_result.pre_contingency_result

    all_outage_ids = [contingency.id for contingency in pow_n1_definition.contingencies if not contingency.is_basecase()]
    convergence_df, failed_outages = get_convergence_result_df(
        post_contingency_results, pre_contingency_result, all_outage_ids, timestep, basecase_id
    )
    add_name_column(convergence_df, pow_n1_definition.contingency_name_mapping, index_level="contingency")
    convergence_df = pl.from_pandas(convergence_df, include_index=True, nan_to_null=False).lazy()

    branch_limit_polars = pl.from_pandas(pow_n1_definition.branch_limits, include_index=True, nan_to_null=False).lazy()

    branch_results_df = get_branch_results_polars(
        branch_results,
        three_windings_transformer_results,
        monitored_elements["branches"],
        monitored_elements["trafo3w"],
        failed_outages,
        timestep,
        branch_limit_polars,
    )
    node_results_df = get_node_results_polars(
        bus_results,
        monitored_elements["buses"],
        pl.from_pandas(pow_n1_definition.bus_map, include_index=True, nan_to_null=False).lazy(),
        pl.from_pandas(pow_n1_definition.voltage_levels, include_index=True, nan_to_null=False).lazy(),
        failed_outages,
        timestep,
        method,
    )
    regulating_elements_df = get_regulating_element_results(
        monitored_elements["buses"], timestep=timestep, basecase_name=basecase_id
    )
    regulating_elements_df = pl.from_pandas(regulating_elements_df, include_index=True, nan_to_null=False).lazy()
    va_diff_results_df = get_va_diff_results_polars(
        bus_results=bus_results,
        outages=all_outage_ids,
        va_diff_with_buses=pl.from_pandas(pow_n1_definition.blank_va_diff, include_index=True, nan_to_null=False).lazy(),
        bus_map=pl.from_pandas(pow_n1_definition.bus_map, include_index=True, nan_to_null=False).lazy(),
        timestep=timestep,
    )
    branch_results_df = update_basename_polars(branch_results_df, basecase_id)
    branch_results_df = add_name_column_polars(
        branch_results_df, pow_n1_definition.element_name_mapping, index_level="element"
    )
    branch_results_df = add_name_column_polars(
        branch_results_df, pow_n1_definition.contingency_name_mapping, index_level="contingency"
    )

    node_results_df = update_basename_polars(node_results_df, basecase_id)
    node_results_df = add_name_column_polars(node_results_df, pow_n1_definition.element_name_mapping, index_level="element")
    node_results_df = add_name_column_polars(
        node_results_df, pow_n1_definition.contingency_name_mapping, index_level="contingency"
    )

    regulating_elements_df = update_basename_polars(regulating_elements_df, basecase_id)
    regulating_elements_df = add_name_column_polars(
        regulating_elements_df, pow_n1_definition.element_name_mapping, index_level="element"
    )
    regulating_elements_df = add_name_column_polars(
        regulating_elements_df, pow_n1_definition.contingency_name_mapping, index_level="contingency"
    )

    va_diff_results_df = update_basename_polars(va_diff_results_df, basecase_id)
    va_diff_results_df = add_name_column_polars(
        va_diff_results_df, pow_n1_definition.element_name_mapping, index_level="element"
    )
    va_diff_results_df = add_name_column_polars(
        va_diff_results_df, pow_n1_definition.contingency_name_mapping, index_level="contingency"
    )

    lf_results = LoadflowResultsPolars(
        job_id=job_id,
        branch_results=branch_results_df,
        node_results=node_results_df,
        regulating_element_results=regulating_elements_df,
        va_diff_results=va_diff_results_df,
        converged=convergence_df,
        warnings=[],
        additional_information=[],
        lazy=True,
    )
    return lf_results

run_contingency_analysis_powsybl #

run_contingency_analysis_powsybl(
    net,
    n_minus_1_definition,
    job_id,
    timestep,
    method="ac",
    n_processes=1,
    polars=False,
)

Compute the Contingency Analysis for the network.

PARAMETER DESCRIPTION
net

The powsybl network to compute the Contingency Analysis for

TYPE: Network

n_minus_1_definition

The N-1 definition to use for the contingency analysis. Contains outages and monitored elements

TYPE: Nminus1Definition

job_id

The job id of the current job

TYPE: str

timestep

The timestep to use for the contingency analysis

TYPE: int

method

The method to use for the contingency analysis. Either "ac" or "dc", by default "dc"

TYPE: Literal[ac, dc] DEFAULT: 'ac'

n_processes

The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially. If > 1, the analysis is run in parallel Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process

TYPE: int DEFAULT: 1

polars

Whether to use polars for the dataframe operations.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[LoadflowResults, LoadflowResultsPolars]

The results of the loadflow computation.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/contingency_analysis_powsybl.py
def run_contingency_analysis_powsybl(
    net: Network,
    n_minus_1_definition: Nminus1Definition,
    job_id: str,
    timestep: int,
    method: Literal["ac", "dc"] = "ac",
    n_processes: int = 1,
    polars: bool = False,
) -> Union[LoadflowResults, LoadflowResultsPolars]:
    """Compute the Contingency Analysis for the network.

    Parameters
    ----------
    net : Network
        The powsybl network to compute the Contingency Analysis for
    n_minus_1_definition : Nminus1Definition
        The N-1 definition to use for the contingency analysis. Contains outages and monitored elements
    job_id : str
        The job id of the current job
    timestep : int
        The timestep to use for the contingency analysis
    method : Literal["ac", "dc"], optional
        The method to use for the contingency analysis. Either "ac" or "dc", by default "dc"
    n_processes : int, optional
        The number of processes to use for the contingency analysis. If 1, the analysis is run sequentially.
        If > 1, the analysis is run in parallel
        Paralelization is done by splitting the contingencies into chunks and running each chunk in a separate process
    polars: bool
        Whether to use polars for the dataframe operations.

    Returns
    -------
    Union[LoadflowResults, LoadflowResultsPolars]
        The results of the loadflow computation.
    """
    if n_minus_1_definition.loadflow_parameters.distributed_slack:
        # We only do this once, before the first batch. So we dont have to redo it every iteration
        net = set_target_values_to_lf_values_incl_distributed_slack(net, method)
    pow_n1_definition = translate_nminus1_for_powsybl(n_minus_1_definition, net)

    lf_result = run_contingency_analysis_polars(
        net=net,
        pow_n1_definition=pow_n1_definition,
        job_id=job_id,
        timestep=timestep,
        method=method,
        n_processes=n_processes,
    )
    if not polars:
        lf_result = convert_polars_loadflow_results_to_pandas(lf_result)
    missing_element_warnings = [
        f"Element with id {element.id} not found in the network." for element in pow_n1_definition.missing_elements
    ]
    missing_contingency_warnings = [
        f"Contingency with id {contingency.id} contains elements that are not found in the network."
        for contingency in pow_n1_definition.missing_contingencies
    ]
    lf_result.warnings = [*missing_element_warnings, *missing_contingency_warnings, *lf_result.warnings]
    return lf_result

toop_engine_contingency_analysis.pypowsybl.powsybl_helpers_polars #

Helper functions to translate the N-1 definition into a usable format for Powsybl.

This includes translating contingencies, monitored elements and collecting the necessary data from the network, so this only has to happen once.

POWSYBL_CONVERGENCE_MAP module-attribute #

POWSYBL_CONVERGENCE_MAP = {
    value: value,
    value: value,
    value: value,
    value: value,
}

get_node_results_polars #

get_node_results_polars(
    bus_results,
    monitored_buses,
    bus_map,
    voltage_levels,
    failed_outages,
    timestep,
    method,
)

Get the node results for the given outages and timestep.

TODO: This is currently faking the sum of p and q at the node

PARAMETER DESCRIPTION
bus_results

The bus results from the powsybl security analysis

TYPE: LazyFrame

monitored_buses

The list of monitored buses to get the node results for

TYPE: list[str]

bus_map

A mapping from busbar sections or bus_breaker_buses to the electrical buses. This is used to map the buses from bus_results to electrical buses and back to the monitored buses.

TYPE: LazyFrame

voltage_levels

The voltage levels of the buses. This is used to determine voltage limits and nominal v in DC.

TYPE: LazyFrame

failed_outages

The list of failed outages to get nan-node results for

TYPE: list[str]

timestep

The timestep to get the node results for

TYPE: int

method

The method to use for the node results. Either "ac" or "dc"

TYPE: Literal[ac, dc]

RETURNS DESCRIPTION
DataFrame[NodeResultSchemaPolars]

The node results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def get_node_results_polars(
    bus_results: pl.LazyFrame,
    monitored_buses: list[str],
    bus_map: pl.LazyFrame,
    voltage_levels: pl.LazyFrame,
    failed_outages: list[str],
    timestep: int,
    method: Literal["ac", "dc"],
) -> patpl.LazyFrame[NodeResultSchemaPolars]:
    """Get the node results for the given outages and timestep.

    TODO: This is currently faking the sum of p and q at the node

    Parameters
    ----------
    bus_results : pl.LazyFrame
        The bus results from the powsybl security analysis
    monitored_buses : list[str]
        The list of monitored buses to get the node results for
    bus_map: pl.LazyFrame
        A mapping from busbar sections or bus_breaker_buses to the electrical buses.
        This is used to map the buses from bus_results to electrical buses and back to the monitored buses.
    voltage_levels: pl.LazyFrame
        The voltage levels of the buses. This is used to determine
        voltage limits and nominal v in DC.
    failed_outages : list[str]
        The list of failed outages to get nan-node results for
    timestep : int
        The timestep to get the node results for
    method : Literal["ac", "dc"]
        The method to use for the node results. Either "ac" or "dc"

    Returns
    -------
    patpl.DataFrame[NodeResultSchemaPolars]
        The node results for the given outages and timestep
    """
    if bus_results.limit(1).collect().is_empty():
        return get_failed_node_results_polars(timestep, failed_outages, monitored_buses)
    # Translate bus_ids that could be busbar sections or bus_breaker_buses to the monitored buses
    # Should work for both busbar and bus_breaker models
    node_results = bus_results.drop("operator_strategy_id")
    node_results = node_results.rename({"contingency_id": "contingency"})
    node_results = node_results.join(
        bus_map.select("id", "bus_breaker_bus_id"), left_on=["bus_id"], right_on=["id"], how="left"
    )  # m:1 join
    node_results = node_results.drop_nulls("bus_breaker_bus_id")

    monitored_bus_map = bus_map.filter(pl.col("id").is_in(monitored_buses))
    bus_to_element_map = monitored_bus_map.select(
        pl.col("bus_breaker_bus_id"),
        pl.col("id").alias("element"),
    )
    node_results = node_results.join(bus_to_element_map, on=["bus_breaker_bus_id"], how="left")
    # remove not monitored buses
    node_results = node_results.drop_nulls("element")

    # Merge the actual voltage level in kV
    node_results = node_results.join(
        voltage_levels,
        left_on=["voltage_level_id"],
        right_on=["id"],
        how="left",
    )

    # set timestamp column
    node_results = node_results.with_columns(timestep=pl.lit(timestep))

    node_results = node_results.rename({"v_mag": "vm", "v_angle": "va"})

    # Calculate the values
    if method == "dc":
        node_results = node_results.with_columns(
            pl.when(pl.col("va").is_not_null())
            .then(pl.col("nominal_v"))  # fill vm with nominal v if va is present
            .otherwise(pl.col("vm"))  # keep original vm
            .alias("vm")
        )
    node_results = node_results.with_columns((pl.col("vm") - pl.col("nominal_v")).alias("vm_deviation"))
    node_results = node_results.with_columns(
        (pl.col("vm_deviation") / (pl.col("high_voltage_limit") - pl.col("nominal_v"))).alias("deviation_to_max")
    )
    node_results = node_results.with_columns(
        (pl.col("vm_deviation") / (pl.col("nominal_v") - pl.col("low_voltage_limit"))).alias("deviation_to_min")
    )
    node_results = node_results.with_columns(
        pl.when(pl.col("vm_deviation") >= 0)
        .then(pl.col("deviation_to_max"))
        .otherwise(pl.col("deviation_to_min"))  # keep original vm
        .alias("vm_loading")
    )

    failed_node_results = get_failed_node_results_polars(timestep, failed_outages, monitored_buses)

    # TODO: va_loading is not defined yet
    node_results = node_results.cast({"timestep": pl.Int64})

    # TODO: add p and q calculation at the node
    node_results = node_results.with_columns(
        p=pl.lit(float("nan")),  # TODO
        q=pl.lit(float("nan")),  # TODO
        element_name=pl.lit(""),  # will be filled later
        contingency_name=pl.lit(""),  # will be filled later
    )

    node_results = node_results.select(
        [
            "timestep",
            "contingency",
            "element",
            "vm",
            "va",
            "vm_loading",
            "p",
            "q",
            "element_name",
            "contingency_name",
        ]
    )
    all_node_results = pl.concat([node_results, failed_node_results])

    return all_node_results

get_branch_results_polars #

get_branch_results_polars(
    branch_results,
    three_winding_results,
    monitored_branches,
    monitored_trafo3w,
    failed_outages,
    timestep,
    branch_limits,
)

Get the branch results for the given outages and timestep.

PARAMETER DESCRIPTION
branch_results

The branch results from the powsybl security analysis

TYPE: LazyFrame

three_winding_results

The three winding transformer results from the powsybl security analysis

TYPE: LazyFrame

monitored_branches

The list of monitored branches with 2 sides to get the branch results for

TYPE: list[str]

monitored_trafo3w

The list of monitored three winding transformers to get the branch results for

TYPE: list[str]

failed_outages

The list of failed outages to get nan-branch results for

TYPE: list[str]

timestep

The timestep to get the branch results for

TYPE: int

branch_limits

The branch limits from the powsybl network

TYPE: LazyFrame

RETURNS DESCRIPTION
DataFrame[BranchResultSchemaPolars]

The polars branch results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def get_branch_results_polars(
    branch_results: pl.LazyFrame,
    three_winding_results: pl.LazyFrame,
    monitored_branches: list[str],
    monitored_trafo3w: list[str],
    failed_outages: list[str],
    timestep: int,
    branch_limits: pl.LazyFrame,
) -> patpl.LazyFrame[BranchResultSchemaPolars]:
    """Get the branch results for the given outages and timestep.

    Parameters
    ----------
    branch_results : pl.LazyFrame
        The branch results from the powsybl security analysis
    three_winding_results : pl.LazyFrame
        The three winding transformer results from the powsybl security analysis
    monitored_branches : list[str]
        The list of monitored branches with 2 sides to get the branch results for
    monitored_trafo3w : list[str]
        The list of monitored three winding transformers to get the branch results for
    failed_outages : list[str]
        The list of failed outages to get nan-branch results for
    timestep : int
        The timestep to get the branch results for
    branch_limits : pl.LazyFrame
        The branch limits from the powsybl network

    Returns
    -------
    patpl.DataFrame[BranchResultSchemaPolars]
        The polars branch results for the given outages and timestep
    """
    # Align all indizes
    branch_results = branch_results.drop("operator_strategy_id")
    branch_results = branch_results.rename({"contingency_id": "contingency", "branch_id": "element"})
    three_winding_results = three_winding_results.rename({"contingency_id": "contingency", "transformer_id": "element"})

    side_one_results = (
        pl.concat(
            [
                branch_results.select(["contingency", "element", "p1", "q1", "i1"]),
                three_winding_results.select(["contingency", "element", "p1", "q1", "i1"]),
            ]
        )
        .with_columns(side=pl.lit(BranchSide.ONE.value))
        .rename({"p1": "p", "q1": "q", "i1": "i"})
    )
    side_two_results = (
        pl.concat(
            [
                branch_results.select(["contingency", "element", "p2", "q2", "i2"]),
                three_winding_results.select(["contingency", "element", "p2", "q2", "i2"]),
            ]
        )
        .with_columns(side=pl.lit(BranchSide.TWO.value))
        .rename({"p2": "p", "q2": "q", "i2": "i"})
    )
    side_three_results = (
        three_winding_results.select(["contingency", "element", "p3", "q3", "i3"])
        .with_columns(side=pl.lit(BranchSide.THREE.value))
        .rename({"p3": "p", "q3": "q", "i3": "i"})
    )
    # Combine and Add timestep column
    converted_branch_results = pl.concat([side_one_results, side_two_results, side_three_results]).with_columns(
        timestep=pl.lit(timestep)
    )
    converted_branch_results = converted_branch_results.cast({"timestep": pl.Int64, "side": pl.Int64})
    branch_limits = branch_limits.cast({"side": pl.Int64, "value": pl.Float64})

    if not converted_branch_results.limit(1).collect().is_empty():
        converted_branch_results = (
            converted_branch_results.join(
                branch_limits, left_on=["element", "side"], right_on=["element_id", "side"], how="left"
            )  # m:1 join
            .with_columns(loading=pl.col("i") / pl.col("value"))
            .drop("value")
        )
    else:
        # add i column
        converted_branch_results = converted_branch_results.with_columns(i=pl.lit(float("nan")))
        # add loading column
        converted_branch_results = converted_branch_results.with_columns(loading=pl.lit(float("nan")))
        # cast null to str
        converted_branch_results = converted_branch_results.cast({"contingency": pl.String, "element": pl.String})
    # fill loading nulls with nans for loading
    converted_branch_results = converted_branch_results.with_columns(pl.col("loading").fill_null(float("nan")))
    # add empty element_name and contingency_name columns to match the schema
    converted_branch_results = converted_branch_results.with_columns(
        element_name=pl.lit(""),
        contingency_name=pl.lit(""),
    )

    # Add results for non convergent contingencies
    failed_branch_results = get_failed_branch_results_polars(timestep, failed_outages, monitored_branches, monitored_trafo3w)

    converted_branch_results = converted_branch_results.select(
        [
            "timestep",
            "contingency",
            "element",
            "side",
            "p",
            "q",
            "i",
            "loading",
            "element_name",
            "contingency_name",
        ]
    )
    converted_branch_results = pl.concat([converted_branch_results, failed_branch_results])

    return converted_branch_results

get_va_diff_results_polars #

get_va_diff_results_polars(
    bus_results,
    outages,
    va_diff_with_buses,
    bus_map,
    timestep,
)

Get the voltage angle difference results for the given outages and bus results.

PARAMETER DESCRIPTION
bus_results

The dataframe containing the bus results of powsybl contingency analysis.

TYPE: LazyFrame

outages

The list of outages to be considered. These are the contingency ids that are outaged.

TYPE: list[str]

va_diff_with_buses

The dataframe containing the voltage angle difference results with the bus pairs that need checking.

TYPE: LazyFrame

bus_map

A mapping from busbar sections to bus breaker buses. This is used to convert the busbar sections to bus breaker buses in the Node Breaker model.

TYPE: LazyFrame

timestep

The timestep of the results.

TYPE: int

RETURNS DESCRIPTION
VADiffResultSchemaPolars

The dataframe containing the voltage angle difference results for the given outages.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def get_va_diff_results_polars(
    bus_results: pl.LazyFrame, outages: list[str], va_diff_with_buses: pl.LazyFrame, bus_map: pl.LazyFrame, timestep: int
) -> patpl.LazyFrame[VADiffResultSchemaPolars]:
    """Get the voltage angle difference results for the given outages and bus results.

    Parameters
    ----------
    bus_results : pl.LazyFrame
        The dataframe containing the bus results of powsybl contingency analysis.
    outages : list[str]
        The list of outages to be considered. These are the contingency ids that are outaged.
    va_diff_with_buses : pl.LazyFrame
        The dataframe containing the voltage angle difference results with the bus pairs that need checking.
    bus_map: pl.LazyFrame
        A mapping from busbar sections to bus breaker buses. This is used to convert the busbar sections to bus breaker buses
        in the Node Breaker model.
    timestep : int
        The timestep of the results.

    Returns
    -------
    VADiffResultSchemaPolars
        The dataframe containing the voltage angle difference results for the given outages.
    """
    if len(outages) == 0 or bus_results.limit(1).collect().is_empty():
        return (
            pl.from_pandas(get_empty_dataframe_from_model(VADiffResultSchema), include_index=True, nan_to_null=False)
            .lazy()
            .cast({"timestep": pl.Int64, "va_diff": pl.Float64})
        )
    basecase_in_result = ""
    iteration_va_diff = va_diff_with_buses.filter(pl.col("contingency").is_in([basecase_in_result, *outages]))

    iteration_va_diff = iteration_va_diff.with_columns(timestep=pl.lit(timestep).cast(pl.Int64))
    # Map busbar sections where there are any. For the rest use the bus_breaker_bus_id from the results (here the bus id)
    bus_results = bus_results.join(
        bus_map.select("id", "bus_breaker_bus_id"), left_on=["bus_id"], right_on=["id"], how="left"
    )  # m:1 join

    # get the voltage angles for both buses in the va_diff definition
    iteration_va_diff = iteration_va_diff.join(
        bus_results.select("contingency_id", "bus_breaker_bus_id", "v_angle"),
        left_on=["contingency", "bus_breaker_bus1_id"],
        right_on=["contingency_id", "bus_breaker_bus_id"],
        how="left",
    )  # m:1 join
    iteration_va_diff = iteration_va_diff.rename({"v_angle": "v_angle_1"})
    iteration_va_diff = iteration_va_diff.join(
        bus_results.select("contingency_id", "bus_breaker_bus_id", "v_angle"),
        left_on=["contingency", "bus_breaker_bus2_id"],
        right_on=["contingency_id", "bus_breaker_bus_id"],
        how="left",
    )  # m:1 join
    iteration_va_diff = iteration_va_diff.rename({"v_angle": "v_angle_2"})

    # Calculate the voltage angle difference
    iteration_va_diff = iteration_va_diff.with_columns((pl.col("v_angle_1") - pl.col("v_angle_2")).alias("va_diff"))

    # drop duplicates
    iteration_va_diff = iteration_va_diff.unique()

    # add empty element_name and contingency_name columns to match the schema
    iteration_va_diff = iteration_va_diff.with_columns(
        element_name=pl.lit(""),  # will be filled later
        contingency_name=pl.lit(""),  # will be filled later
    )

    iteration_va_diff = iteration_va_diff.select(
        [
            "timestep",
            "contingency",
            "element",
            "va_diff",
            "element_name",
            "contingency_name",
        ]
    )

    return iteration_va_diff

update_basename_polars #

update_basename_polars(result_df, basecase_name=None)

Update the basecase name in the results dataframes.

This function updates the contingency index level of the results dataframes to reflect the basecase name. If the basecase is not included in the run, it will remove it from the results. Powsybl includes the basecase as an empty string by default.

The Dataframes are expected to have a multi-index with a "contingency" level. The Dataframes are updated inplace.

PARAMETER DESCRIPTION
result_df

The dataframe containing the branch / node / VADiff results

TYPE: LoadflowResultTablePolars

basecase_name

The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
LoadflowResultTablePolars

The updated dataframes with the basecase name set or removed.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def update_basename_polars(
    result_df: LoadflowResultTablePolars,
    basecase_name: Optional[str] = None,
) -> LoadflowResultTablePolars:
    """Update the basecase name in the results dataframes.

    This function updates the contingency index level of the results dataframes to
    reflect the basecase name. If the basecase is not included in the run, it will
    remove it from the results. Powsybl includes the basecase as an empty string by default.

    The Dataframes are expected to have a multi-index with a "contingency" level.
    The Dataframes are updated inplace.

    Parameters
    ----------
    result_df: LoadflowResultTablePolars
        The dataframe containing the branch / node / VADiff results
    basecase_name: Optional[str], optional
        The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

    Returns
    -------
    LoadflowResultTablePolars
        The updated dataframes with the basecase name set or removed.
    """
    if basecase_name is not None:
        # Replace the empty string with the basecase name
        result_df = result_df.with_columns(
            pl.when(pl.col("contingency") == "")
            .then(pl.lit(basecase_name))
            .otherwise(pl.col("contingency"))
            .alias("contingency")
        )

    else:
        # Remove the basecase from the results if it is not included in the run
        result_df = result_df.filter(pl.col("contingency") != "")
    return result_df

add_name_column_polars #

add_name_column_polars(
    result_df, name_map, index_level="element"
)

Translate the element ids in the results dataframes to the original names.

This function translates the element names in the results dataframes to the original names from the Powsybl network. This is useful for debugging and for displaying the results.

PARAMETER DESCRIPTION
result_df

The dataframe containing the node / branch / VADiff results

TYPE: LoadflowResultTablePolars

name_map

A mapping from the element ids to the original names. This is used to translate the element names in the results.

TYPE: dict[str, str]

index_level

The index level storing the ids that should be mapped to the names. by default "element" for the monitored elements.

TYPE: str DEFAULT: 'element'

RETURNS DESCRIPTION
LoadflowResultTablePolars

The updated dataframe with the ids translated to the original names.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def add_name_column_polars(
    result_df: LoadflowResultTablePolars,
    name_map: dict[str, str],
    index_level: str = "element",
) -> LoadflowResultTablePolars:
    """Translate the element ids in the results dataframes to the original names.

    This function translates the element names in the results dataframes to the original names
    from the Powsybl network. This is useful for debugging and for displaying the results.

    Parameters
    ----------
    result_df: LoadflowResultTablePolars
        The dataframe containing the node / branch / VADiff results
    name_map: dict[str | str]
        A mapping from the element ids to the original names. This is used to translate the element names in the results.
    index_level: str, optional
        The index level storing the ids that should be mapped to the names. by default "element" for the monitored elements.

    Returns
    -------
    LoadflowResultTablePolars
        The updated dataframe with the ids translated to the original names.
    """
    result_df = result_df.with_columns(
        pl.col(index_level)
        .replace(name_map, default=pl.col(f"{index_level}_name").fill_null(""))
        .alias(f"{index_level}_name")
    )

    # fill nulls with empty string
    result_df = result_df.with_columns(pl.col(f"{index_level}_name").fill_null(""))
    return result_df

get_failed_node_results_polars #

get_failed_node_results_polars(
    timestep, failed_outages, monitored_nodes
)

Get the failed node results for the given outages and timestep.

A wrapper around get_failed_node_results to convert the pandas dataframe to a polars dataframe.

PARAMETER DESCRIPTION
timestep

The timestep to get the node results for

TYPE: int

failed_outages

The list of failed outages to get nan-node results for

TYPE: list[str]

monitored_nodes

The list of monitored nodes to get the node results for

TYPE: list[str]

RETURNS DESCRIPTION
DataFrame[NodeResultSchemaPolars]

The polars dataframe containing the failed node results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def get_failed_node_results_polars(
    timestep: int, failed_outages: list[str], monitored_nodes: list[str]
) -> patpl.LazyFrame[NodeResultSchemaPolars]:
    """Get the failed node results for the given outages and timestep.

    A wrapper around get_failed_node_results to convert the pandas dataframe to a polars dataframe.

    Parameters
    ----------
    timestep : int
        The timestep to get the node results for
    failed_outages : list[str]
        The list of failed outages to get nan-node results for
    monitored_nodes : list[str]
        The list of monitored nodes to get the node results for

    Returns
    -------
    patpl.DataFrame[NodeResultSchemaPolars]
        The polars dataframe containing the failed node results for the given outages and timestep
    """
    failed_node_results = get_failed_node_results(timestep, failed_outages, monitored_nodes)
    failed_node_results = pl.from_pandas(failed_node_results, include_index=True, nan_to_null=False).lazy()
    failed_node_results = failed_node_results.cast({"timestep": pl.Int64})
    return failed_node_results

get_failed_branch_results_polars #

get_failed_branch_results_polars(
    timestep,
    failed_outages,
    monitored_branches,
    monitored_trafo3w,
)

Get the failed branch results for the given outages and timestep.

A wrapper around get_failed_branch_results to convert the pandas dataframe to a polars dataframe.

PARAMETER DESCRIPTION
timestep

The timestep to get the branch results for

TYPE: int

failed_outages

The list of failed outages to get nan-branch results for

TYPE: list[str]

monitored_branches

The list of monitored branches with 2 sides to get the branch results for

TYPE: list[str]

monitored_trafo3w

The list of monitored three winding transformers to get the branch results for

TYPE: list[str]

RETURNS DESCRIPTION
DataFrame[BranchResultSchemaPolars]

The polars dataframe containing the failed branch results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers_polars.py
@pa.check_types
def get_failed_branch_results_polars(
    timestep: int, failed_outages: list[str], monitored_branches: list[str], monitored_trafo3w: list[str]
) -> patpl.LazyFrame[BranchResultSchemaPolars]:
    """Get the failed branch results for the given outages and timestep.

    A wrapper around get_failed_branch_results to convert the pandas dataframe to a polars dataframe.

    Parameters
    ----------
    timestep : int
        The timestep to get the branch results for
    failed_outages : list[str]
        The list of failed outages to get nan-branch results for
    monitored_branches : list[str]
        The list of monitored branches with 2 sides to get the branch results for
    monitored_trafo3w : list[str]
        The list of monitored three winding transformers to get the branch results for

    Returns
    -------
    patpl.DataFrame[BranchResultSchemaPolars]
        The polars dataframe containing the failed branch results for the given outages and timestep
    """
    failed_branch_results = get_failed_branch_results(timestep, failed_outages, monitored_branches, monitored_trafo3w)
    failed_branch_results = pl.from_pandas(failed_branch_results, include_index=True, nan_to_null=False).lazy()
    failed_branch_results = failed_branch_results.cast({"timestep": pl.Int64, "side": pl.Int64})
    return failed_branch_results

toop_engine_contingency_analysis.pypowsybl.powsybl_helpers #

Helper functions to translate the N-1 definition into a usable format for Powsybl.

This includes translating contingencies, monitored elements and collecting the necessary data from the network, so this only has to happen once.

POWSYBL_CONVERGENCE_MAP module-attribute #

POWSYBL_CONVERGENCE_MAP = {
    value: value,
    value: value,
    value: value,
    value: value,
}

PowsyblContingency #

Bases: BaseModel

A Powsybl contingency.

This is a simplified version of the PandapowerContingency that is used in Powsybl. It contains only the necessary information to run an N-1 analysis in Powsybl.

id instance-attribute #

id

The unique id of the contingency.

name class-attribute instance-attribute #

name = ''

The name of the contingency.

elements instance-attribute #

elements

The list of outaged element ids.

is_basecase #

is_basecase()

Check if the contingency is a basecase.

A basecase contingency has no outaged elements.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def is_basecase(self) -> bool:
    """Check if the contingency is a basecase.

    A basecase contingency has no outaged elements.
    """
    return len(self.elements) == 0

PowsyblMonitoredElements #

Bases: TypedDict

A dictionary to hold the monitored element ids for the N-1 analysis.

This is used to store the monitored elements in a format that can be used in Powsybl.

branches instance-attribute #

branches

trafo3w instance-attribute #

trafo3w

switches instance-attribute #

switches

voltage_levels instance-attribute #

voltage_levels

buses instance-attribute #

buses

PowsyblNMinus1Definition #

Bases: BaseModel

A Powsybl N-1 definition.

This is a simplified version of the NMinus1Definition that is used in Powsybl. It contains only the necessary information to run an N-1 analysis in Powsybl.

model_config class-attribute instance-attribute #

model_config = {'arbitrary_types_allowed': True}

contingencies instance-attribute #

contingencies

The outages to be considered. Maps contingency id to outaged element ids.

monitored_elements instance-attribute #

monitored_elements

The list of branches with two sides, to be monitored during the N-1 analysis.

missing_elements class-attribute instance-attribute #

missing_elements = []

A list of monitored elements that are not present in the network.

missing_contingencies class-attribute instance-attribute #

missing_contingencies = []

A list of contingencies whose elements are (partially) not present in the network.

branch_limits instance-attribute #

branch_limits

The branch limits to be used during the N-1 analysis. If None, the default limits will be used.

blank_va_diff instance-attribute #

blank_va_diff

The buses to be used during the N-1 analysis. This is used to determine the voltage levels of the monitored buses. Could be a busbar section or a bus_breaker_buse depending on the model type.

bus_map instance-attribute #

bus_map

A mapping from busbar sections and bus breaker buses to bus breaker buses, electrical buses and voltage_levels. This help to always get the correct buses, even if the model type changes.

element_name_mapping instance-attribute #

element_name_mapping

A mapping from element ids to their names. This is used to convert the element ids to their names in the results.

contingency_name_mapping instance-attribute #

contingency_name_mapping

A mapping from contingency ids to their names. This is used to convert the contingency ids to their names in the results.

voltage_levels instance-attribute #

voltage_levels

The voltage levels of the buses. This is used to determine voltage limits.

distributed_slack class-attribute instance-attribute #

distributed_slack = True

Whether to distribute the slack across the generators in the grid. Only relevant for powsybl grids.

contingency_propagation class-attribute instance-attribute #

contingency_propagation = False

Whether to enable powsybl's contingency propagation in the N-1 analysis.

https://powsybl.readthedocs.io/projects/powsybl-open-loadflow/en/latest/security/parameters.html Security Analysis will determine by topological search the switches with type circuit breakers (i.e. capable of opening fault currents) that must be opened to isolate the fault. Depending on the network structure, this could lead to more equipments to be simulated as tripped, because disconnectors and load break switches (i.e., not capable of opening fault currents) are not considered.

__getitem__ #

__getitem__(key)

Get a subset of the nminus1definition based on the contingencies.

If a string is given, the contingency id must be in the contingencies list. If an integer or slice is given, the case id will be indexed by the integer or slice.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def __getitem__(self, key: str | int | slice) -> "PowsyblNMinus1Definition":
    """Get a subset of the nminus1definition based on the contingencies.

    If a string is given, the contingency id must be in the contingencies list.
    If an integer or slice is given, the case id will be indexed by the integer or slice.
    """
    if isinstance(key, str):
        contingency_ids = [contingency.id for contingency in self.contingencies]
        if key not in contingency_ids:
            raise KeyError(f"Contingency id {key} not in contingencies.")
        index = contingency_ids.index(key)
        index = slice(index, index + 1)
    elif isinstance(key, int):
        index = slice(key, key + 1)
    elif isinstance(key, slice):
        index = key
    else:
        raise TypeError("Key must be a string, int or slice.")

    updated_definition = self.model_copy(
        update={
            "contingencies": self.contingencies[index],
        }
    )
    # pylint: disable=unsubscriptable-object
    return PowsyblNMinus1Definition.model_validate(updated_definition)

translate_contingency_to_powsybl #

translate_contingency_to_powsybl(
    contingencies, identifiables
)

Translate the contingencies to a format that can be used in Powsybl.

PARAMETER DESCRIPTION
contingencies

The list of contingencies to translate.

TYPE: list[Contingency]

identifiables

A dataframe containing the identifiables of the network. This is used to check if the elements are present in the network.

TYPE: DataFrame

RETURNS DESCRIPTION
pow_contingency

A list of PowsyblContingency objects, each containing the id, name and elements.

TYPE: list[PowsyblContingency]

missing_contingency

A list of all contingencies that are not fully present in the network.

TYPE: list[Contingency]

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def translate_contingency_to_powsybl(
    contingencies: list[Contingency], identifiables: pd.Index
) -> tuple[list[PowsyblContingency], list[Contingency]]:
    """Translate the contingencies to a format that can be used in Powsybl.

    Parameters
    ----------
    contingencies : list[Contingency]
        The list of contingencies to translate.
    identifiables : pd.DataFrame
        A dataframe containing the identifiables of the network.
        This is used to check if the elements are present in the network.

    Returns
    -------
    pow_contingency: list[PowsyblContingency]
        A list of PowsyblContingency objects, each containing the id, name and elements.
    missing_contingency: list[Contingency]
        A list of all contingencies that are not fully present in the network.
    """
    pow_contingencies = []
    missing_contingencies = []
    for contingency in contingencies:
        outaged_elements = []
        for element in contingency.elements:
            if element.id not in identifiables:
                missing_contingencies.append(contingency)
                break
            outaged_elements.append(element.id)
        else:
            pp_contingency = PowsyblContingency(
                id=contingency.id,
                name=contingency.name or "",
                elements=outaged_elements,
            )
            pow_contingencies.append(pp_contingency)

    return pow_contingencies, missing_contingencies

translate_monitored_elements_to_powsybl #

translate_monitored_elements_to_powsybl(
    nminus1_definition, branches, buses, switches
)

Translate the monitored elements to a format that can be used in Powsybl.

Also adds busses that are not monitored per se, but are needed for the voltage angle difference calculation.

PARAMETER DESCRIPTION
nminus1_definition

The original Nminus1Definition containing the monitored elements and outages.

TYPE: Nminus1Definition

branches

The dataframe containing the branches of the network and their voltage_id including 3w-trafos.

TYPE: DataFrame

buses

The dataframe containing the buses of the network and their voltage_id. These include busbar sections and bus_breaker buses.

TYPE: DataFrame

switches

The dataframe containing the switches of the network and their voltage_id.

TYPE: DataFrame

RETURNS DESCRIPTION
monitored_elements

A dictionary containing the monitored elements in a format that can be used in Powsybl.

TYPE: PowsyblMonitoredElements

element_name_mapping

A mapping from element ids to their names. This is used to convert the element ids to their names in the results.

TYPE: dict[str, str]

missing_elements

A list of monitored elements that are not present in the network.

TYPE: list[GridElement]

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def translate_monitored_elements_to_powsybl(
    nminus1_definition: Nminus1Definition, branches: pd.DataFrame, buses: pd.DataFrame, switches: pd.DataFrame
) -> tuple[PowsyblMonitoredElements, dict[str, str], list[GridElement]]:
    """Translate the monitored elements to a format that can be used in Powsybl.

    Also adds busses that are not monitored per se, but are needed for the voltage angle difference calculation.

    Parameters
    ----------
    nminus1_definition: Nminus1Definition
        The original Nminus1Definition containing the monitored elements and outages.
    branches : pd.DataFrame
        The dataframe containing the branches of the network and their voltage_id including 3w-trafos.
    buses : pd.DataFrame
        The dataframe containing the buses of the network and their voltage_id.
        These include busbar sections and bus_breaker buses.
    switches : pd.DataFrame
        The dataframe containing the switches of the network and their voltage_id.


    Returns
    -------
    monitored_elements: PowsyblMonitoredElements
        A dictionary containing the monitored elements in a format that can be used in Powsybl.
    element_name_mapping: dict[str, str]
        A mapping from element ids to their names. This is used to convert the element ids to their names in the results.
    missing_elements: list[GridElement]
        A list of monitored elements that are not present in the network.
    """
    monitored_elements = nminus1_definition.monitored_elements
    all_monitored_branches = [element.id for element in monitored_elements if element.kind == "branch"]
    missing_branches = set(all_monitored_branches) - set(branches.index)
    monitored_branch_df = branches.loc[
        [branch_id for branch_id in all_monitored_branches if branch_id not in missing_branches]
    ]
    monitored_branches = monitored_branch_df.loc[
        monitored_branch_df.type.isin(["LINE", "TWO_WINDINGS_TRANSFORMER", "TIE_LINE"])
    ].index.tolist()

    monitored_trafo3w = monitored_branch_df.loc[monitored_branch_df.type == "THREE_WINDINGS_TRANSFORMER"].index.tolist()

    all_monitored_buses = [element.id for element in monitored_elements if element.kind == "bus"]
    missing_buses = set(all_monitored_buses) - set(buses.index)
    monitored_buses = [bus_id for bus_id in all_monitored_buses if bus_id not in missing_buses]

    all_monitored_switches = [element.id for element in monitored_elements if element.kind == "switch"]
    missing_switches = set(all_monitored_switches) - set(switches.index)
    monitored_switches = [switch_id for switch_id in all_monitored_switches if switch_id not in missing_switches]

    # The voltagelevels of outaged branches are relevant for the voltage angle difference calculation.
    all_outaged_branch_ids = [
        elem.id for contingency in nminus1_definition.contingencies for elem in contingency.elements if elem.kind == "branch"
    ]
    missing_outage_branches = set(all_outaged_branch_ids) - set(branches.index)
    outaged_branch_df = branches.loc[
        [branch_id for branch_id in all_outaged_branch_ids if branch_id not in missing_outage_branches]
    ]
    outaged_branch_ids = outaged_branch_df.loc[
        outaged_branch_df.type.isin(["LINE", "TWO_WINDINGS_TRANSFORMER", "TIE_LINE"])
    ].index.tolist()
    monitored_voltage_levels = set(
        buses.loc[monitored_buses, "voltage_level_id"].unique().tolist()
        + switches.loc[monitored_switches, "voltage_level_id"].unique().tolist()
        + branches.loc[monitored_branches + outaged_branch_ids, "voltage_level1_id"].unique().tolist()
        + branches.loc[monitored_branches + outaged_branch_ids, "voltage_level2_id"].unique().tolist()
    )
    powsybl_monitored_elements = PowsyblMonitoredElements(
        branches=monitored_branches,
        trafo3w=monitored_trafo3w,
        switches=monitored_switches,
        buses=monitored_buses,
        voltage_levels=list(monitored_voltage_levels),
    )

    element_name_mapping = {element.id: element.name or "" for element in monitored_elements}
    missing_element_ids = missing_branches | missing_buses | missing_switches
    missing_elements = [element for element in monitored_elements if element.id in missing_element_ids]
    return powsybl_monitored_elements, element_name_mapping, missing_elements

prepare_branch_limits #

prepare_branch_limits(
    branch_limits, chosen_limit, monitored_branches
)

Prepare the branch limits for the N-1 analysis.

This is done here, to avoid having to do this in every process.

PARAMETER DESCRIPTION
branch_limits

The dataframe containing the branch limits of the network.

TYPE: DataFrame

chosen_limit

The name of the limit to be used for the N-1 analysis. This is usually "permanent_limit".

TODO Decide if and how this could be extended to other limits.#

TYPE: str

monitored_branches

The list of branches to be monitored during the N-1 analysis.

TYPE: list[str]

RETURNS DESCRIPTION
branch_limits

The dataframe containing the branch limits for the N-1 analysis in the right format.

TYPE: DataFrame

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def prepare_branch_limits(branch_limits: pd.DataFrame, chosen_limit: str, monitored_branches: list[str]) -> pd.DataFrame:
    """Prepare the branch limits for the N-1 analysis.

    This is done here, to avoid having to do this in every process.

    Parameters
    ----------
    branch_limits : pd.DataFrame
        The dataframe containing the branch limits of the network.
    chosen_limit : str
        The name of the limit to be used for the N-1 analysis. This is usually "permanent_limit".
        #TODO Decide if and how this could be extended to other limits.
    monitored_branches : list[str]
        The list of branches to be monitored during the N-1 analysis.

    Returns
    -------
    branch_limits : pd.DataFrame
        The dataframe containing the branch limits for the N-1 analysis in the right format.
    """
    translated_limits = branch_limits.reset_index()
    chosen_limit_type = translated_limits["name"] == chosen_limit
    limit_monitored = translated_limits["element_id"].isin(monitored_branches)
    translated_limits = translated_limits[chosen_limit_type & limit_monitored]
    translated_limits["side"] = translated_limits["side"].map({"ONE": 1, "TWO": 2, "THREE": 3})
    return translated_limits.groupby(by=["element_id", "side"]).min()[["value"]]

get_blank_va_diff #

get_blank_va_diff(
    all_outages, single_branch_outages, monitored_switches
)

Get a blank dataframe for the voltage angle difference results.

This already includes all possible contingencies and monitored switches. The buses of the switches and the outaged branches are added later.

PARAMETER DESCRIPTION
all_outages

The list of all outages to be considered. For all of these cases, all switches need to be checked

TYPE: list[str]

single_branch_outages

A dictionary mapping contingency ids to single outaged element ids. For all of these cases, the specific outaged branch need to be checked.

TYPE: dict[str, str]

monitored_switches

The list of monitored switches to be considered. These are only the switches that are open and retained.

TYPE: list[str]

RETURNS DESCRIPTION
DataFrame

A blank dataframe with the correct index for the voltage angle difference results. The index is a MultiIndex with the following levels: - timestep: The timestep of the results - contingency: The contingency id (including an empty string for the base case) - element: The element id (the switch or outaged branch)

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def get_blank_va_diff(
    all_outages: list[str], single_branch_outages: dict[str, str], monitored_switches: list[str]
) -> pd.DataFrame:
    """Get a blank dataframe for the voltage angle difference results.

    This already includes all possible contingencies and monitored switches.
    The buses of the switches and the outaged branches are added later.

    Parameters
    ----------
    all_outages : list[str]
        The list of all outages to be considered. For all of these cases, all switches need to be checked
    single_branch_outages : dict[str, str]
        A dictionary mapping contingency ids to single outaged element ids.
        For all of these cases, the specific outaged branch need to be checked.
    monitored_switches : list[str]
        The list of monitored switches to be considered. These are only the switches that are open and retained.

    Returns
    -------
    pd.DataFrame
        A blank dataframe with the correct index for the voltage angle difference results.
        The index is a MultiIndex with the following levels:
        - timestep: The timestep of the results
        - contingency: The contingency id (including an empty string for the base case)
        - element: The element id (the switch or outaged branch)
    """
    basecase_in_result = ""
    switch_va_diff_df = pd.DataFrame(
        index=pd.MultiIndex.from_product(
            [
                [basecase_in_result, *all_outages],  # Add the empty string for the basecase
                monitored_switches,
            ],
            names=["contingency", "element"],
        )
    )
    outage_va_diff_df = pd.DataFrame(
        index=pd.MultiIndex.from_product(
            [
                single_branch_outages.keys(),
            ],
            names=["contingency"],
        )
    )
    outage_va_diff_df["element"] = single_branch_outages.values()
    outage_va_diff_df.set_index(["element"], append=True, inplace=True)
    blank_va_diff_df = pd.concat([switch_va_diff_df, outage_va_diff_df], axis=0)
    return blank_va_diff_df

get_blank_va_diff_with_buses #

get_blank_va_diff_with_buses(
    branches,
    switches,
    pow_contingencies,
    monitored_switches,
)

Get a blank dataframe for the voltage angle difference results with the buspairs that need checking.

The buspairs are bus_breaker_buses (net.get_bus_breaker_view_buses)

PARAMETER DESCRIPTION
branches

The dataframe containing the branches of the network and their bus_breaker_buses.

TYPE: DataFrame

switches

The dataframe containing the switches of the network and their bus_breaker_buses.

TYPE: DataFrame

pow_contingencies

The list of all contingencies to be considered. For all of these cases, all switches need to be checked. For single outages we also consider the outaged branches.

TYPE: list[PowsyblContingency]

monitored_switches

The list of monitored switches to be considered. These are only the switches that are open and retained.

TYPE: list[str]

RETURNS DESCRIPTION
DataFrame

A blank dataframe with the correct index for the voltage angle difference results. The index is a MultiIndex with the following levels: - contingency: The contingency id (including an empty string for the base case) - element: The element id (the switch or outaged branch) - bus_breaker_bus1_id: The first bus_breaker_bus_id of the element - bus_breaker_bus2_id: The second bus_breaker_bus_id of the element

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def get_blank_va_diff_with_buses(
    branches: pd.DataFrame,
    switches: pd.DataFrame,
    pow_contingencies: list[PowsyblContingency],
    monitored_switches: list[str],
) -> pd.DataFrame:
    """Get a blank dataframe for the voltage angle difference results with the buspairs that need checking.

    The buspairs are bus_breaker_buses (net.get_bus_breaker_view_buses)

    Parameters
    ----------
    branches : pd.DataFrame
        The dataframe containing the branches of the network and their bus_breaker_buses.
    switches : pd.DataFrame
        The dataframe containing the switches of the network and their bus_breaker_buses.
    pow_contingencies: list[PowsyblContingency]
        The list of all contingencies to be considered. For all of these cases, all switches need to be checked.
        For single outages we also consider the outaged branches.
    monitored_switches : list[str]
        The list of monitored switches to be considered. These are only the switches that are open and retained.

    Returns
    -------
    pd.DataFrame
        A blank dataframe with the correct index for the voltage angle difference results.
        The index is a MultiIndex with the following levels:
        - contingency: The contingency id (including an empty string for the base case)
        - element: The element id (the switch or outaged branch)
        - bus_breaker_bus1_id: The first bus_breaker_bus_id of the element
        - bus_breaker_bus2_id: The second bus_breaker_bus_id of the element

    """
    branch_indizes = branches.query("type in ['LINE', 'TWO_WINDINGS_TRANSFORMER', 'TIE_LINE']").index
    single_contingencies = [contingency for contingency in pow_contingencies if len(contingency.elements) == 1]
    single_branch_outages = {
        contingency.id: contingency.elements[0]
        for contingency in single_contingencies
        if contingency.elements[0] in branch_indizes
    }
    branches = branches.loc[single_branch_outages.values()][["bus_breaker_bus1_id", "bus_breaker_bus2_id"]]
    switches = switches.loc[monitored_switches]
    switches = switches[switches.open & switches.retained][["bus_breaker_bus1_id", "bus_breaker_bus2_id"]]
    element_df = pd.concat([branches, switches], axis=0)
    all_outage_ids = [contingency.id for contingency in pow_contingencies if len(contingency.elements) > 0]
    blank_va_diff = get_blank_va_diff(all_outage_ids, single_branch_outages, switches.index.tolist())

    va_diff_with_buses = blank_va_diff.merge(
        element_df, left_on=blank_va_diff.index.get_level_values("element"), right_index=True, how="left"
    ).drop(columns="key_0")
    return va_diff_with_buses

get_va_diff_results #

get_va_diff_results(
    bus_results,
    outages,
    va_diff_with_buses,
    bus_map,
    timestep,
)

Get the voltage angle difference results for the given outages and bus results.

PARAMETER DESCRIPTION
bus_results

The dataframe containing the bus results of powsybl contingency analysis.

TYPE: DataFrame

outages

The list of outages to be considered. These are the contingency ids that are outaged.

TYPE: list[str]

va_diff_with_buses

The dataframe containing the voltage angle difference results with the bus pairs that need checking.

TYPE: DataFrame

bus_map

A mapping from busbar sections to bus breaker buses. This is used to convert the busbar sections to bus breaker buses in the Node Breaker model.

TYPE: DataFrame

timestep

The timestep of the results.

TYPE: int

RETURNS DESCRIPTION
DataFrame

The dataframe containing the voltage angle difference results for the given outages.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types
def get_va_diff_results(
    bus_results: pd.DataFrame, outages: list[str], va_diff_with_buses: pd.DataFrame, bus_map: pd.DataFrame, timestep: int
) -> pat.DataFrame[VADiffResultSchema]:
    """Get the voltage angle difference results for the given outages and bus results.

    Parameters
    ----------
    bus_results : pd.DataFrame
        The dataframe containing the bus results of powsybl contingency analysis.
    outages : list[str]
        The list of outages to be considered. These are the contingency ids that are outaged.
    va_diff_with_buses : pd.DataFrame
        The dataframe containing the voltage angle difference results with the bus pairs that need checking.
    bus_map: pd.DataFrame
        A mapping from busbar sections to bus breaker buses. This is used to convert the busbar sections to bus breaker buses
        in the Node Breaker model.
    timestep : int
        The timestep of the results.

    Returns
    -------
    pd.DataFrame
        The dataframe containing the voltage angle difference results for the given outages.
    """
    if len(outages) == 0 or len(va_diff_with_buses) == 0:
        return get_empty_dataframe_from_model(VADiffResultSchema)
    basecase_in_result = ""
    iteration_va_diff = va_diff_with_buses.loc[
        va_diff_with_buses.index.get_level_values("contingency").isin([basecase_in_result, *outages])
    ]
    iteration_va_diff["timestep"] = timestep
    # Map busbar sections where there are any. For the rest use the bus_breaker_bus_id from the results (here the bus id)
    bus_results = bus_results.merge(
        bus_map.bus_breaker_bus_id, left_on=bus_results.index.get_level_values("bus_id"), right_index=True, how="left"
    )

    iteration_va_diff = iteration_va_diff.reset_index()
    # Map the values from the results to the buses of the switches and the outaged branches
    iteration_va_diff = iteration_va_diff.merge(
        bus_results[["v_angle"]].add_suffix("_1"),
        left_on=["contingency", "bus_breaker_bus1_id"],
        right_on=[bus_results.index.get_level_values("contingency_id"), bus_results.bus_breaker_bus_id],
        how="left",
    )
    iteration_va_diff = iteration_va_diff.merge(
        bus_results[["v_angle"]].add_suffix("_2"),
        left_on=["contingency", "bus_breaker_bus2_id"],
        right_on=[bus_results.index.get_level_values("contingency_id"), bus_results.bus_breaker_bus_id],
        how="left",
    )
    iteration_va_diff.drop_duplicates(inplace=True)
    iteration_va_diff.set_index(["timestep", "contingency", "element"], inplace=True)
    iteration_va_diff["va_diff"] = iteration_va_diff["v_angle_1"] - iteration_va_diff["v_angle_2"]

    iteration_va_diff = iteration_va_diff.drop(
        columns=["bus_breaker_bus1_id", "bus_breaker_bus2_id", "v_angle_1", "v_angle_2"]
    )

    # set empty columns to NaN
    iteration_va_diff["element_name"] = ""
    iteration_va_diff["contingency_name"] = ""

    return iteration_va_diff

get_busbar_mapping #

get_busbar_mapping(net)

Get a map between the different kind of buses in the network.

Maps busbar sections and bus breaker buses to bus breaker buses and electrical buses. Maps the electrical buses to monitored buses (either bus breaker or busbar sections).

PARAMETER DESCRIPTION
net

The Powsybl network to use for the translation. This is used to get the busbar sections and bus breaker buses.

TYPE: Network

RETURNS DESCRIPTION
DataFrame

A dataframe containing the busbar mapping from busbar sections to bus breaker buses.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def get_busbar_mapping(net: Network) -> pd.DataFrame:
    """Get a map between the different kind of buses in the network.

    Maps busbar sections and bus breaker buses to bus breaker buses and electrical buses.
    Maps the electrical buses to monitored buses (either bus breaker or busbar sections).

    Parameters
    ----------
    net : Network
        The Powsybl network to use for the translation. This is used to get the busbar sections and bus breaker buses.

    Returns
    -------
    pd.DataFrame
        A dataframe containing the busbar mapping from busbar sections to bus breaker buses.
    """
    busbar_sections = net.get_injections(attributes=["type", "bus_breaker_bus_id", "bus_id", "voltage_level_id"]).query(
        "type == 'BUSBAR_SECTION'"
    )
    mapping_cols = ["bus_breaker_bus_id", "bus_id", "voltage_level_id"]
    bus_breaker_buses = net.get_bus_breaker_view_buses(attributes=["voltage_level_id", "bus_id"])
    bus_breaker_buses["bus_breaker_bus_id"] = bus_breaker_buses.index
    bus_map = pd.concat([busbar_sections[mapping_cols], bus_breaker_buses[mapping_cols]], axis=0)
    return bus_map

translate_nminus1_for_powsybl #

translate_nminus1_for_powsybl(n_minus_1_definition, net)

Translate the N-1 definition to a format that can be used in Powsybl.

PARAMETER DESCRIPTION
n_minus_1_definition

The N-1 definition to translate.

TYPE: Nminus1Definition

net

The Powsybl network to use for the translation. This is used to get the busbarsections, buses, branches and switches.

TYPE: Network

RETURNS DESCRIPTION
PowsyblNMinus1Definition

The translated N-1 definition that can be used in Powsybl.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def translate_nminus1_for_powsybl(n_minus_1_definition: Nminus1Definition, net: Network) -> PowsyblNMinus1Definition:
    """Translate the N-1 definition to a format that can be used in Powsybl.

    Parameters
    ----------
    n_minus_1_definition : Nminus1Definition
        The N-1 definition to translate.
    net : Network
        The Powsybl network to use for the translation. This is used to get the busbarsections, buses, branches and switches.

    Returns
    -------
    PowsyblNMinus1Definition
        The translated N-1 definition that can be used in Powsybl.
    """
    id_type = n_minus_1_definition.id_type or "powsybl"
    # By default we assume the id_type is powsybl. This works for all powsybl identifiables
    if id_type not in (supported_ids := get_args(POWSYBL_SUPPORTED_ID_TYPES)):
        raise ValueError(
            f"Unsupported id_type {n_minus_1_definition.id_type}. Only {supported_ids} are supported for Powsybl."
        )

    # Load data once from the network
    busmap = get_busbar_mapping(net)
    voltage_levels = net.get_voltage_levels(attributes=["nominal_v", "high_voltage_limit", "low_voltage_limit"])
    voltage_levels["high_voltage_limit"] = voltage_levels["high_voltage_limit"].fillna(voltage_levels["nominal_v"] * 1.2)
    voltage_levels["low_voltage_limit"] = voltage_levels["low_voltage_limit"].fillna(voltage_levels["nominal_v"] * 0.8)

    branch_limits = net.get_operational_limits().query("type=='CURRENT'")
    branches = net.get_branches(
        attributes=["type", "voltage_level1_id", "voltage_level2_id", "bus_breaker_bus1_id", "bus_breaker_bus2_id"]
    )
    trafo3ws = net.get_3_windings_transformers(
        attributes=[
            "voltage_level1_id",
            "voltage_level2_id",
            "voltage_level3_id",
            "bus_breaker_bus1_id",
            "bus_breaker_bus2_id",
            "bus_breaker_bus3_id",
        ]
    ).assign(type="THREE_WINDINGS_TRANSFORMER")
    all_branches = pd.concat([branches, trafo3ws], axis=0)
    switches = net.get_switches(
        attributes=["open", "retained", "voltage_level_id", "bus_breaker_bus1_id", "bus_breaker_bus2_id"]
    )
    trafo3ws = net.get_3_windings_transformers(
        attributes=[
            "voltage_level1_id",
            "voltage_level2_id",
            "voltage_level3_id",
            "bus_breaker_bus1_id",
            "bus_breaker_bus2_id",
            "bus_breaker_bus3_id",
        ]
    )
    identifiables = net.get_identifiables(attributes=[]).index
    pow_contingencies, missing_contingencies = translate_contingency_to_powsybl(
        n_minus_1_definition.contingencies, identifiables
    )
    contingency_name_map = {contingency.id: contingency.name or "" for contingency in n_minus_1_definition.contingencies}
    (monitored_elements, element_name_map, missing_elements) = translate_monitored_elements_to_powsybl(
        n_minus_1_definition, all_branches, busmap, switches
    )

    # create an empty dataframe with the correct index
    va_diff_with_buses = get_blank_va_diff_with_buses(branches, switches, pow_contingencies, monitored_elements["switches"])
    branch_limits = prepare_branch_limits(branch_limits, "permanent_limit", monitored_elements["branches"])
    return PowsyblNMinus1Definition(
        contingencies=pow_contingencies,
        blank_va_diff=va_diff_with_buses,
        monitored_elements=monitored_elements,
        branch_limits=branch_limits,
        bus_map=busmap,
        element_name_mapping=element_name_map,
        contingency_name_mapping=contingency_name_map,
        voltage_levels=voltage_levels,
        distributed_slack=n_minus_1_definition.loadflow_parameters.distributed_slack,
        missing_elements=missing_elements,
        missing_contingencies=missing_contingencies,
        contingency_propagation=n_minus_1_definition.loadflow_parameters.contingency_propagation,
    )

get_regulating_element_results #

get_regulating_element_results(
    monitored_buses, timestep, basecase_name=None
)

Get the regulating element results for the given outages and timestep.

TODO: This is a fake implementation, we need to get the real results from the powsybl security analysis

PARAMETER DESCRIPTION
monitored_buses

The list of monitored buses to get the regulating element results for

TYPE: list[str]

timestep

The timestep to get the regulating element results for

TYPE: int

basecase_name

The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
DataFrame[RegulatingElementResultSchema]

The regulating element results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types
def get_regulating_element_results(
    monitored_buses: list[str], timestep: int, basecase_name: str | None = None
) -> pat.DataFrame[RegulatingElementResultSchema]:
    """Get the regulating element results for the given outages and timestep.

    TODO: This is a fake implementation, we need to get the real results from the powsybl security analysis

    Parameters
    ----------
    monitored_buses : list[str]
        The list of monitored buses to get the regulating element results for
    timestep : int
        The timestep to get the regulating element results for
    basecase_name : str | None, optional
        The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

    Returns
    -------
    pat.DataFrame[RegulatingElementResultSchema]
        The regulating element results for the given outages and timestep
    """
    regulating_elements = get_empty_dataframe_from_model(RegulatingElementResultSchema)
    # TODO dont fake this
    if basecase_name and len(monitored_buses) > 0:
        regulating_elements.loc[(timestep, basecase_name, monitored_buses[0]), "value"] = -9999.0
        regulating_elements.loc[(timestep, basecase_name, monitored_buses[0]), "regulating_element_type"] = (
            RegulatingElementType.GENERATOR_Q.value
        )
        regulating_elements.loc[(timestep, basecase_name, monitored_buses[0]), "value"] = 9999.0
        regulating_elements.loc[(timestep, basecase_name, monitored_buses[0]), "regulating_element_type"] = (
            RegulatingElementType.SLACK_P.value
        )
    return regulating_elements

get_node_results #

get_node_results(
    bus_results,
    monitored_buses,
    bus_map,
    voltage_levels,
    failed_outages,
    timestep,
    method,
)

Get the node results for the given outages and timestep.

TODO: This is currently faking the sum of p and q at the node

PARAMETER DESCRIPTION
bus_results

The bus results from the powsybl security analysis

TYPE: DataFrame

monitored_buses

The list of monitored buses to get the node results for

TYPE: list[str]

bus_map

A mapping from busbar sections or bus_breaker_buses to the electrical buses. This is used to map the buses from bus_results to electrical buses and back to the monitored buses.

TYPE: DataFrame

voltage_levels

The voltage levels of the buses. This is used to determine voltage limits and nominal v in DC.

TYPE: DataFrame

failed_outages

The list of failed outages to get nan-node results for

TYPE: list[str]

timestep

The timestep to get the node results for

TYPE: int

method

The method to use for the node results. Either "ac" or "dc"

TYPE: Literal[ac, dc]

RETURNS DESCRIPTION
DataFrame[NodeResultSchema]

The node results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types
def get_node_results(
    bus_results: pd.DataFrame,
    monitored_buses: list[str],
    bus_map: pd.DataFrame,
    voltage_levels: pd.DataFrame,
    failed_outages: list[str],
    timestep: int,
    method: Literal["ac", "dc"],
) -> pat.DataFrame[NodeResultSchema]:
    """Get the node results for the given outages and timestep.

    TODO: This is currently faking the sum of p and q at the node

    Parameters
    ----------
    bus_results : pd.DataFrame
        The bus results from the powsybl security analysis
    monitored_buses : list[str]
        The list of monitored buses to get the node results for
    bus_map: pd.DataFrame,
        A mapping from busbar sections or bus_breaker_buses to the electrical buses.
        This is used to map the buses from bus_results to electrical buses and back to the monitored buses.
    voltage_levels: pd.DataFrame,
        The voltage levels of the buses. This is used to determine
        voltage limits and nominal v in DC.
    failed_outages : list[str]
        The list of failed outages to get nan-node results for
    timestep : int
        The timestep to get the node results for
    method : Literal["ac", "dc"]
        The method to use for the node results. Either "ac" or "dc"

    Returns
    -------
    pat.DataFrame[NodeResultSchema]
        The node results for the given outages and timestep
    """
    if bus_results.empty:
        return get_failed_node_results(timestep, failed_outages, monitored_buses)
    # Translate bus_ids that could be busbar sections or bus_breaker_buses to the monitored buses
    # Should work for both busbar and bus_breaker models
    node_results = deepcopy(bus_results)
    node_results["bus_breaker_bus_id"] = node_results.index.get_level_values("bus_id").map(bus_map.bus_breaker_bus_id)
    node_results = node_results.dropna(subset=["bus_breaker_bus_id"])
    monitored_bus_map = bus_map.loc[monitored_buses]
    bus_to_element_map = pd.DataFrame(
        data={"element": monitored_bus_map.index.values}, index=monitored_bus_map.bus_breaker_bus_id.values
    )
    node_results = node_results.merge(bus_to_element_map, right_index=True, left_on="bus_breaker_bus_id")

    # Merge the actual voltage level in kV
    voltage_columns = voltage_levels.columns.to_list()
    node_results[voltage_columns] = voltage_levels.loc[node_results.index.get_level_values("voltage_level_id")].values
    node_results = node_results.assign(timestep=0)
    node_results.index = pd.MultiIndex.from_arrays(
        [
            node_results.timestep.values,
            node_results.index.get_level_values("contingency_id").values,
            node_results.element.values,
        ],
        names=["timestep", "contingency", "element"],
    )

    node_results.rename(columns={"v_mag": "vm", "v_angle": "va"}, inplace=True)

    # Calculate the values
    if method == "dc":
        has_va = node_results["va"].notna().values
        node_results.loc[has_va, "vm"] = node_results.loc[has_va, "nominal_v"]
    vm_deviation = node_results["vm"].values - node_results["nominal_v"].values
    deviation_to_max = vm_deviation / (node_results["high_voltage_limit"].values - node_results["nominal_v"].values)
    deviation_to_min = vm_deviation / (node_results["nominal_v"].values - node_results["low_voltage_limit"].values)
    higher_voltage = vm_deviation > 0
    node_results.loc[higher_voltage, "vm_loading"] = deviation_to_max[higher_voltage]
    node_results.loc[~higher_voltage, "vm_loading"] = deviation_to_min[~higher_voltage]
    # TODO Add sum of p and q at the node
    failed_node_results = get_failed_node_results(timestep, failed_outages, monitored_buses)

    all_node_results = pd.concat([node_results, failed_node_results], axis=0)[["vm", "va", "vm_loading"]]

    # set empty dataframe columns to NaN
    all_node_results["p"] = np.nan
    all_node_results["q"] = np.nan
    all_node_results["element_name"] = ""
    all_node_results["contingency_name"] = ""

    return all_node_results

get_branch_results #

get_branch_results(
    branch_results,
    three_winding_results,
    monitored_branches,
    monitored_trafo3w,
    failed_outages,
    timestep,
    branch_limits,
)

Get the branch results for the given outages and timestep.

PARAMETER DESCRIPTION
branch_results

The branch results from the powsybl security analysis

TYPE: DataFrame

three_winding_results

The three winding transformer results from the powsybl security analysis

TYPE: DataFrame

monitored_branches

The list of monitored branches with 2 sides to get the branch results for

TYPE: list[str]

monitored_trafo3w

The list of monitored three winding transformers to get the branch results for

TYPE: list[str]

failed_outages

The list of failed outages to get nan-branch results for

TYPE: list[str]

timestep

The timestep to get the branch results for

TYPE: int

branch_limits

The branch limits from the powsybl network

TYPE: DataFrame

RETURNS DESCRIPTION
DataFrame[BranchResultSchema]

The branch results for the given outages and timestep

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types
def get_branch_results(
    branch_results: pd.DataFrame,
    three_winding_results: pd.DataFrame,
    monitored_branches: list[str],
    monitored_trafo3w: list[str],
    failed_outages: list[str],
    timestep: int,
    branch_limits: pd.DataFrame,
) -> pat.DataFrame[BranchResultSchema]:
    """Get the branch results for the given outages and timestep.

    Parameters
    ----------
    branch_results : pd.DataFrame
        The branch results from the powsybl security analysis
    three_winding_results : pd.DataFrame
        The three winding transformer results from the powsybl security analysis
    monitored_branches : list[str]
        The list of monitored branches with 2 sides to get the branch results for
    monitored_trafo3w : list[str]
        The list of monitored three winding transformers to get the branch results for
    failed_outages : list[str]
        The list of failed outages to get nan-branch results for
    timestep : int
        The timestep to get the branch results for
    branch_limits : pd.DataFrame
        The branch limits from the powsybl network

    Returns
    -------
    pat.DataFrame[BranchResultSchema]
        The branch results for the given outages and timestep
    """
    # Align all indizes
    branch_results = branch_results.droplevel("operator_strategy_id")
    branch_results.index.rename({"contingency_id": "contingency", "branch_id": "element"}, inplace=True)
    three_winding_results.index.rename({"contingency_id": "contingency", "transformer_id": "element"}, inplace=True)

    side_one_results = (
        pd.concat([branch_results[["p1", "q1", "i1"]], three_winding_results[["p1", "q1", "i1"]]], axis=0)
        .assign(side=BranchSide.ONE.value)
        .rename(columns={"p1": "p", "q1": "q", "i1": "i"})
    )
    side_two_results = (
        pd.concat([branch_results[["p2", "q2", "i2"]], three_winding_results[["p2", "q2", "i2"]]], axis=0)
        .assign(side=BranchSide.TWO.value)
        .rename(columns={"p2": "p", "q2": "q", "i2": "i"})
    )
    side_three_results = (
        three_winding_results[["p3", "q3", "i3"]]
        .assign(side=BranchSide.THREE.value)
        .rename(columns={"p3": "p", "q3": "q", "i3": "i"})
    )
    # Combine and Add timestep column
    converted_branch_results = pd.concat([side_one_results, side_two_results, side_three_results], axis=0).assign(
        timestep=timestep
    )
    converted_branch_results = converted_branch_results.set_index(["side", "timestep"], append=True)
    converted_branch_results = converted_branch_results.reorder_levels(["timestep", "contingency", "element", "side"])

    # Add missing MultiIndex levels
    # divide current flow by current limits, but only keep the rows, that were there before
    indexer = pd.MultiIndex.from_arrays(
        [converted_branch_results.index.get_level_values("element"), converted_branch_results.index.get_level_values("side")]
    )
    converted_branch_results["loading"] = converted_branch_results["i"].values / branch_limits.reindex(indexer).value.values

    # Add results for non convergent contingencies
    failed_branch_results = get_failed_branch_results(timestep, failed_outages, monitored_branches, monitored_trafo3w)

    converted_branch_results = pd.concat([converted_branch_results, failed_branch_results], axis=0)
    return converted_branch_results

get_convergence_result_df #

get_convergence_result_df(
    post_contingency_results,
    pre_contingency_result,
    outages,
    timestep,
    basecase_name=None,
)

Get the convergence dataframe for the given outages and timestep.

PARAMETER DESCRIPTION
post_contingency_results

The post contingency results from the powsybl security analysis. Maps contingency id to PostContingencyResult.

TYPE: dict[str, PostContingencyResult]

pre_contingency_result

The pre contingency result from the powsybl security analysis. Holds the Basecase.

TYPE: PreContingencyResult

outages

The list of outages to get the convergence results for

TYPE: list[str]

timestep

The timestep to get the convergence results for

TYPE: int

basecase_name

The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
DataFrame[ConvergedSchema]

The convergence dataframe for the given outages and timestep

list[str]

The list of failed outages

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types
def get_convergence_result_df(
    post_contingency_results: dict[str, PostContingencyResult],
    pre_contingency_result: PreContingencyResult,
    outages: list[str],
    timestep: int,
    basecase_name: Optional[str] = None,
) -> tuple[pat.DataFrame[ConvergedSchema], list[str]]:
    """Get the convergence dataframe for the given outages and timestep.

    Parameters
    ----------
    post_contingency_results: dict[str, PostContingencyResult],
        The post contingency results from the powsybl security analysis.
        Maps contingency id to PostContingencyResult.
    pre_contingency_result : PreContingencyResult
        The pre contingency result from the powsybl security analysis. Holds the Basecase.
    outages : list[str]
        The list of outages to get the convergence results for
    timestep : int
        The timestep to get the convergence results for
    basecase_name : Optional[str], optional
        The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

    Returns
    -------
    pat.DataFrame[ConvergedSchema]
        The convergence dataframe for the given outages and timestep
    list[str]
        The list of failed outages
    """
    converge_converted_df = pd.DataFrame(index=outages)
    converge_converted_df.index.name = "contingency"
    converge_converted_df["timestep"] = timestep
    converge_converted_df.set_index(["timestep"], inplace=True, append=True)
    converge_converted_df = converge_converted_df.reorder_levels(["timestep", "contingency"], axis=0)
    converge_converted_df["status"] = [
        post_contingency_results[contingency].status.value
        if contingency in post_contingency_results
        else pypowsybl.loadflow.ComponentStatus.NO_CALCULATION.value
        for contingency in outages
    ]
    converge_converted_df.status = converge_converted_df["status"].map(POWSYBL_CONVERGENCE_MAP)
    failed_outages = [
        outage
        for outage, success in zip(outages, converge_converted_df.status.values == "CONVERGED", strict=True)
        if not success
    ]

    if basecase_name is not None:
        # Add the basecase to the convergence dataframe
        converge_converted_df.loc[(timestep, basecase_name), "status"] = POWSYBL_CONVERGENCE_MAP[
            pre_contingency_result.status.value
        ]

    converge_converted_df["iteration_count"] = np.nan
    converge_converted_df["warnings"] = ""
    converge_converted_df["contingency_name"] = ""

    return converge_converted_df, failed_outages

update_basename #

update_basename(result_df, basecase_name=None)

Update the basecase name in the results dataframes.

This function updates the contingency index level of the results dataframes to reflect the basecase name. If the basecase is not included in the run, it will remove it from the results. Powsybl includes the basecase as an empty string by default.

The Dataframes are expected to have a multi-index with a "contingency" level. The Dataframes are updated inplace.

PARAMETER DESCRIPTION
result_df

The dataframe containing the branch / node / VADiff results

TYPE: LoadflowResultTable

basecase_name

The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
LOADFLOW_RESULT_TABLE

The updated dataframes with the basecase name set or removed.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types(inplace=True)
def update_basename(
    result_df: LoadflowResultTable,
    basecase_name: Optional[str] = None,
) -> LoadflowResultTable:
    """Update the basecase name in the results dataframes.

    This function updates the contingency index level of the results dataframes to
    reflect the basecase name. If the basecase is not included in the run, it will
    remove it from the results. Powsybl includes the basecase as an empty string by default.

    The Dataframes are expected to have a multi-index with a "contingency" level.
    The Dataframes are updated inplace.

    Parameters
    ----------
    result_df: LOADFLOW_RESULT_TABLE
        The dataframe containing the branch / node / VADiff results
    basecase_name: Optional[str], optional
        The name of the basecase contingency, if it is included in the run. Otherwise None, by default None

    Returns
    -------
    LOADFLOW_RESULT_TABLE
        The updated dataframes with the basecase name set or removed.
    """
    contingency_index_level = result_df.index.names.index("contingency")
    if basecase_name is not None:
        result_df.index = result_df.index.set_levels(
            result_df.index.levels[contingency_index_level].map(lambda x: basecase_name if x == "" else x),
            level=contingency_index_level,
        )
        return result_df
    result_df.drop("", level=contingency_index_level, axis=0, inplace=True, errors="ignore")
    return result_df

add_name_column #

add_name_column(result_df, name_map, index_level='element')

Translate the element ids in the results dataframes to the original names.

This function translates the element names in the results dataframes to the original names from the Powsybl network. This is useful for debugging and for displaying the results.

PARAMETER DESCRIPTION
result_df

The dataframe containing the node / branch / VADiff results

TYPE: LoadflowResultTable

name_map

A mapping from the element ids to the original names. This is used to translate the element names in the results.

TYPE: dict[str, str]

index_level

The index level storing the ids that should be mapped to the names. by default "element" for the monitored elements.

TYPE: str DEFAULT: 'element'

RETURNS DESCRIPTION
LoadflowResultTable

The updated dataframe with the ids translated to the original names.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
@pa.check_types(inplace=True)
def add_name_column(
    result_df: LoadflowResultTable,
    name_map: dict[str, str],
    index_level: str = "element",
) -> LoadflowResultTable:
    """Translate the element ids in the results dataframes to the original names.

    This function translates the element names in the results dataframes to the original names
    from the Powsybl network. This is useful for debugging and for displaying the results.

    Parameters
    ----------
    result_df: LoadflowResultTable
        The dataframe containing the node / branch / VADiff results
    name_map: dict[str | str]
        A mapping from the element ids to the original names. This is used to translate the element names in the results.
    index_level: str, optional
        The index level storing the ids that should be mapped to the names. by default "element" for the monitored elements.

    Returns
    -------
    LoadflowResultTable
        The updated dataframe with the ids translated to the original names.
    """
    result_df[f"{index_level}_name"] = result_df.index.get_level_values(index_level).map(name_map).fillna("")
    return result_df

set_target_values_to_lf_values_incl_distributed_slack #

set_target_values_to_lf_values_incl_distributed_slack(
    net, method
)

Update the target values of generators to include the distributed slack.

This is necessary if you want to run the security analysis for generators without distributed their outaged power across the whole network, but still want to mantain the original n0-flows.

PARAMETER DESCRIPTION
net

The powsybl network to update

TYPE: Network

method

The method to use for the loadflow, either "ac" or "dc"

TYPE: Literal[ac, dc]

RETURNS DESCRIPTION
Network

The updated network

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def set_target_values_to_lf_values_incl_distributed_slack(net: Network, method: Literal["ac", "dc"]) -> Network:
    """Update the target values of generators to include the distributed slack.

    This is necessary if you want to run the security analysis for generators without distributed their
    outaged power across the whole network, but still want to mantain the original n0-flows.

    Parameters
    ----------
    net : Network
        The powsybl network to update
    method : Literal["ac", "dc"]
        The method to use for the loadflow, either "ac" or "dc"

    Returns
    -------
    Network
        The updated network
    """
    if method == "ac":
        pypowsybl.loadflow.run_ac(net, DISTRIBUTED_SLACK)
    else:
        pypowsybl.loadflow.run_dc(net, DISTRIBUTED_SLACK)
    gens = net.get_generators()
    gens["target_p"] = (-gens["p"]).fillna(gens["target_p"])
    if method == "ac":
        gens["target_q"] = (-gens["q"]).fillna(gens["target_q"])
    net.update_generators(gens[["target_p", "target_q"]])
    batteries = net.get_batteries()
    batteries["target_p"] = (-batteries["p"]).fillna(batteries["target_p"])
    if method == "ac":
        batteries["target_q"] = (-batteries["q"]).fillna(batteries["target_q"])
    net.update_batteries(batteries[["target_p", "target_q"]])
    return net

get_full_nminus1_definition_powsybl #

get_full_nminus1_definition_powsybl(net)

Get the full N-1 definition from a Powsybl network.

This function retrieves the N-1 definition from a Powsybl network, including: Monitored Elements all lines, trafos, buses and switches Contingencies all lines, trafos, generators and loads Basecase contingency with name "BASECASE"

PARAMETER DESCRIPTION
net

The Powsybl network to retrieve the N-1 definition from.

TYPE: Network

RETURNS DESCRIPTION
Nminus1Definition

The complete N-1 definition for the given Powsybl network.

Source code in packages/contingency_analysis_pkg/src/toop_engine_contingency_analysis/pypowsybl/powsybl_helpers.py
def get_full_nminus1_definition_powsybl(net: pypowsybl.network.Network) -> Nminus1Definition:
    """Get the full N-1 definition from a Powsybl network.

    This function retrieves the N-1 definition from a Powsybl network, including:
        Monitored Elements
            all lines, trafos, buses and switches
        Contingencies
            all lines, trafos, generators and loads
            Basecase contingency with name "BASECASE"

    Parameters
    ----------
    net : pypowsybl.network.Network
        The Powsybl network to retrieve the N-1 definition from.

    Returns
    -------
    Nminus1Definition
        The complete N-1 definition for the given Powsybl network.
    """
    lines = [
        GridElement(id=id, name=getattr(row, "name", ""), type="LINE", kind="branch")
        for id, row in net.get_lines(attributes=["name"]).iterrows()
    ]
    trafo2w = [
        GridElement(id=id, name=getattr(row, "name", ""), type="TWO_WINDINGS_TRANSFORMER", kind="branch")
        for id, row in net.get_2_windings_transformers(attributes=["name"]).iterrows()
    ]
    trafos3w = [
        GridElement(id=id, name=getattr(row, "name", ""), type="THREE_WINDINGS_TRANSFORMER", kind="branch")
        for id, row in net.get_3_windings_transformers(attributes=["name"]).iterrows()
    ]

    branch_elements = [*lines, *trafo2w, *trafos3w]

    switches = [
        GridElement(id=id, name=getattr(row, "name", ""), type="SWITCH", kind="switch")
        for id, row in net.get_switches(attributes=["name"]).iterrows()
    ]
    buses = net.get_busbar_sections(attributes=[])
    if buses.empty:
        buses = net.get_bus_breaker_view_buses(attributes=[])
    buses = [GridElement(id=id, name=getattr(row, "name", ""), type="BUS", kind="bus") for id, row in buses.iterrows()]
    monitored_elements = [*branch_elements, *switches, *buses]

    generators = [
        GridElement(id=id, name=getattr(row, "name", ""), type="GENERATOR", kind="injection")
        for id, row in net.get_generators(attributes=["name"]).iterrows()
    ]
    loads = [
        GridElement(id=id, name=getattr(row, "name", ""), type="LOAD", kind="injection")
        for id, row in net.get_loads(attributes=["name"]).iterrows()
    ]
    outaged_elements = [*branch_elements, *generators, *loads]

    basecase_contingency = [Contingency(id="BASECASE", name="BASECASE", elements=[])]
    single_contingencies = [
        Contingency(id=element.id, name=element.name or "", elements=[element]) for element in outaged_elements
    ]

    nminus1_definition = Nminus1Definition(
        contingencies=[*basecase_contingency, *single_contingencies],
        monitored_elements=monitored_elements,
        id_type="powsybl",
        loadflow_parameters=LoadflowParameters(
            distributed_slack=True,  # This is the default for Powsybl
        ),
    )
    return nminus1_definition