logo cosasdedevs

Cómo crear un CRUD en python. Parte 3: Modificar, borrar y buscar

Cómo crear un CRUD en python. Parte 3: Modificar, borrar y buscar

My Profile
Mar 08, 2020

Bienvenidos a la tercera parte de este tutorial. En esta parte veremos como realizar la modificación, el borrado y la búsqueda de un contacto.

Para acceder a las otras partes del tutorial, aquí os dejo los enlaces:

Buscar en un csv

Nuestro primer paso será implementar la búsqueda, para ello iremos al archivo classes/dbcsv.py y añadiremos una función de búsqueda llamada get_by_filters, que podrá recibir como parámetros de búsqueda parte del nombre, apellidos o el email.

    def get_by_filters(self, filters):

        list_data = []
        list_header = []
        with open(self._filename, mode='r', encoding='utf-16') as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=';')
            is_header = True
            for row in csv_reader:
                if is_header:
                    list_header = row
                    is_header = False
                    continue

                if row:
                    file = {}

                    for key, value in enumerate(row):
                        file[list_header[key]] = value
                    
                    for key_filter, value_filter in filters.items():
                        matches = re.search(rf"{value_filter}", file[key_filter], re.IGNORECASE)
                        if matches:
                            list_data.append(file)
                            break

        return list_data

Esta función recibe un diccionario filters que puede contener las claves NAME, SURNAME y EMAIL que contendrán como valores las posibles partes de un nombre, apellido o email, después lee el csv que utilizamos para guardar los datos y utiliza una exprexión regular para buscar coincidencias entre nuestros filtros y los campos del csv, si hacemos match al menos con uno de los filtros, lo guardamos en una lista que posteriormente retornaremos al usuario.

Ahora que ya tenemos esta parte, volvemos al archivo classes/dbcontacts.py y creamos la función search_contacts que la usaremos para realizar nuestras búsquedas.

    def search_contacts(self, filters):
        if 'NAME' not in filters and 'SURNAME' not in filters and 'EMAIL' not in filters:
            raise ValueError('Debes envíar al menos un filtro')

        list_contacts = self.get_by_filters(filters)
        return self._create_object_contacts(list_contacts)


    def _create_object_contacts(self, list_contacts):

        if not list_contacts:
            return None

        object_contacts = []
        # Convertimos los datos a objectos de tipo contact
        for contact in list_contacts:
            c = Contact(contact['ID'], contact['NAME'], contact['SURNAME'], contact['EMAIL'], contact['PHONE'], contact['BIRTHDAY'])
            object_contacts.append(c)

        return object_contacts

Aquí comprobamos si al menos nos ha pasado un filtro, si no es así enviamos una excepción, si pasa al menos un parámetro de búsqueda, llamamos a la función get_by_filters y después creamos un listado de objetos de tipo Contact con la función _create_object_contats, esta función se ha creado a partir de la parte de la función list_contacts en la que creaba el listado de objetos, ya que como lo utilizamos aquí también, la hemos extraido en una nueva función.

La función list_contacts quedaría ahora así:

    def list_contacts(self):
        list_contacts = self.get_all()
        return self._create_object_contacts(list_contacts)


Ahora vamos al archivo main.py y modificamos run para añadir una función para realizar las búsquedas que se llamará search_contact.

def run():
    print_options()

    command = input()
    command = command.upper()

    if command == 'C':
        create_contact()
    elif command == 'L':
        list_contacts()
    elif command == 'M':
        pass
    elif command == 'E':
        pass
    elif command == 'B':
        search_contact()
    elif command == 'S':
        os._exit(1)
    else:
        print('Comando inválido')

    time.sleep(1)
    run()

Una vez hecho esto nos ponemos manos a la obra con la función para realizar las búsquedas en la que podremos pasar una parte del nombre, apellidos o el email para realizar el filtrado.

