logo cosasdedevs
Qué es el paralelismo y cómo utilizarlo en Python

Qué es el paralelismo y cómo utilizarlo en Python



My Profile
Sep 03, 2022

¡Hola! 👋. En este tutorial te voy a explicar qué es el paralelismo y vas a ver como puede mejorar de manera brutal el rendimiento de nuestras aplicaciones. Todo esto lo vamos a ver con un ejemplo real con Python para que veas su utilidad 💪.

¿Qué es el paralelismo? 🤔

El paralelismo es una herramienta que podemos utilizar para realizar acciones de forma simultánea y de esta forma reducir la cantidad de tiempo que consume la ejecución de un programa.

Siempre que encontramos el término paralelismo suele ir de la mano de la concurrencia, así que a continuación explicaré que es

¿Qué es la concurrencia?

La concurrencia es la forma de gestionar y coordinar los múltiples procesos que podemos generar utilizando el paralelismo.

Ejemplo de script en Python que no funciona en paralelo

Para explicar el funcionamiento primero vamos a ver un script en el que no vamos a usar paralelismo. Este va a consistir en realizar una serie de peticiones a la API de Rick and Morty para obtener todos los personajes. Ya que la API está paginada, deberemos realizar varias peticiones para obtener todos los personajes.

Si no tienes muy claro el funcionamiento de una API, te recomiendo que primero te leas mi guía para aprender a trabajar con APIs.

Dicho esto, vamos al ejemplo. Yo me he creado un entorno virtual y para realizar las peticiones he utilizado la librería requests de Python la cual puedes instalar con el siguiente comando:

pip install requests

Si quieres profundizar en el funcionamiento de este paquete, te dejo este tutorial en el que explico cómo utilizar requests de Python con una API Rest.

Una vez instalada, este es el código que he utilizado para obtener todos los personajes:

import requests
import time

all_characters = []

def get_characters(page: int = 1):
    url = f"https://rickandmortyapi.com/api/character/?page={page}"
    print(f"For url: {url}")

    response = requests.get(url)

    if(response.status_code == 200):
        characters = response.json()
        for character in characters['results']:
            all_characters.append(character)

        if(characters['info']['next']):
            page += 1
            return get_characters(page)
        else:
            return

if __name__ == '__main__':
    init = time.time()
    get_characters()
    print('Num results:')
    print(len(all_characters))
    print('Time ' + str(time.time() - init))

Como puedes ver, primero importo los paquetes requests y time. La primera para poder realizar las peticiones a la API y la segunda para poder calcular el tiempo de ejecución entre el inicio y final de la recolección de personajes.

Después he creado una lista donde guardaremos todos los personajes.

El siguiente paso es declarar la función get_characters que funcionará de forma recursiva y se encargará de obtener todos los personajes.

Ya que, como comenté antes, la respuesta de la API para obtener los usuarios está paginada, debemos realizar varias peticiones para obtener todos los personajes, así que por defecto pediremos la primera página y mientras nos indique que hay más páginas, incrementaremos el valor de la variable page y seguiremos llamando a la función get_characters de forma recursiva.

Esto lo podemos saber gracias a la clave "info" de la respuesta de la API en la que nos indica el número total de páginas y si hay una página siguiente a la que consultar. Mientras la clave "next" tenga un valor, significa que tenemos una página más a revisar:

