diff --git a/coverage/sysmon.py b/coverage/sysmon.py index e15c9ce42..576cf2c96 100644 --- a/coverage/sysmon.py +++ b/coverage/sysmon.py @@ -22,6 +22,7 @@ from typing import ( Any, Callable, + Iterable, NewType, Optional, cast, @@ -57,15 +58,18 @@ DISABLE_TYPE = NewType("DISABLE_TYPE", object) MonitorReturn = Optional[DISABLE_TYPE] DISABLE = cast(MonitorReturn, getattr(sys_monitoring, "DISABLE", None)) +TOffset = int -ALWAYS_JUMPS = { - dis.opmap[name] for name in - ["JUMP_FORWARD", "JUMP_BACKWARD", "JUMP_BACKWARD_NO_INTERRUPT"] -} +ALWAYS_JUMPS: set[int] = set() +RETURNS: set[int] = set() -RETURNS = { - dis.opmap[name] for name in ["RETURN_VALUE", "RETURN_GENERATOR"] -} +if env.PYBEHAVIOR.branch_right_left: + ALWAYS_JUMPS.update( + dis.opmap[name] + for name in ["JUMP_FORWARD", "JUMP_BACKWARD", "JUMP_BACKWARD_NO_INTERRUPT"] + ) + + RETURNS.update(dis.opmap[name] for name in ["RETURN_VALUE", "RETURN_GENERATOR"]) if LOG: # pragma: debugging @@ -175,16 +179,26 @@ def _decorator(meth: AnyCallable) -> AnyCallable: class InstructionWalker: - def __init__(self, code: CodeType): + """Utility to step through trails of instructions.""" + + def __init__(self, code: CodeType) -> None: self.code = code - self.insts: dict[int, dis.Instruction] = {} + self.insts: dict[TOffset, dis.Instruction] = {} + inst = None for inst in dis.get_instructions(code): self.insts[inst.offset] = inst + assert inst is not None self.max_offset = inst.offset - def walk(self, *, start_at=0, follow_jumps=True): + def walk( + self, *, start_at: TOffset = 0, follow_jumps: bool = True + ) -> Iterable[dis.Instruction]: + """ + Yield instructions starting from `start_at`. Follow unconditional + jumps if `follow_jumps` is true. + """ seen = set() offset = start_at while offset < self.max_offset + 1: @@ -199,52 +213,57 @@ def walk(self, *, start_at=0, follow_jumps=True): offset += 2 -def populate_branch_trails(code: CodeType, code_info: CodeInfo) -> tuple[list[int], TArc | None]: +def populate_branch_trails(code: CodeType, code_info: CodeInfo) -> None: + """ + Populate the `branch_trails` attribute on `code_info`. + """ iwalker = InstructionWalker(code) for inst in iwalker.walk(follow_jumps=False): log(f"considering {inst=}") if not inst.jump_target: - log(f"no jump_target") + log("no jump_target") continue if inst.opcode in ALWAYS_JUMPS: - log(f"always jumps") + log("always jumps") continue from_line = inst.line_number + assert from_line is not None - def walkabout(start_at, branch_kind): - insts = [] + def walk_one_branch( + start_at: TOffset, branch_kind: str + ) -> tuple[list[TOffset], TArc | None]: + # pylint: disable=cell-var-from-loop + inst_offsets: list[TOffset] = [] to_line = None for inst2 in iwalker.walk(start_at=start_at): - insts.append(inst2.offset) + inst_offsets.append(inst2.offset) if inst2.line_number and inst2.line_number != from_line: to_line = inst2.line_number break elif inst2.jump_target and (inst2.opcode not in ALWAYS_JUMPS): - log(f"stop: {inst2.jump_target=}, {inst2.opcode=} ({dis.opname[inst2.opcode]}), {ALWAYS_JUMPS=}") + log( + f"stop: {inst2.jump_target=}, " + + f"{inst2.opcode=} ({dis.opname[inst2.opcode]}), " + + f"{ALWAYS_JUMPS=}" + ) break elif inst2.opcode in RETURNS: to_line = -code.co_firstlineno break - # if to_line is None: - # import contextlib - # with open("/tmp/foo.out", "a") as f: - # with contextlib.redirect_stdout(f): - # print() - # print(f"{code = }") - # print(f"{from_line = }, {to_line = }, {start_at = }") - # dis.dis(code) - # 1/0 if to_line is not None: - log(f"possible branch from @{start_at}: {insts}, {(from_line, to_line)} {code}") - return insts, (from_line, to_line) + log( + f"possible branch from @{start_at}: " + + f"{inst_offsets}, {(from_line, to_line)} {code}" + ) + return inst_offsets, (from_line, to_line) else: - log(f" no possible branch from @{start_at}: {insts}") + log(f" no possible branch from @{start_at}: {inst_offsets}") return [], None code_info.branch_trails[inst.offset] = ( - walkabout(start_at=inst.offset + 2, branch_kind="not-taken"), - walkabout(start_at=inst.jump_target, branch_kind="taken"), + walk_one_branch(start_at=inst.offset + 2, branch_kind="not-taken"), + walk_one_branch(start_at=inst.jump_target, branch_kind="taken"), ) @@ -254,7 +273,7 @@ class CodeInfo: tracing: bool file_data: TTraceFileData | None - byte_to_line: dict[int, int] | None + byte_to_line: dict[TOffset, TLineNo] | None # Keys are start instruction offsets for branches. # Values are two tuples: # ( @@ -263,15 +282,15 @@ class CodeInfo: # ) # Two possible trails from the branch point, left and right. branch_trails: dict[ - int, + TOffset, tuple[ - tuple[list[int], TArc] | None, - tuple[list[int], TArc] | None, - ] + tuple[list[TOffset], TArc | None], + tuple[list[TOffset], TArc | None], + ], ] -def bytes_to_lines(code: CodeType) -> dict[int, int]: +def bytes_to_lines(code: CodeType) -> dict[TOffset, TLineNo]: """Make a dict mapping byte code offsets to line numbers.""" b2l = {} for bstart, bend, lineno in code.co_lines(): @@ -335,15 +354,21 @@ def start(self) -> None: sys_monitoring.use_tool_id(self.myid, "coverage.py") register = functools.partial(sys_monitoring.register_callback, self.myid) events = sys.monitoring.events - import contextlib sys_monitoring.set_events(self.myid, events.PY_START) register(events.PY_START, self.sysmon_py_start) if self.trace_arcs: register(events.PY_RETURN, self.sysmon_py_return) register(events.LINE, self.sysmon_line_arcs) - register(events.BRANCH_RIGHT, self.sysmon_branch_either) # type:ignore[attr-defined] - register(events.BRANCH_LEFT, self.sysmon_branch_either) # type:ignore[attr-defined] + if env.PYBEHAVIOR.branch_right_left: + register( + events.BRANCH_RIGHT, # type:ignore[attr-defined] + self.sysmon_branch_either, + ) + register( + events.BRANCH_LEFT, # type:ignore[attr-defined] + self.sysmon_branch_either, + ) else: register(events.LINE, self.sysmon_line_lines) sys_monitoring.restart_events() @@ -385,7 +410,7 @@ def get_stats(self) -> dict[str, int] | None: @panopticon("code", "@") def sysmon_py_start( # pylint: disable=useless-return - self, code: CodeType, instruction_offset: int + self, code: CodeType, instruction_offset: TOffset ) -> MonitorReturn: """Handle sys.monitoring.events.PY_START events.""" # Entering a new frame. Decide if we should trace in this file. @@ -433,7 +458,7 @@ def sysmon_py_start( # pylint: disable=useless-return branch_trails={}, ) self.code_infos[id(code)] = code_info - populate_branch_trails(code, code_info) # TODO: should be a method? + populate_branch_trails(code, code_info) # TODO: should be a method? self.code_objects.append(code) if tracing_code: @@ -445,7 +470,8 @@ def sysmon_py_start( # pylint: disable=useless-return if self.trace_arcs: assert env.PYBEHAVIOR.branch_right_left local_events |= ( - events.BRANCH_RIGHT | events.BRANCH_LEFT # type:ignore[attr-defined] + events.BRANCH_RIGHT # type:ignore[attr-defined] + | events.BRANCH_LEFT # type:ignore[attr-defined] ) sys_monitoring.set_local_events(self.myid, code, local_events) # 111963: @@ -457,7 +483,7 @@ def sysmon_py_start( # pylint: disable=useless-return def sysmon_py_return( # pylint: disable=useless-return self, code: CodeType, - instruction_offset: int, + instruction_offset: TOffset, retval: object, ) -> MonitorReturn: """Handle sys.monitoring.events.PY_RETURN events for branch coverage.""" @@ -472,7 +498,7 @@ def sysmon_py_return( # pylint: disable=useless-return return None @panopticon("code", "line") - def sysmon_line_lines(self, code: CodeType, line_number: int) -> MonitorReturn: + def sysmon_line_lines(self, code: CodeType, line_number: TLineNo) -> MonitorReturn: """Handle sys.monitoring.events.LINE events for line coverage.""" code_info = self.code_infos[id(code)] if code_info.file_data is not None: @@ -481,7 +507,7 @@ def sysmon_line_lines(self, code: CodeType, line_number: int) -> MonitorReturn: return DISABLE @panopticon("code", "line") - def sysmon_line_arcs(self, code: CodeType, line_number: int) -> MonitorReturn: + def sysmon_line_arcs(self, code: CodeType, line_number: TLineNo) -> MonitorReturn: """Handle sys.monitoring.events.LINE events for branch coverage.""" code_info = self.code_infos[id(code)] if code_info.file_data is not None: @@ -492,7 +518,7 @@ def sysmon_line_arcs(self, code: CodeType, line_number: int) -> MonitorReturn: @panopticon("code", "@", "@") def sysmon_branch_either( - self, code: CodeType, instruction_offset: int, destination_offset: int + self, code: CodeType, instruction_offset: TOffset, destination_offset: TOffset ) -> MonitorReturn: """Handle BRANCH_RIGHT and BRANCH_LEFT events.""" code_info = self.code_infos[id(code)]