logo cosasdedevs
Cómo crear un CRUD en python. Parte 4: Conexión a Postgresql

Cómo crear un CRUD en python. Parte 4: Conexión a Postgresql



My Profile
Mar 14, 2020

Bienvenidos a la cuarta parte de este tutorial, en esta parte vamos a cambiar nuestra base de datos que hasta ahora estaba basada en el uso de un CSV por el uso de una base de datos de verdad, en este caso Postgresql.

Para ello lo primero que haremos será instalar Postgresql en nuestro ordenador, para ello lo podemos descargar desde la página oficial de Postgresql.

Después accederemos a la consola de Postgresql, si estamos en Windows lo haremos con la Shell que nos instala (SQL shell) y con linux escribiendo el comando psql.

Una vez dentro, crearemos la base de datos, el usuario con el que nos conectaremos y le daremos permisos para que pueda usarla con lo siguientes comandos.

create database agenda;
CREATE USER <nombre-usuario> WITH PASSWORD '<contraseña>';
grant all privileges on database agenda to <nombre-usuario>;

Lo siguiente es instalar es activar nuestro entorno virtual e instalar la librería psycopg2 que es la que usaremos para realizar las conexiones.

pip install psycopg2

También instalaremos la librería environs, que la usaremos para acceder a las variables de entorno que vamos a crear para guardar la información relativa a los accesos de la base de datos.

pip install environs

Ahora en nuestro directorio raiz crearemos el archivo .env donde guardaremos las variables de entorno y asignaremos los datos de conexión a Postgresql.

POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=agenda
POSTGRES_USER=<nombre-usuario>
POSTGRES_PASSWORD=<contraseña>

Ahora que ya tenemos esto nos pondremos con la clase para acceder a la base de datos, para ello creamos el archivo classes/dbpostgresql.py y crearemos la clase DBPostgresql con la que gestionaremos todo. Esta clase sustituirá a la clase DBbyCSV por lo que contendrá la mayoria de sus funciones aunque obviamente adaptadas para Postgresql, es nos hará tener que realizar muy pocos cambios en la clase DBContacts que es la que extiende de ella.

import psycopg2
import re
from environs import Env

class DBPostgresql:

    def __init__(self, schema, table_name):
        self._table_name = table_name
        self._schema = schema
        env = Env()
        env.read_env()
        self._connect = psycopg2.connect(
            host=env('POSTGRES_HOST'), 
            database=env('POSTGRES_DB'), 
            user=env('POSTGRES_USER'), 
            password=env('POSTGRES_PASSWORD')
        )

        self._cur = self._connect.cursor()
        self._launch_query('SELECT 1')
        print('Conexión establecida con éxito')
            
        self._create_table()


En el constructor de la clase, recibiremos el schema y el nombre que le daremos a la tabla, después accedemos a las variables de entorno para crear la conexión a la base de datos. Una vez hecho esto comprobamos que funciona correctamente lanzando una query y después accederemos a la función _create_table que lo que hace es comprobar si existe ya una tabla con el nombre introducido por parámetro, si no es así la creamos.

     def _create_table(self):

        query = f'CREATE TABLE IF NOT EXISTS public.{self._table_name} ('
        primary_key = ''
        for field_name, config in self._schema.items():
            if config['type'] == 'autoincrement':
                query += f'{field_name} serial,'
                primary_key = f'PRIMARY KEY ({field_name})'

            elif config['type'] in ['string', 'int']:
                if config['type'] == 'string':
                    query += f'{field_name} character varying'
                else:
                    query += f'{field_name} integer'

                if 'max_length' in config:
                    query += f'({config["max_length"]})'
                query += ','

            elif config['type'] == 'date':
                query += f'{field_name} date,'

        query += f'{primary_key})'

        self._launch_query(query)

En esta función recibimos un schema como el que creamos para la clase que trabajaba con csvs y generará el código sql para crear la tabla. Al finalizar hemos creado una función donde lanzaremos la query llamada _launch_query, que usaremos también para todas las demás funciones que creemos en esta clase.

    def _launch_query(self, query):
        print(query)
        self._cur.execute(query)
        matches = re.search(r"^SELECT", query, re.IGNORECASE)
        if not matches:
            self._connect.commit()

En esta función mostramos la query introducida por el usuario (yo lo he dejado porque es un tutorial pero está feo que se vea eso en una aplicación en producción), ejecuta la query y comprueba si es un SELECT u otro tipo de orden, si es otra orden como por ejemplo un INSERT, realizará el commit que lo que hará es confirmar los cambios en la base de datos.

También añadiremos el destructor para cerrar las conexiones a la base de datos cuando se elimine el objeto de nuestra clase.

    def __del__(self):
        self._connect.close()
        self._cur.close()

