Spans
=====
.. currentmodule:: distributed

.. note::

    This is an experimental feature and may rapidly change without a deprecation cycle.

Dask offers various diagnostics and :doc:`fine-performance-metrics` about tasks,
grouping them by their prefix (commonly, the name of the function being called). This
can be suboptimal:

- your client code may be sophisticated enough that correlating lines on the client and
  tasks being run on the cluster may prove difficult;
- the same function may be applied to different parts of your workflow, with a different
  performance profile;
- you may be in a multitenancy setup, where part of the load on the cluster was not
  caused by your client code.

In these situations, it may be useful to attach meaningful tags to your workflow, or
segments of it.
To do so, you should use the :func:`span` context manager inside the client code.

For example:

.. code-block:: python

    import dask.config
    import dask.array as da
    from distributed import Client, span

    # Read important note below
    dask.config.set({"optimization.fuse.active": False})
    client = Client()

    with span("Alice's workflow"):
        with span("data load"):
            a = da.read_zarr(...)
        with span("ML preprocessing"):
            a = preprocess(a)
        with span("Model training"):
            model = train(a)
        model = model.compute()

Note how the :func:`span` context manager can be nested.
The example will create the following spans on the scheduler:

- ``("Alice's workflow", )``
- ``("Alice's workflow", "data load")``
- ``("Alice's workflow", "ML preprocessing")``
- ``("Alice's workflow", "Model training")``

Each of the spans will be mapped to the tasks matching the segment of the graph that was
defined inside its context manager. The parent span will be mapped to all tasks of its
children.

Tags are arbitrary and nothing stops you from parameterizing them; for example

>>> with span(f"{user}'s workflow"):
...     ...

Which may give you

- ``("Alice's workflow", "data load")``
- ``("Bob's workflow", "data load")``
- etc.

This is useful for example if you want to observe either all the workload
submitted by Alice, while hiding Bob's activity, or alternatively all the data loading
activity, regardless of who submitted it.

The possibilities are more or less endless - instead *or in addition to* a username at
the top, you could store information on what dataset you're processing, etc.


The default span
----------------
If you don't use the :func:`span` context manager, your tasks will be automatically
attributed to the ``("default", )`` span.


Viewing the spans
-----------------
You can filter by span tags in the :doc:`fine-performance-metrics` dashboard widget to
filter your workload:

.. image:: images/fine-performance-metrics/spans.png
   :alt: Span selection in the Fine Performance Metrics dashboard

Additionally, spans can be queried using scheduler extensions or
:meth:`~distributed.Client.run_on_scheduler`; see :ref:`spans_developer_api`.


User API
--------
.. important::

    Dataframes have a minimum granularity of a single call to `compute()` or `persist()`
    and can't break it down further into groups of operations - if the example above
    used dataframes, everything would have been uniformly tagged as "Alice's Workflow",
    as it is the span that's active during `compute()`.

    In other collections, such as arrays and delayed objects, spans that don't wrap
    around a call to `compute()` or `persist()` can get lost during the optimization
    phase. To prevent this issue, you must set

    >>> dask.config.set({"optimization.fuse.active": False})

    Or in dask.yaml:

    .. code-block:: yaml

        optimization:
          fuse:
            active: false

    A possible workaround, that also works for dataframes, can be to perform
    intermediate calls to `persist()`. Note however that this can significantly
    impact optimizations and reduce overall performance.

    .. code-block:: python

        with span("Alice's workflow"):
            with span("data load"):
                a = dd.read_parquet(...).persist()
            with span("ML preprocessing"):
                a = preprocess(a).persist()
                del a  # Release distributed memory for a as soon as possible
            with span("Model training"):
                model = train(b).persist()
                del b  # Release distributed memory for b as soon as possible
                model = model.compute()

.. autofunction:: span


.. _spans_developer_api:

Dask Developer API
------------------
.. currentmodule:: distributed.spans

.. admonition:: Intended audience

    This section is only of interest to developers maintaining Dask or writing scheduler
    extensions, e.g. to create an alternative dashboard or to store metrics long-term.

Spans can be accessed on the scheduler through ``Scheduler.extensions["spans"]``, which
contains a singleton instance of :class:`SpansSchedulerExtension`. In turn, the
extension contains a mapping of all :class:`Span` objects, plus a variety of convenience
methods to access and aggregate them.

Note how :class:`Span` objects offer a variety of methods that the dashboard currently
doesn't use - such as start/stop times, tasks counts, and size of the output.

.. autoclass:: Span
   :members:

.. autoclass:: SpansSchedulerExtension
   :members:

.. autoclass:: SpansWorkerExtension
   :members:
