Skip to content

Operations

Duqtools uses delayed operations for filesystem-changing operations. They are implemented mostly with decorators, but a function could be added directly to the op_queue.

Operation(action, description, extra_description=None, *args, **kwargs)

Bases: BaseModel

Operation, simple class which has a callable action.

Usually not called directly but used through Operations. has the following members:

Parameters:

  • description –

    description of the operation to be done

  • extra_description (Optional[str]) –

    extra description, (not colorized)

  • action –

    function to be eventually evaluated

  • *args –

    positional arguments to action

  • **kwargs –

    keyword arguments to action

Source code in duqtools/operations.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(self,
             action,
             description,
             extra_description: Optional[str] = None,
             *args,
             **kwargs):
    """__init__.

    Parameters
    ----------
    description: str
        description of the operation to be done
    extra_description: Optional[str]
        extra description, (not colorized)
    action: Callable
        function to be eventually evaluated
    *args: tuple
        positional arguments to action
    **kwargs: dict
        keyword arguments to action
    """

    if extra_description:
        description = click.style(description,
                                  fg='green') + ' : ' + extra_description
    super().__init__(action=action,
                     description=description,
                     *args,
                     **kwargs)
    if not self.kwargs:
        self.kwargs = {}

__call__()

Execute the action with the args and kwargs.

Returns:

  • Operation –

    The operation that was executed

Source code in duqtools/operations.py
69
70
71
72
73
74
75
76
77
78
79
def __call__(self) -> Operation:
    """Execute the action with the args and kwargs.

    Returns
    -------
    Operation
        The operation that was executed
    """
    logger.debug(self.description)
    self.action(*self.args, **self.kwargs)
    return self

Operations()

Bases: deque

Operations Queue which keeps track of all the operations that need to be done.

It's basically dask_delayed, but custom made and a few drawbacks: The return value from an action is eventually discarded, communication between queue items is possible through references, or global values, but not really recommended, and no guidance for this is provided

Source code in duqtools/operations.py
 99
100
101
102
103
def __init__(self):
    super().__init__()
    stream = logging.StreamHandler()
    stream.setFormatter(logging.Formatter('%(message)s'))
    self.logger.addHandler(stream)

add(*args, **kwargs)

convenience Operation wrapper around put.

from duqtools.operations import add_to_op_queue.

