joblib跟踪进展。并行执行

时间:2022-06-27 00:11:48

Is there a simple way to track the overall progress of a joblib.Parallel execution?

有没有一种简单的方法来跟踪一个joblib的整体进展?并行执行?

I have a long-running execution composed of thousands of jobs, which I want to track and record in a database. However, to do that, whenever Parallel finishes a task, I need it to execute a callback, reporting how many remaining jobs are left.

我有一个由数千个作业组成的长时间运行的执行,我希望在数据库中跟踪记录。然而,要做到这一点,无论何时并行完成一个任务,我都需要它执行一个回调,报告还剩下多少个作业。

I've accomplished a similar task before with Python's stdlib multiprocessing.Pool, by launching a thread that records the number of pending jobs in Pool's job list.

我以前用Python的stdlib多处理完成过类似的任务。Pool,通过启动一个线程,该线程记录Pool作业列表中的待处理作业的数量。

Looking at the code, Parallel inherits Pool, so I thought I could pull off the same trick, but it doesn't seem to use these that list, and I haven't been able to figure out how else to "read" it's internal status any other way.

看一下代码,并行继承池,我想我可以用同样的技巧,但是它似乎没有使用这个列表,而且我还没能找到其他方法来“读取”它的内部状态。

5 个解决方案

#1


8  

The documentation you linked to states that Parallel has an optional progress meter. It's implemented by using the callback keyword argument provided by multiprocessing.Pool.apply_async:

您链接到并行状态的文档有一个可选的进度表。它是通过使用multiprocess . pool .apply_async提供的回调关键字参数实现的。

# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
            kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1

...

class CallBack(object):
    """ Callback used by parallel: it is used for progress reporting, and
        to add data to be processed
    """
    def __init__(self, index, parallel):
        self.parallel = parallel
        self.index = index

    def __call__(self, out):
        self.parallel.print_progress(self.index)
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

And here's print_progress:

这里是print_progress:

def print_progress(self, index):
    elapsed_time = time.time() - self._start_time

    # This is heuristic code to print only 'verbose' times a messages
    # The challenge is that we may not know the queue length
    if self._original_iterable:
        if _verbosity_filter(index, self.verbose):
            return
        self._print('Done %3i jobs       | elapsed: %s',
                    (index + 1,
                     short_format_time(elapsed_time),
                    ))
    else:
        # We are finished dispatching
        queue_length = self.n_dispatched
        # We always display the first loop
        if not index == 0:
            # Display depending on the number of remaining items
            # A message as soon as we finish dispatching, cursor is 0
            cursor = (queue_length - index + 1
                      - self._pre_dispatch_amount)
            frequency = (queue_length // self.verbose) + 1
            is_last_item = (index + 1 == queue_length)
            if (is_last_item or cursor % frequency):
                return
        remaining_time = (elapsed_time / (index + 1) *
                    (self.n_dispatched - index - 1.))
        self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                    (index + 1,
                     queue_length,
                     short_format_time(elapsed_time),
                     short_format_time(remaining_time),
                    ))

The way they implement this is kind of weird, to be honest - it seems to assume that tasks will always be completed in the order that they're started. The index variable that goes to print_progress is just the self.n_dispatched variable at the time the job was actually started. So the first job launched will always finish with an index of 0, even if say, the third job finished first. It also means they don't actually keep track of the number of completed jobs. So there's no instance variable for you to monitor.

老实说,他们执行这个任务的方式有点奇怪——它似乎假设任务总是按照开始的顺序完成。到print_progress的索引变量就是self。作业实际启动时的n_dispatch变量。因此,启动的第一个作业的索引总是为0,即使第三个作业先完成。这也意味着他们实际上没有跟踪完成的工作的数量。所以没有实例变量可以监控。

I think your best best is to make your own CallBack class, and monkey patch Parallel:

我认为你最好自己做回调类,monkey patch Parallel:

from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed

class CallBack(object):
    completed = defaultdict(int)

    def __init__(self, index, parallel):
        self.index = index
        self.parallel = parallel

    def __call__(self, index):
        CallBack.completed[self.parallel] += 1
        print("done with {}".format(CallBack.completed[self.parallel]))
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.CallBack = CallBack

if __name__ == "__main__":
    print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))

Output:

输出:

done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

That way, your callback gets called whenever a job completes, rather than the default one.

这样,每当一个作业完成时,就会调用回调函数,而不是默认的回调函数。

#2


3  

