from __future__ import annotations
import hashlib
import logging
from typing import TYPE_CHECKING, Protocol, cast
from scrapy.utils.misc import build_from_crawler
if TYPE_CHECKING:
from collections.abc import Iterable
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy import Request
from scrapy.core.downloader import Downloader
from scrapy.crawler import Crawler
logger = logging.getLogger(__name__)
def _path_safe(text: str) -> str:
"""
Return a filesystem-safe version of a string ``text``
>>> _path_safe('simple.org').startswith('simple.org')
True
>>> _path_safe('dash-underscore_.org').startswith('dash-underscore_.org')
True
>>> _path_safe('some@symbol?').startswith('some_symbol_')
True
"""
pathable_slot = "".join([c if c.isalnum() or c in "-._" else "_" for c in text])
# as we replace some letters we can get collision for different slots
# add we add unique part
unique_slot = hashlib.md5(text.encode("utf8")).hexdigest() # noqa: S324
return f"{pathable_slot}-{unique_slot}"
class QueueProtocol(Protocol):
"""Protocol for downstream queues of ``ScrapyPriorityQueue``."""
def push(self, request: Request) -> None: ...
def pop(self) -> Request | None: ...
def close(self) -> None: ...
def __len__(self) -> int: ...
[docs]
class ScrapyPriorityQueue:
"""A priority queue implemented using multiple internal queues (typically,
FIFO queues). It uses one internal queue for each priority value. The
internal queue must implement the following methods:
* push(obj)
* pop()
* close()
* __len__()
Optionally, the queue could provide a ``peek`` method, that should return
the next object to be returned by ``pop``, but without removing it from the
queue.
``__init__`` method of ScrapyPriorityQueue receives a downstream_queue_cls
argument, which is a class used to instantiate a new (internal) queue when
a new priority is allocated.
Only integer priorities should be used. Lower numbers are higher
priorities.
startprios is a sequence of priorities to start with. If the queue was
previously closed leaving some priority buckets non-empty, those priorities
should be passed in startprios.
Disk persistence
================
.. warning:: The files that this class generates on disk are an
implementation detail, and may change without a warning in a future
version of Scrapy. Do not rely on the following information for
anything other than debugging purposes.
When a component instantiates this class with a non-empty *key* argument,
*key* is used as a persistence directory.
For every request enqueued, this class checks:
- Whether the request is a :ref:`start request <start-requests>` or not.
- The :data:`~scrapy.Request.priority` of the request.
For each combination of the above seen, this class creates an instance of
*downstream_queue_cls* with *key* set to a subdirectory of the persistence
directory, named as the request priority (e.g. ``1``), with an ``s`` suffix
in case of a start request (e.g. ``1s``).
"""
@classmethod
def from_crawler(
cls,
crawler: Crawler,
downstream_queue_cls: type[QueueProtocol],
key: str,
startprios: Iterable[int] = (),
*,
start_queue_cls: type[QueueProtocol] | None = None,
) -> Self:
return cls(
crawler,
downstream_queue_cls,
key,
startprios,
start_queue_cls=start_queue_cls,
)
def __init__(
self,
crawler: Crawler,
downstream_queue_cls: type[QueueProtocol],
key: str,
startprios: Iterable[int] = (),
*,
start_queue_cls: type[QueueProtocol] | None = None,
):
self.crawler: Crawler = crawler
self.downstream_queue_cls: type[QueueProtocol] = downstream_queue_cls
self._start_queue_cls: type[QueueProtocol] | None = start_queue_cls
self.key: str = key
self.queues: dict[int, QueueProtocol] = {}
self._start_queues: dict[int, QueueProtocol] = {}
self.curprio: int | None = None
self.init_prios(startprios)
def init_prios(self, startprios: Iterable[int]) -> None:
if not startprios:
return
for priority in startprios:
q = self.qfactory(priority)
if q:
self.queues[priority] = q
if self._start_queue_cls:
q = self._sqfactory(priority)
if q:
self._start_queues[priority] = q
self.curprio = min(startprios)
def qfactory(self, key: int) -> QueueProtocol:
return build_from_crawler(
self.downstream_queue_cls,
self.crawler,
self.key + "/" + str(key),
)
def _sqfactory(self, key: int) -> QueueProtocol:
assert self._start_queue_cls is not None
return build_from_crawler(
self._start_queue_cls,
self.crawler,
f"{self.key}/{key}s",
)
def priority(self, request: Request) -> int:
return -request.priority
def push(self, request: Request) -> None:
priority = self.priority(request)
is_start_request = request.meta.get("is_start_request", False)
if is_start_request and self._start_queue_cls:
if priority not in self._start_queues:
self._start_queues[priority] = self._sqfactory(priority)
q = self._start_queues[priority]
else:
if priority not in self.queues:
self.queues[priority] = self.qfactory(priority)
q = self.queues[priority]
q.push(request) # this may fail (eg. serialization error)
if self.curprio is None or priority < self.curprio:
self.curprio = priority
def pop(self) -> Request | None:
while self.curprio is not None:
try:
q = self.queues[self.curprio]
except KeyError:
pass
else:
m = q.pop()
if not q:
del self.queues[self.curprio]
q.close()
if not self._start_queues:
self._update_curprio()
return m
if self._start_queues:
try:
q = self._start_queues[self.curprio]
except KeyError:
self._update_curprio()
else:
m = q.pop()
if not q:
del self._start_queues[self.curprio]
q.close()
self._update_curprio()
return m
else:
self._update_curprio()
return None
def _update_curprio(self) -> None:
prios = {
p
for queues in (self.queues, self._start_queues)
for p, q in queues.items()
if q
}
self.curprio = min(prios) if prios else None
def peek(self) -> Request | None:
"""Returns the next object to be returned by :meth:`pop`,
but without removing it from the queue.
Raises :exc:`NotImplementedError` if the underlying queue class does
not implement a ``peek`` method, which is optional for queues.
"""
if self.curprio is None:
return None
try:
queue = self._start_queues[self.curprio]
except KeyError:
queue = self.queues[self.curprio]
# Protocols can't declare optional members
return cast("Request", queue.peek()) # type: ignore[attr-defined]
def close(self) -> list[int]:
active: set[int] = set()
for queues in (self.queues, self._start_queues):
for p, q in queues.items():
active.add(p)
q.close()
return list(active)
def __len__(self) -> int:
return (
sum(
len(x)
for queues in (self.queues, self._start_queues)
for x in queues.values()
)
if self.queues or self._start_queues
else 0
)
class DownloaderInterface:
def __init__(self, crawler: Crawler):
assert crawler.engine
self.downloader: Downloader = crawler.engine.downloader
def stats(self, possible_slots: Iterable[str]) -> list[tuple[int, str]]:
return [(self._active_downloads(slot), slot) for slot in possible_slots]
def get_slot_key(self, request: Request) -> str:
return self.downloader.get_slot_key(request)
def _active_downloads(self, slot: str) -> int:
"""Return a number of requests in a Downloader for a given slot"""
if slot not in self.downloader.slots:
return 0
return len(self.downloader.slots[slot].active)
[docs]
class DownloaderAwarePriorityQueue:
"""PriorityQueue which takes Downloader activity into account:
domains (slots) with the least amount of active downloads are dequeued
first.
Disk persistence
================
.. warning:: The files that this class generates on disk are an
implementation detail, and may change without a warning in a future
version of Scrapy. Do not rely on the following information for
anything other than debugging purposes.
When a component instantiates this class with a non-empty *key* argument,
*key* is used as a persistence directory, and inside that directory this
class creates a subdirectory per download slot (domain).
Those subdirectories are named after the corresponding download slot, with
path-unsafe characters replaced by underscores and an MD5 hash suffix to
avoid collisions.
For each download slot, this class creates an instance of
:class:`ScrapyPriorityQueue` with the download slot subdirectory as *key*
and its own *downstream_queue_cls*.
"""
@classmethod
def from_crawler(
cls,
crawler: Crawler,
downstream_queue_cls: type[QueueProtocol],
key: str,
startprios: dict[str, Iterable[int]] | None = None,
*,
start_queue_cls: type[QueueProtocol] | None = None,
) -> Self:
return cls(
crawler,
downstream_queue_cls,
key,
startprios,
start_queue_cls=start_queue_cls,
)
def __init__(
self,
crawler: Crawler,
downstream_queue_cls: type[QueueProtocol],
key: str,
slot_startprios: dict[str, Iterable[int]] | None = None,
*,
start_queue_cls: type[QueueProtocol] | None = None,
):
if crawler.settings.getint("CONCURRENT_REQUESTS_PER_IP") != 0:
raise ValueError(
f'"{self.__class__}" does not support CONCURRENT_REQUESTS_PER_IP'
)
if slot_startprios and not isinstance(slot_startprios, dict):
raise ValueError(
"DownloaderAwarePriorityQueue accepts "
"``slot_startprios`` as a dict; "
f"{slot_startprios.__class__!r} instance "
"is passed. Most likely, it means the state is "
"created by an incompatible priority queue. "
"Only a crawl started with the same priority "
"queue class can be resumed."
)
self._downloader_interface: DownloaderInterface = DownloaderInterface(crawler)
self.downstream_queue_cls: type[QueueProtocol] = downstream_queue_cls
self._start_queue_cls: type[QueueProtocol] | None = start_queue_cls
self.key: str = key
self.crawler: Crawler = crawler
self.pqueues: dict[str, ScrapyPriorityQueue] = {} # slot -> priority queue
self._last_selected_slot: str | None = None
if slot_startprios:
for slot, startprios in slot_startprios.items():
self.pqueues[slot] = self.pqfactory(slot, startprios)
def _next_slot(self, stats: list[tuple[int, str]], *, update_state: bool) -> str:
last = self._last_selected_slot
min_active: int | None = None
best_slot: str | None = None
best_slot_after_last: str | None = None
for active, slot in stats:
if min_active is None or active < min_active:
min_active = active
best_slot = slot
best_slot_after_last = None
if last is not None and slot > last:
best_slot_after_last = slot
elif active == min_active:
if best_slot is None or slot < best_slot:
best_slot = slot
if (
last is not None
and slot > last
and (best_slot_after_last is None or slot < best_slot_after_last)
):
best_slot_after_last = slot
assert best_slot is not None
slot = best_slot_after_last if best_slot_after_last is not None else best_slot
if update_state:
self._last_selected_slot = slot
return slot
def pqfactory(
self, slot: str, startprios: Iterable[int] = ()
) -> ScrapyPriorityQueue:
return ScrapyPriorityQueue(
self.crawler,
self.downstream_queue_cls,
self.key + "/" + _path_safe(slot),
startprios,
start_queue_cls=self._start_queue_cls,
)
def pop(self) -> Request | None:
stats = self._downloader_interface.stats(self.pqueues)
if not stats:
return None
slot = self._next_slot(stats, update_state=True)
queue = self.pqueues[slot]
request = queue.pop()
if len(queue) == 0:
del self.pqueues[slot]
return request
def push(self, request: Request) -> None:
slot = self._downloader_interface.get_slot_key(request)
if slot not in self.pqueues:
self.pqueues[slot] = self.pqfactory(slot)
queue = self.pqueues[slot]
queue.push(request)
def peek(self) -> Request | None:
"""Returns the next object to be returned by :meth:`pop`,
but without removing it from the queue.
Raises :exc:`NotImplementedError` if the underlying queue class does
not implement a ``peek`` method, which is optional for queues.
"""
stats = self._downloader_interface.stats(self.pqueues)
if not stats:
return None
slot = self._next_slot(stats, update_state=False)
queue = self.pqueues[slot]
return queue.peek()
def close(self) -> dict[str, list[int]]:
active = {slot: queue.close() for slot, queue in self.pqueues.items()}
self.pqueues.clear()
return active
def __len__(self) -> int:
return sum(len(x) for x in self.pqueues.values()) if self.pqueues else 0
def __contains__(self, slot: str) -> bool:
return slot in self.pqueues