Skip to main content

Data Engineering End-to-End

Stack Tecnológico

  • Apache Airflow
  • PostgreSQL
  • Pandas
  • Python
  • Docker
  • SQL

Resumen del Proyecto

En este proyecto, vamos a obtener un archivo CSV desde un repositorio remoto, descargarlo al directorio de trabajo local, crear una tabla local en PostgreSQL y escribir estos datos del CSV en la tabla de PostgreSQL con el script write_csv_to_postgres.py.

Luego, obtendremos los datos de la tabla. Después de algunas modificaciones y prácticas con pandas, crearemos 3 Dataframes separados con el script create_df_and_modify.py

Al final, obtendremos estos 3 dataframes, crearemos tablas relacionadas en la base de datos de PostgreSQL e insertaremos los dataframes en estas tablas con el script write_df_to_postgres.py

Todos estos scripts se ejecutarán como tareas de DAGs en Airflow con el script de DAG.

Pasos

write_csv_to_postgres.py -> obtiene un archivo CSV desde una URL. Lo guarda en el directorio de trabajo local como churn_modelling.csv. Después de leer el archivo CSV, lo escribe en una tabla local de PostgreSQL.

create_df_and_modify.py -> lee la misma tabla de PostgreSQL y crea un dataframe de pandas a partir de ella, modificándolo. Luego, crea 3 dataframes separados.

write_df_to_postgres.py -> escribe estos 3 dataframes en 3 tablas separadas ubicadas en el servidor de PostgreSQL.

Obtener Datos CSV Remotos

Primero debemos conectarnos a PgAdmin para visualizar las tablas creadas y ejecutar consultas SQL. Usar DBeaver es otra opción, también podemos conectar nuestra instancia de PostgreSQL a DBeaver. Una vez en la página de configuración, debemos definir:

  • Host
  • Nombre de la Base de Datos
  • Nombre de Usuario
  • Contraseña
  • Puerto

Necesitaremos todos estos parámetros al conectar a la base de datos. A continuación para poder crear una nueva base de datos, usuario y contraseña. Vamos a usar localhost y el puerto será 5432 (el puerto predeterminado para Postgres). También podemos conectar a DBeaver usando estos parámetros.

Cómo Crear Base de Datos, Usuario y Acceso a PostgreSQL

Ahora, explicaré cómo acceder a nuestro servidor PostgreSQL usando psql a través del terminal.

Necesitamos instalar todas las bibliotecas y paquetes necesarios en el archivo requirements.txt:

psycopg
pandas
datetime
requests
pandasql
urllib3
traceback2
pip install -r requirements.txt

Necesitamos importar las bibliotecas necesarias y definir las variables ambientales.

  1. Después de todo, aquí viene la parte principal:
import psycopg2
import os
import traceback
import logging
import pandas as pd
import urllib.request

logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')

postgres_host = os.environ.get('postgres_host')
postgres_database = os.environ.get('postgres_database')
postgres_user = os.environ.get('postgres_user')
postgres_password = os.environ.get('postgres_password')
postgres_port = os.environ.get('postgres_port')
dest_folder = os.environ.get('dest_folder')

url = "https://raw.githubusercontent.com/xploiterx/datasets/master/Proyect-0/CSV/Churn_Modelling-1.csv"
destination_path = f'{dest_folder}/churn_modelling.csv'
  1. Tenemos que conectarnos al servidor Postgres.
try:
conn = psycopg2.connect(
host=postgres_host,
database=postgres_database,
user=postgres_user,
password=postgres_password,
port=postgres_port
)
cur = conn.cursor()
logging.info('Conexión exitosa al servidor Postgres')
except Exception as e:
traceback.print_exc()
logging.error("No se pudo crear la conexión a Postgres")
  1. Después de conectarnos al servidor Postgres, descargaremos el archivo remoto en nuestro directorio de trabajo.
def download_file_from_url(url: str, dest_folder: str):
"""
Descarga un archivo desde una URL específica y lo descarga en el directorio local
"""
if not os.path.exists(str(dest_folder)):
os.makedirs(str(dest_folder)) # crear carpeta si no existe

try:
urllib.request.urlretrieve(url, destination_path)
logging.info('Archivo CSV descargado exitosamente al directorio de trabajo')
except Exception as e:
logging.error(f'Error al descargar el archivo CSV debido a: {e}')
traceback.print_exc()
  1. Dado que el segundo objetivo es subir los datos CSV a una tabla de Postgres, debemos crear la tabla.
def create_postgres_table():
"""
Crea la tabla de Postgres con un esquema deseado
"""
try:
cur.execute("""CREATE TABLE IF NOT EXISTS churn_modelling (RowNumber INTEGER PRIMARY KEY, CustomerId INTEGER,
Surname VARCHAR(50), CreditScore INTEGER, Geography VARCHAR(50), Gender VARCHAR(20), Age INTEGER,
Tenure INTEGER, Balance FLOAT, NumOfProducts INTEGER, HasCrCard INTEGER, IsActiveMember INTEGER, EstimatedSalary FLOAT, Exited INTEGER)""")

logging.info(' Nueva tabla churn_modelling creada exitosamente en el servidor de postgres')
except:
logging.warning(' Verifica si la tabla churn_modelling existe')
  1. Después de ejecutar la función anterior, podemos verificar si la tabla se creó con éxito mediante pgAdmin o DBeaver. Al final, tenemos que insertar todos los datos (dataframes de pandas) en la tabla recién creada fila por fila.
def write_to_postgres():
"""
Crea el dataframe y escribe en la tabla de Postgres si aún no existe
"""
df = pd.read_csv(f'{dest_folder}/churn_modelling.csv')
inserted_row_count = 0

for _, row in df.iterrows():
count_query = f"""SELECT COUNT(*) FROM churn_modelling WHERE RowNumber = {row['RowNumber']}"""
cur.execute(count_query)
result = cur.fetchone()

if result[0] == 0:
inserted_row_count += 1
cur.execute("""INSERT INTO churn_modelling (RowNumber, CustomerId, Surname, CreditScore, Geography, Gender, Age,
Tenure, Balance, NumOfProducts, HasCrCard, IsActiveMember, EstimatedSalary, Exited) VALUES (%s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s, %s,%s, %s)""",
(int(row[0]), int(row[1]), str(row[2]), int(row[3]), str(row[4]), str(row[5]), int(row[6]), int(row[7]), float(row[8]), int(row[9]), int(row[10]), int(row[11]), float(row[12]), int(row[13])))

logging.info(f' {inserted_row_count} filas del archivo CSV insertadas en la tabla churn_modelling con éxito')