logo cosasdedevs
Parte 4: Autenticación con JWT en FastAPI

Parte 4: Autenticación con JWT en FastAPI



My Profile
Dic 08, 2021

¡Hola! Cuatro tutoriales ya 🙀, esta serie de tutoriales se está acercando a su fin, pero todavía nos quedan unas cuantas cosas por ver. En el tutorial de esta semana configuraremos nuestro proyecto para poder realizar la autenticación con JWT y aprenderemos cómo realizar el login con el usuario que creamos en el paso anterior.

Si queréis acceder al resto de los tutoriales, aquí tenéis los enlaces 👇

Modelo de Pydantic para los tokens

Lo primero que vamos a necesitar es un modelo para los tokens. Para ello vamos a la carpeta /app/v1/schema y creamos un archivo llamado token_schema.py que contendrá el siguiente código:

from pydantic import BaseModel
from typing import Optional

class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None

En este caso tendremos la clase Token que será el objeto que usaremos para retornarle el token de autenticación y el tipo de autenticación y luego tendremos una clase TokenData que almacenará el nombre de usuario en el token. Aquí podríamos guardar más información que nos podría ser útil en el token y cuando lo decodificásemos poder utilizarla.

Como podéis observar, aquí empleamos un tipo nuevo llamado Optional que recibe el tipo de dato e indica que ese campo será opcional. En este caso no estamos usando Field y por defecto le daremos un valor de None.

Nuevas variables de entorno

Ahora que ya tenemos nuestro modelo para los tokens, el siguiente paso será crear un para de variables de entorno llamadas ACCESS_TOKEN_EXPIRE_MINUTES y SECRET_KEY. La primera contendrá el tiempo de validez máximo de un token en minutos y la segunda una clave para codificar y decodificar nuestros tokens. Si estáis en Linux esta clave podéis generarla con el siguiente comando:

openssl rand -hex 32

Si no tenéis Linux, podéis copiaros las que pondré a continuación para probar el proyecto en local, pero si subís un proyecto a producción si os aconsejo que generéis una nueva.

Abrís el archivo .env y añadís las variables:


# Auth
ACCESS_TOKEN_EXPIRE_MINUTES=1440
SECRET_KEY=e97965045c7df14cb4d5760371e7325104a8f33ad5d00c0a506d6fb09d0047db

Como tiempo de expiración yo le he dado 24 horas, sin embargo, podéis darle el tiempo que queráis.

Ahora abrimos el archivo /app/v1/utils/settings.py y añadimos las nuevas variables:

import os

from pydantic import BaseSettings
from dotenv import load_dotenv
load_dotenv()


class Settings(BaseSettings):

    db_name: str = os.getenv('DB_NAME')
    db_user: str = os.getenv('DB_USER')
    db_pass: str = os.getenv('DB_PASS')
    db_host: str = os.getenv('DB_HOST')
    db_port: str = os.getenv('DB_PORT')

    secret_key: str = os.getenv('SECRET_KEY')
    token_expire: int = os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES')

Autenticación

Una vez añadidas las nuevas variables de entorno al proyecto es hora de ponernos manos a la obra con la autenticación. Para ello, primero vamos a instalar librería que usaremos para trabajar con JWT así que lanzamos el siguiente comando para instalarla:

pip install "python-jose[cryptography]"

También necesitaremos instalar python-multipart. Para ello lanzamos el siguiente comando:

pip install python-multipart

Una vez hecho esto, vamos a crear un archivo llamado auth_service.py dentro de /app/v1/service y vamos a añadir el siguiente contenido:

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

from jose import JWTError, jwt
from passlib.context import CryptContext

from app.v1.model.user_model import User as UserModel
from app.v1.schema.token_schema import TokenData
from app.v1.utils.settings import Settings

settings = Settings()


SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = settings.token_expire


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/v1/login")


