collectstatic.py 14.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
import os

from django.apps import apps
from django.contrib.staticfiles.finders import get_finders
from django.contrib.staticfiles.storage import staticfiles_storage
from django.core.checks import Tags
from django.core.files.storage import FileSystemStorage
from django.core.management.base import BaseCommand, CommandError
from django.core.management.color import no_style
from django.utils.functional import cached_property


class Command(BaseCommand):
    """
    Copies or symlinks static files from different locations to the
    settings.STATIC_ROOT.
    """

    help = "Collect static files in a single location."
    requires_system_checks = [Tags.staticfiles]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.copied_files = []
        self.symlinked_files = []
        self.unmodified_files = []
        self.post_processed_files = []
        self.storage = staticfiles_storage
        self.style = no_style()

    @cached_property
    def local(self):
        try:
            self.storage.path("")
        except NotImplementedError:
            return False
        return True

    def add_arguments(self, parser):
        parser.add_argument(
            "--noinput",
            "--no-input",
            action="store_false",
            dest="interactive",
            help="Do NOT prompt the user for input of any kind.",
        )
        parser.add_argument(
            "--no-post-process",
            action="store_false",
            dest="post_process",
            help="Do NOT post process collected files.",
        )
        parser.add_argument(
            "-i",
            "--ignore",
            action="append",
            default=[],
            dest="ignore_patterns",
            metavar="PATTERN",
            help="Ignore files or directories matching this glob-style "
            "pattern. Use multiple times to ignore more.",
        )
        parser.add_argument(
            "-n",
            "--dry-run",
            action="store_true",
            help="Do everything except modify the filesystem.",
        )
        parser.add_argument(
            "-c",
            "--clear",
            action="store_true",
            help="Clear the existing files using the storage "
            "before trying to copy or link the original file.",
        )
        parser.add_argument(
            "-l",
            "--link",
            action="store_true",
            help="Create a symbolic link to each file instead of copying.",
        )
        parser.add_argument(
            "--no-default-ignore",
            action="store_false",
            dest="use_default_ignore_patterns",
            help=(
                "Don't ignore the common private glob-style patterns (defaults to "
                "'CVS', '.*' and '*~')."
            ),
        )

    def set_options(self, **options):
        """
        Set instance variables based on an options dict
        """
        self.interactive = options["interactive"]
        self.verbosity = options["verbosity"]
        self.symlink = options["link"]
        self.clear = options["clear"]
        self.dry_run = options["dry_run"]
        ignore_patterns = options["ignore_patterns"]
        if options["use_default_ignore_patterns"]:
            ignore_patterns += apps.get_app_config("staticfiles").ignore_patterns
        self.ignore_patterns = list({os.path.normpath(p) for p in ignore_patterns})
        self.post_process = options["post_process"]

    def collect(self):
        """
        Perform the bulk of the work of collectstatic.

        Split off from handle() to facilitate testing.
        """
        if self.symlink and not self.local:
            raise CommandError("Can't symlink to a remote destination.")

        if self.clear:
            self.clear_dir("")

        if self.symlink:
            handler = self.link_file
        else:
            handler = self.copy_file

        found_files = {}
        for finder in get_finders():
            for path, storage in finder.list(self.ignore_patterns):
                # Prefix the relative path if the source storage contains it
                if getattr(storage, "prefix", None):
                    prefixed_path = os.path.join(storage.prefix, path)
                else:
                    prefixed_path = path

                if prefixed_path not in found_files:
                    found_files[prefixed_path] = (storage, path)
                    handler(path, prefixed_path, storage)
                else:
                    self.log(
                        "Found another file with the destination path '%s'. It "
                        "will be ignored since only the first encountered file "
                        "is collected. If this is not what you want, make sure "
                        "every static file has a unique path." % prefixed_path,
                        level=1,
                    )

        # Storage backends may define a post_process() method.
        if self.post_process and hasattr(self.storage, "post_process"):
            processor = self.storage.post_process(found_files, dry_run=self.dry_run)
            for original_path, processed_path, processed in processor:
                if isinstance(processed, Exception):
                    self.stderr.write("Post-processing '%s' failed!" % original_path)
                    # Add a blank line before the traceback, otherwise it's
                    # too easy to miss the relevant part of the error message.
                    self.stderr.write()
                    raise processed
                if processed:
                    self.log(
                        "Post-processed '%s' as '%s'" % (original_path, processed_path),
                        level=2,
                    )
                    self.post_processed_files.append(original_path)
                else:
                    self.log("Skipped post-processing '%s'" % original_path)

        return {
            "modified": self.copied_files + self.symlinked_files,
            "unmodified": self.unmodified_files,
            "post_processed": self.post_processed_files,
        }

    def handle(self, **options):
        self.set_options(**options)
        message = ["\n"]
        if self.dry_run:
            message.append(
                "You have activated the --dry-run option so no files will be "
                "modified.\n\n"
            )

        message.append(
            "You have requested to collect static files at the destination\n"
            "location as specified in your settings"
        )

        if self.is_local_storage() and self.storage.location:
            destination_path = self.storage.location
            message.append(":\n\n    %s\n\n" % destination_path)
            should_warn_user = self.storage.exists(destination_path) and any(
                self.storage.listdir(destination_path)
            )
        else:
            destination_path = None
            message.append(".\n\n")
            # Destination files existence not checked; play it safe and warn.
            should_warn_user = True

        if self.interactive and should_warn_user:
            if self.clear:
                message.append("This will DELETE ALL FILES in this location!\n")
            else:
                message.append("This will overwrite existing files!\n")

            message.append(
                "Are you sure you want to do this?\n\n"
                "Type 'yes' to continue, or 'no' to cancel: "
            )
            if input("".join(message)) != "yes":
                raise CommandError("Collecting static files cancelled.")

        collected = self.collect()

        if self.verbosity >= 1:
            modified_count = len(collected["modified"])
            unmodified_count = len(collected["unmodified"])
            post_processed_count = len(collected["post_processed"])
            return (
                "\n%(modified_count)s %(identifier)s %(action)s"
                "%(destination)s%(unmodified)s%(post_processed)s."
            ) % {
                "modified_count": modified_count,
                "identifier": "static file" + ("" if modified_count == 1 else "s"),
                "action": "symlinked" if self.symlink else "copied",
                "destination": (
                    " to '%s'" % destination_path if destination_path else ""
                ),
                "unmodified": (
                    ", %s unmodified" % unmodified_count
                    if collected["unmodified"]
                    else ""
                ),
                "post_processed": (
                    collected["post_processed"]
                    and ", %s post-processed" % post_processed_count
                    or ""
                ),
            }

    def log(self, msg, level=2):
        """
        Small log helper
        """
        if self.verbosity >= level:
            self.stdout.write(msg)

    def is_local_storage(self):
        return isinstance(self.storage, FileSystemStorage)

    def clear_dir(self, path):
        """
        Delete the given relative path using the destination storage backend.
        """
        if not self.storage.exists(path):
            return

        dirs, files = self.storage.listdir(path)
        for f in files:
            fpath = os.path.join(path, f)
            if self.dry_run:
                self.log("Pretending to delete '%s'" % fpath, level=1)
            else:
                self.log("Deleting '%s'" % fpath, level=1)
                try:
                    full_path = self.storage.path(fpath)
                except NotImplementedError:
                    self.storage.delete(fpath)
                else:
                    if not os.path.exists(full_path) and os.path.lexists(full_path):
                        # Delete broken symlinks
                        os.unlink(full_path)
                    else:
                        self.storage.delete(fpath)
        for d in dirs:
            self.clear_dir(os.path.join(path, d))

    def delete_file(self, path, prefixed_path, source_storage):
        """
        Check if the target file should be deleted if it already exists.
        """
        if self.storage.exists(prefixed_path):
            try:
                # When was the target file modified last time?
                target_last_modified = self.storage.get_modified_time(prefixed_path)
            except (OSError, NotImplementedError, AttributeError):
                # The storage doesn't support get_modified_time() or failed
                pass
            else:
                try:
                    # When was the source file modified last time?
                    source_last_modified = source_storage.get_modified_time(path)
                except (OSError, NotImplementedError, AttributeError):
                    pass
                else:
                    # The full path of the target file
                    if self.local:
                        full_path = self.storage.path(prefixed_path)
                        # If it's --link mode and the path isn't a link (i.e.
                        # the previous collectstatic wasn't with --link) or if
                        # it's non-link mode and the path is a link (i.e. the
                        # previous collectstatic was with --link), the old
                        # links/files must be deleted so it's not safe to skip
                        # unmodified files.
                        can_skip_unmodified_files = not (
                            self.symlink ^ os.path.islink(full_path)
                        )
                    else:
                        # In remote storages, skipping is only based on the
                        # modified times since symlinks aren't relevant.
                        can_skip_unmodified_files = True
                    # Avoid sub-second precision (see #14665, #19540)
                    file_is_unmodified = target_last_modified.replace(
                        microsecond=0
                    ) >= source_last_modified.replace(microsecond=0)
                    if file_is_unmodified and can_skip_unmodified_files:
                        if prefixed_path not in self.unmodified_files:
                            self.unmodified_files.append(prefixed_path)
                        self.log("Skipping '%s' (not modified)" % path)
                        return False
            # Then delete the existing file if really needed
            if self.dry_run:
                self.log("Pretending to delete '%s'" % path)
            else:
                self.log("Deleting '%s'" % path)
                self.storage.delete(prefixed_path)
        return True

    def link_file(self, path, prefixed_path, source_storage):
        """
        Attempt to link ``path``
        """
        # Skip this file if it was already copied earlier
        if prefixed_path in self.symlinked_files:
            return self.log("Skipping '%s' (already linked earlier)" % path)
        # Delete the target file if needed or break
        if not self.delete_file(path, prefixed_path, source_storage):
            return
        # The full path of the source file
        source_path = source_storage.path(path)
        # Finally link the file
        if self.dry_run:
            self.log("Pretending to link '%s'" % source_path, level=1)
        else:
            self.log("Linking '%s'" % source_path, level=2)
            full_path = self.storage.path(prefixed_path)
            os.makedirs(os.path.dirname(full_path), exist_ok=True)
            try:
                if os.path.lexists(full_path):
                    os.unlink(full_path)
                os.symlink(source_path, full_path)
            except NotImplementedError:
                import platform

                raise CommandError(
                    "Symlinking is not supported in this "
                    "platform (%s)." % platform.platform()
                )
            except OSError as e:
                raise CommandError(e)
        if prefixed_path not in self.symlinked_files:
            self.symlinked_files.append(prefixed_path)

    def copy_file(self, path, prefixed_path, source_storage):
        """
        Attempt to copy ``path`` with storage
        """
        # Skip this file if it was already copied earlier
        if prefixed_path in self.copied_files:
            return self.log("Skipping '%s' (already copied earlier)" % path)
        # Delete the target file if needed or break
        if not self.delete_file(path, prefixed_path, source_storage):
            return
        # The full path of the source file
        source_path = source_storage.path(path)
        # Finally start copying
        if self.dry_run:
            self.log("Pretending to copy '%s'" % source_path, level=1)
        else:
            self.log("Copying '%s'" % source_path, level=2)
            with source_storage.open(path) as source_file:
                self.storage.save(prefixed_path, source_file)
        self.copied_files.append(prefixed_path)