Source code for apps.bazel_parser.refinement

from __future__ import annotations

import dataclasses
import enum
import logging

import numpy as np
import pandas

from apps.bazel_parser import repo_graph_data

[docs] logger = logging.getLogger(__name__)
[docs] class Verbosity(enum.Enum):
[docs] SILENT = "SILENT"
[docs] COUNT = "COUNT"
[docs] LIST = "LIST"
@dataclasses.dataclass
[docs] class RefinementConfig: """This specifies how to refine a dataframe of nodes to a smaller set. We expect node_name to be the index of the dataframe. All of these fields are exclusionary. """
[docs] name_patterns: list[str]
[docs] class_patterns: list[str]
[docs] class_pattern_to_name_patterns: dict[str, list[str]]
def _show_exclusions( pattern: str, exclusion: pandas.Series[bool] | np.ndarray[tuple[int]], df: pandas.DataFrame, verbosity: Verbosity, ) -> None: if verbosity == Verbosity.SILENT: return elif verbosity == Verbosity.COUNT: count = len(np.where(exclusion)[0]) logger.info(f"{pattern} = {count}") elif verbosity == Verbosity.LIST: logger.info(f"{pattern} ->") for node in sorted(df.loc[exclusion].index.tolist()): logger.info(f"- {node}") else: raise ValueError(f"Unhandled verbosity: {verbosity}")
[docs] def refine_dataframe( df: pandas.DataFrame, refinement: RefinementConfig, verbosity: Verbosity, ) -> pandas.DataFrame: include = np.full(len(df), True, dtype=bool) exclude_by_name = [] for pattern in refinement.name_patterns: match = df.index.str.fullmatch(pattern) exclude_by_name.append(match) include &= ~match exclude_by_class = [] for pattern in refinement.class_patterns: cls_match = df["node_class"].str.fullmatch(pattern) exclude_by_class.append(cls_match) include &= ~cls_match exclude_by_class_then_name = {} for ( class_pattern, name_patterns, ) in refinement.class_pattern_to_name_patterns.items(): name_exclusions = np.full(len(df), False, dtype=bool) for name_pattern in name_patterns: name_exclusions |= df.index.str.fullmatch(name_pattern) node_cls_match = ( df["node_class"].str.fullmatch(class_pattern) & name_exclusions ) exclude_by_class_then_name[class_pattern] = node_cls_match include &= ~node_cls_match for pattern, pat_exclusion in zip( refinement.name_patterns, exclude_by_name ): _show_exclusions( pattern=pattern, exclusion=pat_exclusion, df=df, verbosity=verbosity, ) for pattern, pat_cls_exclusion in zip( refinement.class_patterns, exclude_by_class ): _show_exclusions( pattern=pattern, exclusion=pat_cls_exclusion, df=df, verbosity=verbosity, ) for pattern, pat_cls_name_exclusion in exclude_by_class_then_name.items(): _show_exclusions( pattern=pattern, exclusion=pat_cls_name_exclusion, df=df, verbosity=verbosity, ) return df.loc[include]
[docs] def remove_node_from_repo( node: str, repo: repo_graph_data.RepoGraphData ) -> None: """Modify graph by removing node, but preserving edges. Removed nodes are expected to be noise/artifact targets (probability=1.0, duration=0), so dropping their attributes is correct. Callers should call repo.refresh() afterward to recompute all metrics on the reduced graph. """ for parent in repo.graph.predecessors(node): for child in repo.graph.successors(node): repo.graph.add_edge(parent, child) repo.graph.remove_node(node)
[docs] def full_refinement( repo: repo_graph_data.RepoGraphData, refinement: RefinementConfig, verbosity: Verbosity, ) -> pandas.DataFrame: refined_df = refine_dataframe( df=repo.df, refinement=refinement, verbosity=verbosity, ) removed_nodes = set(repo.df.index.tolist()) - set( refined_df.index.tolist() ) repo.df = refined_df for node in removed_nodes: remove_node_from_repo(node, repo) return repo.df