Ahora vamos a crear las funciones de inserción, actualización y borrado.

INSERT

    def insert(self, data):

        values = "'" + "', '".join(data.values()) + "'"
        query = f'INSERT INTO public.{self._table_name} ({", ".join(data.keys())}) VALUES ({values});'

        self._launch_query(query)

        return True

Para el insert recibimos un diccionario con el nombre del campo y su valor y lo convertimos en una query para realizar posteriormente el insert.

UPDATE

    def update(self, id_object, data):

        list_update = []
        for field_name, field_value in data.items():
            list_update.append(f"{field_name}='{field_value}'")
        

        query = f'UPDATE public.{self._table_name} SET {", ".join(list_update)} WHERE id = {id_object};'
        self._launch_query(query)

Con el update, estamos recibiendo él id de la fila a actualizar y un diccionario con los campos que queremos actualizar, creamos la query y la lanzamos con el _launch_query.

DELETE

    def delete(self, id_object):
        query = f'DELETE FROM public.{self._table_name} WHERE id = {id_object};'

        self._launch_query(query)

Para el borrado, simplemente pasamos el id, creamos la query y la lanzamos.

Ahora crearemos la función para obtener una fila por id, para eso creamos la función get_by_id.

    def get_by_id(self, id_object):
        query = f'SELECT * FROM public.{self._table_name} WHERE id = {id_object};'

        table_keys = []
        for schema_key in self._schema.keys():
            table_keys.append(schema_key)
            
        data = {}
        self._launch_query(query)
        row = self._cur.fetchone()
        for key, value in enumerate(row):
            data[table_keys[key]] = value

        return data

Esta función recibe un id, obtiene las keys del schema y genera un diccionario con el resultado de la búsqueda, ya que estamos buscando una fila en concreto, usamos fetchone para obtener solo un objeto.

Después creamos la función get_by_filters que es la que realiza los filtrados de búsqueda.

    def get_by_filters(self, filters=None):

        list_filters = []

        where = '1=1'
        if filters is not None:
            for field_name, field_value in filters.items():
                list_filters.append(f"{field_name} LIKE '%{field_value}%'")

                where = " AND ".join(list_filters)

        query = f'SELECT * FROM public.{self._table_name} WHERE {where};'

        table_keys = []
        for schema_key in self._schema.keys():
            table_keys.append(schema_key)

        list_data = []
        self._launch_query(query)
        rows = self._cur.fetchall()

        for row in rows:
            data = {}
            for key, value in enumerate(row):
                data[table_keys[key]] = value

            list_data.append(data)

        return list_data

Aquí comprobamos si nos está pasando filtros, sino es así, nos traemos todos los datos, si vienen los filtros crearemos un WHERE con un LIKE de cada clave valor para buscar si nuestra tabla contiene parte de esas búsquedas. Generamos la query y la lanzamos, con el resultado devolvemos una lista con todos las filas encontradas.

Por último crearemos get_all que lo que hará será devolver todos las filas de nuestra tabla.

    def get_all(self):
        return self.get_by_filters()

Como veis, estamos aprovechando la función get_by_filters, no le pasamos ningún filtro, por lo que nos devuelve todos los resultados.

Ahora que ya tenemos la clase completada, vamos a ir al archivo classes/dbcontacts.py y realizaremos algunos cambios.

Lo primero será importar nuestra nueva clase y modificar las claves de la constante SCHEMA para que ahora estén en minúsculas.

from .contact import Contact
# from .dbcsv import DBbyCSV
from .dbpostgresql import DBPostgresql

SCHEMA = {
    'id': {
        'type': 'autoincrement',
    }, 
    'name': {
        'type': 'string',
        'min_length': 3,
        'max_length': 50
    }, 
    'surname': {
        'type': 'string',
        'min_length': 5,
        'max_length': 100
    }, 
    'email': {
        'type': 'string',
        'max_length': 254
    }, 
    'phone': {
        'type': 'int'
    }, 
    'birthday': {
        'type': 'date'
    }
}

Ahora modificamos la función save_contact, ya que necesitaremos enviarle un diccionario con el nombre de los campos y no un listado como hacíamos antes.

    def save_contact(self, contact):
        data = {
            'name':contact.name, 
            'surname':contact.surname, 
            'email':contact.email, 
            'phone':contact.phone, 
            'birthday':contact.birthday
        }
        return self.insert(data)