def search_contact():

    filters = {}
    print('Introduce un nombre (vacío para usar otro filtro):')
    nombre = input()
    if nombre:
        filters['NAME'] = nombre
    print('Introduce un apellido (vacío para usar otro filtro):')
    apellidos = input()
    if apellidos:
        filters['SURNAME'] = apellidos
    print('Introduce un email (vacío para usar otro filtro):')
    email = input()
    if email:
        filters['EMAIL'] = email

    try:
        list_contacts = db.search_contacts(filters)
        if not list_contacts:
            return print('No hay ningún contacto con esos criterios de búsqueda')

        _print_table_contacts(list_contacts)
    except ValueError as err:
        print(err)
        time.sleep(1)
        search_contact()


def _print_table_contacts(list_contacts):
    table = PrettyTable(db.get_schema().keys())
    for contact in list_contacts:
        table.add_row([
            contact.id_contact,
            contact.name,
            contact.surname,
            contact.email,
            contact.phone,
            contact.birthday
        ])

    print(table)
    print('Pulsa cualquier letra para continuar')
    command = input()

En esta función estamos solicitando al usuario que introduzca un nombre, apellido o email, si los datos introducidos por el usuario no están vacíos se guardarán en el diccionario filters y se realizará la búsqueda llamando a la función search_contacts, si encuentra resultados, mostrará una tabla como cuando listamos todos los contactos.

En _print_table_contacts he cambiado el mensaje para que sea entendible en los dos casos.

Actualizar un csv

Ahora que ya tenemos esto vamos a realizar el update, para ello volvemos a la clase dbcsv.py en la que crearemos dos nuevas funciones, update para realizar la actualización de un contacto y get_by_id para traernos un contacto por su id además de importar dos librerías que ahora explicaremos su uso.

from tempfile import NamedTemporaryFile
import shutil
    def get_by_id(self, id_object):
        list_header = []
        with open(self._filename, mode='r', encoding='utf-16') as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=';')
            is_header = True
            for row in csv_reader:
                if is_header:
                    list_header = row
                    is_header = False
                    continue

                if row:
                    file = {}
                    for key, value in enumerate(row):
                        file[list_header[key]] = value
                    if file['ID'] == id_object:
                        return file

        return {}

La función get_by_id recibirá un id y luego leeremos el csv, si encontramos una coincidencia devolverá un diccionario con los datos del contacto, si no devolverá un diccionario vacío.

    def update(self, id_object, data):
        data_csv = self.get_by_id(id_object)

        if not data_csv:
            raise Exception('No se ha encontrado el objecto con el id enviado')

        for key, value in data.items():
            data_csv[key] = value

        tempfile = NamedTemporaryFile(mode='w', delete=False, encoding='utf-16')

        list_header = []
        with open(self._filename, mode='r', encoding='utf-16') as csv_file, tempfile:
            csv_reader = csv.reader(csv_file, delimiter=';')
            data_writer = csv.writer(tempfile, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL, lineterminator='\n')

            is_header = True
            for row in csv_reader:
                if is_header:
                    list_header = row
                    is_header = False
                    data_writer.writerow(row)
                    continue

                if row:
                    file = {}
                    for key, value in enumerate(row):
                        file[list_header[key]] = value
                    
                    if file['ID'] != data_csv['ID']:
                        data_writer.writerow(row)
                        continue

                    for key, value in data_csv.items():
                        file[key] = value

                    data_writer.writerow(file.values())

        shutil.move(tempfile.name, self._filename)
        return True

En el update, pasaremos por parámetros el id del contacto y un diccionario con los datos del contacto que queremos actualizar que no tienen porqué ser todos, después comprobamos si existe el contacto con la función get_by_id y si es así, actualizamos los datos de esa línea y creamos un objeto de tipo NamedTemporaryFile, esta función la usaremos para crear un archivo temporal. Después leemos el csv orginal y el objeto tmpfile lo usaremos para rehacer el csv. Una vez hecho esto, comprobamos si la línea que estamos recorriendo tiene él id que buscamos, si es así reemplazamos los datos y la guardamos, si no, guardamos los datos originales.

Una vez hecho esto, usaremos la librería shutil para mover nuestros cambios al archivo original.

