Skip to main content

Airflow + MinIO

Generator

El objetivo de esta sección es utilizar MinIO como nuestro almacén de objetos central en lugar de utilizar directorios y/o carpetas para almacenar nuestras fuentes de datos que utilizaremos en el proyecto.

En nuestra sección anterior, aprendimos cómo usar Airflow para crear un DAG que nos permitió descargar un archivo CSV y moverlo a una ubicación dentro del mismo host, es decir, a otra carpeta o directorio.

Ahora, la idea principal en esta sección del proyecto es utilizar Airflow para crear un DAG que nos permita descargar el archivo CSV, pero esta vez queremos almacenarlo en MinIO en lugar de en una carpeta o directorio tradicional.

Lo primero que debemos hacer es ajustar el DAG de la sección anterior para que los datos se almacenen en MinIO en lugar de en una carpeta local, necesitaremos integrar la funcionalidad de carga de datos de MinIO en nuestro flujo de trabajo de Airflow. Podemos lograr esto utilizando la biblioteca minio-py, que proporciona una interfaz de Python para interactuar con un servidor MinIO.

Primero, necesitaremos instalar la biblioteca minio en nuestro entorno Airflow. Puedes hacerlo ejecutando pip install minio en tu contenedor Airflow o incluyendo la instalación en el Dockerfile si estás utilizando Docker para tu entorno de desarrollo.

Por esto, debemos crear un nuevo Dockerfile para proceder con la instalación del paquete, ya que nuestro contenedor de Airflow no viene preconfigurado con minio

note

El contenedor Airflow que usamos en nuestra Mini-Data-Platform viene configurado con los paquetes básicos, y lo hemos dejado así a propósito para que aprendas a usar Docker, configurar tu propio Dockerfile y tener algunas bases para crear tus propios contenedores

Dockerfile

En primer lugar, debemos conectarnos a nuestra maquina virtual Machine-0. Una vez hecho esto, no olvides navegar desde la terminal de VSCode al directorio repos/mini-data-platform.

cd repos/mini-data-plarform

Una vez que estés en el directorio, vamos a crear un archivo al que llamaremos Dockerfile y agregaremos las instrucciones. Ahora, desde la terminal, usaremos el comando:

touch Dockerfile

Esto creará un archivo llamado Dockerfile, el cual podrás visualizar desde el panel izquierdo de VSCode. Puedes abrirlo desde este panel haciendo doble clic en el archivo.

Ahora solo debes introducir el siguiente código en el Dockerfile:

FROM z2hx/airflow

USER airflow

RUN pip install minio

Ahora desglosemos el Dockerfile:

  1. FROM z2hx/airflow:  Esta línea indica que estamos utilizando la imagen z2hx/Airflow como la base para nuestra propia imagen. Esta es una imagen modificada a partir de la imagen oficial donde se agregaron algunas dependencias. Esto significa que nuestra imagen contendrá todo lo que está en la imagen z2hx/Airflow, y luego agregaremos la instalación de minio utilizando pip.

  2. USER airflow: Con esta instrucción, se cambia el usuario que ejecutará los siguientes comandos en el Dockerfile a airflow. Esto es útil por razones de seguridad, ya que evita ejecutar el contenedor como el usuario root, lo que puede ser riesgoso.

  3. RUN pip install minio: Esta línea ejecuta un comando en la imagen que está construyendo. Aquí, se utiliza pip, que es el gestor de paquetes de Python, para instalar la biblioteca minio. Esta biblioteca permite interactuar con el servidor MinIO, que es una solución de almacenamiento compatible con S3.

Minio Python

La biblioteca MinIO para Python es una biblioteca de cliente de Python para interactuar con el servidor de almacenamiento MinIO. Al instalar esta biblioteca dentro del contenedor de Airflow, puedes aprovechar todas las funcionalidades que ofrece MinIO para interactuar con objetos de almacenamiento en la nube desde Airflow.

Algunas de las cosas que puedes hacer con la biblioteca MinIO dentro del contenedor de Airflow son:

  1. Interactuar con buckets y objetos: Puedes crear, eliminar y listar buckets, así como subir, descargar y eliminar objetos dentro de esos buckets.

  2. Gestionar políticas de acceso: Puedes configurar políticas de acceso para controlar quién puede acceder a tus buckets y objetos, lo que es útil para asegurar tu almacenamiento de objetos.

  3. Gestionar versiones de objetos: MinIO admite el versionado de objetos, lo que te permite mantener un historial de versiones de tus objetos y restaurar versiones anteriores si es necesario.

  4. Utilizar características avanzadas: MinIO ofrece una amplia gama de características avanzadas, como cifrado de extremo a extremo, compresión de objetos y notificaciones de eventos, que puedes aprovechar en tu aplicación.

