Source code for apps.bazel_parser.refinement
from __future__ import annotations
import dataclasses
import enum
import numpy as np
import pandas
from apps.bazel_parser import repo_graph_data
[docs]
class Verbosity(enum.Enum):
# XXX: Maybe load from yaml or something other than json for comments?
@dataclasses.dataclass
[docs]
class RefinementConfig:
"""This specifies how to refine a dataframe of nodes to a smaller set.
We expect node_name to be the index of the dataframe.
All of these fields are exclusionary.
"""
[docs]
name_patterns: list[str]
[docs]
class_patterns: list[str]
[docs]
class_pattern_to_name_patterns: dict[str, list[str]]
def _show_exclusions(
pattern: str,
exclusion: pandas.Series[bool] | np.ndarray[tuple[int]],
df: pandas.DataFrame,
verbosity: Verbosity,
) -> None:
# XXX: print/log/ or return string?
if verbosity == Verbosity.SILENT:
return
elif verbosity == Verbosity.COUNT:
# XXX: Check we don't need == True
count = len(np.where(exclusion)[0])
print(f"{pattern} = {count}")
elif verbosity == Verbosity.LIST:
print(f"{pattern} ->")
for node in sorted(df.loc[exclusion].index.tolist()):
print(f"- {node}")
else:
raise ValueError(f"Unhandled verbosity: {verbosity}")
[docs]
def refine_dataframe(
df: pandas.DataFrame,
refinement: RefinementConfig,
verbosity: Verbosity,
) -> pandas.DataFrame:
include = np.full(len(df), True, dtype=bool)
exclude_by_name = []
for pattern in refinement.name_patterns:
match = df.index.str.fullmatch(pattern)
exclude_by_name.append(match)
include &= ~match
exclude_by_class = []
for pattern in refinement.class_patterns:
cls_match = df["node_class"].str.fullmatch(pattern)
exclude_by_class.append(cls_match)
include &= ~cls_match
exclude_by_class_then_name = {}
for (
class_pattern,
name_patterns,
) in refinement.class_pattern_to_name_patterns.items():
name_exclusions = np.full(len(df), False, dtype=bool)
for name_pattern in name_patterns:
name_exclusions |= df.index.str.fullmatch(name_pattern)
node_cls_match = (
df["node_class"].str.fullmatch(class_pattern) & name_exclusions
)
exclude_by_class_then_name[class_pattern] = node_cls_match
include &= ~node_cls_match
for pattern, pat_exclusion in zip(
refinement.name_patterns, exclude_by_name
):
_show_exclusions(
pattern=pattern,
exclusion=pat_exclusion,
df=df,
verbosity=verbosity,
)
for pattern, pat_cls_exclusion in zip(
refinement.class_patterns, exclude_by_class
):
_show_exclusions(
pattern=pattern,
exclusion=pat_cls_exclusion,
df=df,
verbosity=verbosity,
)
for pattern, pat_cls_name_exclusion in exclude_by_class_then_name.items():
# XXX: Maybe display more than just the top-level class pattern
_show_exclusions(
pattern=pattern,
exclusion=pat_cls_name_exclusion,
df=df,
verbosity=verbosity,
)
# XXX: Log / return the individual exclusions
return df.loc[include]
[docs]
def remove_node_from_repo(
node: str, repo: repo_graph_data.RepoGraphData
) -> None:
"""Modify graph by removing node, but preserving edges.
XXX: How to handle probability / duration attributes of removed nodes?
"""
for parent in repo.graph.predecessors(node):
for child in repo.graph.successors(node):
repo.graph.add_edge(parent, child)
repo.graph.remove_node(node)
[docs]
def full_refinement(
repo: repo_graph_data.RepoGraphData,
refinement: RefinementConfig,
verbosity: Verbosity,
) -> pandas.DataFrame:
refined_df = refine_dataframe(
df=repo.df,
refinement=refinement,
verbosity=verbosity,
)
removed_nodes = set(repo.df.index.tolist()) - set(
refined_df.index.tolist()
)
repo.df = refined_df
for node in removed_nodes:
remove_node_from_repo(node, repo)
return repo.df