Cómo crear un CRUD en python. Parte 5: Test unitarios
Hoy nos toca terminar con esta serie de tutoriales para principiantes en Python, en este tutorial vamos a ver como integrar test unitarios en nuestra aplicación. Para el que no sepa que son, básicamente son scripts que creamos para testar bloques de código y sino nos devuelve la respuesta que esperamos mostrará un error. Así nos aseguramos que todo está en perfectas condiciones cuando realizamos un cambio en nuestra aplicación.
Os dejo los enlaces a las otras partes del tutorial por si os habéis perdido alguna ;)
Cómo crear un CRUD en python. Parte 1: Estructura y clase
Cómo crear un CRUD en python. Parte 2: Creación y listado
Cómo crear un CRUD en python. Parte 3: Modificar, borrar y buscar
Cómo crear un CRUD en python. Parte 4: Conexión a Postgresql
Si queréis más información acerca de los test unitarios en Python, podéis ver más información desde la documentación oficial.
Lo primero de todo será añadir unas mejoras en nuestra clase DBPostgresql para que sus funciones sean más testables, así que abrimos el archivo classes/dbpostgresql.py y lo primero que modificaremos será el constructor.
def __init__(self, schema, table_name, is_test):
self._table_name = table_name
self._schema = schema
self._is_test = is_test
env = Env()
env.read_env()
self._connect = psycopg2.connect(
host=env('POSTGRES_HOST'),
database=env('POSTGRES_DB'),
user=env('POSTGRES_USER'),
password=env('POSTGRES_PASSWORD')
)
self._cur = self._connect.cursor()
self._launch_query('SELECT 1')
print('Conexión establecida con éxito')
self._create_table()
Ahora pasamos un nuevo parámetro llamado is_test que lo usaremos en la función _launch_query que es la que vamos a modificar ahora.
def _launch_query(self, query):
print(query)
self._cur.execute(query)
matches = re.search(r"^SELECT", query, re.IGNORECASE)
affected = self._cur.rowcount
if not matches and not self._is_test:
self._connect.commit()
return affected
Cuando estemos realizando un test no vamos a querer que los cambios se apliquen a la base de datos, para ello utilizaremos la variable is_test, si estamos en un test no se realizará el commit. Además ahora retornaremos el número de filas afectadas por nuestra consulta con rowcount.
def insert(self, data):
values = "'" + "', '".join(data.values()) + "'"
query = f'INSERT INTO public.{self._table_name} ({", ".join(data.keys())}) VALUES ({values}) RETURNING id;'
return self._launch_query(query)
La siguiente función que hemos modificado es el insert, ahora al realizar el insert tendremos la posibilidad de recuperar el id de la fila insertada más adelante.
def delete(self, id_object):
query = f'DELETE FROM public.{self._table_name} WHERE id = {id_object};'
return self._launch_query(query)
def update(self, id_object, data):
list_update = []
for field_name, field_value in data.items():
list_update.append(f"{field_name}='{field_value}'")
query = f'UPDATE public.{self._table_name} SET {", ".join(list_update)} WHERE id = {id_object};'
return self._launch_query(query)
En delete y update ahora retornamos el resultado de _launch_query que es el número de filas afectados, de esa forma confirmaremos si se ha realizado algún cambio.
def get_by_id(self, id_object):
query = f'SELECT * FROM public.{self._table_name} WHERE id = {id_object};'
table_keys = []
for schema_key in self._schema.keys():
table_keys.append(schema_key)
data = {}
self._launch_query(query)
row = self._cur.fetchone()
if row:
for key, value in enumerate(row):
data[table_keys[key]] = value
return data
En la función get_by_id hemos corregido un problema, solo guardamos datos en data si el row tiene datos, así evitamos que guarde None en data y nos devuelva una respuesta inesperada.
def get_last_id(self):
return self._cur.fetchone()[0]
Por último añadiremos la función get_last_id que nos devolverá el id de la última fila insertada.
Ahora que ya tenemos esta parte lista, abriremos el archivo classes/dbcontacts.py para realizar también unas modificaciones en la clase DBcontacts.
def __init__(self, is_test=False):
table_name = 'contacts'
super().__init__(SCHEMA, table_name, is_test)
En el constructor ahora le pasaremos si estamos realizando un test, por defecto está a False, así no tendremos que realizar más modificaciones en el código.
def delete_contact(self, id_object):
if not id_object:
raise ValueError('Debes enviar el id del contacto')
return self.delete(id_object)
Por último modificaremos la función delete_contact para devolver el resultado de la acción de borrado.
Una vez hecho esto, lo que haremos será crear una carpeta en nuestro directorio raiz llamada tests y dentro de ella crearemos el archivo test_dbcontacts, este archivo se encargará de comprobar que toda la funcionalidad de la clase DBContacts es la correcta.
Abrimos el archivo test_dbcontacts y añadimos la primera parte.
import unittest
from classes.contact import Contact
from classes.dbcontacts import DBContacts
class TestDBContacts(unittest.TestCase):
def setUp(self):
self.db = DBContacts(True)
self.db.save_contact(self._object_contact())
def _object_contact(self):
return Contact(None, 'Usertest', 'User Test', 'user@gmail.com', '999999999', '1987-11-23')
def _dict_contact(self):
return {
'name':'Usertest2',
'surname':'User Test2',
'email':'user2@gmail.com',
'phone':900000000,
'birthday':'1987-11-24'
}
Aquí estamos importando unittest que es la librería que utilizaremos para los tests y que ya viene integrada con python por lo que no tendremos que descargarla, después creamos nuestra clase que extenderá de unittest.TestCase. Una cosa muy importante, debe empezar por la palabra Test, sino no funcionará.
Después declaramos la función setUp que se utiliza para declarar funcionalidades antes de que empiece cada test, nosotros la usaremos para que en cada test declare una instancia de nuestra base de datos y que cree un contacto.
Por último declaramos dos funciones, una con un objeto de tipo Contact y otro un diccionario también con datos de un contacto para poder recuperar esta información rápidamente a la hora de usar tests.
También existe la función tearDown que se ejecuta cada vez que finaliza uno de nuestros métodos de test pero en este caso no la necesitaremos, por lo tanto, no la vamos a declarar en nuestro test.
Ahora que ya lo tenemos todo prepararado, empezaremos creando el test para la creación de un contacto, para ello creamos la función test_save_contact. Para el caso de las funciones pasa igual que con la clase, para que funcionen deben empezar por la palabra test_.
def test_save_contact(self):
result = self.db.save_contact(self._object_contact())
self.assertEqual(result, 1)
Esta función llama a save_contact para crear un contacto y recupera el resultado que si recordaréis es el número de filas afectadas por este cambio, una vez hecho esto, utilizaremos una de las funciones de validación de test, en este caso assertEqual que verifica que el resultado obtenido es igual al que pasamos por segundo parámetro.
Para ver todos los tipos de funciones de validación podéis verlo desde la documentación.
Ahora crearemos la función para validar la actualización de un contacto, para ello creamos la función test_update_contact.
def test_update_contact(self):
contact = self._dict_contact()
last_id = self.db.get_last_id()
result = self.db.update_contact(last_id, contact)
self.assertNotEqual(result, 0)
ddbb_contact = self.db.get_by_id(last_id)
self.assertEqual(contact['name'], ddbb_contact['name'])
self.assertEqual(contact['surname'], ddbb_contact['surname'])
self.assertEqual(contact['email'], ddbb_contact['email'])
self.assertEqual(contact['phone'], ddbb_contact['phone'])
self.assertEqual(contact['birthday'], ddbb_contact['birthday'].strftime("%Y-%m-%d"))
Para testar el update, obtenemos el último id insertado con el nuevo método que añadimos a la clase DBPostgresql llamado get_last_id y como cada vez que se instancia la base de datos también estamos insertando un contacto, lo tendremos disponible.
Después llamamos a update_contact para actualizar el contacto y pasamos la configuración del contacto que creamos con la función _dict_contact.
Una vez hecho esto usamos assertNotEqual para comprobar que se ha realizado algún cambio, esto lo hacemos para que veais otros posibles usos pero podríamos haberlo hecho exactamente que en el test de guardado de contacto.
Por último recuperamos el contacto de la base de datos y contrastamos que coinciden con el diccionario para actualizarlo.
Ahora testaremos la funcionalidad para obtener un contacto en concreto, para ello crearemos la función test_get_contact.
def test_get_contact(self):
last_id = self.db.get_last_id()
contact = self.db.get_by_id(last_id)
self.assertNotEqual(contact, {})
my_contact = self._object_contact()
self.assertEqual(contact['name'], my_contact.name)
self.assertEqual(contact['surname'], my_contact.surname)
self.assertEqual(contact['email'], my_contact.email)
self.assertEqual(str(contact['phone']), my_contact.phone)
self.assertEqual(contact['birthday'].strftime("%Y-%m-%d"), my_contact.birthday)
Obtendremos el último id insertado y nos traeremos el contacto por id, una vez hecho esto, simplemente lo comparamos con los datos de la función que usamos para pasar un contacto a la función de guardado y verificamos que coinciden.
def test_search_contact(self):
last_id = self.db.get_last_id()
my_contact = self._object_contact()
filters = {
'name': my_contact.name,
'surname': my_contact.surname,
'email': my_contact.email
}
list_contacts = self.db.search_contacts(filters)
self.assertNotEqual(list_contacts, [])
for contact in list_contacts:
self.assertIsInstance(contact, Contact)
self.assertEqual(contact.name, my_contact.name)
self.assertEqual(contact.surname, my_contact.surname)
self.assertEqual(contact.email, my_contact.email)
self.assertEqual(str(contact.phone), my_contact.phone)
self.assertEqual(contact.birthday.strftime("%Y-%m-%d"), my_contact.birthday)
break
Para la búsqueda crearemos la función test_search_contact, filtraremos por los datos del contacto definido en _object_contact y obtendremos un listado con search_contacts. Una vez hecho esto, comprobamos que no devuelve un listado vacío con assertNotEqual y recorremos el listado.
Con assertIsInstance comprobamos que es una instancia de la clase Contact y luego comparamos el resultado con el objeto que tenemos guardado para nuestras pruebas.
def test_list_contacts(self):
list_contacts = self.db.list_contacts()
self.assertNotEqual(list_contacts, [])
for contact in list_contacts:
self.assertIsInstance(contact, Contact)
Para el listado de contactos haremos casi lo mismo que en la función anterior pero esta vez sin usar filtros.
def test_remove_contact(self):
last_id = self.db.get_last_id()
self.assertEqual(self.db.delete_contact(last_id), 1)
contact = self.db.get_by_id(last_id)
self.assertEqual(contact, {})
Como último test, comprobaremos el borrado, obtenemos el id del último contacto insertado y después llamamos a delete_contact, si todo ha ido bien, nos debe devolver 1 que significa que ha afectado a una fila. Por último comprobamos que al intentar recuperar el contacto eliminado el resultado es un diccionario vacío y con eso ya tendríamos todos los tests.
Ahora que ya tenemos esto al final de archivo añadiremos la función de entrada a la funcionalidad del test y ya estaría listo.
if __name__ == "__main__":
unittest.main()
Bien, pues ya tenemos montados todos nuestros tests, ahora solo falta ejecutarlos, para ello en nuestro directorio raiz lanzaremos la siguiente instrucción:
python -m unittest tests/test_dbcontacts.py --verbose
Y este debe ser el resultado:
Una cosa que no he comentado antes es que --verbose lo he añadido para tener más información pero realmente no sería necesario.
Conclusiones
En este tutorial hemos aprendido como utilizar los tests unitarios en python y como usar algunos de sus asserts. Os ánimo a que leais más sobre este tema porque solo hemos rascado la superficie y hay un montón de cosas que puedes hacer con esto además de lo que habéis visto.
Por último, espero que os ayude esta serie de tutoriales, recordad que podéis descargar el proyecto al completo desde mi github y que para cualquier duda podéis usar la caja de comentarios :)
Espero que este post te ayude y como siempre, te recomiendo seguirme en Twitter para estar al tanto de los nuevo contenido. Ahora también puedes seguirme en Instagram donde estoy subiendo tips, tutoriales en vídeo e información sobre herramientas para developers.
Por último os dejo mi guía para aprender a trabajar con APIs donde explico todo el funcionamiento de una API, el protocolo HTTP y veremos como construir una API con arquitectura REST.
Nos leemos 👋.