MinIO dentro de tu contenedor de Airflow, puedes acceder a todas las funcionalidades de MinIO para almacenar y gestionar objetos en la nube directamente desde tus flujos de trabajo de Airflow.

Docker Build

Ahora debemos crear la imagen de Docker a partir del Dockerfile. Para ello, simplemente debemos ejecutar el siguiente comando desde la terminal:

docker build -t myairflow .

Descompongamos el comando docker build --network=host -t myairflow .

  1. docker build: Este es el comando principal de Docker utilizado para construir imágenes a partir de un Dockerfile y un contexto de construcción.

  2. -t myairflow: Esta opción asigna un nombre y una etiqueta (-t) a la imagen que se está construyendo. En este caso, el nombre de la imagen será myairflow.

  3. .Esto especifica el contexto de construcción, es decir, el directorio actual. Docker buscará un archivo llamado Dockerfile en este directorio y utilizará ese archivo para construir la imagen.

DAG + MinIO

Una vez creado el nuevo contenedor myairflow, podemos crear un nuevo DAG al que podemos llamar download_csv_dag_minio.py. Este DAG lo crearemos en el mismo directorio de DAGs en Airflow. Una vez creado el archivo, procederemos a copiar el siguiente código:

import os
from datetime import datetime, timedelta
from minio import Minio

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

# Configuración de Minio
def get_minio_client():
minio_client = Minio(
endpoint="minio:9000",
access_key=os.environ["MINIO_ROOT_USER"],
secret_key=os.environ["MINIO_ROOT_PASSWORD"],
secure=False
)
return minio_client

def upload_to_minio(**kwargs):
minio_client = get_minio_client()
bucket = 'csv-bucket'
file_path = '/opt/airflow/notebooks/Churn_Modelling-1.csv'
object_name = 'Churn_Modelling-1.csv'

# Verificar si el bucket existe, si no, crearlo
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)

# Subir el archivo
minio_client.fput_object(bucket, object_name, file_path, content_type='application/csv')

# Configuración por defecto del DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 5, 13),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

# Definición del DAG
dag = DAG(
'download_csv_dag_minio',
default_args=default_args,
description='DAG to download, move, and upload CSV file to Minio',
schedule_interval=None,
)

# Tarea para descargar el archivo CSV
download_csv_task = BashOperator(
task_id='download_csv',
bash_command='wget -O /opt/airflow/Proyectos/input/Churn_Modelling-1.csv https://raw.githubusercontent.com/xploiterx/datasets/master/Proyect-0/CSV/Churn_Modelling-1.csv',
dag=dag,
)

# Tarea para copiar el archivo CSV a otro directorio
cp_csv_task = BashOperator(
task_id='cp_csv',
bash_command='cp /opt/airflow/Proyectos/input/Churn_Modelling-1.csv /opt/airflow/notebooks',
dag=dag,
)

# Tarea para subir el archivo CSV a Minio
upload_csv_task = PythonOperator(
task_id='upload_csv_to_minio',
provide_context=True,
python_callable=upload_to_minio,
dag=dag,
)

# Definir las dependencias entre las tareas
download_csv_task >> cp_csv_task >> upload_csv_task

Modificando el docker-compose.yml

Para que podamos usar el nuevo contenedor creado, debemos modificar el archivo docker-compose.yml para indicarle que utilice nuestra nueva imagen myairflow. Para lograr esto, vamos a editar el docker-compose.yml desde VSCode. Una vez abierto el archivo docker-compose.yml, editaremos la siguiente línea donde dice z2hx/airflow, la cambiaremos por myairflow, de modo que quede así:

Antes

version: '3.5'

x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-z2hx/airflow}

Después

version: '3.5'

x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-myairflow}

Una vez realizado todos estos pasos solo debemos correr de nuevo el docker-compose.yml

**Si los contenedores están iniciados:

Generator

Para iniciar de nuevo los contenedores:

docker compose -f airflow.docker-compose.yml up