Ahora que ya tenemos esta parte, volvemos a classes/dbcontacts.py y añadimos la función update_contact.

    def update_contact(self, id_object, data):
        if not id_object:
            raise ValueError('Debes envíar el id del contacto')
        if not data:
            raise ValueError('Debes envíar al menos un parámetro a actualizar')
        self.update(id_object, data)

Aquí simplemente validaremos que nos envía él id del contacto y un diccionario con al menos un dato y ya llamaremos a la función update que acabamos de crear.

Volvemos a main.py y modificamos la función run añadiendo la función update_contact.

def run():
    print_options()

    command = input()
    command = command.upper()

    if command == 'C':
        create_contact()
    elif command == 'L':
        list_contacts()
    elif command == 'M':
        update_contact()
    elif command == 'E':
        pass
    elif command == 'B':
        search_contact()
    elif command == 'S':
        os._exit(1)
    else:
        print('Comando inválido')

    time.sleep(1)
    run()

Antes de crear la función update_contact, vamos a ir a check_contact_data y vamos a hacer una pequeña modificación en la función para poder usarla en otros casos, ya que la vamos a necesitar de nuevo en update_contact.

def check_contact_data(message, data_name, force = True):
    print(message)
    input_data = input()
    if not force and not input_data:
        return
    try:
        getattr(validator, f'validate{data_name.capitalize()}')(input_data)
        return input_data
    except ValueError as err:
        print(err)
        check_contact_data(message, data_name) 

Hemos añadido un nuevo parametro llamado force y que contendrá el valor True por defecto, esto lo usaremos para los casos en los que el input venga vacío que no realice ninguna validación. En el caso del update, no es necesario que tenga todos los datos así que enviando False nos saltaremos la validación si no hay datos.

Ahora si, creamos la función update_contact para actualizar un contacto.

def update_contact():

    list_contacts()

    print('Introduce el id del contacto que quieres actualizar:')
    id_object = input()

    data = {}
    nombre = check_contact_data('Introduce un nombre (vacío para mantener el nombre actual):', 'name', False)
    if nombre:
        data['NAME'] = nombre
    apellidos = check_contact_data('Introduce un apellido (vacío para mantener los apellidos actuales):', 'surname', False)
    if apellidos:
        data['SURNAME'] = apellidos
    email = check_contact_data('Introduce un email (vacío para mantener el email actual):', 'email', False)
    if email:
        data['EMAIL'] = email
    phone = check_contact_data('Introduce un teléfono (vacío para mantener el teléfono actual):', 'phone', False)
    if phone:
        data['PHONE'] = phone
    birthday = check_contact_data('Introduce una fecha de nacimiento YYYY-MM-DD (vacío para mantener la fecha actual):', 'birthday', False)
    if birthday:
        data['BIRTHDAY'] = birthday
    
    try:
        res = db.update(id_object, data)
        if res:
            print('Contacto actualizado con éxito')
    except Exception as err:
        print(err)
        time.sleep(1)
        update_contact()

Primero mostraremos la lista de contactos, después solicitamos él id del contacto que queramos actualizar y validamos los datos que introduzca el usuario con check_contact_data. Si todo ha ido bien, llamamos a db.update y actualizamos el usuario, si no lanzará una excepción y reintentará el proceso.

Borrar en un csv

Por último haremos la parte del borrado, para ello volvemos a classes/dbcsv.py y creamos la función modify_file.

    def modify_file(self, id_object, data, action):
        data_csv = self.get_by_id(id_object)

        if not data_csv:
            raise Exception('No se ha encontrado el objecto con el id enviado')

        for key, value in data.items():
            data_csv[key] = value

        tempfile = NamedTemporaryFile(mode='w', delete=False, encoding='utf-16')

        list_header = []
        with open(self._filename, mode='r', encoding='utf-16') as csv_file, tempfile:
            csv_reader = csv.reader(csv_file, delimiter=';')
            data_writer = csv.writer(tempfile, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL, lineterminator='\n')

            is_header = True
            for row in csv_reader:
                if is_header:
                    list_header = row
                    is_header = False
                    data_writer.writerow(row)
                    continue

                # Si es update, actualizamos cuando hacemos match
                if row and action == 'update':
                    file = {}
                    for key, value in enumerate(row):
                        file[list_header[key]] = value
                    
                    if file['ID'] != data_csv['ID']:
                        data_writer.writerow(row)
                        continue

                    for key, value in data_csv.items():
                        file[key] = value

                    data_writer.writerow(file.values())
                # Si es delete cuando hacemos match continuamos para saltarnos el insertado de esa línea
                elif row and action == 'delete':
                    file = {}
                    for key, value in enumerate(row):
                        file[list_header[key]] = value
                    
                    if file['ID'] == data_csv['ID']:
                        continue

                    data_writer.writerow(row)

        shutil.move(tempfile.name, self._filename)
        return True

