Commit 14f7b403 authored by Ysobel Vera's avatar Ysobel Vera

Made the homepage

Made the content for the homepage app and started on making the content for the about app
parents

Too many changes to show.

To preserve performance only 1000 of 1000+ files are displayed.

Maria Ysobel Lourdes A. Vera
216232
BS CS-DGDD
CSCI 40-F
Lab 01: Song Library
February 13, 2023
This lab has been truthfully completed by me.
<sgd> Maria Ysobel Lourdes A. Vera, February 13, 2023
This diff is collapsed.
Copyright (c) Django Software Foundation and individual contributors.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of Django nor the names of its contributors may be used
to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Metadata-Version: 2.1
Name: Django
Version: 4.1.6
Summary: A high-level Python web framework that encourages rapid development and clean, pragmatic design.
Home-page: https://www.djangoproject.com/
Author: Django Software Foundation
Author-email: foundation@djangoproject.com
License: BSD-3-Clause
Project-URL: Documentation, https://docs.djangoproject.com/
Project-URL: Release notes, https://docs.djangoproject.com/en/stable/releases/
Project-URL: Funding, https://www.djangoproject.com/fundraising/
Project-URL: Source, https://github.com/django/django
Project-URL: Tracker, https://code.djangoproject.com/
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Web Environment
Classifier: Framework :: Django
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: BSD License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Topic :: Internet :: WWW/HTTP
Classifier: Topic :: Internet :: WWW/HTTP :: Dynamic Content
Classifier: Topic :: Internet :: WWW/HTTP :: WSGI
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.8
License-File: LICENSE
License-File: LICENSE.python
License-File: AUTHORS
Requires-Dist: asgiref (<4,>=3.5.2)
Requires-Dist: sqlparse (>=0.2.2)
Requires-Dist: backports.zoneinfo ; python_version < "3.9"
Requires-Dist: tzdata ; sys_platform == "win32"
Provides-Extra: argon2
Requires-Dist: argon2-cffi (>=19.1.0) ; extra == 'argon2'
Provides-Extra: bcrypt
Requires-Dist: bcrypt ; extra == 'bcrypt'
======
Django
======
Django is a high-level Python web framework that encourages rapid development
and clean, pragmatic design. Thanks for checking it out.
All documentation is in the "``docs``" directory and online at
https://docs.djangoproject.com/en/stable/. If you're just getting started,
here's how we recommend you read the docs:
* First, read ``docs/intro/install.txt`` for instructions on installing Django.
* Next, work through the tutorials in order (``docs/intro/tutorial01.txt``,
``docs/intro/tutorial02.txt``, etc.).
* If you want to set up an actual deployment server, read
``docs/howto/deployment/index.txt`` for instructions.
* You'll probably want to read through the topical guides (in ``docs/topics``)
next; from there you can jump to the HOWTOs (in ``docs/howto``) for specific
problems, and check out the reference (``docs/ref``) for gory details.
* See ``docs/README`` for instructions on building an HTML version of the docs.
Docs are updated rigorously. If you find any problems in the docs, or think
they should be clarified in any way, please take 30 seconds to fill out a
ticket here: https://code.djangoproject.com/newticket
To get more help:
* Join the ``#django`` channel on ``irc.libera.chat``. Lots of helpful people
hang out there. See https://web.libera.chat if you're new to IRC.
* Join the django-users mailing list, or read the archives, at
https://groups.google.com/group/django-users.
To contribute to Django:
* Check out https://docs.djangoproject.com/en/dev/internals/contributing/ for
information about getting involved.
To run Django's test suite:
* Follow the instructions in the "Unit tests" section of
``docs/internals/contributing/writing-code/unit-tests.txt``, published online at
https://docs.djangoproject.com/en/dev/internals/contributing/writing-code/unit-tests/#running-the-unit-tests
Supporting the Development of Django
====================================
Django's development depends on your contributions.
If you depend on Django, remember to support the Django Software Foundation: https://www.djangoproject.com/fundraising/
This diff is collapsed.
Wheel-Version: 1.0
Generator: bdist_wheel (0.37.1)
Root-Is-Purelib: true
Tag: py3-none-any
[console_scripts]
django-admin = django.core.management:execute_from_command_line
# don't import any costly modules
import sys
import os
is_pypy = '__pypy__' in sys.builtin_module_names
def warn_distutils_present():
if 'distutils' not in sys.modules:
return
if is_pypy and sys.version_info < (3, 7):
# PyPy for 3.6 unconditionally imports distutils, so bypass the warning
# https://foss.heptapod.net/pypy/pypy/-/blob/be829135bc0d758997b3566062999ee8b23872b4/lib-python/3/site.py#L250
return
import warnings
warnings.warn(
"Distutils was imported before Setuptools, but importing Setuptools "
"also replaces the `distutils` module in `sys.modules`. This may lead "
"to undesirable behaviors or errors. To avoid these issues, avoid "
"using distutils directly, ensure that setuptools is installed in the "
"traditional way (e.g. not an editable install), and/or make sure "
"that setuptools is always imported before distutils."
)
def clear_distutils():
if 'distutils' not in sys.modules:
return
import warnings
warnings.warn("Setuptools is replacing distutils.")
mods = [
name
for name in sys.modules
if name == "distutils" or name.startswith("distutils.")
]
for name in mods:
del sys.modules[name]
def enabled():
"""
Allow selection of distutils by environment variable.
"""
which = os.environ.get('SETUPTOOLS_USE_DISTUTILS', 'local')
return which == 'local'
def ensure_local_distutils():
import importlib
clear_distutils()
# With the DistutilsMetaFinder in place,
# perform an import to cause distutils to be
# loaded from setuptools._distutils. Ref #2906.
with shim():
importlib.import_module('distutils')
# check that submodules load as expected
core = importlib.import_module('distutils.core')
assert '_distutils' in core.__file__, core.__file__
assert 'setuptools._distutils.log' not in sys.modules
def do_override():
"""
Ensure that the local copy of distutils is preferred over stdlib.
See https://github.com/pypa/setuptools/issues/417#issuecomment-392298401
for more motivation.
"""
if enabled():
warn_distutils_present()
ensure_local_distutils()
class _TrivialRe:
def __init__(self, *patterns):
self._patterns = patterns
def match(self, string):
return all(pat in string for pat in self._patterns)
class DistutilsMetaFinder:
def find_spec(self, fullname, path, target=None):
# optimization: only consider top level modules and those
# found in the CPython test suite.
if path is not None and not fullname.startswith('test.'):
return
method_name = 'spec_for_{fullname}'.format(**locals())
method = getattr(self, method_name, lambda: None)
return method()
def spec_for_distutils(self):
if self.is_cpython():
return
import importlib
import importlib.abc
import importlib.util
try:
mod = importlib.import_module('setuptools._distutils')
except Exception:
# There are a couple of cases where setuptools._distutils
# may not be present:
# - An older Setuptools without a local distutils is
# taking precedence. Ref #2957.
# - Path manipulation during sitecustomize removes
# setuptools from the path but only after the hook
# has been loaded. Ref #2980.
# In either case, fall back to stdlib behavior.
return
class DistutilsLoader(importlib.abc.Loader):
def create_module(self, spec):
mod.__name__ = 'distutils'
return mod
def exec_module(self, module):
pass
return importlib.util.spec_from_loader(
'distutils', DistutilsLoader(), origin=mod.__file__
)
@staticmethod
def is_cpython():
"""
Suppress supplying distutils for CPython (build and tests).
Ref #2965 and #3007.
"""
return os.path.isfile('pybuilddir.txt')
def spec_for_pip(self):
"""
Ensure stdlib distutils when running under pip.
See pypa/pip#8761 for rationale.
"""
if self.pip_imported_during_build():
return
clear_distutils()
self.spec_for_distutils = lambda: None
@classmethod
def pip_imported_during_build(cls):
"""
Detect if pip is being imported in a build script. Ref #2355.
"""
import traceback
return any(
cls.frame_file_is_setup(frame) for frame, line in traceback.walk_stack(None)
)
@staticmethod
def frame_file_is_setup(frame):
"""
Return True if the indicated frame suggests a setup.py file.
"""
# some frames may not have __file__ (#2940)
return frame.f_globals.get('__file__', '').endswith('setup.py')
def spec_for_sensitive_tests(self):
"""
Ensure stdlib distutils when running select tests under CPython.
python/cpython#91169
"""
clear_distutils()
self.spec_for_distutils = lambda: None
sensitive_tests = (
[
'test.test_distutils',
'test.test_peg_generator',
'test.test_importlib',
]
if sys.version_info < (3, 10)
else [
'test.test_distutils',
]
)
for name in DistutilsMetaFinder.sensitive_tests:
setattr(
DistutilsMetaFinder,
f'spec_for_{name}',
DistutilsMetaFinder.spec_for_sensitive_tests,
)
DISTUTILS_FINDER = DistutilsMetaFinder()
def add_shim():
DISTUTILS_FINDER in sys.meta_path or insert_shim()
class shim:
def __enter__(self):
insert_shim()
def __exit__(self, exc, value, tb):
remove_shim()
def insert_shim():
sys.meta_path.insert(0, DISTUTILS_FINDER)
def remove_shim():
try:
sys.meta_path.remove(DISTUTILS_FINDER)
except ValueError:
pass
__import__('_distutils_hack').do_override()
Copyright (c) Django Software Foundation and individual contributors.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of Django nor the names of its contributors may be used
to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Metadata-Version: 2.1
Name: asgiref
Version: 3.6.0
Summary: ASGI specs, helper code, and adapters
Home-page: https://github.com/django/asgiref/
Author: Django Software Foundation
Author-email: foundation@djangoproject.com
License: BSD-3-Clause
Project-URL: Documentation, https://asgi.readthedocs.io/
Project-URL: Further Documentation, https://docs.djangoproject.com/en/stable/topics/async/#async-adapter-functions
Project-URL: Changelog, https://github.com/django/asgiref/blob/master/CHANGELOG.txt
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Web Environment
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: BSD License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Topic :: Internet :: WWW/HTTP
Requires-Python: >=3.7
License-File: LICENSE
Requires-Dist: typing-extensions ; python_version < "3.8"
Provides-Extra: tests
Requires-Dist: pytest ; extra == 'tests'
Requires-Dist: pytest-asyncio ; extra == 'tests'
Requires-Dist: mypy (>=0.800) ; extra == 'tests'
asgiref
=======
.. image:: https://api.travis-ci.org/django/asgiref.svg
:target: https://travis-ci.org/django/asgiref
.. image:: https://img.shields.io/pypi/v/asgiref.svg
:target: https://pypi.python.org/pypi/asgiref
ASGI is a standard for Python asynchronous web apps and servers to communicate
with each other, and positioned as an asynchronous successor to WSGI. You can
read more at https://asgi.readthedocs.io/en/latest/
This package includes ASGI base libraries, such as:
* Sync-to-async and async-to-sync function wrappers, ``asgiref.sync``
* Server base classes, ``asgiref.server``
* A WSGI-to-ASGI adapter, in ``asgiref.wsgi``
Function wrappers
-----------------
These allow you to wrap or decorate async or sync functions to call them from
the other style (so you can call async functions from a synchronous thread,
or vice-versa).
In particular:
* AsyncToSync lets a synchronous subthread stop and wait while the async
function is called on the main thread's event loop, and then control is
returned to the thread when the async function is finished.
* SyncToAsync lets async code call a synchronous function, which is run in
a threadpool and control returned to the async coroutine when the synchronous
function completes.
The idea is to make it easier to call synchronous APIs from async code and
asynchronous APIs from synchronous code so it's easier to transition code from
one style to the other. In the case of Channels, we wrap the (synchronous)
Django view system with SyncToAsync to allow it to run inside the (asynchronous)
ASGI server.
Note that exactly what threads things run in is very specific, and aimed to
keep maximum compatibility with old synchronous code. See
"Synchronous code & Threads" below for a full explanation. By default,
``sync_to_async`` will run all synchronous code in the program in the same
thread for safety reasons; you can disable this for more performance with
``@sync_to_async(thread_sensitive=False)``, but make sure that your code does
not rely on anything bound to threads (like database connections) when you do.
Threadlocal replacement
-----------------------
This is a drop-in replacement for ``threading.local`` that works with both
threads and asyncio Tasks. Even better, it will proxy values through from a
task-local context to a thread-local context when you use ``sync_to_async``
to run things in a threadpool, and vice-versa for ``async_to_sync``.
If you instead want true thread- and task-safety, you can set
``thread_critical`` on the Local object to ensure this instead.
Server base classes
-------------------
Includes a ``StatelessServer`` class which provides all the hard work of
writing a stateless server (as in, does not handle direct incoming sockets
but instead consumes external streams or sockets to work out what is happening).
An example of such a server would be a chatbot server that connects out to
a central chat server and provides a "connection scope" per user chatting to
it. There's only one actual connection, but the server has to separate things
into several scopes for easier writing of the code.
You can see an example of this being used in `frequensgi <https://github.com/andrewgodwin/frequensgi>`_.
WSGI-to-ASGI adapter
--------------------
Allows you to wrap a WSGI application so it appears as a valid ASGI application.
Simply wrap it around your WSGI application like so::
asgi_application = WsgiToAsgi(wsgi_application)
The WSGI application will be run in a synchronous threadpool, and the wrapped
ASGI application will be one that accepts ``http`` class messages.
Please note that not all extended features of WSGI may be supported (such as
file handles for incoming POST bodies).
Dependencies
------------
``asgiref`` requires Python 3.7 or higher.
Contributing
------------
Please refer to the
`main Channels contributing docs <https://github.com/django/channels/blob/master/CONTRIBUTING.rst>`_.
Testing
'''''''
To run tests, make sure you have installed the ``tests`` extra with the package::
cd asgiref/
pip install -e .[tests]
pytest
Building the documentation
''''''''''''''''''''''''''
The documentation uses `Sphinx <http://www.sphinx-doc.org>`_::
cd asgiref/docs/
pip install sphinx
To build the docs, you can use the default tools::
sphinx-build -b html . _build/html # or `make html`, if you've got make set up
cd _build/html
python -m http.server
...or you can use ``sphinx-autobuild`` to run a server and rebuild/reload
your documentation changes automatically::
pip install sphinx-autobuild
sphinx-autobuild . _build/html
Releasing
'''''''''
To release, first add details to CHANGELOG.txt and update the version number in ``asgiref/__init__.py``.
Then, build and push the packages::
python -m build
twine upload dist/*
rm -r build/ dist/
Implementation Details
----------------------
Synchronous code & threads
''''''''''''''''''''''''''
The ``asgiref.sync`` module provides two wrappers that let you go between
asynchronous and synchronous code at will, while taking care of the rough edges
for you.
Unfortunately, the rough edges are numerous, and the code has to work especially
hard to keep things in the same thread as much as possible. Notably, the
restrictions we are working with are:
* All synchronous code called through ``SyncToAsync`` and marked with
``thread_sensitive`` should run in the same thread as each other (and if the
outer layer of the program is synchronous, the main thread)
* If a thread already has a running async loop, ``AsyncToSync`` can't run things
on that loop if it's blocked on synchronous code that is above you in the
call stack.
The first compromise you get to might be that ``thread_sensitive`` code should
just run in the same thread and not spawn in a sub-thread, fulfilling the first
restriction, but that immediately runs you into the second restriction.
The only real solution is to essentially have a variant of ThreadPoolExecutor
that executes any ``thread_sensitive`` code on the outermost synchronous
thread - either the main thread, or a single spawned subthread.
This means you now have two basic states:
* If the outermost layer of your program is synchronous, then all async code
run through ``AsyncToSync`` will run in a per-call event loop in arbitrary
sub-threads, while all ``thread_sensitive`` code will run in the main thread.
* If the outermost layer of your program is asynchronous, then all async code
runs on the main thread's event loop, and all ``thread_sensitive`` synchronous
code will run in a single shared sub-thread.
Crucially, this means that in both cases there is a thread which is a shared
resource that all ``thread_sensitive`` code must run on, and there is a chance
that this thread is currently blocked on its own ``AsyncToSync`` call. Thus,
``AsyncToSync`` needs to act as an executor for thread code while it's blocking.
The ``CurrentThreadExecutor`` class provides this functionality; rather than
simply waiting on a Future, you can call its ``run_until_future`` method and
it will run submitted code until that Future is done. This means that code
inside the call can then run code on your thread.
Maintenance and Security
------------------------
To report security issues, please contact security@djangoproject.com. For GPG
signatures and more security process information, see
https://docs.djangoproject.com/en/dev/internals/security/.
To report bugs or request new features, please open a new GitHub issue.
This repository is part of the Channels project. For the shepherd and maintenance team, please see the
`main Channels readme <https://github.com/django/channels/blob/master/README.rst>`_.
asgiref-3.6.0.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
asgiref-3.6.0.dist-info/LICENSE,sha256=uEZBXRtRTpwd_xSiLeuQbXlLxUbKYSn5UKGM0JHipmk,1552
asgiref-3.6.0.dist-info/METADATA,sha256=wuGJxjZfcfLGgT2MMqK9YK5kV9LEGpVoNzcXI2lRhWM,9203
asgiref-3.6.0.dist-info/RECORD,,
asgiref-3.6.0.dist-info/WHEEL,sha256=G16H4A3IeoQmnOrYV4ueZGKSjhipXx8zc8nu9FGlvMA,92
asgiref-3.6.0.dist-info/top_level.txt,sha256=bokQjCzwwERhdBiPdvYEZa4cHxT4NCeAffQNUqJ8ssg,8
asgiref/__init__.py,sha256=pnBAJI_5J7ByFo6sxUkzCfK3L9wcGJP7yeF7OJLugqk,22
asgiref/__pycache__/__init__.cpython-310.pyc,,
asgiref/__pycache__/compatibility.cpython-310.pyc,,
asgiref/__pycache__/current_thread_executor.cpython-310.pyc,,
asgiref/__pycache__/local.cpython-310.pyc,,
asgiref/__pycache__/server.cpython-310.pyc,,
asgiref/__pycache__/sync.cpython-310.pyc,,
asgiref/__pycache__/testing.cpython-310.pyc,,
asgiref/__pycache__/timeout.cpython-310.pyc,,
asgiref/__pycache__/typing.cpython-310.pyc,,
asgiref/__pycache__/wsgi.cpython-310.pyc,,
asgiref/compatibility.py,sha256=DhY1SOpOvOw0Y1lSEjCqg-znRUQKecG3LTaV48MZi68,1606
asgiref/current_thread_executor.py,sha256=oeH8zv2tTmcbpxdUmOSMzbEXzeY5nJzIMFvzprE95gA,2801
asgiref/local.py,sha256=nx5RqVFLYgUJVaxzApuQUW7dd9y21sruMYdgISoRs1k,4854
asgiref/py.typed,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
asgiref/server.py,sha256=egTQhZo1k4G0F7SSBQNp_VOekpGcjBJZU2kkCoiGC_M,6005
asgiref/sync.py,sha256=R5J8l_o5VmYtMGNWMANk5PUMOUApmIKK8vKogGCHTaM,20664
asgiref/testing.py,sha256=3byNRV7Oto_Fg8Z-fErQJ3yGf7OQlcUexbN_cDQugzQ,3119
asgiref/timeout.py,sha256=LtGL-xQpG8JHprdsEUCMErJ0kNWj4qwWZhEHJ3iKu4s,3627
asgiref/typing.py,sha256=NJiEadpn0IEZ6uCTDq3X6G8WtlwfnMgnt1AQ1u7Yem0,5843
asgiref/wsgi.py,sha256=-L0eo_uK_dq7EPjv1meW1BRGytURaO9NPESxnJc9CtA,6575
Wheel-Version: 1.0
Generator: bdist_wheel (0.37.1)
Root-Is-Purelib: true
Tag: py3-none-any
import inspect
from .sync import iscoroutinefunction
def is_double_callable(application):
"""
Tests to see if an application is a legacy-style (double-callable) application.
"""
# Look for a hint on the object first
if getattr(application, "_asgi_single_callable", False):
return False
if getattr(application, "_asgi_double_callable", False):
return True
# Uninstanted classes are double-callable
if inspect.isclass(application):
return True
# Instanted classes depend on their __call__
if hasattr(application, "__call__"):
# We only check to see if its __call__ is a coroutine function -
# if it's not, it still might be a coroutine function itself.
if iscoroutinefunction(application.__call__):
return False
# Non-classes we just check directly
return not iscoroutinefunction(application)
def double_to_single_callable(application):
"""
Transforms a double-callable ASGI application into a single-callable one.
"""
async def new_application(scope, receive, send):
instance = application(scope)
return await instance(receive, send)
return new_application
def guarantee_single_callable(application):
"""
Takes either a single- or double-callable application and always returns it
in single-callable style. Use this to add backwards compatibility for ASGI
2.0 applications to your server/test harness/etc.
"""
if is_double_callable(application):
application = double_to_single_callable(application)
return application
import queue
import threading
from concurrent.futures import Executor, Future
class _WorkItem:
"""
Represents an item needing to be run in the executor.
Copied from ThreadPoolExecutor (but it's private, so we're not going to rely on importing it)
"""
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
self.future.set_exception(exc)
# Break a reference cycle with the exception 'exc'
self = None
else:
self.future.set_result(result)
class CurrentThreadExecutor(Executor):
"""
An Executor that actually runs code in the thread it is instantiated in.
Passed to other threads running async code, so they can run sync code in
the thread they came from.
"""
def __init__(self):
self._work_thread = threading.current_thread()
self._work_queue = queue.Queue()
self._broken = False
def run_until_future(self, future):
"""
Runs the code in the work queue until a result is available from the future.
Should be run from the thread the executor is initialised in.
"""
# Check we're in the right thread
if threading.current_thread() != self._work_thread:
raise RuntimeError(
"You cannot run CurrentThreadExecutor from a different thread"
)
future.add_done_callback(self._work_queue.put)
# Keep getting and running work items until we get the future we're waiting for
# back via the future's done callback.
try:
while True:
# Get a work item and run it
work_item = self._work_queue.get()
if work_item is future:
return
work_item.run()
del work_item
finally:
self._broken = True
def submit(self, fn, *args, **kwargs):
# Check they're not submitting from the same thread
if threading.current_thread() == self._work_thread:
raise RuntimeError(
"You cannot submit onto CurrentThreadExecutor from its own thread"
)
# Check they're not too late or the executor errored
if self._broken:
raise RuntimeError("CurrentThreadExecutor already quit or is broken")
# Add to work queue
f = Future()
work_item = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(work_item)
# Return the future
return f
import random
import string
import sys
import threading
import weakref
class Local:
"""
A drop-in replacement for threading.locals that also works with asyncio
Tasks (via the current_task asyncio method), and passes locals through
sync_to_async and async_to_sync.
Specifically:
- Locals work per-coroutine on any thread not spawned using asgiref
- Locals work per-thread on any thread not spawned using asgiref
- Locals are shared with the parent coroutine when using sync_to_async
- Locals are shared with the parent thread when using async_to_sync
(and if that thread was launched using sync_to_async, with its parent
coroutine as well, with this working for indefinite levels of nesting)
Set thread_critical to True to not allow locals to pass from an async Task
to a thread it spawns. This is needed for code that truly needs
thread-safety, as opposed to things used for helpful context (e.g. sqlite
does not like being called from a different thread to the one it is from).
Thread-critical code will still be differentiated per-Task within a thread
as it is expected it does not like concurrent access.
This doesn't use contextvars as it needs to support 3.6. Once it can support
3.7 only, we can then reimplement the storage more nicely.
"""
def __init__(self, thread_critical: bool = False) -> None:
self._thread_critical = thread_critical
self._thread_lock = threading.RLock()
self._context_refs: "weakref.WeakSet[object]" = weakref.WeakSet()
# Random suffixes stop accidental reuse between different Locals,
# though we try to force deletion as well.
self._attr_name = "_asgiref_local_impl_{}_{}".format(
id(self),
"".join(random.choice(string.ascii_letters) for i in range(8)),
)
def _get_context_id(self):
"""
Get the ID we should use for looking up variables
"""
# Prevent a circular reference
from .sync import AsyncToSync, SyncToAsync
# First, pull the current task if we can
context_id = SyncToAsync.get_current_task()
context_is_async = True
# OK, let's try for a thread ID
if context_id is None:
context_id = threading.current_thread()
context_is_async = False
# If we're thread-critical, we stop here, as we can't share contexts.
if self._thread_critical:
return context_id
# Now, take those and see if we can resolve them through the launch maps
for i in range(sys.getrecursionlimit()):
try:
if context_is_async:
# Tasks have a source thread in AsyncToSync
context_id = AsyncToSync.launch_map[context_id]
context_is_async = False
else:
# Threads have a source task in SyncToAsync
context_id = SyncToAsync.launch_map[context_id]
context_is_async = True
except KeyError:
break
else:
# Catch infinite loops (they happen if you are screwing around
# with AsyncToSync implementations)
raise RuntimeError("Infinite launch_map loops")
return context_id
def _get_storage(self):
context_obj = self._get_context_id()
if not hasattr(context_obj, self._attr_name):
setattr(context_obj, self._attr_name, {})
self._context_refs.add(context_obj)
return getattr(context_obj, self._attr_name)
def __del__(self):
try:
for context_obj in self._context_refs:
try:
delattr(context_obj, self._attr_name)
except AttributeError:
pass
except TypeError:
# WeakSet.__iter__ can crash when interpreter is shutting down due
# to _IterationGuard being None.
pass
def __getattr__(self, key):
with self._thread_lock:
storage = self._get_storage()
if key in storage:
return storage[key]
else:
raise AttributeError(f"{self!r} object has no attribute {key!r}")
def __setattr__(self, key, value):
if key in ("_context_refs", "_thread_critical", "_thread_lock", "_attr_name"):
return super().__setattr__(key, value)
with self._thread_lock:
storage = self._get_storage()
storage[key] = value
def __delattr__(self, key):
with self._thread_lock:
storage = self._get_storage()
if key in storage:
del storage[key]
else:
raise AttributeError(f"{self!r} object has no attribute {key!r}")
import asyncio
import logging
import time
import traceback
from .compatibility import guarantee_single_callable
logger = logging.getLogger(__name__)
class StatelessServer:
"""
Base server class that handles basic concepts like application instance
creation/pooling, exception handling, and similar, for stateless protocols
(i.e. ones without actual incoming connections to the process)
Your code should override the handle() method, doing whatever it needs to,
and calling get_or_create_application_instance with a unique `scope_id`
and `scope` for the scope it wants to get.
If an application instance is found with the same `scope_id`, you are
given its input queue, otherwise one is made for you with the scope provided
and you are given that fresh new input queue. Either way, you should do
something like:
input_queue = self.get_or_create_application_instance(
"user-123456",
{"type": "testprotocol", "user_id": "123456", "username": "andrew"},
)
input_queue.put_nowait(message)
If you try and create an application instance and there are already
`max_application` instances, the oldest/least recently used one will be
reclaimed and shut down to make space.
Application coroutines that error will be found periodically (every 100ms
by default) and have their exceptions printed to the console. Override
application_exception() if you want to do more when this happens.
If you override run(), make sure you handle things like launching the
application checker.
"""
application_checker_interval = 0.1
def __init__(self, application, max_applications=1000):
# Parameters
self.application = application
self.max_applications = max_applications
# Initialisation
self.application_instances = {}
### Mainloop and handling
def run(self):
"""
Runs the asyncio event loop with our handler loop.
"""
event_loop = asyncio.get_event_loop()
asyncio.ensure_future(self.application_checker())
try:
event_loop.run_until_complete(self.handle())
except KeyboardInterrupt:
logger.info("Exiting due to Ctrl-C/interrupt")
async def handle(self):
raise NotImplementedError("You must implement handle()")
async def application_send(self, scope, message):
"""
Receives outbound sends from applications and handles them.
"""
raise NotImplementedError("You must implement application_send()")
### Application instance management
def get_or_create_application_instance(self, scope_id, scope):
"""
Creates an application instance and returns its queue.
"""
if scope_id in self.application_instances:
self.application_instances[scope_id]["last_used"] = time.time()
return self.application_instances[scope_id]["input_queue"]
# See if we need to delete an old one
while len(self.application_instances) > self.max_applications:
self.delete_oldest_application_instance()
# Make an instance of the application
input_queue = asyncio.Queue()
application_instance = guarantee_single_callable(self.application)
# Run it, and stash the future for later checking
future = asyncio.ensure_future(
application_instance(
scope=scope,
receive=input_queue.get,
send=lambda message: self.application_send(scope, message),
),
)
self.application_instances[scope_id] = {
"input_queue": input_queue,
"future": future,
"scope": scope,
"last_used": time.time(),
}
return input_queue
def delete_oldest_application_instance(self):
"""
Finds and deletes the oldest application instance
"""
oldest_time = min(
details["last_used"] for details in self.application_instances.values()
)
for scope_id, details in self.application_instances.items():
if details["last_used"] == oldest_time:
self.delete_application_instance(scope_id)
# Return to make sure we only delete one in case two have
# the same oldest time
return
def delete_application_instance(self, scope_id):
"""
Removes an application instance (makes sure its task is stopped,
then removes it from the current set)
"""
details = self.application_instances[scope_id]
del self.application_instances[scope_id]
if not details["future"].done():
details["future"].cancel()
async def application_checker(self):
"""
Goes through the set of current application instance Futures and cleans up
any that are done/prints exceptions for any that errored.
"""
while True:
await asyncio.sleep(self.application_checker_interval)
for scope_id, details in list(self.application_instances.items()):
if details["future"].done():
exception = details["future"].exception()
if exception:
await self.application_exception(exception, details)
try:
del self.application_instances[scope_id]
except KeyError:
# Exception handling might have already got here before us. That's fine.
pass
async def application_exception(self, exception, application_details):
"""
Called whenever an application coroutine has an exception.
"""
logging.error(
"Exception inside application: %s\n%s%s",
exception,
"".join(traceback.format_tb(exception.__traceback__)),
f" {exception}",
)
This diff is collapsed.
import asyncio
import time
from .compatibility import guarantee_single_callable
from .timeout import timeout as async_timeout
class ApplicationCommunicator:
"""
Runs an ASGI application in a test mode, allowing sending of
messages to it and retrieval of messages it sends.
"""
def __init__(self, application, scope):
self.application = guarantee_single_callable(application)
self.scope = scope
self.input_queue = asyncio.Queue()
self.output_queue = asyncio.Queue()
self.future = asyncio.ensure_future(
self.application(scope, self.input_queue.get, self.output_queue.put)
)
async def wait(self, timeout=1):
"""
Waits for the application to stop itself and returns any exceptions.
"""
try:
async with async_timeout(timeout):
try:
await self.future
self.future.result()
except asyncio.CancelledError:
pass
finally:
if not self.future.done():
self.future.cancel()
try:
await self.future
except asyncio.CancelledError:
pass
def stop(self, exceptions=True):
if not self.future.done():
self.future.cancel()
elif exceptions:
# Give a chance to raise any exceptions
self.future.result()
def __del__(self):
# Clean up on deletion
try:
self.stop(exceptions=False)
except RuntimeError:
# Event loop already stopped
pass
async def send_input(self, message):
"""
Sends a single message to the application
"""
# Give it the message
await self.input_queue.put(message)
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout(timeout):
return await self.output_queue.get()
except asyncio.TimeoutError as e:
# See if we have another error to raise inside
if self.future.done():
self.future.result()
else:
self.future.cancel()
try:
await self.future
except asyncio.CancelledError:
pass
raise e
async def receive_nothing(self, timeout=0.1, interval=0.01):
"""
Checks that there is no message to receive in the given time.
"""
# `interval` has precedence over `timeout`
start = time.monotonic()
while time.monotonic() - start < timeout:
if not self.output_queue.empty():
return False
await asyncio.sleep(interval)
return self.output_queue.empty()
# This code is originally sourced from the aio-libs project "async_timeout",
# under the Apache 2.0 license. You may see the original project at
# https://github.com/aio-libs/async-timeout
# It is vendored here to reduce chain-dependencies on this library, and
# modified slightly to remove some features we don't use.
import asyncio
import warnings
from types import TracebackType
from typing import Any # noqa
from typing import Optional, Type
class timeout:
"""timeout context manager.
Useful in cases when you want to apply timeout logic around block
of code or in cases when asyncio.wait_for is not suitable. For example:
>>> with timeout(0.001):
... async with aiohttp.get('https://github.com') as r:
... await r.text()
timeout - value in seconds or None to disable timeout logic
loop - asyncio compatible event loop
"""
def __init__(
self,
timeout: Optional[float],
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
self._timeout = timeout
if loop is None:
loop = asyncio.get_running_loop()
else:
warnings.warn(
"""The loop argument to timeout() is deprecated.""", DeprecationWarning
)
self._loop = loop
self._task = None # type: Optional[asyncio.Task[Any]]
self._cancelled = False
self._cancel_handler = None # type: Optional[asyncio.Handle]
self._cancel_at = None # type: Optional[float]
def __enter__(self) -> "timeout":
return self._do_enter()
def __exit__(
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> Optional[bool]:
self._do_exit(exc_type)
return None
async def __aenter__(self) -> "timeout":
return self._do_enter()
async def __aexit__(
self,
exc_type: Type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
self._do_exit(exc_type)
@property
def expired(self) -> bool:
return self._cancelled
@property
def remaining(self) -> Optional[float]:
if self._cancel_at is not None:
return max(self._cancel_at - self._loop.time(), 0.0)
else:
return None
def _do_enter(self) -> "timeout":
# Support Tornado 5- without timeout
# Details: https://github.com/python/asyncio/issues/392
if self._timeout is None:
return self
self._task = asyncio.current_task(self._loop)
if self._task is None:
raise RuntimeError(
"Timeout context manager should be used " "inside a task"
)
if self._timeout <= 0:
self._loop.call_soon(self._cancel_task)
return self
self._cancel_at = self._loop.time() + self._timeout
self._cancel_handler = self._loop.call_at(self._cancel_at, self._cancel_task)
return self
def _do_exit(self, exc_type: Type[BaseException]) -> None:
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
raise asyncio.TimeoutError
if self._timeout is not None and self._cancel_handler is not None:
self._cancel_handler.cancel()
self._cancel_handler = None
self._task = None
return None
def _cancel_task(self) -> None:
if self._task is not None:
self._task.cancel()
self._cancelled = True
import sys
from typing import Awaitable, Callable, Dict, Iterable, Optional, Tuple, Type, Union
if sys.version_info >= (3, 8):
from typing import Literal, Protocol, TypedDict
else:
from typing_extensions import Literal, Protocol, TypedDict
__all__ = (
"ASGIVersions",
"HTTPScope",
"WebSocketScope",
"LifespanScope",
"WWWScope",
"Scope",
"HTTPRequestEvent",
"HTTPResponseStartEvent",
"HTTPResponseBodyEvent",
"HTTPResponseTrailersEvent",
"HTTPServerPushEvent",
"HTTPDisconnectEvent",
"WebSocketConnectEvent",
"WebSocketAcceptEvent",
"WebSocketReceiveEvent",
"WebSocketSendEvent",
"WebSocketResponseStartEvent",
"WebSocketResponseBodyEvent",
"WebSocketDisconnectEvent",
"WebSocketCloseEvent",
"LifespanStartupEvent",
"LifespanShutdownEvent",
"LifespanStartupCompleteEvent",
"LifespanStartupFailedEvent",
"LifespanShutdownCompleteEvent",
"LifespanShutdownFailedEvent",
"ASGIReceiveEvent",
"ASGISendEvent",
"ASGIReceiveCallable",
"ASGISendCallable",
"ASGI2Protocol",
"ASGI2Application",
"ASGI3Application",
"ASGIApplication",
)
class ASGIVersions(TypedDict):
spec_version: str
version: Union[Literal["2.0"], Literal["3.0"]]
class HTTPScope(TypedDict):
type: Literal["http"]
asgi: ASGIVersions
http_version: str
method: str
scheme: str
path: str
raw_path: bytes
query_string: bytes
root_path: str
headers: Iterable[Tuple[bytes, bytes]]
client: Optional[Tuple[str, int]]
server: Optional[Tuple[str, Optional[int]]]
extensions: Optional[Dict[str, Dict[object, object]]]
class WebSocketScope(TypedDict):
type: Literal["websocket"]
asgi: ASGIVersions
http_version: str
scheme: str
path: str
raw_path: bytes
query_string: bytes
root_path: str
headers: Iterable[Tuple[bytes, bytes]]
client: Optional[Tuple[str, int]]
server: Optional[Tuple[str, Optional[int]]]
subprotocols: Iterable[str]
extensions: Optional[Dict[str, Dict[object, object]]]
class LifespanScope(TypedDict):
type: Literal["lifespan"]
asgi: ASGIVersions
WWWScope = Union[HTTPScope, WebSocketScope]
Scope = Union[HTTPScope, WebSocketScope, LifespanScope]
class HTTPRequestEvent(TypedDict):
type: Literal["http.request"]
body: bytes
more_body: bool
class HTTPResponseStartEvent(TypedDict):
type: Literal["http.response.start"]
status: int
headers: Iterable[Tuple[bytes, bytes]]
trailers: bool
class HTTPResponseBodyEvent(TypedDict):
type: Literal["http.response.body"]
body: bytes
more_body: bool
class HTTPResponseTrailersEvent(TypedDict):
type: Literal["http.response.trailers"]
headers: Iterable[Tuple[bytes, bytes]]
more_trailers: bool
class HTTPServerPushEvent(TypedDict):
type: Literal["http.response.push"]
path: str
headers: Iterable[Tuple[bytes, bytes]]
class HTTPDisconnectEvent(TypedDict):
type: Literal["http.disconnect"]
class WebSocketConnectEvent(TypedDict):
type: Literal["websocket.connect"]
class WebSocketAcceptEvent(TypedDict):
type: Literal["websocket.accept"]
subprotocol: Optional[str]
headers: Iterable[Tuple[bytes, bytes]]
class WebSocketReceiveEvent(TypedDict):
type: Literal["websocket.receive"]
bytes: Optional[bytes]
text: Optional[str]
class WebSocketSendEvent(TypedDict):
type: Literal["websocket.send"]
bytes: Optional[bytes]
text: Optional[str]
class WebSocketResponseStartEvent(TypedDict):
type: Literal["websocket.http.response.start"]
status: int
headers: Iterable[Tuple[bytes, bytes]]
class WebSocketResponseBodyEvent(TypedDict):
type: Literal["websocket.http.response.body"]
body: bytes
more_body: bool
class WebSocketDisconnectEvent(TypedDict):
type: Literal["websocket.disconnect"]
code: int
class WebSocketCloseEvent(TypedDict):
type: Literal["websocket.close"]
code: int
reason: Optional[str]
class LifespanStartupEvent(TypedDict):
type: Literal["lifespan.startup"]
class LifespanShutdownEvent(TypedDict):
type: Literal["lifespan.shutdown"]
class LifespanStartupCompleteEvent(TypedDict):
type: Literal["lifespan.startup.complete"]
class LifespanStartupFailedEvent(TypedDict):
type: Literal["lifespan.startup.failed"]
message: str
class LifespanShutdownCompleteEvent(TypedDict):
type: Literal["lifespan.shutdown.complete"]
class LifespanShutdownFailedEvent(TypedDict):
type: Literal["lifespan.shutdown.failed"]
message: str
ASGIReceiveEvent = Union[
HTTPRequestEvent,
HTTPDisconnectEvent,
WebSocketConnectEvent,
WebSocketReceiveEvent,
WebSocketDisconnectEvent,
LifespanStartupEvent,
LifespanShutdownEvent,
]
ASGISendEvent = Union[
HTTPResponseStartEvent,
HTTPResponseBodyEvent,
HTTPResponseTrailersEvent,
HTTPServerPushEvent,
HTTPDisconnectEvent,
WebSocketAcceptEvent,
WebSocketSendEvent,
WebSocketResponseStartEvent,
WebSocketResponseBodyEvent,
WebSocketCloseEvent,
LifespanStartupCompleteEvent,
LifespanStartupFailedEvent,
LifespanShutdownCompleteEvent,
LifespanShutdownFailedEvent,
]
ASGIReceiveCallable = Callable[[], Awaitable[ASGIReceiveEvent]]
ASGISendCallable = Callable[[ASGISendEvent], Awaitable[None]]
class ASGI2Protocol(Protocol):
def __init__(self, scope: Scope) -> None:
...
async def __call__(
self, receive: ASGIReceiveCallable, send: ASGISendCallable
) -> None:
...
ASGI2Application = Type[ASGI2Protocol]
ASGI3Application = Callable[
[
Scope,
ASGIReceiveCallable,
ASGISendCallable,
],
Awaitable[None],
]
ASGIApplication = Union[ASGI2Application, ASGI3Application]
from io import BytesIO
from tempfile import SpooledTemporaryFile
from asgiref.sync import AsyncToSync, sync_to_async
class WsgiToAsgi:
"""
Wraps a WSGI application to make it into an ASGI application.
"""
def __init__(self, wsgi_application):
self.wsgi_application = wsgi_application
async def __call__(self, scope, receive, send):
"""
ASGI application instantiation point.
We return a new WsgiToAsgiInstance here with the WSGI app
and the scope, ready to respond when it is __call__ed.
"""
await WsgiToAsgiInstance(self.wsgi_application)(scope, receive, send)
class WsgiToAsgiInstance:
"""
Per-socket instance of a wrapped WSGI application
"""
def __init__(self, wsgi_application):
self.wsgi_application = wsgi_application
self.response_started = False
self.response_content_length = None
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
raise ValueError("WSGI wrapper received a non-HTTP scope")
self.scope = scope
with SpooledTemporaryFile(max_size=65536) as body:
# Alright, wait for the http.request messages
while True:
message = await receive()
if message["type"] != "http.request":
raise ValueError("WSGI wrapper received a non-HTTP-request message")
body.write(message.get("body", b""))
if not message.get("more_body"):
break
body.seek(0)
# Wrap send so it can be called from the subthread
self.sync_send = AsyncToSync(send)
# Call the WSGI app
await self.run_wsgi_app(body)
def build_environ(self, scope, body):
"""
Builds a scope and request body into a WSGI environ object.
"""
environ = {
"REQUEST_METHOD": scope["method"],
"SCRIPT_NAME": scope.get("root_path", "").encode("utf8").decode("latin1"),
"PATH_INFO": scope["path"].encode("utf8").decode("latin1"),
"QUERY_STRING": scope["query_string"].decode("ascii"),
"SERVER_PROTOCOL": "HTTP/%s" % scope["http_version"],
"wsgi.version": (1, 0),
"wsgi.url_scheme": scope.get("scheme", "http"),
"wsgi.input": body,
"wsgi.errors": BytesIO(),
"wsgi.multithread": True,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}
# Get server name and port - required in WSGI, not in ASGI
if "server" in scope:
environ["SERVER_NAME"] = scope["server"][0]
environ["SERVER_PORT"] = str(scope["server"][1])
else:
environ["SERVER_NAME"] = "localhost"
environ["SERVER_PORT"] = "80"
if "client" in scope:
environ["REMOTE_ADDR"] = scope["client"][0]
# Go through headers and make them into environ entries
for name, value in self.scope.get("headers", []):
name = name.decode("latin1")
if name == "content-length":
corrected_name = "CONTENT_LENGTH"
elif name == "content-type":
corrected_name = "CONTENT_TYPE"
else:
corrected_name = "HTTP_%s" % name.upper().replace("-", "_")
# HTTPbis say only ASCII chars are allowed in headers, but we latin1 just in case
value = value.decode("latin1")
if corrected_name in environ:
value = environ[corrected_name] + "," + value
environ[corrected_name] = value
return environ
def start_response(self, status, response_headers, exc_info=None):
"""
WSGI start_response callable.
"""
# Don't allow re-calling once response has begun
if self.response_started:
raise exc_info[1].with_traceback(exc_info[2])
# Don't allow re-calling without exc_info
if hasattr(self, "response_start") and exc_info is None:
raise ValueError(
"You cannot call start_response a second time without exc_info"
)
# Extract status code
status_code, _ = status.split(" ", 1)
status_code = int(status_code)
# Extract headers
headers = [
(name.lower().encode("ascii"), value.encode("ascii"))
for name, value in response_headers
]
# Extract content-length
self.response_content_length = None
for name, value in response_headers:
if name.lower() == "content-length":
self.response_content_length = int(value)
# Build and send response start message.
self.response_start = {
"type": "http.response.start",
"status": status_code,
"headers": headers,
}
@sync_to_async
def run_wsgi_app(self, body):
"""
Called in a subthread to run the WSGI app. We encapsulate like
this so that the start_response callable is called in the same thread.
"""
# Translate the scope and incoming request body into a WSGI environ
environ = self.build_environ(self.scope, body)
# Run the WSGI app
bytes_sent = 0
for output in self.wsgi_application(environ, self.start_response):
# If this is the first response, include the response headers
if not self.response_started:
self.response_started = True
self.sync_send(self.response_start)
# If the application supplies a Content-Length header
if self.response_content_length is not None:
# The server should not transmit more bytes to the client than the header allows
bytes_allowed = self.response_content_length - bytes_sent
if len(output) > bytes_allowed:
output = output[:bytes_allowed]
self.sync_send(
{"type": "http.response.body", "body": output, "more_body": True}
)
bytes_sent += len(output)
# The server should stop iterating over the response when enough data has been sent
if bytes_sent == self.response_content_length:
break
# Close connection
if not self.response_started:
self.response_started = True
self.sync_send(self.response_start)
self.sync_send({"type": "http.response.body"})
import os; var = 'SETUPTOOLS_USE_DISTUTILS'; enabled = os.environ.get(var, 'local') == 'local'; enabled and __import__('_distutils_hack').add_shim();
from django.utils.version import get_version
VERSION = (4, 1, 6, "final", 0)
__version__ = get_version(VERSION)
def setup(set_prefix=True):
"""
Configure the settings (this happens as a side effect of accessing the
first setting), configure logging and populate the app registry.
Set the thread-local urlresolvers script prefix if `set_prefix` is True.
"""
from django.apps import apps
from django.conf import settings
from django.urls import set_script_prefix
from django.utils.log import configure_logging
configure_logging(settings.LOGGING_CONFIG, settings.LOGGING)
if set_prefix:
set_script_prefix(
"/" if settings.FORCE_SCRIPT_NAME is None else settings.FORCE_SCRIPT_NAME
)
apps.populate(settings.INSTALLED_APPS)
"""
Invokes django-admin when the django module is run as a script.
Example: python -m django check
"""
from django.core import management
if __name__ == "__main__":
management.execute_from_command_line()
from .config import AppConfig
from .registry import apps
__all__ = ["AppConfig", "apps"]
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from django.contrib import admin
# Register your models here.
from django.apps import AppConfig
class {{ camel_case_app_name }}Config(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = '{{ app_name }}'
from django.db import models
# Create your models here.
from django.test import TestCase
# Create your tests here.
from django.shortcuts import render
# Create your views here.
This diff is collapsed.
This diff is collapsed.
# This file is distributed under the same license as the Django package.
#
# The *_FORMAT strings use the Django date format syntax,
# see https://docs.djangoproject.com/en/dev/ref/templates/builtins/#date
DATE_FORMAT = "j F، Y"
TIME_FORMAT = "g:i A"
# DATETIME_FORMAT =
YEAR_MONTH_FORMAT = "F Y"
MONTH_DAY_FORMAT = "j F"
SHORT_DATE_FORMAT = "d‏/m‏/Y"
# SHORT_DATETIME_FORMAT =
# FIRST_DAY_OF_WEEK =
# The *_INPUT_FORMATS strings use the Python strftime format syntax,
# see https://docs.python.org/library/datetime.html#strftime-strptime-behavior
# DATE_INPUT_FORMATS =
# TIME_INPUT_FORMATS =
# DATETIME_INPUT_FORMATS =
DECIMAL_SEPARATOR = ","
THOUSAND_SEPARATOR = "."
# NUMBER_GROUPING =
# This file is distributed under the same license as the Django package.
#
# The *_FORMAT strings use the Django date format syntax,
# see https://docs.djangoproject.com/en/dev/ref/templates/builtins/#date
DATE_FORMAT = "j F Y"
TIME_FORMAT = "H:i"
DATETIME_FORMAT = "j F Y H:i"
YEAR_MONTH_FORMAT = "F Y"
MONTH_DAY_FORMAT = "j F"
SHORT_DATE_FORMAT = "j F Y"
SHORT_DATETIME_FORMAT = "j F Y H:i"
FIRST_DAY_OF_WEEK = 0 # Sunday
# The *_INPUT_FORMATS strings use the Python strftime format syntax,
# see https://docs.python.org/library/datetime.html#strftime-strptime-behavior
DATE_INPUT_FORMATS = [
"%Y/%m/%d", # '2006/10/25'
]
TIME_INPUT_FORMATS = [
"%H:%M", # '14:30
"%H:%M:%S", # '14:30:59'
]
DATETIME_INPUT_FORMATS = [
"%Y/%m/%d %H:%M", # '2006/10/25 14:30'
"%Y/%m/%d %H:%M:%S", # '2006/10/25 14:30:59'
]
DECIMAL_SEPARATOR = ","
THOUSAND_SEPARATOR = "."
NUMBER_GROUPING = 3
# This file is distributed under the same license as the Django package.
#
# The *_FORMAT strings use the Django date format syntax,
# see https://docs.djangoproject.com/en/dev/ref/templates/builtins/#date
DATE_FORMAT = "j E Y"
TIME_FORMAT = "G:i"
DATETIME_FORMAT = "j E Y, G:i"
YEAR_MONTH_FORMAT = "F Y"
MONTH_DAY_FORMAT = "j F"
SHORT_DATE_FORMAT = "d.m.Y"
SHORT_DATETIME_FORMAT = "d.m.Y H:i"
FIRST_DAY_OF_WEEK = 1 # Monday
# The *_INPUT_FORMATS strings use the Python strftime format syntax,
# see https://docs.python.org/library/datetime.html#strftime-strptime-behavior
DATE_INPUT_FORMATS = [
"%d.%m.%Y", # '25.10.2006'
"%d.%m.%y", # '25.10.06'
]
DATETIME_INPUT_FORMATS = [
"%d.%m.%Y %H:%M:%S", # '25.10.2006 14:30:59'
"%d.%m.%Y %H:%M:%S.%f", # '25.10.2006 14:30:59.000200'
"%d.%m.%Y %H:%M", # '25.10.2006 14:30'
"%d.%m.%y %H:%M:%S", # '25.10.06 14:30:59'
"%d.%m.%y %H:%M:%S.%f", # '25.10.06 14:30:59.000200'
"%d.%m.%y %H:%M", # '25.10.06 14:30'
]
DECIMAL_SEPARATOR = ","
THOUSAND_SEPARATOR = "\xa0" # non-breaking space
NUMBER_GROUPING = 3
# This file is distributed under the same license as the Django package.
#
# The *_FORMAT strings use the Django date format syntax,
# see https://docs.djangoproject.com/en/dev/ref/templates/builtins/#date
DATE_FORMAT = "d F Y"
TIME_FORMAT = "H:i"
# DATETIME_FORMAT =
# YEAR_MONTH_FORMAT =
MONTH_DAY_FORMAT = "j F"
SHORT_DATE_FORMAT = "d.m.Y"
# SHORT_DATETIME_FORMAT =
# FIRST_DAY_OF_WEEK =
# The *_INPUT_FORMATS strings use the Python strftime format syntax,
# see https://docs.python.org/library/datetime.html#strftime-strptime-behavior
# DATE_INPUT_FORMATS =
# TIME_INPUT_FORMATS =
# DATETIME_INPUT_FORMATS =
DECIMAL_SEPARATOR = ","
THOUSAND_SEPARATOR = " " # Non-breaking space
# NUMBER_GROUPING =
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment