magicclass.ext.dask package

Submodules

magicclass.ext.dask.progress module

class magicclass.ext.dask.progress.DaskProgressBar(max: int = 100, minimum: float = 0.5, dt: float = 0.1)[source]

Bases: magicgui.widgets._bases.widget.Widget, magicgui.widgets._bases.mixins._OrientationMixin, MutableSequence[magicgui.widgets._bases.widget.Widget]

A progress bar widget for dask computation.

computed

Signal descriptor, for declaring a signal on a class.

This is designed to be used as a class attribute, with the supported signature(s) provided in the contructor:

class MyEmitter:

changed = Signal(int) # changed will emit an int

def receiver(arg: int):

print("new value:", arg)

emitter = MyEmitter() emitter.changed.connect(receiver) emitter.emit(1)

Note: in the example above, MyEmitter.changed is an instance of Signal, and emitter.changed is an instance of SignalInstance.

Parameters
  • *types (sequence of Type) -- A sequence of individual types

  • description (str, optional) -- Optional descriptive text for the signal. (not used internally).

  • name (str, optional) -- Optional name of the signal. If it is not specified then the name of the class attribute that is bound to the signal will be used. default None

  • check_nargs_on_connect (str, optional) -- Whether to check the number of positional args against signature when connecting a new callback. This can also be provided at connection time using .connect(..., check_nargs=True). By default, True.

  • check_types_on_connect (str, optional) -- Whether to check the callback parameter types against signature when connecting a new callback. This can also be provided at connection time using .connect(..., check_types=True). By default, False.

set_worker(worker: GeneratorWorker | FunctionWorker)[source]

Set currently running worker.

class magicclass.ext.dask.progress.dask_thread_worker(f: Callable | None = None, *, ignore_errors: bool = False, progress: ProgressDict | bool | None = True)[source]

Bases: magicclass.utils.qthreading.thread_worker

Create a dask's worker in a superqt/napari style.

This thread worker class can monitor the progress of dask computation. Callback function connected to computed signal will get called when any one of the tasks are finished. The returned value of the task will be sent to the callback argument. The returned value is useful if delayed functions are computed but it is not always meaningful when dask mapping functions such as map_blocks is used. Unlike standard thread_worker, you should not specify total parameter since dask progress bar knows it.

Examples

@magicclass
class A:
    @dask_thread_worker
    def func(self):
        arr = da.random.random((30000, 30000))
        da.mean(arr).compute()

    @func.computed.connect
    def _on_computed(self, _=None):
        print("computed")
property computed: magicclass.utils.qthreading.CallbackList[Any]

magicclass.ext.dask.resource module

class magicclass.ext.dask.resource.DaskResourceProfiler(dt: float = 1.0, **kwargs)[source]

Bases: magicgui.widgets._bases.widget.Widget, magicgui.widgets._bases.mixins._OrientationMixin, MutableSequence[magicgui.widgets._bases.widget.Widget]

clear() None -- remove all items from S[source]
class magicclass.ext.dask.resource.EventedTracker(dt=1)[source]

Bases: dask.diagnostics.profile._Tracker

run()[source]

Method to be run in sub-process; can be overridden in sub-class

Module contents