Concorrência e Paralelismo – Threads, múltiplos processos e AsyncIO – Parte 2

No post anterior falei sobre a diferença entre concorrência e paralelismo, classificação de processos e soluções para implementar aplicações com código concorrente. Se você ainda não leu esse post, recomendo que você comece por lá.


É muito comum ver programadores escreverem código para executar uma tarefa qualquer, decidirem empiricamente que o desempenho não está adequado e que podem resolver isso fazendo com que exatamente o mesmo código seja executado em uma CPU mais rápida, ou por mais de uma CPU. Surpreendentemente, a situação não muda e a quantidade de recursos computacionais alocados para tarefa continua não sendo proporcional ao benefício. Neste momento, se o último post serviu de alguma coisa, deve estar óbvio que a solução lida com o problema de forma inadequada.

Antes de dar um exemplo mais concreto pra essa situação, vamos à revisão de definição:

  • Concorrência: Conjunto de tarefas sendo executadas de forma intercalada em um mesmo intervalo de tempo. É uma forma de lidar com muitas coisas ao mesmo tempo e está ligado a forma de se estruturar tarefas.
  • Paralelismo: Execução simultânea de um conjunto de tarefas, que podem ou não estar relacionadas. É uma forma de fazer muitas coisas ao mesmo tempo e está ligado a forma de se executar tarefas.

Cenários de tarefas I/O Bound

Realizar 1 requisição HTTP

Nossa tarefa é realizar uma requisição HTTP para o site http://www.americanas.com e imprimir o resultado no stdout. Que em python, a solução mais simples seria:

import requests

response = requests.get('http://www.americanas.com')
print(response.text)

O módulo requests é uma dependência e pode ser instalado com o comando pip install requests.

Essa execução pode ser representada da seguinte forma:

Screen Shot 2017-04-16 at 5.58.55 PM.png

Fig.1: Simulação de requisição de 1 url

Não existe situação de concorrência e não há nenhuma tarefa que poderia ser executada pela aplicação no tempo ocioso da CPU. Nesse caso, o tempo gasto com operações de CPU é bem menor, e o custo dessas operações é sempre o mesmo independe de fatores externos. Já o tempo gasto na barra verde pode ser determinado por diversos fatores, como:

  • Tamanho do conteúdo
  • Latência da rede
  • Banda de download do cliente
  • Banda de upload do servidor

Essa é claramente uma operação I/O bound. O que será que acontece quando tentarmos realizar não 1, mas 4 requisições?

Realizar 4 requisições HTTP

O intuito aqui é realizar a mesma tarefa no cenário anterior, porém, para 4 urls diferentes. Vamos alterar o código:

import requests

urls = (
    'http://www.americanas.com',
    'http://www.submarino.com',
    'http://www.shoptime.com',
    'http://www.soubarato.com',
)

for url in urls:
    response = requests.get(url)
    print(response.text)

Desta vez temos temos 4 urls, e o trabalho continua se dando de forma sequencial. Da mesma forma que no exemplo1. Como podemos ver abaixo, desta vez fica mais aparente que a CPU passa boa parte do tempo esperando que uma requisição termine até que possa começar a realizar outra.

Screen Shot 2017-04-16 at 6.29.39 PM.png

Fig.2: Simulação de requisição de 4 urls de forma sequencial

Você que leu o post anterior, já deve imaginar que uma forma não escalável de se atacar esse problema seria usar múltiplos processos, não é? Mesmo assim, vamos fazer só por diversão.

Solução com multiplos processos

import os
import requests
from multiprocessing import Pool

urls = (
    'http://www.americanas.com',
    'http://www.submarino.com',
    'http://www.shoptime.com',
    'http://www.soubarato.com',
)

def get_and_print(url):
    response = requests.get(url)
    print(response.text)

pool = Pool(processes=os.cpu_count())
pool.map(get_and_print, urls)

Esse é um exemplo bem simples e que não envolve compartilhamento de estados entre os processos. Na linha #19, criamos um worker para cada CPU disponível e na linha #20, dividimos o trabalho de entre cada um deles.

Vamos fazer uma simulação “inocente” da execução. Desconsiderar todos os custos adicionais de se criar e manter processos adicionais (como falamos no post anterior), e considerar que o código se comportará da seguinte forma em um ambiente com 4 núcleos:

Screen Shot 2017-04-16 at 7.01.49 PM.png

Fig.3: Simulação inocente da solução de requisições por múltiplos processos.

Solução com múltiplas threads

E o que aconteceria se adaptássemos esse mesmo código para utilizar múltiplas threads? Tudo que você precisa fazer é mudar um import.

import os
import requests
from multiprocessing.pool import ThreadPool as Pool

urls = (
    'http://www.americanas.com',
    'http://www.submarino.com',
    'http://www.shoptime.com',
    'http://www.soubarato.com',
)

def get_and_print(url):
    response = requests.get(url)
    print(response.text)