Expanding on dano's answer for the newest version of the joblib library. There were a couple of changes to the internal implementation.

扩展dano对joblib库最新版本的回答。内部实现有几个变化。

from joblib import Parallel, delayed
from collections import defaultdict

# patch joblib progress callback
class BatchCompletionCallBack(object):
  completed = defaultdict(int)

  def __init__(self, time, index, parallel):
    self.index = index
    self.parallel = parallel

  def __call__(self, index):
    BatchCompletionCallBack.completed[self.parallel] += 1
    print("done with {}".format(BatchCompletionCallBack.completed[self.parallel]))
    if self.parallel._original_iterator is not None:
      self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack

#3


2  

Why can't you simply use tqdm? The following worked for me

为什么不能简单地使用tqdm?下面的方法对我很有效

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import tqdm

def myfun(x):
    return x**2

results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in tqdm(range(1000))
100%|██████████| 1000/1000 [00:00<00:00, 10563.37it/s]

#4


1  

Here's another answer to your question with the following syntax:

以下是对你的问题的另一种回答:

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

https://*.com/a/40415477/232371

https://*.com/a/40415477/232371

#5


0  

Text progress bar

文本进度条

One more variant for those, who want text progress bar without additional modules like tqdm. Actual for joblib=0.11, python 3.5.2 on linux at 16.04.2018 and shows progress upon subtask completion.

对于那些想要文本进度条而不需要像tqdm这样的附加模块的人来说,还有一个变体。joblib=0.11, linux上的python 3.5.2在16.04.2018,显示子任务完成时的进度。

Redefine native class:

定义本地类:

class BatchCompletionCallBack(object):
    # Added code - start
    global total_n_jobs
    # Added code - end
    def __init__(self, dispatch_timestamp, batch_size, parallel):
        self.dispatch_timestamp = dispatch_timestamp
        self.batch_size = batch_size
        self.parallel = parallel

    def __call__(self, out):
        self.parallel.n_completed_tasks += self.batch_size
        this_batch_duration = time.time() - self.dispatch_timestamp

        self.parallel._backend.batch_completed(self.batch_size,
                                           this_batch_duration)
        self.parallel.print_progress()
        # Added code - start
        progress = self.parallel.n_completed_tasks / total_n_jobs
        print(
            "\rProgress: [{0:50s}] {1:.1f}%".format('#' * int(progress * 50), progress*100)
            , end="", flush=True)
        if self.parallel.n_completed_tasks == total_n_jobs:
            print('\n')
        # Added code - end
        if self.parallel._original_iterator is not None:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack

Define global constant before usage with total number of jobs:

在使用前定义全局常数和总作业数:

total_n_jobs = 10

This will result in something like this:

这将导致如下情况:

Progress: [########################################          ] 80.0%

#1


8  

The documentation you linked to states that Parallel has an optional progress meter. It's implemented by using the callback keyword argument provided by multiprocessing.Pool.apply_async:

您链接到并行状态的文档有一个可选的进度表。它是通过使用multiprocess . pool .apply_async提供的回调关键字参数实现的。

# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
            kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1

...

class CallBack(object):
    """ Callback used by parallel: it is used for progress reporting, and
        to add data to be processed
    """
    def __init__(self, index, parallel):
        self.parallel = parallel
        self.index = index

    def __call__(self, out):
        self.parallel.print_progress(self.index)
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

And here's print_progress:

这里是print_progress:

def print_progress(self, index):
    elapsed_time = time.time() - self._start_time

    # This is heuristic code to print only 'verbose' times a messages
    # The challenge is that we may not know the queue length
    if self._original_iterable:
        if _verbosity_filter(index, self.verbose):
            return
        self._print('Done %3i jobs       | elapsed: %s',
                    (index + 1,
                     short_format_time(elapsed_time),
                    ))
    else:
        # We are finished dispatching
        queue_length = self.n_dispatched
        # We always display the first loop
        if not index == 0:
            # Display depending on the number of remaining items
            # A message as soon as we finish dispatching, cursor is 0
            cursor = (queue_length - index + 1
                      - self._pre_dispatch_amount)
            frequency = (queue_length // self.verbose) + 1
            is_last_item = (index + 1 == queue_length)
            if (is_last_item or cursor % frequency):
                return
        remaining_time = (elapsed_time / (index + 1) *
                    (self.n_dispatched - index - 1.))
        self._print('Done %3i out of %3i | elapsed: %s remaining: %s',
                    (index + 1,
                     queue_length,
                     short_format_time(elapsed_time),
                     short_format_time(remaining_time),
                    ))

The way they implement this is kind of weird, to be honest - it seems to assume that tasks will always be completed in the order that they're started. The index variable that goes to print_progress is just the self.n_dispatched variable at the time the job was actually started. So the first job launched will always finish with an index of 0, even if say, the third job finished first. It also means they don't actually keep track of the number of completed jobs. So there's no instance variable for you to monitor.

老实说,他们执行这个任务的方式有点奇怪——它似乎假设任务总是按照开始的顺序完成。到print_progress的索引变量就是self。作业实际启动时的n_dispatch变量。因此,启动的第一个作业的索引总是为0,即使第三个作业先完成。这也意味着他们实际上没有跟踪完成的工作的数量。所以没有实例变量可以监控。

I think your best best is to make your own CallBack class, and monkey patch Parallel:

我认为你最好自己做回调类,monkey patch Parallel:

from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed

class CallBack(object):
    completed = defaultdict(int)

    def __init__(self, index, parallel):
        self.index = index
        self.parallel = parallel

    def __call__(self, index):
        CallBack.completed[self.parallel] += 1
        print("done with {}".format(CallBack.completed[self.parallel]))
        if self.parallel._original_iterable:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.CallBack = CallBack

if __name__ == "__main__":
    print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))