Ejecutando el DAG

danger

Antes de proceder con este ejercicio, debemos validar que en la terminal de VSCode estén activos los puertos de Minio (9000, 9001) y Airflow (7000). En caso contrario, debes activarlos utilizando la opción Add Port.

Generator

Ahora, dirígete al navegador y validemos que tengas acceso a MinIO. Simplemente escribe localhost:9001 e ingresa las credenciales que están almacenadas en la carpeta config/minio.env.

Generator

Este archivo contiene las credenciales para poder iniciar sesión en el portal web de MinIO.

MINIO_ROOT_USER=minio-access-key
MINIO_ROOT_PASSWORD=minio-secret-key

Una vez hayas iniciado sesión, verás una pantalla similar a la siguiente imagen donde podrás validar que no contiene nada. Pero una vez ejecutemos el DAG, el script creará una carpeta donde se almacenará el CSV.

Generator

Ahora, solo nos resta ejecutar el DAG. Antes de hacerlo, debemos asegurarnos de eliminar los dos archivos CSV creados en el proceso anterior, ubicados en Proyectos/input y notebooks. Una vez hecho esto, puedes proceder a ejecutar el DAG desde el panel de Airflow, tal como se realizó en la sección anterior. No olvides ejecutar el DAG correcto download_csv_dag_minio.py, ya que en el panel de Airflow debes tener dos DAGs: el del proceso anterior llamado download_csv_dag.py y el actual llamado download_csv_dag_minio.py.

Generator

Ahora, solo debes ejecutar el DAG download_csv_dag_minio.py EL proceso es exactamente el mismo que el anterior, solo que ahora se ejecutan tres tareas como se puede validar en el panel:

Generator

  1. Descarga un archivo CSV desde una URL pública.
  2. Copia el archivo descargado a otro directorio en el sistema de archivos local.
  3. Sube el archivo copiado a un bucket en Minio.

Puedes validar que efectivamente en Proyectos/input y notebooks están los dos Churn_Modelling-1.csv. Ahora, solo resta validar nuestro S3 Minio y para validar esto revisemos el portal web de minio y validemos que efectivamente el CSV este almacenado en MinIO

Generator

Para revisar el CSV descargado solo haz clic en Browse y verás la siguiente pantalla:

Generator

Y como puedes validar el CSV efectivamente ha sido descargado y almacenado en MinIO

