162 lines
5.4 KiB
Python
162 lines
5.4 KiB
Python
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
|
|
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
|
|
|
|
"""Invasive patches for coverage.py."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import atexit
|
|
import contextlib
|
|
import os
|
|
import site
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any, NoReturn
|
|
|
|
from coverage import env
|
|
from coverage.debug import NoDebugging, DevNullDebug
|
|
from coverage.exceptions import ConfigError, CoverageException
|
|
|
|
if TYPE_CHECKING:
|
|
from coverage import Coverage
|
|
from coverage.config import CoverageConfig
|
|
from coverage.types import TDebugCtl
|
|
|
|
|
|
def apply_patches(
|
|
cov: Coverage,
|
|
config: CoverageConfig,
|
|
debug: TDebugCtl,
|
|
*,
|
|
make_pth_file: bool = True,
|
|
) -> None:
|
|
"""Apply invasive patches requested by `[run] patch=`."""
|
|
debug = debug if debug.should("patch") else DevNullDebug()
|
|
for patch in sorted(set(config.patch)):
|
|
if patch == "_exit":
|
|
_patch__exit(cov, debug)
|
|
|
|
elif patch == "execv":
|
|
_patch_execv(cov, config, debug)
|
|
|
|
elif patch == "fork":
|
|
_patch_fork(debug)
|
|
|
|
elif patch == "subprocess":
|
|
_patch_subprocess(config, debug, make_pth_file)
|
|
|
|
else:
|
|
raise ConfigError(f"Unknown patch {patch!r}")
|
|
|
|
|
|
def _patch__exit(cov: Coverage, debug: TDebugCtl) -> None:
|
|
"""Patch os._exit."""
|
|
debug.write("Patching _exit")
|
|
|
|
old_exit = os._exit
|
|
|
|
def coverage_os_exit_patch(status: int) -> NoReturn:
|
|
with contextlib.suppress(Exception):
|
|
debug.write(f"Using _exit patch with {cov = }")
|
|
with contextlib.suppress(Exception):
|
|
cov.save()
|
|
old_exit(status)
|
|
|
|
os._exit = coverage_os_exit_patch
|
|
|
|
|
|
def _patch_execv(cov: Coverage, config: CoverageConfig, debug: TDebugCtl) -> None:
|
|
"""Patch the execv family of functions."""
|
|
if env.WINDOWS:
|
|
raise CoverageException("patch=execv isn't supported yet on Windows.")
|
|
|
|
debug.write("Patching execv")
|
|
|
|
def make_execv_patch(fname: str, old_execv: Any) -> Any:
|
|
def coverage_execv_patch(*args: Any, **kwargs: Any) -> Any:
|
|
with contextlib.suppress(Exception):
|
|
debug.write(f"Using execv patch for {fname} with {cov = }")
|
|
with contextlib.suppress(Exception):
|
|
cov.save()
|
|
|
|
if fname.endswith("e"):
|
|
# Assume the `env` argument is passed positionally.
|
|
new_env = args[-1]
|
|
# Pass our configuration in the new environment.
|
|
new_env["COVERAGE_PROCESS_CONFIG"] = config.serialize()
|
|
if env.TESTING:
|
|
# The subprocesses need to use the same core as the main process.
|
|
new_env["COVERAGE_CORE"] = os.getenv("COVERAGE_CORE")
|
|
|
|
# When testing locally, we need to honor the pyc file location
|
|
# or they get written to the .tox directories and pollute the
|
|
# next run with a different core.
|
|
if (cache_prefix := os.getenv("PYTHONPYCACHEPREFIX")) is not None:
|
|
new_env["PYTHONPYCACHEPREFIX"] = cache_prefix
|
|
|
|
# Without this, it fails on PyPy and Ubuntu.
|
|
new_env["PATH"] = os.getenv("PATH")
|
|
old_execv(*args, **kwargs)
|
|
|
|
return coverage_execv_patch
|
|
|
|
# All the exec* and spawn* functions eventually call execv or execve.
|
|
os.execv = make_execv_patch("execv", os.execv)
|
|
os.execve = make_execv_patch("execve", os.execve)
|
|
|
|
|
|
def _patch_fork(debug: TDebugCtl) -> None:
|
|
"""Ensure Coverage is properly reset after a fork."""
|
|
from coverage.control import _after_fork_in_child
|
|
if env.WINDOWS:
|
|
raise CoverageException("patch=fork isn't supported yet on Windows.")
|
|
|
|
debug.write("Patching fork")
|
|
os.register_at_fork(after_in_child=_after_fork_in_child)
|
|
|
|
|
|
def _patch_subprocess(config: CoverageConfig, debug: TDebugCtl, make_pth_file: bool) -> None:
|
|
"""Write .pth files and set environment vars to measure subprocesses."""
|
|
debug.write("Patching subprocess")
|
|
|
|
if make_pth_file:
|
|
pth_files = create_pth_files(debug)
|
|
def delete_pth_files() -> None:
|
|
for p in pth_files:
|
|
debug.write(f"Deleting subprocess .pth file: {str(p)!r}")
|
|
p.unlink(missing_ok=True)
|
|
atexit.register(delete_pth_files)
|
|
assert config.config_file is not None
|
|
os.environ["COVERAGE_PROCESS_CONFIG"] = config.serialize()
|
|
|
|
|
|
# Writing .pth files is not obvious. On Windows, getsitepackages() returns two
|
|
# directories. A .pth file in the first will be run, but coverage isn't
|
|
# importable yet. We write into all the places we can, but with defensive
|
|
# import code.
|
|
|
|
PTH_CODE = """\
|
|
try:
|
|
import coverage
|
|
except:
|
|
pass
|
|
else:
|
|
coverage.process_startup()
|
|
"""
|
|
|
|
PTH_TEXT = f"import sys; exec({PTH_CODE!r})\n"
|
|
|
|
def create_pth_files(debug: TDebugCtl = NoDebugging()) -> list[Path]:
|
|
"""Create .pth files for measuring subprocesses."""
|
|
pth_files = []
|
|
for pth_dir in site.getsitepackages():
|
|
pth_file = Path(pth_dir) / f"subcover_{os.getpid()}.pth"
|
|
try:
|
|
if debug.should("patch"):
|
|
debug.write(f"Writing subprocess .pth file: {str(pth_file)!r}")
|
|
pth_file.write_text(PTH_TEXT, encoding="utf-8")
|
|
except OSError: # pragma: cant happen
|
|
continue
|
|
else:
|
|
pth_files.append(pth_file)
|
|
return pth_files
|