Output:

输出:

done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

That way, your callback gets called whenever a job completes, rather than the default one.

这样,每当一个作业完成时,就会调用回调函数,而不是默认的回调函数。

#2


3  

Expanding on dano's answer for the newest version of the joblib library. There were a couple of changes to the internal implementation.

扩展dano对joblib库最新版本的回答。内部实现有几个变化。

from joblib import Parallel, delayed
from collections import defaultdict

# patch joblib progress callback
class BatchCompletionCallBack(object):
  completed = defaultdict(int)

  def __init__(self, time, index, parallel):
    self.index = index
    self.parallel = parallel

  def __call__(self, index):
    BatchCompletionCallBack.completed[self.parallel] += 1
    print("done with {}".format(BatchCompletionCallBack.completed[self.parallel]))
    if self.parallel._original_iterator is not None:
      self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack

#3


2  

Why can't you simply use tqdm? The following worked for me

为什么不能简单地使用tqdm?下面的方法对我很有效

from joblib import Parallel, delayed
from datetime import datetime
from tqdm import tqdm

def myfun(x):
    return x**2

results = Parallel(n_jobs=8)(delayed(myfun)(i) for i in tqdm(range(1000))
100%|██████████| 1000/1000 [00:00<00:00, 10563.37it/s]

#4


1  

Here's another answer to your question with the following syntax:

以下是对你的问题的另一种回答:

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

https://*.com/a/40415477/232371

https://*.com/a/40415477/232371

#5


0  

Text progress bar

文本进度条

One more variant for those, who want text progress bar without additional modules like tqdm. Actual for joblib=0.11, python 3.5.2 on linux at 16.04.2018 and shows progress upon subtask completion.

对于那些想要文本进度条而不需要像tqdm这样的附加模块的人来说,还有一个变体。joblib=0.11, linux上的python 3.5.2在16.04.2018,显示子任务完成时的进度。

Redefine native class:

定义本地类:

class BatchCompletionCallBack(object):
    # Added code - start
    global total_n_jobs
    # Added code - end
    def __init__(self, dispatch_timestamp, batch_size, parallel):
        self.dispatch_timestamp = dispatch_timestamp
        self.batch_size = batch_size
        self.parallel = parallel

    def __call__(self, out):
        self.parallel.n_completed_tasks += self.batch_size
        this_batch_duration = time.time() - self.dispatch_timestamp

        self.parallel._backend.batch_completed(self.batch_size,
                                           this_batch_duration)
        self.parallel.print_progress()
        # Added code - start
        progress = self.parallel.n_completed_tasks / total_n_jobs
        print(
            "\rProgress: [{0:50s}] {1:.1f}%".format('#' * int(progress * 50), progress*100)
            , end="", flush=True)
        if self.parallel.n_completed_tasks == total_n_jobs:
            print('\n')
        # Added code - end
        if self.parallel._original_iterator is not None:
            self.parallel.dispatch_next()

import joblib.parallel
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack

Define global constant before usage with total number of jobs:

在使用前定义全局常数和总作业数:

total_n_jobs = 10

This will result in something like this:

这将导致如下情况:

Progress: [########################################          ] 80.0%