DAG Explicación detallada

  1. Importaciones y configuración inicial:

    import os
    from datetime import datetime, timedelta
    from minio import Minio
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.bash_operator import BashOperator

    Se importan las bibliotecas necesarias, incluyendo os para variables de entorno, datetime y timedelta para manejar fechas, Minio para interactuar con el almacenamiento Minio, y DAG, PythonOperator y BashOperator de Airflow para definir y gestionar las tareas del DAG.

  2. Configuración del cliente de Minio:

    def get_minio_client():
    minio_client = Minio(
    endpoint="minio:9000",
    access_key=os.environ["MINIO_ROOT_USER"],
    secret_key=os.environ["MINIO_ROOT_PASSWORD"],
    secure=False
    )
    return minio_client

    Esta función configura y devuelve un cliente de Minio utilizando las credenciales almacenadas en las variables de entorno:

    • def get_minio_client():: Esto define una función llamada get_minio_client.

    • minio_client = Minio(: Aquí se crea una instancia de Minio, la cual es una clase proporcionada por la biblioteca minio. Esta clase permite interactuar con un servidor Minio.

    • endpoint="minio:9000",: El parámetro endpoint especifica la dirección y el puerto del servidor Minio al que se desea conectar. En este caso, se usa minio:9000, lo que implica que el servidor Minio está disponible en el host minio en el puerto 9000.

    • access_key=os.environ["MINIO_ROOT_USER"],: Este parámetro especifica la clave de acceso que se utilizará para autenticarse en el servidor Minio. La clave de acceso se lee de una variable de entorno llamada MINIO_ROOT_USER.

    • secret_key=os.environ["MINIO_ROOT_PASSWORD"],: Similar al parámetro anterior, este especifica la clave secreta que se utilizará para autenticarse en el servidor Minio. La clave secreta se lee de una variable de entorno llamada MINIO_ROOT_PASSWORD.

    • secure=False: Este parámetro indica si se debe usar una conexión segura (HTTPS) al comunicarse con el servidor Minio. Al establecerlo en False, se indica que se usará una conexión no segura (HTTP).

    • return minio_client: Una vez que se ha creado el cliente Minio y se han configurado las credenciales y la configuración del servidor, se devuelve la instancia del cliente Minio creado.

  3. Función para subir archivos a Minio:

    def upload_to_minio(**kwargs):
    minio_client = get_minio_client()
    bucket = 'csv-bucket'
    file_path = '/opt/airflow/notebooks/Churn_Modelling-1.csv'
    object_name = 'Churn_Modelling-1.csv'

    if not minio_client.bucket_exists(bucket):
    minio_client.make_bucket(bucket)

    minio_client.fput_object(bucket, object_name, file_path, content_type='application/csv')

    Esta función utiliza el cliente de Minio para subir un archivo CSV a un bucket específico. Si el bucket no existe, lo crea primero:

    • def upload_to_minio(**kwargs):: Esto define una función llamada upload_to_minio que acepta argumentos clave-valor adicionales (**kwargs). Estos argumentos pueden contener información adicional que podría ser útil en la función, aunque en este caso no se están utilizando.

    • minio_client = get_minio_client(): Aquí se llama a la función get_minio_client() para obtener una instancia del cliente Minio. Esta función se supone que ya ha sido definida en otro lugar y devuelve un cliente Minio configurado correctamente.

    • bucket = 'csv-bucket': Se define el nombre del bucket al que se subirá el archivo. En este caso, el nombre del bucket es csv-bucket.

    • file_path = '/opt/airflow/notebooks/Churn_Modelling-1.csv': Se especifica la ruta completa del archivo que se va a subir al servidor Minio. En este caso, el archivo se encuentra en /opt/airflow/notebooks/Churn_Modelling-1.csv.

    • object_name = 'Churn_Modelling-1.csv': Este es el nombre que tendrá el objeto en el bucket de Minio una vez que se haya subido. En este caso, el nombre del objeto es el mismo que el nombre del archivo.

    • if not minio_client.bucket_exists(bucket):: Aquí se verifica si el bucket especificado ya existe en el servidor Minio. Si el bucket no existe, se crea utilizando el método make_bucket() del cliente Minio.

    • minio_client.fput_object(bucket, object_name, file_path, content_type='application/csv'): Finalmente, se utiliza el método fput_object() del cliente Minio para subir el archivo al bucket especificado. Este método toma como argumentos el nombre del bucket, el nombre del objeto, la ruta del archivo local y opcionalmente el tipo de contenido del archivo. En este caso, se especifica que el tipo de contenido del archivo es application/csv.

  4. Configuración por defecto del DAG:

    default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 5, 13),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    }

    Aquí se configuran los parámetros por defecto para el DAG, incluyendo el propietario, la fecha de inicio, y las políticas de reintento:

    • 'owner': 'airflow': Este argumento indica quién es el propietario del DAG. Es útil para identificar quién es responsable del mantenimiento y la gestión del DAG.

    • 'depends_on_past': False: Este argumento especifica si las tareas del DAG deben esperar a que las tareas del día anterior se completen antes de ejecutarse. En este caso, se establece en False, lo que significa que las tareas no dependerán del estado de las tareas anteriores.

    • 'start_date': datetime(2024, 5, 13): Este argumento define la fecha y hora de inicio del DAG. Es la fecha en la que se comenzará a programar la ejecución del DAG. En este caso, el DAG está programado para comenzar el 13 de mayo de 2024.

    • 'email_on_failure': False: Este argumento especifica si se enviará un correo electrónico cuando una tarea del DAG falle. En este caso, se establece en False, lo que significa que no se enviarán correos electrónicos en caso de fallo.

    • 'email_on_retry': False: Este argumento especifica si se enviará un correo electrónico cuando se vuelva a intentar una tarea después de un fallo. En este caso, se establece en False, lo que significa que no se enviarán correos electrónicos en caso de reintentos.

    • 'retries': 1: Este argumento especifica el número de intentos que se realizarán al ejecutar una tarea si esta falla. En este caso, se establece en 1, lo que significa que habrá un solo reintentos en caso de fallo.

    • 'retry_delay': timedelta(minutes=5): Este argumento especifica el tiempo de espera entre reintentos en caso de fallo de una tarea. En este caso, se establece en 5 minutos, lo que significa que se esperará 5 minutos antes de intentar nuevamente después de un fallo.

  5. Definición del DAG:

    dag = DAG(
    'download_csv_dag',
    default_args=default_args,
    description='DAG to download, move, and upload CSV file to Minio',
    schedule_interval=None,
    )

    Se define el DAG con un identificador (download_csv_dag), los argumentos por defecto, una descripción y el intervalo de ejecución (en este caso, no se ejecuta en un horario regular).

    • 'download_csv_dag': Es el nombre del DAG. Este nombre se utiliza para identificar el DAG dentro de Airflow y debe ser único dentro del entorno de Airflow.

    • default_args=default_args: Este argumento indica que se utilizarán los argumentos por defecto definidos en el diccionario default_args que se proporcionó anteriormente. Estos argumentos especifican configuraciones como el propietario del DAG, la fecha de inicio, la configuración de reintentos, etc.

    • description='DAG to download, move, and upload CSV file to Minio': Esta es una descripción opcional del DAG. Proporciona una breve explicación del propósito o la funcionalidad del DAG. En este caso, describe que el DAG está destinado a descargar, mover y cargar un archivo CSV en Minio.

    • schedule_interval=None: Este argumento especifica el intervalo de programación para la ejecución del DAG. Al establecerlo en None, se indica que el DAG no se ejecutará automáticamente en un horario programado, sino que se activará manualmente o a través de desencadenadores externos.

  6. Definición de las tareas:

    • Tarea para descargar el archivo CSV:

      download_csv_task = BashOperator(
      task_id='download_csv',
      bash_command='wget -O /opt/airflow/Proyectos/input/Churn_Modelling-1.csv https://raw.githubusercontent.com/xploiterx/datasets/master/Proyect-0/CSV/Churn_Modelling-1.csv',
      dag=dag,
      )

      Esta tarea utiliza BashOperator para descargar un archivo CSV desde una URL específica y guardarlo en un directorio local.

    • Tarea para copiar el archivo CSV a otro directorio:

      cp_csv_task = BashOperator(
      task_id='cp_csv',
      bash_command='cp /opt/airflow/Proyectos/input/Churn_Modelling-1.csv /opt/airflow/notebooks',
      dag=dag,
      )

      Esta tarea copia el archivo CSV descargado a otro directorio dentro del sistema de archivos local.

    • Tarea para subir el archivo CSV a Minio:

      upload_csv_task = PythonOperator(
      task_id='upload_csv_to_minio',
      provide_context=True,
      python_callable=upload_to_minio,
      dag=dag,
      )

      Esta tarea utiliza PythonOperator para ejecutar la función upload_to_minio, subiendo el archivo CSV a Minio:

      • task_id='upload_csv_to_minio': Este argumento especifica el identificador único de la tarea. Es el nombre por el cual la tarea será conocida dentro del DAG. En este caso, el identificador es 'upload_csv_to_minio'.

      • provide_context=True: Este argumento indica si se debe proporcionar el contexto de ejecución de Airflow a la función llamada upload_to_minio. Cuando está establecido en True, el contexto de ejecución de Airflow se pasa como argumento a la función, lo que permite acceder a información como la fecha de ejecución, los parámetros del DAG, etc.

      • python_callable=upload_to_minio: Este argumento especifica la función que se ejecutará como parte de la tarea. En este caso, se proporciona la función upload_to_minio que definimos anteriormente. Esta función realizará la carga del archivo CSV a Minio.

      • dag=dag: Este argumento indica a qué DAG pertenece la tarea. Aquí, dag=dag significa que esta tarea pertenece al DAG dag que se definió previamente.

  7. Definición de las dependencias entre las tareas:

    download_csv_task >> cp_csv_task >> upload_csv_task

    Se establecen las dependencias entre las tareas, indicando que primero se debe descargar el archivo, luego copiarlo y finalmente subirlo a Minio.

Resumen

Hemos credo un DAG configurado para realizar las siguientes acciones secuenciales:

  1. Descarga un archivo CSV desde una URL pública.
  2. Copia el archivo descargado a otro directorio en el sistema de archivos local.
  3. Sube el archivo copiado a un bucket en Minio.

Este proceso se define utilizando operadores de Python y Bash en Airflow, con una configuración específica para el cliente de Minio y las tareas necesarias para completar el flujo de trabajo.