Source code for apps.bazel_parser.parsing
"""Handle converting raw types to the data structures that get used."""
import datetime
import pathlib
import networkx
from apps.bazel_parser import repo_graph_data
from third_party.bazel.src.main.protobuf import build_pb2
from tools import git_utils
def _get_rules(
query_result: build_pb2.QueryResult,
) -> dict[str, build_pb2.Rule]:
"""Get rules by name"""
rules = {}
for i, target in enumerate(query_result.target):
type_name = build_pb2.Target.Discriminator.Name( # type: ignore
target.type
)
if target.type == build_pb2.Target.RULE:
pass
elif target.type in {
build_pb2.Target.SOURCE_FILE,
build_pb2.Target.GENERATED_FILE,
build_pb2.Target.PACKAGE_GROUP,
build_pb2.Target.ENVIRONMENT_GROUP,
}:
# logger.debug(f"{i}, {type_name}")
# XXX: Should we allow SOURCE_FILE?
continue
else:
raise ValueError(
f"Invalid target type: {type_name}({target.type})"
)
# We are a rule type now
rule = target.rule
# logger.debug(f"{rule.name}({rule.rule_class})")
# Didn't see much use with these:
# - rule.configured_rule_input
# - rule.default_setting
rules[rule.name] = rule
return rules
[docs]
def get_dependency_digraph(
rules: dict[str, build_pb2.Rule], ignore_external: bool
) -> networkx.DiGraph:
graph: networkx.DiGraph = networkx.DiGraph()
for rule in rules.values():
# Specify X depends on Y as X is a parent of Y
for rule_input in rule.rule_input:
if ignore_external and rule_input.startswith("@"):
continue
graph.add_edge(rule.name, rule_input)
for output in rule.rule_output:
graph.add_edge(output, rule.name)
# Still add this to the graph, even if no edges
if not graph.has_node(rule.name):
graph.add_node(rule.name)
return graph
def _normalize_paths_to_bazel_intermediates(
files: list[pathlib.Path],
) -> dict[str, pathlib.Path]:
normalized_map = {}
for f in files:
normalized = f"//{f}"
normalized_map[normalized] = f
return normalized_map
def _normalize_bazel_target_to_intermediate(target: str) -> str:
if target.startswith("//:"):
return target.replace(":", "")
else:
return target.replace(":", "/")
def _get_node_probability(
nodes: list[str],
file_commit_map: git_utils.FileCommitMap,
) -> dict[str, float]:
# XXX: Test case with BUILD further up, ensure we still get the right match
bazel_intermediates = _normalize_paths_to_bazel_intermediates(
list(file_commit_map.file_map.keys())
)
bazel_src_target_to_file = {}
for node in nodes:
src_path = bazel_intermediates.get(
_normalize_bazel_target_to_intermediate(node)
)
if src_path is not None:
bazel_src_target_to_file[node] = src_path
node_probability = {}
total_commits = len(file_commit_map.commit_map)
for node, f in bazel_src_target_to_file.items():
node_probability[node] = 1 - (
len(file_commit_map.file_map[f]) / total_commits
)
return node_probability
def _get_node_to_class(
nodes: list[str],
node_probability: dict[str, float],
rules: dict[str, build_pb2.Rule],
) -> dict[str, str]:
node_to_class: dict[str, str] = {}
# Gotta make table for all, in a consistent order, otherwise table, etc.
# won't line up:
# Note that we're not selecting which nodes to view
for node_name in nodes:
node_rule = rules.get(node_name)
if node_name in node_probability:
# Probably want the source files too
node_to_class[node_name] = "source_file"
elif node_rule:
node_to_class[node_name] = node_rule.rule_class
else:
node_to_class[node_name] = "unknown"
return node_to_class
[docs]
def get_repo_graph_data(
query_result: build_pb2.QueryResult,
label_to_runtime: dict[str, datetime.timedelta],
file_commit_map: git_utils.FileCommitMap,
) -> repo_graph_data.RepoGraphData:
node_duration_s = {
label: dt.total_seconds() for label, dt in label_to_runtime.items()
}
rules = _get_rules(query_result)
graph = get_dependency_digraph(rules, ignore_external=True)
node_probability = _get_node_probability(
nodes=list(graph.nodes), file_commit_map=file_commit_map
)
node_to_class = _get_node_to_class(
nodes=list(graph.nodes), node_probability=node_probability, rules=rules
)
return repo_graph_data.RepoGraphData(
graph=graph,
node_to_class=node_to_class,
node_probability=node_probability,
node_duration_s=node_duration_s,
)