Cómo crear un CRUD en python. Parte 3: Modificar, borrar y buscar
Bienvenidos a la tercera parte de este tutorial. En esta parte veremos como realizar la modificación, el borrado y la búsqueda de un contacto.
Para acceder a las otras partes del tutorial, aquí os dejo los enlaces:
- Cómo crear un CRUD en Python. Parte 1: Estructura y clase
- Cómo crear un CRUD en Python. Parte 2: Creación y listado
- Cómo crear un CRUD en Python. Parte 4: Conexión a PostgreSQL
- Cómo crear un CRUD en Python. Parte 5: Test unitarios
Buscar en un csv
Nuestro primer paso será implementar la búsqueda, para ello iremos al archivo classes/dbcsv.py y añadiremos una función de búsqueda llamada get_by_filters, que podrá recibir como parámetros de búsqueda parte del nombre, apellidos o el email.
def get_by_filters(self, filters):
list_data = []
list_header = []
with open(self._filename, mode='r', encoding='utf-16') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=';')
is_header = True
for row in csv_reader:
if is_header:
list_header = row
is_header = False
continue
if row:
file = {}
for key, value in enumerate(row):
file[list_header[key]] = value
for key_filter, value_filter in filters.items():
matches = re.search(rf"{value_filter}", file[key_filter], re.IGNORECASE)
if matches:
list_data.append(file)
break
return list_data
Esta función recibe un diccionario filters que puede contener las claves NAME, SURNAME y EMAIL que contendrán como valores las posibles partes de un nombre, apellido o email, después lee el csv que utilizamos para guardar los datos y utiliza una exprexión regular para buscar coincidencias entre nuestros filtros y los campos del csv, si hacemos match al menos con uno de los filtros, lo guardamos en una lista que posteriormente retornaremos al usuario.
Ahora que ya tenemos esta parte, volvemos al archivo classes/dbcontacts.py y creamos la función search_contacts que la usaremos para realizar nuestras búsquedas.
def search_contacts(self, filters):
if 'NAME' not in filters and 'SURNAME' not in filters and 'EMAIL' not in filters:
raise ValueError('Debes envíar al menos un filtro')
list_contacts = self.get_by_filters(filters)
return self._create_object_contacts(list_contacts)
def _create_object_contacts(self, list_contacts):
if not list_contacts:
return None
object_contacts = []
# Convertimos los datos a objectos de tipo contact
for contact in list_contacts:
c = Contact(contact['ID'], contact['NAME'], contact['SURNAME'], contact['EMAIL'], contact['PHONE'], contact['BIRTHDAY'])
object_contacts.append(c)
return object_contacts
Aquí comprobamos si al menos nos ha pasado un filtro, si no es así enviamos una excepción, si pasa al menos un parámetro de búsqueda, llamamos a la función get_by_filters y después creamos un listado de objetos de tipo Contact con la función _create_object_contats, esta función se ha creado a partir de la parte de la función list_contacts en la que creaba el listado de objetos, ya que como lo utilizamos aquí también, la hemos extraido en una nueva función.
La función list_contacts quedaría ahora así:
def list_contacts(self):
list_contacts = self.get_all()
return self._create_object_contacts(list_contacts)
Ahora vamos al archivo main.py y modificamos run para añadir una función para realizar las búsquedas que se llamará search_contact.
def run():
print_options()
command = input()
command = command.upper()
if command == 'C':
create_contact()
elif command == 'L':
list_contacts()
elif command == 'M':
pass
elif command == 'E':
pass
elif command == 'B':
search_contact()
elif command == 'S':
os._exit(1)
else:
print('Comando inválido')
time.sleep(1)
run()
Una vez hecho esto nos ponemos manos a la obra con la función para realizar las búsquedas en la que podremos pasar una parte del nombre, apellidos o el email para realizar el filtrado.
def search_contact():
filters = {}
print('Introduce un nombre (vacío para usar otro filtro):')
nombre = input()
if nombre:
filters['NAME'] = nombre
print('Introduce un apellido (vacío para usar otro filtro):')
apellidos = input()
if apellidos:
filters['SURNAME'] = apellidos
print('Introduce un email (vacío para usar otro filtro):')
email = input()
if email:
filters['EMAIL'] = email
try:
list_contacts = db.search_contacts(filters)
if not list_contacts:
return print('No hay ningún contacto con esos criterios de búsqueda')
_print_table_contacts(list_contacts)
except ValueError as err:
print(err)
time.sleep(1)
search_contact()
def _print_table_contacts(list_contacts):
table = PrettyTable(db.get_schema().keys())
for contact in list_contacts:
table.add_row([
contact.id_contact,
contact.name,
contact.surname,
contact.email,
contact.phone,
contact.birthday
])
print(table)
print('Pulsa cualquier letra para continuar')
command = input()
En esta función estamos solicitando al usuario que introduzca un nombre, apellido o email, si los datos introducidos por el usuario no están vacíos se guardarán en el diccionario filters y se realizará la búsqueda llamando a la función search_contacts, si encuentra resultados, mostrará una tabla como cuando listamos todos los contactos.
En _print_table_contacts he cambiado el mensaje para que sea entendible en los dos casos.
Actualizar un csv
Ahora que ya tenemos esto vamos a realizar el update, para ello volvemos a la clase dbcsv.py en la que crearemos dos nuevas funciones, update para realizar la actualización de un contacto y get_by_id para traernos un contacto por su id además de importar dos librerías que ahora explicaremos su uso.
from tempfile import NamedTemporaryFile
import shutil
def get_by_id(self, id_object):
list_header = []
with open(self._filename, mode='r', encoding='utf-16') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=';')
is_header = True
for row in csv_reader:
if is_header:
list_header = row
is_header = False
continue
if row:
file = {}
for key, value in enumerate(row):
file[list_header[key]] = value
if file['ID'] == id_object:
return file
return {}
La función get_by_id recibirá un id y luego leeremos el csv, si encontramos una coincidencia devolverá un diccionario con los datos del contacto, si no devolverá un diccionario vacío.
def update(self, id_object, data):
data_csv = self.get_by_id(id_object)
if not data_csv:
raise Exception('No se ha encontrado el objecto con el id enviado')
for key, value in data.items():
data_csv[key] = value
tempfile = NamedTemporaryFile(mode='w', delete=False, encoding='utf-16')
list_header = []
with open(self._filename, mode='r', encoding='utf-16') as csv_file, tempfile:
csv_reader = csv.reader(csv_file, delimiter=';')
data_writer = csv.writer(tempfile, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
is_header = True
for row in csv_reader:
if is_header:
list_header = row
is_header = False
data_writer.writerow(row)
continue
if row:
file = {}
for key, value in enumerate(row):
file[list_header[key]] = value
if file['ID'] != data_csv['ID']:
data_writer.writerow(row)
continue
for key, value in data_csv.items():
file[key] = value
data_writer.writerow(file.values())
shutil.move(tempfile.name, self._filename)
return True
En el update, pasaremos por parámetros el id del contacto y un diccionario con los datos del contacto que queremos actualizar que no tienen porqué ser todos, después comprobamos si existe el contacto con la función get_by_id y si es así, actualizamos los datos de esa línea y creamos un objeto de tipo NamedTemporaryFile, esta función la usaremos para crear un archivo temporal. Después leemos el csv orginal y el objeto tmpfile lo usaremos para rehacer el csv. Una vez hecho esto, comprobamos si la línea que estamos recorriendo tiene él id que buscamos, si es así reemplazamos los datos y la guardamos, si no, guardamos los datos originales.
Una vez hecho esto, usaremos la librería shutil para mover nuestros cambios al archivo original.
Ahora que ya tenemos esta parte, volvemos a classes/dbcontacts.py y añadimos la función update_contact.
def update_contact(self, id_object, data):
if not id_object:
raise ValueError('Debes envíar el id del contacto')
if not data:
raise ValueError('Debes envíar al menos un parámetro a actualizar')
self.update(id_object, data)
Aquí simplemente validaremos que nos envía él id del contacto y un diccionario con al menos un dato y ya llamaremos a la función update que acabamos de crear.
Volvemos a main.py y modificamos la función run añadiendo la función update_contact.
def run():
print_options()
command = input()
command = command.upper()
if command == 'C':
create_contact()
elif command == 'L':
list_contacts()
elif command == 'M':
update_contact()
elif command == 'E':
pass
elif command == 'B':
search_contact()
elif command == 'S':
os._exit(1)
else:
print('Comando inválido')
time.sleep(1)
run()
Antes de crear la función update_contact, vamos a ir a check_contact_data y vamos a hacer una pequeña modificación en la función para poder usarla en otros casos, ya que la vamos a necesitar de nuevo en update_contact.
def check_contact_data(message, data_name, force = True):
print(message)
input_data = input()
if not force and not input_data:
return
try:
getattr(validator, f'validate{data_name.capitalize()}')(input_data)
return input_data
except ValueError as err:
print(err)
check_contact_data(message, data_name)
Hemos añadido un nuevo parametro llamado force y que contendrá el valor True por defecto, esto lo usaremos para los casos en los que el input venga vacío que no realice ninguna validación. En el caso del update, no es necesario que tenga todos los datos así que enviando False nos saltaremos la validación si no hay datos.
Ahora si, creamos la función update_contact para actualizar un contacto.
def update_contact():
list_contacts()
print('Introduce el id del contacto que quieres actualizar:')
id_object = input()
data = {}
nombre = check_contact_data('Introduce un nombre (vacío para mantener el nombre actual):', 'name', False)
if nombre:
data['NAME'] = nombre
apellidos = check_contact_data('Introduce un apellido (vacío para mantener los apellidos actuales):', 'surname', False)
if apellidos:
data['SURNAME'] = apellidos
email = check_contact_data('Introduce un email (vacío para mantener el email actual):', 'email', False)
if email:
data['EMAIL'] = email
phone = check_contact_data('Introduce un teléfono (vacío para mantener el teléfono actual):', 'phone', False)
if phone:
data['PHONE'] = phone
birthday = check_contact_data('Introduce una fecha de nacimiento YYYY-MM-DD (vacío para mantener la fecha actual):', 'birthday', False)
if birthday:
data['BIRTHDAY'] = birthday
try:
res = db.update(id_object, data)
if res:
print('Contacto actualizado con éxito')
except Exception as err:
print(err)
time.sleep(1)
update_contact()
Primero mostraremos la lista de contactos, después solicitamos él id del contacto que queramos actualizar y validamos los datos que introduzca el usuario con check_contact_data. Si todo ha ido bien, llamamos a db.update y actualizamos el usuario, si no lanzará una excepción y reintentará el proceso.
Borrar en un csv
Por último haremos la parte del borrado, para ello volvemos a classes/dbcsv.py y creamos la función modify_file.
def modify_file(self, id_object, data, action):
data_csv = self.get_by_id(id_object)
if not data_csv:
raise Exception('No se ha encontrado el objecto con el id enviado')
for key, value in data.items():
data_csv[key] = value
tempfile = NamedTemporaryFile(mode='w', delete=False, encoding='utf-16')
list_header = []
with open(self._filename, mode='r', encoding='utf-16') as csv_file, tempfile:
csv_reader = csv.reader(csv_file, delimiter=';')
data_writer = csv.writer(tempfile, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
is_header = True
for row in csv_reader:
if is_header:
list_header = row
is_header = False
data_writer.writerow(row)
continue
# Si es update, actualizamos cuando hacemos match
if row and action == 'update':
file = {}
for key, value in enumerate(row):
file[list_header[key]] = value
if file['ID'] != data_csv['ID']:
data_writer.writerow(row)
continue
for key, value in data_csv.items():
file[key] = value
data_writer.writerow(file.values())
# Si es delete cuando hacemos match continuamos para saltarnos el insertado de esa línea
elif row and action == 'delete':
file = {}
for key, value in enumerate(row):
file[list_header[key]] = value
if file['ID'] == data_csv['ID']:
continue
data_writer.writerow(row)
shutil.move(tempfile.name, self._filename)
return True
Esta función la vamos a usar para evitar duplicar código, ya que el borrado es muy similar al update, enviaremos por parámetros el id del contacto, los datos a actualizar si procede y la acción que será update o delete.
Una vez hecho esto, el proceso será igual que en el update pero cuando estemos recorriendo el csv comprobaremos si el action es update o delete. Si es update actualizamos la fila cuando haga match, si no, cuando encuentra la coincidencia, ignoraremos esa línea y no la insertaremos en el archivo temporal.
Ahora simplemente actualizamos las funciones update y delete y ya lo tendríamos listo.
def delete(self, id_object):
return self.modify_file(id_object, {}, 'delete')
def update(self, id_object, data):
return self.modify_file(id_object, data, 'update')
Volvemos a classes/dbcontacts.py y creamos la función delete_contact.
def delete_contact(self, id_object):
if not id_object:
raise ValueError('Debes envíar el id del contacto')
self.delete(id_object)
Aquí simplemente, comprobamos que nos ha enviado un id de contacto y llamamos a la función de borrado.
Ahora que ya tenemos el borrado, volvemos a main.py y en el run añadimos la función delete_contact.
def run():
print_options()
command = input()
command = command.upper()
if command == 'C':
create_contact()
elif command == 'L':
list_contacts()
elif command == 'M':
update_contact()
elif command == 'E':
delete_contact()
elif command == 'B':
search_contact()
elif command == 'S':
os._exit(1)
else:
print('Comando inválido')
time.sleep(1)
run()
Después creamos la función delete_contact.
def delete_contact():
list_contacts()
print('Introduce el id del contacto que quieres eliminar:')
id_object = input()
try:
res = db.delete(id_object)
if res:
print('Contacto eliminado con éxito')
except Exception as err:
print(err)
time.sleep(1)
delete_contact()
Aquí mostraremos el listado de contactos y le pediremos un id de contacto al usuario, si lo encontramos lo eliminará, sino mostrará un mensaje de error y llamará de nuevo a la función delete_contact.
Conclusiones
En este tutorial hemos aprendido a manipular un csv actualizando y eliminando datos además de poder realizar búsquedas, como mejora en las búsquedas se podría hacer un cambio para que los filtros para que tenga que hacer match con todos los filtros que se envíen en vez de solo con uno de ellos pero eso ya los os lo dejo a vosotros.
En el siguiente tutorial aprenderemos a instalar Postgresql y a conectarnos a esta bbdd desde Python.
Para cualquier duda, mi código está en mi github y podéis usar la caja de comentarios.
Espero que este post te ayude y como siempre, te recomiendo seguirme en Twitter para estar al tanto de los nuevo contenido. Ahora también puedes seguirme en Instagram donde estoy subiendo tips, tutoriales en vídeo e información sobre herramientas para developers.
Por último os dejo mi guía para aprender a trabajar con APIs donde explico todo el funcionamiento de una API, el protocolo HTTP y veremos como construir una API con arquitectura REST.
Nos leemos 👋.