def verify_password(plain_password, password):
    return pwd_context.verify(plain_password, password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(username: str):
    return UserModel.filter((UserModel.email == username) | (UserModel.username == username)).first()


def authenticate_user(username: str, password: str):
    user = get_user(username)
    if not user:
        return False
    if not verify_password(password, user.password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


def generate_token(username, password):
    user = authenticate_user(username, password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email/username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    return create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception

    user = get_user(username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

De lo imports nuevos podemos ver datetime que lo utilizaremos para darle un tiempo de vida al token.

También está OAuth2PasswordBearer que lo usaremos para indicar la url de login y para validar el token.

Y por último la librería jose para validar y generar los tokens con JWT.

Después definimos tres constantes en las que guardamos la clave secreta, el algoritmo de codificación y el tiempo de vida del token.

En las siguientes líneas volvemos a usar pwd_context que nos servirá para verificar la validez del password y para generar un hash a partir del password. Una vez explique todo el auth_service.py eliminaremos esa parte del user_service.py para no tenerla duplicada.

Creamos una instancia de OAuth2PasswordBearer y por parámetro definimos la url a la que tendrá que acceder nuestro usuario para poder hacer login.

Luego tenemos las funciones verify_password y get_password_hash para validar un password y para generar un hash de él (que es lo que guardamos como password en la base de datos).

La siguiente función que tenemos es get_user que puede recibir un username o email, esto lo hacemos así para poder autenticarnos tanto por el username como por el email. Retornamos el usuario si existe.

Posteriormente, tenemos la función authenticate_user que recibe un username y el password y comprueba que exista, si es así, verifica que el password es correcto.

La función create_access_token recibe un diccionario con la información que queremos guardar en el token y el tiempo de expiración de este y después lo genera con la función jwt.encode que recibe por parámetro la información a guardar, nuestra clave secreta y el algoritmo que utilizaremos.

Seguidamente, tenemos la función generate_token la cual llamada a la función authenticate_user para revisar la validez de los datos enviados. Si no es así lanzará una excepción y si todo ha ido bien llamará a la función create_access_token y retornará el token del usuario.

Por último tenemos la función get_current_user que recibe un token por parámetro y retorna la información del usuario si es válido.

Ahora que hemos explicado el auth_service.py, vamos a eliminar el código duplicado en el archivo user_service.py, ya que en auth_service.py tenemos también la función get_password_hash y tiene más sentido que esté ahí así que abrimos el archivo y sustituimos nuestro código por el siguiente:

from fastapi import HTTPException, status

from app.v1.model.user_model import User as UserModel
from app.v1.schema import user_schema
from app.v1.service.auth_service import get_password_hash


def create_user(user: user_schema.UserRegister):

    get_user = UserModel.filter((UserModel.email == user.email) | (UserModel.username == user.username)).first()
    if get_user:
        msg = "Email already registered"
        if get_user.username == user.username:
            msg = "Username already registered"
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=msg
        )

    db_user = UserModel(
        username=user.username,
        email=user.email,
        password=get_password_hash(user.password)
    )

    db_user.save()

    return user_schema.User(
        id = db_user.id,
        username = db_user.username,
        email = db_user.email
    )

Lo único que hemos hecho es eliminar la función get_password_hash, el import de CryptContext y la instancia de este para importar get_password_hash de auth_service.py.

Listo, ya tenemos todo casi preparado, ahora solo falta definir la ruta del login en el archivo /app/v1/router/user_route.py así que lo abrimos y sustituimos el código existente por este:

from fastapi import APIRouter
from fastapi import Depends
from fastapi import status
from fastapi import Body
from fastapi.security import OAuth2PasswordRequestForm

from app.v1.schema import user_schema
from app.v1.service import user_service
from app.v1.service import auth_service
from app.v1.schema.token_schema import Token

from app.v1.utils.db import get_db


router = APIRouter(
    prefix="/api/v1",
    tags=["users"]
)

@router.post(
    "/user/",
    status_code=status.HTTP_201_CREATED,
    response_model=user_schema.User,
    dependencies=[Depends(get_db)],
    summary="Create a new user"
)
def create_user(user: user_schema.UserRegister = Body(...)):
    """
    ## Create a new user in the app

    ### Args
    The app can receive next fields into a JSON
    - email: A valid email
    - username: Unique username
    - password: Strong password for authentication

    ### Returns
    - user: User info
    """
    return user_service.create_user(user)

@router.post(
    "/login",
    tags=["users"],
    response_model=Token
)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    """
    ## Login for access token

    ### Args
    The app can receive next fields by form data
    - username: Your username or email
    - password: Your password

    ### Returns
    - access token and token type
    """
    access_token = auth_service.generate_token(form_data.username, form_data.password)
    return Token(access_token=access_token, token_type="bearer")

El primero de los nuevos imports sería OAuth2PasswordRequestForm que es una dependencia de clase con los datos del formulario de autenticación.

También importamos auth_service para utilizar la función para generar nuestro token.

Y por último el modelo Token que creamos al principio de este tutorial y que será el modelo que retornaremos al usuario.

La otra parte que hemos añadido es la función login_for_access_token la cual tiene como decorador un router de tipo post, apuntará al path /api/v1/login, estará dentro de las tags de user en la doc y como respuesta retornará nuestro modelo Token.

La función login_for_access_token recibe por parámetro el formulario con la autenticación que será de tipo OAuth2PasswordRequestForm y después solo debemos llamar a la función generate_token de auth_service para crear el token y retornarlo al usuario.

Listo, ahora vamos a probar que todo está funcionando correctamente. Para ello y al igual que cuando creamos al usuario en el tutorial anterior, vamos a nuestro navegador a la dirección http://127.0.0.1:8000/docs y accedemos a la doc para autenticarnos. Pulsamos en Try it out y aunque aparecen varios campos nosotros únicamente debemos rellenar username y password.

Gracias a la configuración que añadimos, podremos utilizar en el campo username tanto el username como el email, ya que en mi opinión es muy fácil que olvidemos el username con el que nos registramos en una web.

Pulsamos en execute y si todo ha ido bien debería retornar el token de acceso.

Perfecto, ya tenemos el login integrado en nuestro proyecto 🤘 así que el siguiente paso será generar nuestras tareas por hacer, actualizarlas y eliminarlas usando la autenticación, pero eso lo veremos en el siguiente tutorial.

Como en tutoriales anteriores, os dejo repositorio por si necesitáis echarle un vistazo https://github.com/albertorc87/fastapi-todo-api/tree/tutorial-4-auth-jwt.

Espero que este post te ayude y como siempre, te recomiendo seguirme en Twitter para estar al tanto de los nuevo contenido. Ahora también puedes seguirme en Instagram donde estoy subiendo tips, tutoriales en vídeo e información sobre herramientas para developers.

Por último os dejo mi guía para aprender a trabajar con APIs donde explico todo el funcionamiento de una API, el protocolo HTTP y veremos como construir una API con arquitectura REST.

Nos leemos 👋.

11255 vistas

🐍 Sígueme en Twitter

Si te gusta el contenido que subo y no quieres perderte nada, sígueme en Twitter y te avisaré cada vez que cree contenido nuevo 💪
Luego ¡Te sigo!

Nos tomamos en serio tu privacidad

Utilizamos cookies propias y de terceros para recopilar y analizar datos sobre la interacción de los usuarios con cosasdedevs.com. Ver política de cookies.