pool = Pool(processes=os.cpu_count())
pool.map(get_and_print, urls)

Assim como citei com o caso de múltiplos processos, não entrarei no mérito de discutir todos os overheads criados por cada uma dessas abordagens. Não é o propósito aqui, e você pode achar facilmente conteúdo sobre isso pela internet.

Screen Shot 2017-04-16 at 7.56.31 PM.png

Fig.4: Simulação inocente de requisições com múltiplas threads.

Como já vimos anteriormente, cada vez que uma thread pede um um recurso de I/O o controle, com mudança de contexto, pode ser passado para uma nova thread que então será executada até pedir por I/O, etc…

E se precisássemos realizar 4.000 requisições HTTP?

Digamos que cada umas das 4000 requisições tenham o mesmo custo T, ou seja, todas levam o mesmo tempo desde o momento em que um Request HTTP sai do cliente, até o momento em que todo o html está disponível e tempos um Response.

Uma solução sequencial repetiria, 4 mil vezes, o gráfico mostrado na Figura 1 e podemos dizer que seu custo seria 4000 * T.

Screen Shot 2017-04-16 at 9.01.45 PM.png

Fig.5: Simulação ingênua de muitas urls com múltiplos processos

A solução com múltiplos processos, como já vimos, implementa o mesmo código da solução sequencial, mas dividido por P núcleos. Sendo assim, desconsiderando todos os custos relacionados a criação e manutenção de processos, repetiria o mesmo custo da solução sequencial, dividido pelos P núcleos, ou seja, (4000 * T) / P. A solução multi thread apresentada tem o mesmo custo aproximado, já que a quantidade de threads estava atrelada à quantidade de núcleos disponíveis.

E se juntássemos múltiplos processos com múltiplas threads?

Cada um dos P processos poderia ter P threads e, em teoria, existiria um custo total de ((4000 * T) / P) / P).

Parece bom? Não é. E se tivéssemos que realizar 400.000 ou 4.000.000 de requisições? Com essas quantidades, conseguimos ver que é claramente possível aumentar o custo total elevando o valor de T, mas não é tão fácil aumentar o valor de P, fazendo com que o peso do numerador seja muito mais alto, gerando por consequência um custo alto. Vale lembrar também que o benefício de múltiplos processos não cresce linearmente para quantidades de processos > quantidade de núcleos, e que para múltiplas threads, o crescimento de benefício também não é linear e temos que lidar com todas as coisas citadas no post anterior.

Nesse caso, é claro que há algo de errado ou subutilizado na arquitetura da nossa solução, se na maior parte do tempo em que nossa solução está rodando, a CPU está ociosa ou trabalhando em questões não relacionadas a aplicação em si – que é criar uma request e tratar sua response. Então, qual é a alternativa?

Asynchronous I/O

Introduzida por Guido na PEP 3156 em 2012 (sim, 2012. Não é algo tão novo, né?) e acompanha as distribuições python >=3.4 desde 2013 no módulo asyncio que, a grosso modo, provê uma forma de se escalonar eficientemente aplicações I/O bound com código concorrente, em uma única thread. O módulo asyncio possui muitas outras features, e recomendo que se você tiver interesse de ir mais a fundo, leia a documentação oficial.

Vale dizer que features semelhantes existem em vários outras linguagens, como: Go,  Java, Javascript, C#, C++, Objective-C e muitas outras.

Coroutines

Para que seja possível implementar um modelo de programação concorrente, é  preciso que seja possível quebrar uma tarefa em pedaços que podem ser executados independentemente. Corotinas, em português, são funções que podem ser suspensas em determinados pontos de execução para serem retomadas depois, mantendo todos os estados de quando foi suspensa. Nós também podemos definir corotinas como um conjunto de sub-rotinas (instruções), que permitem vários pontos de entrada, de suspensão e retomada de execução em certas partes do código, realizando uma troca de contexto. Em funções python, essa troca de contexto já se dava através da sintaxe yield, e em corotinas (“funções assíncronas”), utilizando a sintaxe yield from  introduzida na PEP 380 ou await(python >=3.5).

Em python >=3.5, são funções declaradas com async def e em python <3.5, funções geradoras decoradas com @asyncio.coroutine.

Loop de Eventos

Corotinas são executadas em um loop de eventos. Loops de eventos, por sua vez, são os responsáveis por:

  • controlar e manter o agendamento de corotinas e seus pontos de suspensão e retomada toda vez que uma corotina necessitar de tempo para execução de uma tarefa;
  • lidar com sinais do sistemas operacional;
  • transmitir dados através da rede;
  • prover abstrações para transporte para diversos canais de comunicação;…
sj46yn.jpg

Fig.6: Ok, vamos voltar aos exemplos!

Realizar 1 requisição HTTP

Vamos voltar ao primeiro exemplo e escrever o mesmo código com asyncio. Para isso, vamos utilizar a biblioteca aiohttp.

