Многопроцессорность и многопоточность в Concurrent.futures

Для чего это нужно?

Многопроцессорность и многопоточность реализуется через модуль concurrent.futures в Python позволяет выполнять задачи одновременно, используя ThreadPoolExecutor для многопоточности или ProcessPoolExecutor для многопроцессорности.

Представьте что вам нужно запустить 200 потоков или процессов, но сделать так, чтобы они обрабатывались к примеру по 5-10 за раз, и когда какой-то из них выполнен, чтобы воркер брал новый, и поддерживал тем самым нужное количество одновременного выполнения в пулле.

Вы можете сделать это вручную через multiprocessing.Pool / threading.Thread / multiprocessing.Process, а можете взять уже готовое решение.

1. Параллельное выполнение используя потоки и воркеров:

import concurrent.futures
import time
# Задание, которое будем выполнять
def task_function(seconds):
    print(f"Started ZProger task: {seconds}s")
    time.sleep(seconds)
    print(f"Finished ZProger task: {seconds}s")
    return f"Result: {seconds}"
# Запускаем 5 потоков, которые будут обрабатывать 5 воркеров
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as
executor:
    seconds_list = [5, 4, 3, 3, 3]
    futures = [executor.submit(task_function, sec) for sec in
seconds_list]
Python

2. Используем пулл процессов:

Позволяет обойти глобальную блокировку интерпретатора (GIL).

import concurrent.futures
import time
def task_function(seconds):
    print(f"Started ZProger task: {seconds}s")
    time.sleep(seconds)
    print(f"Finished ZProger task: {seconds}s")
    return f"Result: {seconds}"
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as
executor:
    seconds_list = [5, 4, 3, 3, 3]
    futures = [executor.submit(task_function, sec) for sec in
seconds_list]
Python

Если установить 5 воркеров, получаем следующий результат:

многопроцессорность и многопоточность

А теперь сравните этот результат, если поставить 3 воркера. У нас будет 5 активных заданий, но лишь 3 воркера будут их обрабатывать. Проще говоря изначально они возьмут первые 3 задания, а когда кто-то закончит, то на смену возьмет остальные:

многопроцессорность и многопоточность

Стоит заметить, что еще не все задания были запущены, но уже были те процессы, которые закончили выполнение. Все дело в том, что воркеров меньше, чем заданий в пулле.

3. Получить результат из потоков / процессов:

import concurrent.futures
import time
def task_function(seconds):
    print(f"Started ZProger task: {seconds}s")
    time.sleep(seconds)
    print(f"Finished ZProger task: {seconds}s")
    return f"Result: {seconds}"
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as
executor:
    seconds_list = [5, 4, 3, 3, 3]
    futures = [executor.submit(task_function, sec) for sec in
seconds_list]
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
Python
многопроцессорность и многопоточность

Как можем заметить, возврат результата также идет постепенно, по мере выполнения.

многопроцессорность и многопоточность

Как видим, executor.submit  выполняется сразу и запускает пулл в работу, далее результат из as_completed собирается по мере выполнения, ожидая пока обработается все задания, и лишь после этого оно идет дальше. Если бы проверки на результат не было, то мы бы просто не вышли из with пока не выполнился бы наш пулл.

4. Передать несколько аргументов:

import concurrent.futures
import time
def zproger_func(a, b, c):
    return f"ZProger Result: {a} / {b} / {c}"
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as
executor:
    futures = [executor.submit(zproger_func, 1, 2, 3) for _ in
range(3)]
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
# Результат:
# ZProger Result: 1 / 2 / 3
# ZProger Result: 1 / 2 / 3
# ZProger Result: 1 / 2 / 3
Python

5. Взаимоблокировки, как делать не стоит:

import concurrent.futures as cf
import time
def wait_on_b():
    time.sleep(5)
    print(b.result())
    return 5
def wait_on_a():
    time.sleep(5)
    print(a.result())
    return 6
