"""Contain the core data-classes for analyzing repo's and their composition."""
import logging
import pathlib
from typing import TypedDict
from typing import cast
import networkx
import pandas
import tqdm
from utils import graph_algorithms
[docs]
logger = logging.getLogger(__name__)
[docs]
class Node(TypedDict):
"""The fields of a given node.
These can be used as columns of panda dataframes
"""
# The bazel label
# The class of the rule, eg. java_library
# How many things depend on you, in_degree
# Total number of nodes that transitively depend on you
# Whether this has a computed duration. We find this more explicit than
# simply checking > 0 for duration_s
# How many things you depend on, out_degree
# Total number of nodes that you transitively depend on
# Total number of nodes that you transitively depend on and have a < 100%
# chance of being cached
[docs]
num_source_descendants: int
# Whether this is a source or not
# ===== Link Analysis =====
# ===== Node Specific =====
# How long this node took to "execute"
# - mostly capturing test time at the moment
# (1 - (# of commits this file changed / Total # of commits))
[docs]
node_probability_cache_hit: float
# ===== Accumulated =====
# Sum of ancestors' node durations + your own
[docs]
group_duration_s: float
# node_probability_cache_hit * PRODUCT{descendants}
# - This makes the simplifying assumption that all changes are independent
# of one another, which isn't true, but convenient.
[docs]
group_probability_cache_hit: float
# group_duration_s * (1 - group_probability_cache_hit)
[docs]
expected_duration_s: float
# Max ancestor depth from this node
# - The max shortest path of this node in a reversed graph
# ancestor_depth, but descendant
# - The max shortest path of this node in a graph
# An unweighted version of expected_duration_s wrt duration
# (1 - group_probability_cache_hit) * num_ancestors
# > This can be interpreted as a score for bottleneck-ness, and captures
# > how a single target act as a force multiplier of graph invalidations in
# > the graph. This is an improved way to identify bottlenecks that could
# > benefit from isolation and cutting off the dependencies between the
# > upstream and downstream side.
# related to rebuilt_targets_by_transitive_dependencies
[docs]
ancestors_by_group_p: float
# An unweighted version of expected_duration_s wrt probability
# num_descendants * num_ancestors
[docs]
ancestors_by_descendants: int
# Betweenness centrality of a node is the sum of the fraction of all-pairs
# shortest paths that pass through
#
# >
# Related to this metric, an important centrality measure from the research
# field of graph structures is “Betweenness centrality”.
#
# This is the fraction of all dependency chains between other targets that
# passes through a single build target like A.
#
# This can be seen as a score for the broker-ness in the dependency network
# and that could also benefit from isolation.
[docs]
betweenness_centrality: float
# Closeness centrality of a node u is the reciprocal of the average
# shortest path distance to u over all n-1 reachable nodes.
[docs]
closeness_centrality: float
# Proposed new fields
# - sum(num_ancestors + num_descendants), can get an idea of size of
# sub-graph
[docs]
class GraphMetrics(TypedDict):
"""Graph-wide metrics."""
# aka) order
# aka) size
[docs]
num_connected_components: int
[docs]
total_duration_s: float
[docs]
expected_duration_s: float
[docs]
avg_files_changed_per_commit: float
[docs]
avg_nodes_affected_per_commit: float
# Proposal for new attributes
# max/aggregation of most of the node attributes
# - networkx.average_shortest_path_length
# Not defining
# unable to capture these since requires strongly connected components
# diameter: int
# radius: int
# Put graph and dataframe into a class to make it a bit easier to work with
[docs]
class RepoGraphData:
[docs]
graph: networkx.DiGraph
def __init__(
self,
graph: networkx.DiGraph,
node_to_class: dict[str, str],
node_probability: dict[str, float],
node_duration_s: dict[str, float],
):
self.graph = graph
data: dict[str, Node] = {}
for node_name, node_class in node_to_class.items():
data[node_name] = {
"node_class": node_class,
"node_name": node_name,
"node_probability_cache_hit": node_probability.get(
node_name, 1.0
),
"is_source": node_name in node_probability,
"node_duration_s": node_duration_s.get(node_name, 0.0),
"has_duration": node_name in node_duration_s,
# Everything below is expected to be updated in refresh
"num_parents": 0,
"num_ancestors": 0,
"num_children": 0,
"num_descendants": 0,
"num_source_descendants": 0,
"pagerank": 0,
"group_duration_s": 0,
"expected_duration_s": 0,
"group_probability_cache_hit": 0,
"ancestor_depth": 0,
"descendant_depth": 0,
"ancestors_by_group_p": 0,
"ancestors_by_descendants": 0,
"betweenness_centrality": 0,
"closeness_centrality": 0,
}
self.df = pandas.DataFrame.from_dict(data, orient="index")
self.refresh()
[docs]
def refresh(self) -> None:
"""Update .df based on updates to graph, etc."""
nodes = dependency_analysis(self)
self.df = pandas.DataFrame.from_dict(nodes, orient="index")
[docs]
def get_node(self, node: str) -> Node:
return cast(Node, self.df.loc[node].to_dict())
[docs]
def get_graph_metrics(self) -> GraphMetrics:
expected_duration_s = (
self.df["node_duration_s"]
* (1 - self.df["group_probability_cache_hit"])
).sum()
# Ensure there are no cycles. Only affects computation of longest_path,
# but still if there's a cycle that indicates the build graph may not
# be computed properly. This was noticed when some build rules
# specified self-loops, which we now trim during graph creation.
try:
cycle = networkx.find_cycle(self.graph)
raise ValueError(f"INVALID: Graph has atleast one cycle: {cycle}")
except networkx.exception.NetworkXNoCycle:
longest_path = networkx.dag_longest_path_length(self.graph)
metrics: GraphMetrics = {
# longest path can be more than max depth
"longest_path": longest_path,
"max_depth":
# Equivalent of max descendant_depth
self.df["ancestor_depth"].max(),
"num_nodes": self.graph.number_of_nodes(),
"num_edges": self.graph.number_of_edges(),
"density": networkx.density(self.graph),
"num_connected_components": (
networkx.number_weakly_connected_components(self.graph)
),
"total_duration_s": self.df["node_duration_s"].sum(),
"expected_duration_s": expected_duration_s,
"avg_files_changed_per_commit": (
1 - self.df["node_probability_cache_hit"]
).sum(),
"avg_nodes_affected_per_commit": (
1 - self.df["group_probability_cache_hit"]
).sum(),
}
return metrics
[docs]
def to_gml(self, out_gml: pathlib.Path) -> None:
# Add node attributes to copy of the graph.
# node_name column duplicates the df index intentionally: to_csv uses
# index=False so node_name is the label in CSV output. "Node" attribute
# duplicates it again for gephi, which uses "Node" as the display
# label.
graph = self.graph.copy()
for name, row in self.df.iterrows():
for k, v in row.items():
graph.nodes[str(name)][str(k)] = v
# Additional fields for visualization
graph.nodes[name]["Node"] = row["node_name"]
graph.nodes[name]["Highlight"] = "No"
# Write the graph
networkx.write_gml(graph, out_gml)
[docs]
def to_csv(self, out_csv: pathlib.Path) -> None:
# node_name is preserved, so don't need to re-emit
self.df.to_csv(out_csv, index=False)
[docs]
def dependency_analysis(repo: RepoGraphData) -> dict[str, Node]:
"""Update repo based on a dependency analysis."""
num_nodes = repo.graph.number_of_nodes()
logger.debug(f"a: {num_nodes}")
node_probability = repo.df["node_probability_cache_hit"].to_dict()
node_duration_s = repo.df["node_duration_s"].to_dict()
group_probability = graph_algorithms.compute_group_probability(
graph=repo.graph, node_probability=node_probability
)
logger.debug(f"a: {repo.graph.number_of_edges()}")
group_duration = graph_algorithms.compute_group_duration(
graph=repo.graph, node_duration_s=node_duration_s
)
logger.debug("a")
pagerank = networkx.pagerank(repo.graph)
# logger.debug(f"nodes: {len(graph.nodes)}")
# logger.debug(f"edges: {len(graph.edges)}")
in_degree = repo.graph.in_degree()
assert not isinstance(in_degree, int)
out_degree = repo.graph.out_degree()
assert not isinstance(out_degree, int)
reversed_graph = repo.graph.reverse()
logger.debug("a")
# Compute depths
forward_all_pairs_shortest_path_length = (
networkx.all_pairs_shortest_path_length(repo.graph)
)
reverse_all_pairs_shortest_path_length = (
networkx.all_pairs_shortest_path_length(reversed_graph)
)
logger.debug("a")
descendant_depth: dict[str, int] = {}
ancestor_depth: dict[str, int] = {}
for node_name, pair_len_dict in tqdm.tqdm(
forward_all_pairs_shortest_path_length
):
descendant_depth[node_name] = max(pair_len_dict.values())
for node_name, pair_len_dict in tqdm.tqdm(
reverse_all_pairs_shortest_path_length
):
ancestor_depth[node_name] = max(pair_len_dict.values())
logger.debug("a")
# Compute centrality metrics
# k: number of pivot nodes for betweenness approximation. Floored at 1000
# since sqrt(V) is too small for large graphs (~138 for V=19k). 1000 pivots
# gives adequate relative rankings for top-N analysis without exact values.
k = int(min(num_nodes, max(1_000, num_nodes**0.5)))
betweenness = networkx.betweenness_centrality(repo.graph, k=k)
logger.debug("a")
closeness = networkx.closeness_centrality(repo.graph)
logger.debug("a")
nodes: dict[str, Node] = {}
for node_name, cur_node in repo.df.iterrows():
node_name = str(node_name)
num_parents = in_degree[node_name]
num_children = out_degree[node_name]
ancestors = list(networkx.ancestors(repo.graph, node_name))
num_ancestors = len(ancestors)
descendants = list(networkx.descendants(repo.graph, node_name))
num_descendants = len(descendants)
num_source_descendants = repo.df.loc[
repo.df.index.intersection(descendants)
]["is_source"].sum()
np = node_probability.get(node_name, 1.0)
gp = group_probability.get(node_name, 1.0)
gd = group_duration.get(node_name, 0)
row: Node = {
"node_name": node_name,
"node_class": cur_node["node_class"],
"is_source": cur_node["is_source"],
"has_duration": cur_node["has_duration"],
"num_parents": int(num_parents),
"num_ancestors": num_ancestors,
"num_children": int(num_children),
"num_descendants": num_descendants,
"num_source_descendants": num_source_descendants,
"pagerank": pagerank[node_name],
"node_duration_s": node_duration_s.get(node_name, 0),
"group_duration_s": gd,
"expected_duration_s": gd * (1 - gp),
"node_probability_cache_hit": np,
"group_probability_cache_hit": gp,
"ancestor_depth": ancestor_depth[node_name],
"descendant_depth": descendant_depth[node_name],
"ancestors_by_group_p": num_ancestors * (1 - gp),
"ancestors_by_descendants": num_ancestors * num_descendants,
"betweenness_centrality": betweenness[node_name],
"closeness_centrality": closeness[node_name],
}
nodes[node_name] = row
logger.debug("a")
return nodes