import asyncio
from aiohttp import ClientSession

async def get_and_print(loop):
    async with ClientSession(loop=loop) as session:
        async with session.get('http://www.americanas.com') as response:
            print(await response.text())

loop = asyncio.get_event_loop()
loop.run_until_complete(get_and_print(loop))

Nesse caso, escrevemos muito mais para chegar a mesma solução, mas muito mais foi feito nessas 10 linhas de código. Algumas linhas são bem importantes:

  • #4: Definimos uma corotina chamada get_and_print
  • #9: Instanciamos o nosso loop de eventos
  • #10: Dizemos que o loop deve rodar até que a coroutine get_and_print termine
  • #6 e #7: async with, assim como with, nada mais é do que um gerenciador de contexto.
  • #8: Utilizamos await para dizer que a corotina pode suspender sua execução neste ponto e retornar somente quando tiver o resultado da future response.text()

Em questões de desempenho e melhor uso da CPU, para o caso de uma aplicação cujo único propósito é fazer uma requisição para uma única url e imprimir na tela, tudo isso não faz sentido e é meramente didático. Vamos avançar um pouco e ver como faríamos o mesmo para 4 urls.

Realizar 4 requisições HTTP

import asyncio
from aiohttp import ClientSession

urls = (
    'http://www.americanas.com',
    'http://www.submarino.com',
    'http://www.shoptime.com',
    'http://www.soubarato.com',
)

async def get_and_print(session, url):
    async with session.get(url) as response:
        print(await response.text())

async def fetch(loop, urls):
    async with ClientSession(loop=loop) as session:
        tasks = (get_and_print(session, url) for url in urls)
        await asyncio.gather(*tasks, return_exceptions=True)

loop = asyncio.get_event_loop()
loop.run_until_complete(fetch(loop, urls))

Agora nós temos duas corotinas: fetch responsável por agendar e esperar a finalização de tarefas individuais de requisições e get_and_print, que reage a uma response e imprime em stdout.

Algumas linhas também valem ser comentadas aqui:

  • #19: Inicializamos ClientSession somente uma vez. Esse objeto encapsula um pool de conexões keep-alive que podemos tirar vantagem. Recomendo que você leia mais sobre na documentação oficial,
  • #20: Criamos um gerador de corotinas a serem executadas. Uma para cada url.
  • #21: Utilizamos a função asyncio.gather para agregar várias corotinas e esperar pela finalização das mesmas. Ou seja, é uma forma de se esperar pela execução concorrente de diversas corotinas até que todas cheguem ao fim. O resultado será uma lista, onde cada um dos itens é o resultado das corotinas ou, como utilizamos o return_exceptions=True, também pode ser uma exception não tratada durante sua execução.

O que tudo isso significa?

Screen Shot 2017-04-17 at 2.11.04 AM.png

Fig.7: Simulação de múltiplas urls com asyncio

  • Significa que o programa não é escrito para esperar até que tenha uma resposta da rede. Toda vez que uma corotina tem que esperar por um recurso, o controle é cedido de volta para o loop que pode alocar uma outra tarefa;
  • Nós programadores escrevemos “callbacks” que devem ser executados quando o resultado de uma chamada a rede terminar;
  • Estruturando bem a solução do problema, conseguimos garantir o melhor escalonamento possível de tarefas I/O bound.

E se quiséssemos que a execução do loop não parasse quando a fetch terminasse? Para fins didáticos, vamos reescrever o mesmo programa para utilizar o método run_forever.

import asyncio
from aiohttp import ClientSession

urls = (
    'http://www.americanas.com',
    'http://www.submarino.com',
    'http://www.shoptime.com',
    'http://www.soubarato.com',
)

async def get_and_print(session, url):
    async with session.get(url) as response:
        print(await response.text())

async def fetch(loop, urls):
    async with ClientSession(loop=loop) as session:
        tasks = (get_and_print(session, url) for url in urls)
        await asyncio.gather(*tasks, return_exceptions=True)

loop = asyncio.get_event_loop()
loop.create_task(fetch(loop, urls))
loop.run_forever()

  • #24: Usamos o método loop.create_task para agendar a chamada de fetch para quando o loop estiver rodando.
  • #25: Dizemos que o loop deve rodar até que o método stop seja chamado. Ou seja, se quisermos que o programa termine após a execução de todas as tarefas, basta chamar loop.stop na linha #22.

Conclusão

Desenvolver assincronamente é, de fato, algo estranho no começo se compararmos com um fluxo sequencial de desenvolvimento. Exige uma mudança de mindset e, como qualquer outra coisa, prática. Não pare por aqui!


Todos os códigos estão disponíveis no meu github.

Anúncios

Deixe um comentário

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair / Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair / Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair / Alterar )

Foto do Google+

Você está comentando utilizando sua conta Google+. Sair / Alterar )

Conectando a %s