Simultaneidade em Python - pool de processos

O pool de processos pode ser criado e usado da mesma maneira que criamos e usamos o pool de threads. O pool de processos pode ser definido como o grupo de processos pré-instanciados e ociosos, que estão prontos para receber trabalho. A criação de pool de processos é preferível em vez de instanciar novos processos para cada tarefa quando precisamos fazer um grande número de tarefas.

Módulo Python - Concurrent.futures

A biblioteca padrão do Python tem um módulo chamado concurrent.futures. Este módulo foi adicionado ao Python 3.2 para fornecer aos desenvolvedores uma interface de alto nível para iniciar tarefas assíncronas. É uma camada de abstração no topo dos módulos de threading e multiprocessamento do Python para fornecer a interface para executar as tarefas usando pool de thread ou processos.

Em nossas seções subsequentes, examinaremos as diferentes subclasses do módulo concurrent.futures.

Classe Executor

Executor é uma classe abstrata de concurrent.futuresMódulo Python. Não pode ser usado diretamente e precisamos usar uma das seguintes subclasses concretas -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor - uma subclasse concreta

É uma das subclasses concretas da classe Executor. Ele usa multiprocessamento e obtemos um pool de processos para enviar as tarefas. Este pool atribui tarefas aos processos disponíveis e os programa para execução.

Como criar um ProcessPoolExecutor?

Com a ajuda do concurrent.futures módulo e sua subclasse concreta Executor, podemos criar facilmente um pool de processos. Para isso, precisamos construir umProcessPoolExecutorcom o número de processos que queremos no pool. Por padrão, o número é 5. Isso é seguido pelo envio de uma tarefa ao pool de processos.

Exemplo

Vamos agora considerar o mesmo exemplo que usamos ao criar pool de threads, a única diferença é que agora usaremos ProcessPoolExecutor ao invés de ThreadPoolExecutor .

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Resultado

False
False
Completed

No exemplo acima, um ProcessoPoolExecutorfoi construído com 5 fios. Em seguida, uma tarefa, que aguardará 2 segundos antes de dar a mensagem, é enviada ao executor do pool de processos. Como visto na saída, a tarefa não é concluída até 2 segundos, então a primeira chamada paradone()retornará False. Após 2 segundos, a tarefa está concluída e obtemos o resultado do futuro chamando oresult() método sobre ele.

Instanciar ProcessPoolExecutor - Gerenciador de Contexto

Outra maneira de instanciar ProcessPoolExecutor é com a ajuda do gerenciador de contexto. Funciona de forma semelhante ao método usado no exemplo acima. A principal vantagem de usar o gerenciador de contexto é que ele parece sintaticamente bom. A instanciação pode ser feita com a ajuda do seguinte código -

with ProcessPoolExecutor(max_workers = 5) as executor

Exemplo

Para melhor compreensão, estamos usando o mesmo exemplo usado durante a criação do pool de threads. Neste exemplo, precisamos começar importando oconcurrent.futuresmódulo. Então, uma função chamadaload_url()é criado, o que carregará o url solicitado. oProcessPoolExecutoré então criado com o número 5 de threads no pool. O processoPoolExecutorfoi utilizado como gerenciador de contexto. Podemos obter o resultado do futuro chamando oresult() método sobre ele.

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

Resultado

O script Python acima irá gerar a seguinte saída -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Uso da função Executor.map ()

O Python map()função é amplamente utilizada para realizar uma série de tarefas. Uma dessas tarefas é aplicar uma determinada função a cada elemento dentro dos iteráveis. Da mesma forma, podemos mapear todos os elementos de um iterador para uma função e enviá-los como trabalhos independentes para oProcessPoolExecutor. Considere o seguinte exemplo de script Python para entender isso.

Exemplo

Vamos considerar o mesmo exemplo que usamos ao criar pool de threads usando o Executor.map()função. No exemplo dado abaixo, a função de mapa é usada para aplicarsquare() função para cada valor na matriz de valores.

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

Resultado

O script Python acima irá gerar a seguinte saída

4
9
16
25

Quando usar ProcessPoolExecutor e ThreadPoolExecutor?

Agora que estudamos sobre as classes de Executor - ThreadPoolExecutor e ProcessPoolExecutor, precisamos saber quando usar qual executor. Precisamos escolher ProcessPoolExecutor no caso de cargas de trabalho vinculadas à CPU e ThreadPoolExecutor no caso de cargas de trabalho vinculadas a E / S.

Se usarmos ProcessPoolExecutor, então não precisamos nos preocupar com o GIL porque ele usa multiprocessamento. Além disso, o tempo de execução será menor quando comparado comThreadPoolExecution. Considere o seguinte exemplo de script Python para entender isso.

Exemplo

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Resultado

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Resultado

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

A partir das saídas de ambos os programas acima, podemos ver a diferença do tempo de execução ao usar ProcessPoolExecutor e ThreadPoolExecutor.