352 lines
14 KiB
Python
352 lines
14 KiB
Python
"""
|
||
live_variables.py — Live Variables backward dataflow analysis for TRIPLA CFGs.
|
||
|
||
A variable v is *live* at the entry of node n if there exists a path
|
||
n → … → use(v) where v is not redefined along the way.
|
||
|
||
Data structures
|
||
---------------
|
||
gen dict[int, set[Var]] — GEN(n) = variables *used* at n
|
||
kill dict[int, set[Var]] — KILL(n) = variables *defined* at n
|
||
in_sets dict[int, set[Var]] — live variables at node *entry*
|
||
out_sets dict[int, set[Var]] — live variables at node *exit*
|
||
|
||
Transfer equations (backward):
|
||
OUT(n) = ∪ IN(s) for all successors s
|
||
IN(n) = (OUT(n) − KILL(n)) ∪ GEN(n)
|
||
|
||
Variables are represented in scoped form ``(scope, name)``, e.g. ``("f","x")``.
|
||
This avoids collisions between equal variable names in different functions.
|
||
|
||
This module also exports ``_BackwardAnalysisBase``, the shared base class
|
||
that ``ReachedUsesAnalysis`` in reached_uses.py inherits from. The base
|
||
provides:
|
||
• AST traversal to collect function-nesting and parameter metadata
|
||
• Lexical variable resolution (parameter shadowing handled correctly)
|
||
• BFS-based CFG-node → owning-function assignment
|
||
• Unified uses / defs extraction for all node types
|
||
|
||
Var = tuple[str, str]
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
from collections import deque
|
||
from typing import TYPE_CHECKING
|
||
|
||
import cfg_build
|
||
import syntax
|
||
from cfg.CFG_Node import CFG_START
|
||
|
||
if TYPE_CHECKING:
|
||
from cfg.CFG import CFG
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Public type alias (imported by reached_uses.py)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
GLOBAL_SCOPE = ""
|
||
Var = tuple[str, str] # (function_name|GLOBAL_SCOPE, variable_name)
|
||
|
||
|
||
# ============================================================================
|
||
# Shared base: function metadata, scope assignment, uses/defs extraction
|
||
# ============================================================================
|
||
|
||
class _BackwardAnalysisBase:
|
||
"""Infrastructure shared by LiveVariablesAnalysis and ReachedUsesAnalysis.
|
||
|
||
Calling ``super().__init__(cfg)`` from a subclass:
|
||
1. Snapshots cfg_build.FUNCTIONS.
|
||
2. Collects AST-level function-nesting and parameter metadata.
|
||
3. BFS-assigns every CFG node to its owning function.
|
||
4. Extracts uses and defs for every CFG node.
|
||
|
||
After __init__ the following attributes are available to subclasses:
|
||
|
||
self.cfg — the CFG object
|
||
self._functions — dict[str, tuple]: snapshot of cfg_build.FUNCTIONS
|
||
self._func_parent — dict[str, str|None]: lexical parent per function
|
||
self._func_params — dict[str, tuple[str,...]]: params per function
|
||
self._func_scope — dict[int, str]: node-id → owning function name
|
||
self.uses — dict[int, set[Var]]: variables used at each node
|
||
self.defs — dict[int, set[Var]]: variables defined at each node
|
||
"""
|
||
|
||
def __init__(self, cfg: "CFG") -> None:
|
||
self.cfg = cfg
|
||
# Snapshot FUNCTIONS so later global-state resets do not affect us.
|
||
self._functions: dict[str, tuple] = dict(cfg_build.FUNCTIONS)
|
||
|
||
self.uses: dict[int, set[Var]] = {}
|
||
self.defs: dict[int, set[Var]] = {}
|
||
|
||
self._func_parent, self._func_params = self._collect_function_metadata()
|
||
self._func_scope: dict[int, str] = self._compute_function_scope()
|
||
self._extract_uses_defs()
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 1a — Walk AST to collect lexical nesting + parameter lists
|
||
# ------------------------------------------------------------------
|
||
|
||
def _collect_function_metadata(
|
||
self,
|
||
) -> tuple[dict[str, str | None], dict[str, tuple[str, ...]]]:
|
||
"""Walk the AST and collect function-parent and parameter information.
|
||
|
||
Returns
|
||
-------
|
||
func_parent : dict[str, str | None]
|
||
func_parent[f] is the name of the immediately enclosing function
|
||
(or None for top-level functions).
|
||
func_params : dict[str, tuple[str, ...]]
|
||
func_params[f] is the ordered tuple of formal parameter names of f.
|
||
"""
|
||
func_parent: dict[str, str | None] = {}
|
||
func_params: dict[str, tuple[str, ...]] = {}
|
||
|
||
def visit(expr: syntax.EXPRESSION | None, current_func: str | None) -> None:
|
||
if expr is None:
|
||
return
|
||
if isinstance(expr, syntax.LET):
|
||
decls = expr.decl if isinstance(expr.decl, list) else [expr.decl]
|
||
# Register metadata for each declared function.
|
||
for d in decls:
|
||
if isinstance(d, syntax.DECL):
|
||
# Use assignment (last-seen wins) to stay consistent
|
||
# with cfg_build.FUNCTIONS, which also overwrites on
|
||
# duplicate names. setdefault (first-seen wins) would
|
||
# disagree when a nested function shadows a top-level
|
||
# one with the same name, causing wrong scope resolution.
|
||
func_parent[d.f_name] = current_func
|
||
func_params[d.f_name] = tuple(d.params)
|
||
# Recurse into function bodies and the in-expression.
|
||
for d in decls:
|
||
if isinstance(d, syntax.DECL):
|
||
visit(d.body, d.f_name)
|
||
else:
|
||
visit(d, current_func)
|
||
visit(expr.body, current_func)
|
||
return
|
||
for _, child in expr.children():
|
||
visit(child, current_func)
|
||
|
||
visit(self.cfg.ast, None)
|
||
return func_parent, func_params
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 1b — Resolve a variable name through the lexical scope chain
|
||
# ------------------------------------------------------------------
|
||
|
||
def _resolve_var(self, func: str | None, name: str) -> Var:
|
||
"""Resolve a variable name via lexical scope chain."""
|
||
if func is None:
|
||
return (GLOBAL_SCOPE, name)
|
||
|
||
cur: str | None = func
|
||
seen: set[str] = set()
|
||
while cur is not None and cur not in seen:
|
||
seen.add(cur)
|
||
if name in self._func_params.get(cur, ()):
|
||
return (cur, name)
|
||
cur = self._func_parent.get(cur)
|
||
|
||
# Fallback: local variable in current function scope.
|
||
return (func, name)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 2 — BFS-assign every CFG node to its owning function
|
||
# ------------------------------------------------------------------
|
||
|
||
def _compute_function_scope(self) -> dict[int, str]:
|
||
"""BFS from each function's START node; return node-id → function-name.
|
||
|
||
Two stopping conditions keep attribution strictly inside each function:
|
||
|
||
1. Do not follow into a *different* function's CFG_START (prevents
|
||
attributing callee body nodes to the caller, and vice-versa).
|
||
2. Do not follow *past* the function's own CFG_END (prevents
|
||
following CFG_END → CFG_RETURN → continuation nodes that belong
|
||
to the *caller* context, which caused variables used there to be
|
||
resolved in the wrong scope).
|
||
|
||
The first function whose BFS claims a node wins.
|
||
"""
|
||
functions = self._functions
|
||
func_scope: dict[int, str] = {}
|
||
all_f_start_ids: set[int] = {fs.id for _, (fs, _) in functions.items()}
|
||
|
||
for f_name, (f_start, f_end) in functions.items():
|
||
queue: deque = deque([f_start])
|
||
while queue:
|
||
node = queue.popleft()
|
||
if node.id in func_scope:
|
||
continue # already claimed by an earlier function
|
||
func_scope[node.id] = f_name
|
||
# Stop here — do not follow CFG_END into caller context.
|
||
if node.id == f_end.id:
|
||
continue
|
||
for child in node.children:
|
||
# Do not follow into a different function's START.
|
||
if (
|
||
isinstance(child, CFG_START)
|
||
and child.id in all_f_start_ids
|
||
and child.id != f_start.id
|
||
):
|
||
continue
|
||
queue.append(child)
|
||
|
||
return func_scope
|
||
|
||
# ------------------------------------------------------------------
|
||
# Step 3 — Extract uses / defs for every CFG node
|
||
# ------------------------------------------------------------------
|
||
|
||
def _extract_uses_defs(self) -> None:
|
||
"""Populate ``self.uses`` and ``self.defs`` for every node in the CFG.
|
||
|
||
Extraction rules:
|
||
• CFG_START(DECL f(p1,…,pk)) → defs = {(f,p1), …, (f,pk)}
|
||
• Node wrapping ID(x) → uses = {lexical_resolve(func, x)}
|
||
• Node wrapping ASSIGN(x = e) → defs = {lexical_resolve(func, x)}
|
||
• Everything else → uses = {}, defs = {}
|
||
|
||
Sub-expressions already have their own CFG nodes and are not
|
||
re-inspected here; each node is responsible only for its own ast_node.
|
||
"""
|
||
for node in self.cfg.nodes():
|
||
nid = node.id
|
||
func = self._func_scope.get(nid) # None → outer / global scope
|
||
ast = node.ast_node
|
||
|
||
uses: set[Var] = set()
|
||
defs: set[Var] = set()
|
||
|
||
if isinstance(node, CFG_START) and isinstance(ast, syntax.DECL):
|
||
# Function entry defines each formal parameter.
|
||
for param in ast.params:
|
||
defs.add((ast.f_name, param))
|
||
elif ast is not None:
|
||
if isinstance(ast, syntax.ID):
|
||
resolved = self._resolve_var(func, ast.name)
|
||
uses.add(resolved)
|
||
elif isinstance(ast, syntax.ASSIGN):
|
||
resolved = self._resolve_var(func, ast.var.name)
|
||
defs.add(resolved)
|
||
|
||
self.uses[nid] = uses
|
||
self.defs[nid] = defs
|
||
|
||
|
||
# ============================================================================
|
||
# Live Variables Analysis
|
||
# ============================================================================
|
||
|
||
class LiveVariablesAnalysis(_BackwardAnalysisBase):
|
||
"""Backward dataflow analysis: Live Variables.
|
||
|
||
A variable (f, x) is *live* at the entry of node n if there is a path
|
||
from n to some use of (f, x) along which (f, x) is not redefined.
|
||
|
||
This is the simpler predecessor to ReachedUsesAnalysis (reached_uses.py):
|
||
it tracks which variables are live, not *where* they are used.
|
||
|
||
Attributes
|
||
----------
|
||
gen dict[int, set[Var]] GEN(n) = uses(n) — vars used at n
|
||
kill dict[int, set[Var]] KILL(n) = defs(n) — vars defined at n
|
||
in_sets dict[int, set[Var]] live variables at n's *entry*
|
||
out_sets dict[int, set[Var]] live variables at n's *exit*
|
||
|
||
(uses and defs are identical to gen / kill and are inherited from the
|
||
base class.)
|
||
|
||
Transfer equations (backward):
|
||
OUT(n) = ∪ IN(s) for all successors s
|
||
IN(n) = (OUT(n) − KILL(n)) ∪ GEN(n)
|
||
"""
|
||
|
||
def __init__(self, cfg: "CFG") -> None:
|
||
# Base populates uses, defs, _func_scope, etc.
|
||
super().__init__(cfg)
|
||
|
||
self.gen: dict[int, set[Var]] = {}
|
||
self.kill: dict[int, set[Var]] = {}
|
||
self.in_sets: dict[int, set[Var]] = {}
|
||
self.out_sets: dict[int, set[Var]] = {}
|
||
|
||
self._build_gen_kill()
|
||
self.solve()
|
||
|
||
# ------------------------------------------------------------------
|
||
# Build gen / kill; initialise in / out to ∅
|
||
# ------------------------------------------------------------------
|
||
|
||
def _build_gen_kill(self) -> None:
|
||
"""GEN(n) = uses(n), KILL(n) = defs(n); initialise in/out sets."""
|
||
for node in self.cfg.nodes():
|
||
nid = node.id
|
||
self.gen[nid] = set(self.uses[nid])
|
||
self.kill[nid] = set(self.defs[nid])
|
||
self.in_sets[nid] = set()
|
||
self.out_sets[nid] = set()
|
||
|
||
# ------------------------------------------------------------------
|
||
# Backward worklist fixpoint
|
||
# ------------------------------------------------------------------
|
||
|
||
def solve(self) -> None:
|
||
"""Backward worklist until fixpoint.
|
||
|
||
Transfer:
|
||
OUT(n) = ∪ IN(s) for all successors s
|
||
IN(n) = (OUT(n) − KILL(n)) ∪ GEN(n)
|
||
|
||
Only nodes reachable from cfg.START are processed (guard against
|
||
propagate=False parent references from CFG.__remove_and_rewire).
|
||
"""
|
||
nodes = list(self.cfg.nodes())
|
||
known: set[int] = set(self.gen.keys())
|
||
id_to_node = {n.id: n for n in nodes}
|
||
worklist: deque = deque(nodes)
|
||
|
||
# Build predecessor relation from children edges. This is more reliable
|
||
# than node.parents because CFG rewiring may add edges with
|
||
# propagate=False, leaving parent links stale.
|
||
preds: dict[int, set[int]] = {nid: set() for nid in known}
|
||
for node in nodes:
|
||
for child in node.children:
|
||
if child.id in known:
|
||
preds[child.id].add(node.id)
|
||
|
||
while worklist:
|
||
node = worklist.popleft()
|
||
nid = node.id
|
||
|
||
new_out: set[Var] = set()
|
||
for child in node.children:
|
||
if child.id in known:
|
||
new_out |= self.in_sets[child.id]
|
||
|
||
new_in: set[Var] = (new_out - self.kill[nid]) | self.gen[nid]
|
||
|
||
if new_out != self.out_sets[nid] or new_in != self.in_sets[nid]:
|
||
self.out_sets[nid] = new_out
|
||
self.in_sets[nid] = new_in
|
||
for pred_id in preds[nid]:
|
||
worklist.append(id_to_node[pred_id])
|
||
|
||
# ------------------------------------------------------------------
|
||
# Result
|
||
# ------------------------------------------------------------------
|
||
|
||
def live_vars_by_node(self) -> dict[int, set[Var]]:
|
||
"""Return the live-variable set at the *entry* of each node.
|
||
|
||
Returns
|
||
-------
|
||
dict[int, set[Var]]
|
||
Keys: CFG node ids whose in_set is non-empty.
|
||
Values: copy of the live-variable set at that node's entry.
|
||
"""
|
||
return {nid: set(vs) for nid, vs in self.in_sets.items() if vs}
|