executor = cf.ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
# Они никогда не завершатся, так как каждое задание ожидает результат
от другого. В итоге они будут бесконечно ждать.
Python

6. Блокировка выполнения с 1 воркером:

# Это также никогда не завершится, так как есть только 1 воркер,
который выполняет 1 задание, и он не может получить результат от себя
же.
def wait_on_future():
    f = executor.submit(pow, 5, 2)
    print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
Python

7. Выполнение задания с каждым аргументом используя map:

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
def get_value(value):
    return value * 2
with ProcessPoolExecutor(max_workers=3) as executor:
    result_list = executor.map(get_value, [1, 2, 3, 4, 5])
    for result in result_list:
        print(result)
# Вывод:
2 4 6 8 10
Python

8. Комбинация netmiko + map:

import time
from datetime import datetime
from itertools import repeat
from concurrent.futures import ThreadPoolExecutor
import logging
import netmiko
import yaml
logging.getLogger('paramiko').setLevel(logging.WARNING)
logging.basicConfig(
    format = '%(threadName)s %(name)s %(levelname)s: %(message)s',
    level=logging.INFO,
)
def send_show(device, show):
    start_msg = '===> {} Connection: {}'
    received_msg = '<=== {} Received:   {}'
    ip = device['ip']
    logging.info(start_msg.format(datetime.now().time(), ip))
    if ip == '192.168.100.1':
        time.sleep(5)
    with netmiko.ConnectHandler(**device) as ssh:
        ssh.enable()
        result = ssh.send_command(show)
        logging.info(received_msg.format(datetime.now().time(), ip))
        return result
with open('devices.yaml') as f:
    devices = yaml.safe_load(f)
with ThreadPoolExecutor(max_workers=3) as executor:
    result = executor.map(send_show, devices, repeat('sh clock'))
    for device, output in zip(devices, result):
print(device['ip'], output)
Python

9. Пример обработки ошибок:

from concurrent.futures import ThreadPoolExecutor, as_completed
from pprint import pprint
from datetime import datetime
import time
from itertools import repeat
import logging
import yaml
from netmiko import ConnectHandler
from netmiko.ssh_exception import NetMikoAuthenticationException
logging.getLogger("paramiko").setLevel(logging.WARNING)
logging.basicConfig(
    format = '%(threadName)s %(name)s %(levelname)s: %(message)s',
    level=logging.INFO)
start_msg = '===> {} Connection: {}'
received_msg = '<=== {} Received: {}'
def send_show(device_dict, command):
    ip = device_dict['ip']
    logging.info(start_msg.format(datetime.now().time(), ip))
    if ip == '192.168.100.1': time.sleep(5)
    with ConnectHandler(**device_dict) as ssh:
        ssh.enable()
        result = ssh.send_command(command)
        logging.info(received_msg.format(datetime.now().time(), ip))
    return {ip: result}
def send_command_to_devices(devices, command):
    data = {}
    with ThreadPoolExecutor(max_workers=2) as executor:
        future_ssh = [
            executor.submit(send_show, d, command) for d in devices
        ]
        for f in as_completed(future_ssh):
            try:
                result = f.result()
            except NetMikoAuthenticationException as e:
                print(e)
            else:
                data.update(result)
    return data
 
if __name__ == '__main__':
    with open('devices.yaml') as f:
        devices = yaml.safe_load(f)
    pprint(send_command_to_devices(devices, 'sh clock'))
Python

Как воркеры распределяют задания?

многопроцессорность и многопоточность

Я установил 2 воркера и запустил 8 заданий, как можете заметить, выполняются только задания «2» и «3». Дело в том, что они первые попали в пулл, и они будут выполняться до тех пор, пока не вернут результат. Если скажем спустя какое-то время задание «2» закончит выполнение, то вместо него воркер начнет обрабатывать задание «4», в итоге вывод будет составлять уже «3» и «4».

ProcessPoolExecutor или ThreadPoolExecutor?