op_queue.add(print, args=('Hello World,),
        description="Function that prints hello world")
Source code in duqtools/operations.py
111
112
113
114
115
116
117
118
119
120
121
122
def add(self, *args, **kwargs) -> None:
    """convenience Operation wrapper around put.

    ```python
    from duqtools.operations import add_to_op_queue.

    op_queue.add(print, args=('Hello World,),
            description="Function that prints hello world")
    ```
    """

    self.append(Operation(*args, **kwargs))

append(item)

Restrict our diet to Operation objects only.

Source code in duqtools/operations.py
128
129
130
131
132
133
134
135
136
def append(self, item: Operation) -> None:  # type: ignore
    """Restrict our diet to Operation objects only."""
    if self.enabled:
        logger.debug(
            f'Appended {item.description} to the operations queue')
        super().append(item)
    else:
        self.logger.info('- ' + item.description)
        item()

apply()

Apply the next operation in the queue and remove it.

Source code in duqtools/operations.py
138
139
140
141
142
143
def apply(self) -> Operation:
    """Apply the next operation in the queue and remove it."""

    operation = self.popleft()
    operation()
    return operation

apply_all()

Apply all queued operations and empty the queue.

and show a fancy progress bar while applying

Source code in duqtools/operations.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def apply_all(self) -> None:
    """Apply all queued operations and empty the queue.

    and show a fancy progress bar while applying
    """
    from tqdm import tqdm
    logger.info(click.style('Applying Operations', fg='red', bold=True))
    with tqdm(total=len(self), position=1) as pbar:
        pbar.set_description('Applying operations')
        with tqdm(iterable=False, bar_format='{desc}', position=0) as dbar:
            while len(self) != 0:
                op = self.popleft()
                op()
                dbar.set_description(op.description)
                logger.info(op.description)
                pbar.update()

check_unconfirmed_operations()

Safety check, it should never happen that operations are not executed.

, check it anyway at program exit

Source code in duqtools/operations.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def check_unconfirmed_operations(self):
    """Safety check, it should never happen that operations are not
    executed.

    , check it anyway at program exit
    """
    if len(self) != 0:
        self.logger.warning(
            click.style(
                'There are unconfirmed operations in the queue at program exit!',
                fg='yellow',
                bold=True))
        for op in self:
            self.logger.warning('- ' + op.description)
        self.logger.warning(
            click.style(
                'Did you forget to use the @confirm_operations decorator?',
                fg='yellow'))

confirm_apply_all()

First asks the user if he wants to apply everything.

Returns:

  • bool( did we apply everything or not ) –
Source code in duqtools/operations.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def confirm_apply_all(self) -> bool:
    """First asks the user if he wants to apply everything.

    Returns
    -------
    bool: did we apply everything or not
    """

    # To print the descriptions we need to get them
    self.logger.info('')
    self.logger.info(
        click.style('Operations in the Queue:', fg='red', bold=True))
    self.logger.info(click.style('========================', fg='red'))
    for op in self:
        self.logger.info('- ' + op.description)

    if self.dry_run:
        self.logger.info('Dry run enabled, not applying op_queue')
        return False

    ans = self.yes or click.confirm(
        'Do you want to apply all these operations?', default=False)

    if ans:
        self.apply_all()
    return ans

put(item)

synonym for append.

Source code in duqtools/operations.py
124
125
126
def put(self, item: Operation) -> None:
    """synonym for append."""
    self.append(item)

add_to_op_queue(op_desc, extra_desc=None)

Decorator which adds the function call to the op_queue, instead of executing it directly, the string can be a format string and use the function arguments.

from duqtools.operations import add_to_op_queue, op_queue

@add_to_op_queue("Printing hello world", "{name}")
def print_hello_world(name):
    print(f"Hello World {name}")


print_hello_world("Snoozy")

op_queue.confirm_apply_all()
Source code in duqtools/operations.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def add_to_op_queue(op_desc: str, extra_desc: Optional[str] = None):
    """Decorator which adds the function call to the op_queue, instead of
    executing it directly, the string can be a format string and use the
    function arguments.

    ```python
    from duqtools.operations import add_to_op_queue, op_queue

    @add_to_op_queue("Printing hello world", "{name}")
    def print_hello_world(name):
        print(f"Hello World {name}")


    print_hello_world("Snoozy")

    op_queue.confirm_apply_all()
    ```
    """

    def op_queue_real(func):

        def wrapper(*args, **kwargs) -> None:
            # For the description format we must convert args to kwargs
            sig = signature(func)
            args_to_kw = dict(zip(sig.parameters, args))
            fkwargs = kwargs.copy()
            fkwargs.update(args_to_kw)
            extra_formatted = None
            if extra_desc:
                extra_formatted = extra_desc.format(**fkwargs)
            op_formatted = op_desc.format(**fkwargs)

            # add the function to the queue
            op_queue.add(action=func,
                         args=args,
                         kwargs=kwargs,
                         description=op_formatted,
                         extra_description=extra_formatted)

        return wrapper

    return op_queue_real

confirm_operations(func)

Decorator which confirms and applies queued operations after the function.

from duqtools.operations import confirm_operations, op_queue

@confirm_operations
def complicated_stuff()
    op_queue.add(print, args=('Hello World,),
            description="Function that prints hello world")
    op_queue.add(print, args=('Hello World again,),
            description="Function that prints hello world, again")
Source code in duqtools/operations.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def confirm_operations(func):
    """Decorator which confirms and applies queued operations after the
    function.

    ```python
    from duqtools.operations import confirm_operations, op_queue

    @confirm_operations
    def complicated_stuff()
        op_queue.add(print, args=('Hello World,),
                description="Function that prints hello world")
        op_queue.add(print, args=('Hello World again,),
                description="Function that prints hello world, again")
    ```
    """

    def wrapper(*args, **kwargs):
        if op_queue.enabled:
            raise RuntimeError('op_queue already enabled')
        op_queue.enabled = True
        ret = func(*args, **kwargs)
        op_queue.confirm_apply_all()
        op_queue.clear()
        op_queue.enabled = False
        return ret

    return wrapper

op_queue_context()

Context manager to enable the op_queue, and confirm_operations on exit Also disables the op_queue on exit.

Works more or less the same as the @confirm_operations decorator

Source code in duqtools/operations.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
@contextmanager
def op_queue_context():
    """Context manager to enable the op_queue, and confirm_operations on exit
    Also disables the op_queue on exit.

    Works more or less the same as the `@confirm_operations` decorator
    """

    if op_queue.enabled:
        raise RuntimeError('op_queue already enabled')
    op_queue.enabled = True
    yield
    op_queue.confirm_apply_all()
    op_queue.clear()
    op_queue.enabled = False