req_tracker.py 4.29 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
import contextlib
import hashlib
import logging
import os
from types import TracebackType
from typing import Dict, Iterator, Optional, Set, Type, Union

from pip._internal.models.link import Link
from pip._internal.req.req_install import InstallRequirement
from pip._internal.utils.temp_dir import TempDirectory

logger = logging.getLogger(__name__)


@contextlib.contextmanager
def update_env_context_manager(**changes):
    # type: (str) -> Iterator[None]
    target = os.environ

    # Save values from the target and change them.
    non_existent_marker = object()
    saved_values = {}  # type: Dict[str, Union[object, str]]
    for name, new_value in changes.items():
        try:
            saved_values[name] = target[name]
        except KeyError:
            saved_values[name] = non_existent_marker
        target[name] = new_value

    try:
        yield
    finally:
        # Restore original values in the target.
        for name, original_value in saved_values.items():
            if original_value is non_existent_marker:
                del target[name]
            else:
                assert isinstance(original_value, str)  # for mypy
                target[name] = original_value


@contextlib.contextmanager
def get_requirement_tracker():
    # type: () -> Iterator[RequirementTracker]
    root = os.environ.get('PIP_REQ_TRACKER')
    with contextlib.ExitStack() as ctx:
        if root is None:
            root = ctx.enter_context(
                TempDirectory(kind='req-tracker')
            ).path
            ctx.enter_context(update_env_context_manager(PIP_REQ_TRACKER=root))
            logger.debug("Initialized build tracking at %s", root)

        with RequirementTracker(root) as tracker:
            yield tracker


class RequirementTracker:

    def __init__(self, root):
        # type: (str) -> None
        self._root = root
        self._entries = set()  # type: Set[InstallRequirement]
        logger.debug("Created build tracker: %s", self._root)

    def __enter__(self):
        # type: () -> RequirementTracker
        logger.debug("Entered build tracker: %s", self._root)
        return self

    def __exit__(
        self,
        exc_type,  # type: Optional[Type[BaseException]]
        exc_val,  # type: Optional[BaseException]
        exc_tb  # type: Optional[TracebackType]
    ):
        # type: (...) -> None
        self.cleanup()

    def _entry_path(self, link):
        # type: (Link) -> str
        hashed = hashlib.sha224(link.url_without_fragment.encode()).hexdigest()
        return os.path.join(self._root, hashed)

    def add(self, req):
        # type: (InstallRequirement) -> None
        """Add an InstallRequirement to build tracking.
        """

        assert req.link
        # Get the file to write information about this requirement.
        entry_path = self._entry_path(req.link)

        # Try reading from the file. If it exists and can be read from, a build
        # is already in progress, so a LookupError is raised.
        try:
            with open(entry_path) as fp:
                contents = fp.read()
        except FileNotFoundError:
            pass
        else:
            message = '{} is already being built: {}'.format(
                req.link, contents)
            raise LookupError(message)

        # If we're here, req should really not be building already.
        assert req not in self._entries

        # Start tracking this requirement.
        with open(entry_path, 'w', encoding="utf-8") as fp:
            fp.write(str(req))
        self._entries.add(req)

        logger.debug('Added %s to build tracker %r', req, self._root)

    def remove(self, req):
        # type: (InstallRequirement) -> None
        """Remove an InstallRequirement from build tracking.
        """

        assert req.link
        # Delete the created file and the corresponding entries.
        os.unlink(self._entry_path(req.link))
        self._entries.remove(req)

        logger.debug('Removed %s from build tracker %r', req, self._root)

    def cleanup(self):
        # type: () -> None
        for req in set(self._entries):
            self.remove(req)

        logger.debug("Removed build tracker: %r", self._root)

    @contextlib.contextmanager
    def track(self, req):
        # type: (InstallRequirement) -> Iterator[None]
        self.add(req)
        yield
        self.remove(req)