En search_contacts modificamos el control de envío de filtros para comprobar simplemente si hay datos en el diccionario y por si implementamos más parámetros de búsqueda que no tengamos que estar añadiéndolos.

    def search_contacts(self, filters):
        if not filters:
            raise ValueError('Debes envíar al menos un filtro')

        list_contacts = self.get_by_filters(filters)
        return self._create_object_contacts(list_contacts)

Por último, en _create_object_contacts, cambiamos las claves a minúsculas.

    def _create_object_contacts(self, list_contacts):

        if not list_contacts:
            return None

        object_contacts = []
        # Convertimos los datos a objectos de tipo contact
        for contact in list_contacts:
            c = Contact(contact['id'], contact['name'], contact['surname'], contact['email'], contact['phone'], contact['birthday'])
            object_contacts.append(c)

        return object_contacts

Para finalizar iremos al archivo main.py y realizaremos unas correcciones y modificaciones. 

Primero vamos a las funciones search_contact y update_contact y modificaremos todas las claves que tengamos en mayúsculas:

def search_contact():

    filters = {}
    print('Introduce un nombre (vacío para usar otro filtro):')
    nombre = input()
    if nombre:
        filters['name'] = nombre
    print('Introduce un apellido (vacío para usar otro filtro):')
    apellidos = input()
    if apellidos:
        filters['surname'] = apellidos
    print('Introduce un email (vacío para usar otro filtro):')
    email = input()
    if email:
        filters['email'] = email

    try:
        list_contacts = db.search_contacts(filters)
        if not list_contacts:
            return print('No hay ningún contacto con esos criterios de búsqueda')

        _print_table_contacts(list_contacts)
    except ValueError as err:
        print(err)
        time.sleep(1)
        search_contact()


def update_contact():

    list_contacts()

    print('Introduce el id del contacto que quieres actualizar:')
    id_object = input()

    data = {}
    nombre = check_contact_data('Introduce un nombre (vacío para mantener el nombre actual):', 'name', False)
    if nombre:
        data['name'] = nombre
    apellidos = check_contact_data('Introduce un apellido (vacío para mantener los apellidos actuales):', 'surname', False)
    if apellidos:
        data['surname'] = apellidos
    email = check_contact_data('Introduce un email (vacío para mantener el email actual):', 'email', False)
    if email:
        data['email'] = email
    phone = check_contact_data('Introduce un teléfono (vacío para mantener el teléfono actual):', 'phone', False)
    if phone:
        data['phone'] = phone
    birthday = check_contact_data('Introduce una fecha de nacimiento YYYY-MM-DD (vacío para mantener la fecha actual):', 'birthday', False)
    if birthday:
        data['birthday'] = birthday
    
    try:
        res = db.update(id_object, data)
        if res:
            print('Contacto actualizado con éxito')
    except Exception as err:
        print(err)
        time.sleep(1)
        update_contact()

Por último corregiremos un error en la función check_contact_data:

def check_contact_data(message, data_name, force = True):
    print(message)
    input_data = input()
    if not force and not input_data:
        return
    try:
        getattr(validator, f'validate{data_name.capitalize()}')(input_data)
        return input_data
    except ValueError as err:
        print(err)
        return check_contact_data(message, data_name, force)

El cambio que hemos realizado se encuentra en return check_contact_data(message, data_name, force), antes no estabamos realizando el return y no pasábamos el force, por lo que si el usuario escribía mal un campo y aparecía el reintento, el input se devolvía vacío.

Conclusiones

En este tutorial hemos aprendido como instalar Postgresql, crear un usuario y una base de datos. Por la parte de python, hemos aprendido a usar la librería psycopg2, realizar consultas, inserciones, actualizaciones y eliminación de datos.

La siguiente parte será la última, en ella aprenderemos como usar los test unitarios con la librería unittest y como aplicarlos a nuestro CRUD.

Recordad que podéis ver el código de esta cuarta parte en mi github y cualquier duda a los comentarios ;)

Espero que este post te ayude y como siempre, te recomiendo seguirme en Twitter para estar al tanto de los nuevo contenido. Ahora también puedes seguirme en Instagram donde estoy subiendo tips, tutoriales en vídeo e información sobre herramientas para developers.

Por último os dejo mi guía para aprender a trabajar con APIs donde explico todo el funcionamiento de una API, el protocolo HTTP y veremos como construir una API con arquitectura REST.

Nos leemos 👋.

7199 vistas

🐍 Sígueme en Twitter

Si te gusta el contenido que subo y no quieres perderte nada, sígueme en Twitter y te avisaré cada vez que cree contenido nuevo 💪
Luego ¡Te sigo!

Nos tomamos en serio tu privacidad

Utilizamos cookies propias y de terceros para recopilar y analizar datos sobre la interacción de los usuarios con cosasdedevs.com. Ver política de cookies.