Стоит добавить, что ThreadPoolExecutor использует те же потоки из Threading , поэтому они ограничены GIL. Если же вам нужно обойти GIL, тогда используйте ProcessPoolExecutor. Что выбрать многопроцессорность и многопоточность зависит от определенной задачи, если необходимо работать с операциями ввода/вывода и ваши операции не требуют вычислений, тогда идеальным решением будут потоки.

В случае с вычислениями на CPU используем ProcessPoolExecutor. Хотя он и в разы быстрее нежели ThreadPoolExecutor и выполняется параллельно, стоит учитывать, что процессы намного прожорливей, нежели потоки.

При выборе между многопроцессорность и многопоточность, рассмотрим следующую аналогию:

  • ThreadPoolExecutor подобен нескольким поварам на общей кухне
  • ProcessPoolExecutor подобен нескольким поварам, у каждого из которых своя собственная кухня

ThreadPoolExecutor идеально подходит для задач, связанных с вводом- выводом, где задачи часто ожидают внешних ресурсов, таких как чтение файлов, создание сетевых запросов или загрузка данных. В этих случаях совместное использование ресурсов допустимо и эффективно.

Преимущества:

  • Потоки легче с точки зрения использования памяти и ресурсов по сравнение с процессами, что приводит к снижению накладных расходов.
  • Потоки совместно используют одно и то же пространство памяти, поэтому обмен данными между потоками быстрый и простой, что может быть полезно, когда задачам необходимо обмениваться данными или состоянием.

Недостатки:

  • Глобальная блокировка интерпретатора (GIL): в CPython GIL предотвращает одновременное выполнение несколькими потоками байт-кода Python, что может ограничить преимущества многопоточности в производительности для задач, связанных с процессором.
  • Риск повреждения данных: из-за разделяемой памяти неправильная синхронизация между потоками может привести к повреждение данных или состояние гонки.

ProcessPoolExecutor лучше подходит для задач, связанных с CPU, где выполняются тяжелые вычисления, а совместное использование ресурсов может привести к снижению производительности. Каждый процесс выполняется в своем собственном пространстве памяти, что обеспечивает настоящий параллелизм для задач, требующих больших вычислительных ресурсов.

Преимущества:

  • Поскольку каждый процесс выполняется независимо, задачи могут использовать несколько ядер CPU для вычислений, обеспечивая более высокую производительность.
  • GIL не применяется к нескольким процессам, что позволяет выполнять задачи одновременно без ограничений производительности, связанных с GIL.
  • Изоляция: у процессов есть собственное пространство памяти, что снижает риск повреждения данных или состояния гонки.

Недостатки:

  • Процессы более ресурсоемки по сравнение с потоками, что приводит к увеличению использования памяти и времени запуска.
  • Взаимодействие между процессами происходит медленнее, чем взаимодействие между потоками, и часто требует сериализации и десериализации данных, что может повлиять на производительность.

Зачем это использовать, если есть обычные потоки?

Представьте что у вас есть задача, которую нужно запустить таким образом, чтобы она автоматически распределяла задания по 10 воркерам, и не превышала это значение. Как только 1 задание выполнено, то на его место сразу приходит другое.

Это реально сделать на Threading, но нужно писать целый модуль с использованием очередей, каналов и т.д. И именно это уже реализовано в concurrent.futures, что избавляет вас от лишней рутины.

Какой результат при отправке запросов?

Как говорилось выше, при операциях ввода-вывода (в том числе сетевых) потоки не блокируются, а значит мы можем использовать ThreadPoolExecutor. Пока один поток будет ожидать ответ, другой в это время будет обрабатывать результат. Здесь нам нужен пулл, и именно с этой задачей справляется concurrent.futures.

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(worker_count) as executor:
    for _ in executor.map(get_info, data):
pass
Python

Здесь worker_count — число рабочих потоков, get_info — функция, выполняющую саму задачу, data — итератор объектов, которые по одному будут передаваться в задачу. Уже этот код при worker_count 150, смог отправить ~400 запросов в секунду.


Опубликовано

в

,

от

Комментарии

Добавить комментарий