{
  "info": {
    "count": 826,
    "pages": 42,
    "next": "https://rickandmortyapi.com/api/character/?page=2",
    "prev": null
  },
  "results": [
    {....

Por último, tenemos el punto de entrada de nuestro script en el que guardamos la fecha de inicio, ejecutamos nuestra función, imprimimos el número de personajes obtenidos y calculamos la diferencia de la fecha de inicio y la actual para saber el tiempo que ha tardado en obtener toda la información.

Si ejecutamos el script, deberíamos obtener una respuesta similar a esta:

For url: https://rickandmortyapi.com/api/character/?page=1
For url: https://rickandmortyapi.com/api/character/?page=2
For url: https://rickandmortyapi.com/api/character/?page=3
For url: https://rickandmortyapi.com/api/character/?page=4
For url: https://rickandmortyapi.com/api/character/?page=5
...
Num results:
826
Time 8.224845886230469

Como puedes observar, la ejecución ha tardado unos 8 segundos. No es mucho tiempo, pero si sigue creciendo el número de personajes cada vez se irá incrementando más y más el tiempo de ejecución. ¿Cómo mejoramos estos tiempos? Lo vas a ver ahora mismo.

Cómo usar el paralelismo en nuestro script de Python

Para usar el paralelismo con nuestro ejemplo, vamos a utilizar la librería nativa de Python threading la cual nos permite realizar el paralelismo mediante hilos o procesos. En nuestro caso vamos a utilizar hilos.

Dicho esto, te dejo el ejemplo ya modificado para utilizar hilos y a continuación te explicaré como funciona:

import requests
import time

from threading import Thread

all_characters = []

def get_characters_by_page(page: int = 1):
    url = f"https://rickandmortyapi.com/api/character/?page={page}"
    print(f"For url: {url}")

    response = requests.get(url)

    if(response.status_code == 200):
        characters = response.json()
        for character in characters['results']:
            all_characters.append(character)

        return characters

    raise Exception(response.json()['error'])

def get_characters_parallel():
    characters = get_characters_by_page()

    pages = characters['info']['pages']

    print(f'Num pages {pages}')

    request_threads = {}
    for page in range(2, (pages + 1)):
        request_threads[page] = Thread(target=get_characters_by_page, kwargs={'page':page})
        request_threads[page].start()

    while(True):
        for key, value in request_threads.copy().items():
            if not value.is_alive():
                del request_threads[key]

        if len(request_threads) < 1:
            break


if __name__ == '__main__':
    init = time.time()
    get_characters_parallel()
    print('Num results:')
    print(len(all_characters))
    print('Time ' + str(time.time() - init))

Como puedes observar, además de requests y time, en este caso importamos la clase Thread de la librería threading.

Como en el caso anterior, tenemos una lista donde guardaremos todos los personajes.

A continuación, he creado la función get_characters_by_page la cual obtiene los personajes de la página indicada y los guarda en la lista. Además, también retorna el número total de páginas.

La siguiente función es get_characters_parallel, aquí es donde está toda la chicha xd. Primero obtenemos los personajes de la primera página con la función get_characters_by_page y como retorna el número total de páginas, con este número podremos decidir el número total de hilos para que se ejecuten en paralelo.

Una vez obtenidos los personajes de la primera página y el número total de páginas, he creado un diccionario llamado request_threads donde vamos a guardar todos los hilos.

El siguiente paso es crear un ciclo for con un rango de números desde la página 2 hasta la última.

Dentro del ciclo crearemos nuestros hilos generando una instancia de la clase Thread en la que indicamos como primer parámetro target la función a ejecutar en un hilo y en el segundo parámetro kwargs indico los parámetros que utiliza esa función, en este caso el número de página.

Una vez instanciada la clase, debemos indicar al hilo que empiece, esto lo haremos con el método start(), esto es muy importante, ya que si no llamamos a este método la función dentro del hilo nunca se ejecutará.

Cuando el ciclo for termine, debemos verificar que los hilos que hemos creado han terminado. Para ello he creado un bucle while en el que voy verificando cada hilo creado con el método is_alive().

Si devuelve false, significa que el hilo ha terminado, por lo que lo elimino del diccionario. Cuando el diccionario se queda vacío, lanzamos break para salir del bucle y terminar la ejecución.

Por último, y al igual que en el caso anterior, tenemos el punto de entrada del script donde calculamos el tiempo de ejecución y el número de resultados.

Si ejecutamos el script, obtendremos una respuesta similar a esta:

For url: https://rickandmortyapi.com/api/character/?page=1
Num pages 42
For url: https://rickandmortyapi.com/api/character/?page=2
For url: https://rickandmortyapi.com/api/character/?page=3
For url: https://rickandmortyapi.com/api/character/?page=4
For url: https://rickandmortyapi.com/api/character/?page=5
...
Num results:
826
Time 1.1612300872802734

Como ves, obtenemos el mismo número de resultados, pero la ejecución del script pasa de ejecutarse en 8 segundos a solo 1 🙀.

¿Y esto porque pasa? 😯

Pues bien, en el primer caso, lanzamos las peticiones una a una, por lo que hasta que no termina una petición, no se ejecuta la siguiente. En el caso de usar paralelismo, las peticiones se realizan de forma simultánea, por lo que una petición no tiene que esperar a que termine la otra y de esta manera podemos obtener los datos antes.

Cuidado 🚨

En este caso, tanto la API como la CPU han respondido bien, pero tenemos que tener en cuenta cosas como el número de peticiones por segundo que podemos realizar a una API o la cantidad de recursos que consume el script si lanzamos un número X de hilos.

Una buena opción para resolver este tipo de problemas sería realizar el uso de la concurrencia, como por ejemplo poner un límite máximo de procesos en paralelo y que cuando lleguemos a ese límite esperar a que termine alguno de ellos para introducir nuevos procesos.

En este último ejemplo os muestro el script modificado para que ahora como máximo se puedan ejecutar 20 hilos en paralelo:

import requests
import time

from threading import Thread

all_characters = []

def get_characters_by_page(page: int = 1):
    url = f"https://rickandmortyapi.com/api/character/?page={page}"
    print(f"For url: {url}")

    response = requests.get(url)

    if(response.status_code == 200):
        characters = response.json()
        for character in characters['results']:
            all_characters.append(character)

        return characters['info']['pages']

    raise Exception(response.json()['error'])

def get_characters_parallel():
    pages = get_characters_by_page()

    print(f'Num pages {pages}')

    limit_threads = 20

    request_threads = {}
    for page in range(2, (pages + 1)):

        if len(request_threads) >= limit_threads:

            check_request_threads = True

            while check_request_threads:
                request_threads = clean_old_request_threads(request_threads)
                if len(request_threads) >= limit_threads:
                    print('Wait 1 second')
                    time.sleep(1)
                else:
                    check_request_threads = False

        request_threads[page] = Thread(target=get_characters_by_page, kwargs={'page':page})
        request_threads[page].start()


    while(True):
        request_threads = clean_old_request_threads(request_threads)

        if len(request_threads) < 1:
            break

def clean_old_request_threads(request_threads):

    for key, value in request_threads.copy().items():
        if not value.is_alive():
            del request_threads[key]

    return request_threads



if __name__ == '__main__':
    init = time.time()
    get_characters_parallel()
    print('Num results:')
    print(len(all_characters))
    print('Time ' + str(time.time() - init))

De esta forma, el proceso pasa a ejecutarse en unos 3 segundos. Tardamos un poco más, pero ganamos control del proceso y sigue siendo más rápido que si no utilizamos el paralelismo.

Espero que este post te ayude y como siempre, te recomiendo seguirme en Twitter para estar al tanto de los nuevo contenido. Ahora también puedes seguirme en Instagram donde estoy subiendo tips, tutoriales en vídeo e información sobre herramientas para developers.

Por último os dejo mi guía para aprender a trabajar con APIs donde explico todo el funcionamiento de una API, el protocolo HTTP y veremos como construir una API con arquitectura REST.

Nos leemos 👋.

6358 vistas

🐍 Sígueme en Twitter

Si te gusta el contenido que subo y no quieres perderte nada, sígueme en Twitter y te avisaré cada vez que cree contenido nuevo 💪
Luego ¡Te sigo!

Nos tomamos en serio tu privacidad

Utilizamos cookies propias y de terceros para recopilar y analizar datos sobre la interacción de los usuarios con cosasdedevs.com. Ver política de cookies.