Esta función la vamos a usar para evitar duplicar código, ya que el borrado es muy similar al update, enviaremos por parámetros el id del contacto, los datos a actualizar si procede y la acción que será update o delete.

Una vez hecho esto, el proceso será igual que en el update pero cuando estemos recorriendo el csv comprobaremos si el action es update o delete. Si es update actualizamos la fila cuando haga match, si no, cuando encuentra la coincidencia, ignoraremos esa línea y no la insertaremos en el archivo temporal.

Ahora simplemente actualizamos las funciones update y delete y ya lo tendríamos listo.

    def delete(self, id_object):
        return self.modify_file(id_object, {}, 'delete')

    def update(self, id_object, data):
        return self.modify_file(id_object, data, 'update')

Volvemos a classes/dbcontacts.py y creamos la función delete_contact.

    def delete_contact(self, id_object):
        if not id_object:
            raise ValueError('Debes envíar el id del contacto')
        self.delete(id_object)

Aquí simplemente, comprobamos que nos ha enviado un id de contacto y llamamos a la función de borrado.

Ahora que ya tenemos el borrado, volvemos a main.py y en el run añadimos la función delete_contact.

def run():
    print_options()

    command = input()
    command = command.upper()

    if command == 'C':
        create_contact()
    elif command == 'L':
        list_contacts()
    elif command == 'M':
        update_contact()
    elif command == 'E':
        delete_contact()
    elif command == 'B':
        search_contact()
    elif command == 'S':
        os._exit(1)
    else:
        print('Comando inválido')

    time.sleep(1)
    run()

Después creamos la función delete_contact.

def delete_contact():
    list_contacts()

    print('Introduce el id del contacto que quieres eliminar:')
    id_object = input()
    try:
        res = db.delete(id_object)
        if res:
            print('Contacto eliminado con éxito')
    except Exception as err:
        print(err)
        time.sleep(1)
        delete_contact()

Aquí mostraremos el listado de contactos y le pediremos un id de contacto al usuario, si lo encontramos lo eliminará, sino mostrará un mensaje de error y llamará de nuevo a la función delete_contact.

Conclusiones

En este tutorial hemos aprendido a manipular un csv actualizando y eliminando datos además de poder realizar búsquedas, como mejora en las búsquedas se podría hacer un cambio para que los filtros para que tenga que hacer match con todos los filtros que se envíen en vez de solo con uno de ellos pero eso ya los os lo dejo a vosotros.

En el siguiente tutorial aprenderemos a instalar Postgresql y a conectarnos a esta bbdd desde Python.

Para cualquier duda, mi código está en mi github y podéis usar la caja de comentarios y recordad que si queréis estar al tanto de cuando suba el siguiente tutorial, seguidme en Twitter, ya que voy comentando todas las noticias del blog y voy subiendo los nuevos tutoriales.

Nos leemos 👋

2153 vistas

Nos tomamos en serio tu privacidad

Utilizamos cookies propias y de terceros para mejorar la experiencia del usuario a través de su navegación. Si pulsas entendido aceptas su uso. Ver política de cookies.

🐍 Sígueme en Twitter

Si te gusta el contenido que subo y no quieres perderte nada, sígueme en Twitter y te avisaré cada vez que cree contenido nuevo 💪
Luego ¡Te sigo!