Source code for apps.bazel_parser.refinement

from __future__ import annotations

import dataclasses
import enum

import numpy as np
import pandas

from apps.bazel_parser import repo_graph_data


[docs] class Verbosity(enum.Enum):
[docs] SILENT = "SILENT"
[docs] COUNT = "COUNT"
[docs] LIST = "LIST"
# XXX: Maybe load from yaml or something other than json for comments? @dataclasses.dataclass
[docs] class RefinementConfig: """This specifies how to refine a dataframe of nodes to a smaller set. We expect node_name to be the index of the dataframe. All of these fields are exclusionary. """
[docs] name_patterns: list[str]
[docs] class_patterns: list[str]
[docs] class_pattern_to_name_patterns: dict[str, list[str]]
def _show_exclusions( pattern: str, exclusion: pandas.Series[bool] | np.ndarray[tuple[int]], df: pandas.DataFrame, verbosity: Verbosity, ) -> None: # XXX: print/log/ or return string? if verbosity == Verbosity.SILENT: return elif verbosity == Verbosity.COUNT: # XXX: Check we don't need == True count = len(np.where(exclusion)[0]) print(f"{pattern} = {count}") elif verbosity == Verbosity.LIST: print(f"{pattern} ->") for node in sorted(df.loc[exclusion].index.tolist()): print(f"- {node}") else: raise ValueError(f"Unhandled verbosity: {verbosity}")
[docs] def refine_dataframe( df: pandas.DataFrame, refinement: RefinementConfig, verbosity: Verbosity, ) -> pandas.DataFrame: include = np.full(len(df), True, dtype=bool) exclude_by_name = [] for pattern in refinement.name_patterns: match = df.index.str.fullmatch(pattern) exclude_by_name.append(match) include &= ~match exclude_by_class = [] for pattern in refinement.class_patterns: cls_match = df["node_class"].str.fullmatch(pattern) exclude_by_class.append(cls_match) include &= ~cls_match exclude_by_class_then_name = {} for ( class_pattern, name_patterns, ) in refinement.class_pattern_to_name_patterns.items(): name_exclusions = np.full(len(df), False, dtype=bool) for name_pattern in name_patterns: name_exclusions |= df.index.str.fullmatch(name_pattern) node_cls_match = ( df["node_class"].str.fullmatch(class_pattern) & name_exclusions ) exclude_by_class_then_name[class_pattern] = node_cls_match include &= ~node_cls_match for pattern, pat_exclusion in zip( refinement.name_patterns, exclude_by_name ): _show_exclusions( pattern=pattern, exclusion=pat_exclusion, df=df, verbosity=verbosity, ) for pattern, pat_cls_exclusion in zip( refinement.class_patterns, exclude_by_class ): _show_exclusions( pattern=pattern, exclusion=pat_cls_exclusion, df=df, verbosity=verbosity, ) for pattern, pat_cls_name_exclusion in exclude_by_class_then_name.items(): # XXX: Maybe display more than just the top-level class pattern _show_exclusions( pattern=pattern, exclusion=pat_cls_name_exclusion, df=df, verbosity=verbosity, ) # XXX: Log / return the individual exclusions return df.loc[include]
[docs] def remove_node_from_repo( node: str, repo: repo_graph_data.RepoGraphData ) -> None: """Modify graph by removing node, but preserving edges. XXX: How to handle probability / duration attributes of removed nodes? """ for parent in repo.graph.predecessors(node): for child in repo.graph.successors(node): repo.graph.add_edge(parent, child) repo.graph.remove_node(node)
[docs] def full_refinement( repo: repo_graph_data.RepoGraphData, refinement: RefinementConfig, verbosity: Verbosity, ) -> pandas.DataFrame: refined_df = refine_dataframe( df=repo.df, refinement=refinement, verbosity=verbosity, ) removed_nodes = set(repo.df.index.tolist()) - set( refined_df.index.tolist() ) repo.df = refined_df for node in removed_nodes: remove_node_from_repo(node, repo) return repo.df