Airflow + MinIO
El objetivo de esta sección es utilizar MinIO
como nuestro almacén de objetos central en lugar de utilizar directorios y/o carpetas para almacenar nuestras fuentes de datos que utilizaremos en el proyecto.
En nuestra sección anterior, aprendimos cómo usar Airflow
para crear un DAG
que nos permitió descargar un archivo CSV
y moverlo a una ubicación dentro del mismo host, es decir, a otra carpeta o directorio.
Ahora, la idea principal en esta sección del proyecto es utilizar Airflow
para crear un DAG
que nos permita descargar el archivo CSV, pero esta vez queremos almacenarlo en MinIO en lugar de en una carpeta o directorio tradicional.
Lo primero que debemos hacer es ajustar el DAG
de la sección anterior para que los datos se almacenen en MinIO en lugar de en una carpeta local, necesitaremos integrar la funcionalidad de carga de datos de MinIO en nuestro flujo de trabajo de Airflow. Podemos lograr esto utilizando la biblioteca minio-py
, que proporciona una interfaz de Python para interactuar con un servidor MinIO.
Primero, necesitaremos instalar la biblioteca minio
en nuestro entorno Airflow
. Puedes hacerlo ejecutando pip install minio
en tu contenedor Airflow o incluyendo la instalación en el Dockerfile si estás utilizando Docker para tu entorno de desarrollo.
Por esto, debemos crear un nuevo Dockerfile para proceder con la instalación del paquete, ya que nuestro contenedor de Airflow
no viene preconfigurado con minio
El contenedor Airflow
que usamos en nuestra Mini-Data-Platform
viene configurado con los paquetes básicos, y lo hemos dejado así a propósito para que aprendas a usar Docker, configurar tu propio Dockerfile y tener algunas bases para crear tus propios contenedores
Dockerfile
En primer lugar, debemos conectarnos a nuestra maquina virtual Machine-0
. Una vez hecho esto, no olvides navegar desde la terminal de VSCode
al directorio repos/mini-data-platform
.
cd repos/mini-data-plarform
Una vez que estés en el directorio, vamos a crear un archivo al que llamaremos Dockerfile
y agregaremos las instrucciones. Ahora, desde la terminal, usaremos el comando:
touch Dockerfile
Esto creará un archivo llamado Dockerfile
, el cual podrás visualizar desde el panel izquierdo de VSCode
. Puedes abrirlo desde este panel haciendo doble clic en el archivo.
Ahora solo debes introducir el siguiente código en el Dockerfile:
FROM z2hx/airflow
USER airflow
RUN pip install minio
Ahora desglosemos el Dockerfile:
-
FROM z2hx/airflow
: Esta línea indica que estamos utilizando la imagenz2hx/Airflow
como la base para nuestra propia imagen. Esta es una imagen modificada a partir de la imagen oficial donde se agregaron algunas dependencias. Esto significa que nuestra imagen contendrá todo lo que está en la imagenz2hx/Airflow
, y luego agregaremos la instalación deminio
utilizando pip. -
USER airflow
: Con esta instrucción, se cambia el usuario que ejecutará los siguientes comandos en el Dockerfile aairflow
. Esto es útil por razones de seguridad, ya que evita ejecutar el contenedor como el usuario root, lo que puede ser riesgoso. -
RUN pip install minio
: Esta línea ejecuta un comando en la imagen que está construyendo. Aquí, se utilizapip
, que es el gestor de paquetes de Python, para instalar la bibliotecaminio
. Esta biblioteca permite interactuar con el servidor MinIO, que es una solución de almacenamiento compatible con S3.
Minio Python
La biblioteca MinIO
para Python
es una biblioteca de cliente de Python
para interactuar con el servidor de almacenamiento MinIO
. Al instalar esta biblioteca dentro del contenedor de Airflow
, puedes aprovechar todas las funcionalidades que ofrece MinIO para interactuar con objetos de almacenamiento en la nube desde Airflow.
Algunas de las cosas que puedes hacer con la biblioteca MinIO dentro del contenedor de Airflow son:
-
Interactuar con buckets y objetos: Puedes crear, eliminar y listar buckets, así como subir, descargar y eliminar objetos dentro de esos buckets.
-
Gestionar políticas de acceso: Puedes configurar políticas de acceso para controlar quién puede acceder a tus buckets y objetos, lo que es útil para asegurar tu almacenamiento de objetos.
-
Gestionar versiones de objetos: MinIO admite el versionado de objetos, lo que te permite mantener un historial de versiones de tus objetos y restaurar versiones anteriores si es necesario.
-
Utilizar características avanzadas: MinIO ofrece una amplia gama de características avanzadas, como cifrado de extremo a extremo, compresión de objetos y notificaciones de eventos, que puedes aprovechar en tu aplicación.
MinIO dentro de tu contenedor de Airflow, puedes acceder a todas las funcionalidades de MinIO para almacenar y gestionar objetos en la nube directamente desde tus flujos de trabajo de Airflow.
Docker Build
Ahora debemos crear la imagen de Docker
a partir del Dockerfile
. Para ello, simplemente debemos ejecutar el siguiente comando desde la terminal:
docker build -t myairflow .
Descompongamos el comando docker build --network=host -t myairflow .
-
docker build
: Este es el comando principal de Docker utilizado para construir imágenes a partir de un Dockerfile y un contexto de construcción. -
-t myairflow
: Esta opción asigna un nombre y una etiqueta (-t
) a la imagen que se está construyendo. En este caso, el nombre de la imagen serámyairflow
. -
.
Esto especifica el contexto de construcción, es decir, el directorio actual. Docker buscará un archivo llamadoDockerfile
en este directorio y utilizará ese archivo para construir la imagen.
DAG + MinIO
Una vez creado el nuevo contenedor myairflow
, podemos crear un nuevo DAG al que podemos llamar download_csv_dag_minio.py
. Este DAG lo crearemos en el mismo directorio de DAGs en Airflow. Una vez creado el archivo, procederemos a copiar el siguiente código:
import os
from datetime import datetime, timedelta
from minio import Minio
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
# Configuración de Minio
def get_minio_client():
minio_client = Minio(
endpoint="minio:9000",
access_key=os.environ["MINIO_ROOT_USER"],
secret_key=os.environ["MINIO_ROOT_PASSWORD"],
secure=False
)
return minio_client
def upload_to_minio(**kwargs):
minio_client = get_minio_client()
bucket = 'csv-bucket'
file_path = '/opt/airflow/notebooks/Churn_Modelling-1.csv'
object_name = 'Churn_Modelling-1.csv'
# Verificar si el bucket existe, si no, crearlo
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
# Subir el archivo
minio_client.fput_object(bucket, object_name, file_path, content_type='application/csv')
# Configuración por defecto del DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 5, 13),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Definición del DAG
dag = DAG(
'download_csv_dag_minio',
default_args=default_args,
description='DAG to download, move, and upload CSV file to Minio',
schedule_interval=None,
)
# Tarea para descargar el archivo CSV
download_csv_task = BashOperator(
task_id='download_csv',
bash_command='wget -O /opt/airflow/Proyectos/input/Churn_Modelling-1.csv https://raw.githubusercontent.com/xploiterx/datasets/master/Proyect-0/CSV/Churn_Modelling-1.csv',
dag=dag,
)
# Tarea para copiar el archivo CSV a otro directorio
cp_csv_task = BashOperator(
task_id='cp_csv',
bash_command='cp /opt/airflow/Proyectos/input/Churn_Modelling-1.csv /opt/airflow/notebooks',
dag=dag,
)
# Tarea para subir el archivo CSV a Minio
upload_csv_task = PythonOperator(
task_id='upload_csv_to_minio',
provide_context=True,
python_callable=upload_to_minio,
dag=dag,
)
# Definir las dependencias entre las tareas
download_csv_task >> cp_csv_task >> upload_csv_task
Modificando el docker-compose.yml
Para que podamos usar el nuevo contenedor creado, debemos modificar el archivo docker-compose.yml
para indicarle que utilice nuestra nueva imagen myairflow
. Para lograr esto, vamos a editar el docker-compose.yml
desde VSCode. Una vez abierto el archivo docker-compose.yml
, editaremos la siguiente línea donde dice z2hx/airflow
, la cambiaremos por myairflow
, de modo que quede así:
Antes
version: '3.5'
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-z2hx/airflow}
Después
version: '3.5'
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-myairflow}
Una vez realizado todos estos pasos solo debemos correr de nuevo el docker-compose.yml
**Si los contenedores están iniciados:
Para iniciar de nuevo los contenedores:
docker compose -f airflow.docker-compose.yml up
Ejecutando el DAG
Antes de proceder con este ejercicio, debemos validar que en la terminal de VSCode estén activos los puertos de Minio (9000, 9001) y Airflow (7000). En caso contrario, debes activarlos utilizando la opción Add Port
.
Ahora, dirígete al navegador y validemos que tengas acceso a MinIO. Simplemente escribe localhost:9001
e ingresa las credenciales que están almacenadas en la carpeta config/minio.env
.
Este archivo contiene las credenciales para poder iniciar sesión en el portal web de MinIO.
MINIO_ROOT_USER=minio-access-key
MINIO_ROOT_PASSWORD=minio-secret-key
Una vez hayas iniciado sesión, verás una pantalla similar a la siguiente imagen donde podrás validar que no contiene nada. Pero una vez ejecutemos el DAG, el script creará una carpeta donde se almacenará el CSV.
Ahora, solo nos resta ejecutar el DAG. Antes de hacerlo, debemos asegurarnos de eliminar los dos archivos CSV
creados en el proceso anterior, ubicados en Proyectos/input
y notebooks
. Una vez hecho esto, puedes proceder a ejecutar el DAG desde el panel de Airflow, tal como se realizó en la sección anterior. No olvides ejecutar el DAG correcto download_csv_dag_minio.py
, ya que en el panel de Airflow debes tener dos DAGs: el del proceso anterior llamado download_csv_dag.py
y el actual llamado download_csv_dag_minio.py
.
Ahora, solo debes ejecutar el DAG download_csv_dag_minio.py
EL proceso es exactamente el mismo que el anterior, solo que ahora se ejecutan tres tareas como se puede validar en el panel:
- Descarga un archivo CSV desde una URL pública.
- Copia el archivo descargado a otro directorio en el sistema de archivos local.
- Sube el archivo copiado a un bucket en Minio.
Puedes validar que efectivamente en Proyectos/input
y notebooks
están los dos Churn_Modelling-1.csv
. Ahora, solo resta validar nuestro S3 Minio
y para validar esto revisemos el portal web de minio y validemos que efectivamente el CSV
este almacenado en MinIO
Para revisar el CSV
descargado solo haz clic en Browse
y verás la siguiente pantalla:
Y como puedes validar el CSV
efectivamente ha sido descargado y almacenado en MinIO
DAG Explicación detallada
-
Importaciones y configuración inicial:
import os
from datetime import datetime, timedelta
from minio import Minio
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperatorSe importan las bibliotecas necesarias, incluyendo
os
para variables de entorno,datetime
ytimedelta
para manejar fechas,Minio
para interactuar con el almacenamiento Minio, yDAG
,PythonOperator
yBashOperator
de Airflow para definir y gestionar las tareas del DAG. -
Configuración del cliente de Minio:
def get_minio_client():
minio_client = Minio(
endpoint="minio:9000",
access_key=os.environ["MINIO_ROOT_USER"],
secret_key=os.environ["MINIO_ROOT_PASSWORD"],
secure=False
)
return minio_clientEsta función configura y devuelve un cliente de Minio utilizando las credenciales almacenadas en las variables de entorno:
-
def get_minio_client():
: Esto define una función llamadaget_minio_client
. -
minio_client = Minio(
: Aquí se crea una instancia de Minio, la cual es una clase proporcionada por la bibliotecaminio
. Esta clase permite interactuar con un servidor Minio. -
endpoint="minio:9000",
: El parámetroendpoint
especifica la dirección y el puerto del servidor Minio al que se desea conectar. En este caso, se usaminio:9000
, lo que implica que el servidor Minio está disponible en el hostminio
en el puerto9000
. -
access_key=os.environ["MINIO_ROOT_USER"],
: Este parámetro especifica la clave de acceso que se utilizará para autenticarse en el servidor Minio. La clave de acceso se lee de una variable de entorno llamadaMINIO_ROOT_USER
. -
secret_key=os.environ["MINIO_ROOT_PASSWORD"],
: Similar al parámetro anterior, este especifica la clave secreta que se utilizará para autenticarse en el servidor Minio. La clave secreta se lee de una variable de entorno llamadaMINIO_ROOT_PASSWORD
. -
secure=False
: Este parámetro indica si se debe usar una conexión segura (HTTPS) al comunicarse con el servidor Minio. Al establecerlo enFalse
, se indica que se usará una conexión no segura (HTTP). -
return minio_client
: Una vez que se ha creado el cliente Minio y se han configurado las credenciales y la configuración del servidor, se devuelve la instancia del cliente Minio creado.
-
-
Función para subir archivos a Minio:
def upload_to_minio(**kwargs):
minio_client = get_minio_client()
bucket = 'csv-bucket'
file_path = '/opt/airflow/notebooks/Churn_Modelling-1.csv'
object_name = 'Churn_Modelling-1.csv'
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
minio_client.fput_object(bucket, object_name, file_path, content_type='application/csv')Esta función utiliza el cliente de Minio para subir un archivo CSV a un bucket específico. Si el bucket no existe, lo crea primero:
-
def upload_to_minio(**kwargs):
: Esto define una función llamadaupload_to_minio
que acepta argumentos clave-valor adicionales (**kwargs
). Estos argumentos pueden contener información adicional que podría ser útil en la función, aunque en este caso no se están utilizando. -
minio_client = get_minio_client()
: Aquí se llama a la funciónget_minio_client()
para obtener una instancia del cliente Minio. Esta función se supone que ya ha sido definida en otro lugar y devuelve un cliente Minio configurado correctamente. -
bucket = 'csv-bucket'
: Se define el nombre del bucket al que se subirá el archivo. En este caso, el nombre del bucket escsv-bucket
. -
file_path = '/opt/airflow/notebooks/Churn_Modelling-1.csv'
: Se especifica la ruta completa del archivo que se va a subir al servidor Minio. En este caso, el archivo se encuentra en/opt/airflow/notebooks/Churn_Modelling-1.csv
. -
object_name = 'Churn_Modelling-1.csv'
: Este es el nombre que tendrá el objeto en el bucket de Minio una vez que se haya subido. En este caso, el nombre del objeto es el mismo que el nombre del archivo. -
if not minio_client.bucket_exists(bucket):
: Aquí se verifica si el bucket especificado ya existe en el servidor Minio. Si el bucket no existe, se crea utilizando el métodomake_bucket()
del cliente Minio. -
minio_client.fput_object(bucket, object_name, file_path, content_type='application/csv')
: Finalmente, se utiliza el métodofput_object()
del cliente Minio para subir el archivo al bucket especificado. Este método toma como argumentos el nombre del bucket, el nombre del objeto, la ruta del archivo local y opcionalmente el tipo de contenido del archivo. En este caso, se especifica que el tipo de contenido del archivo esapplication/csv
.
-
-
Configuración por defecto del DAG:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 5, 13),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}Aquí se configuran los parámetros por defecto para el DAG, incluyendo el propietario, la fecha de inicio, y las políticas de reintento:
-
'owner': 'airflow'
: Este argumento indica quién es el propietario del DAG. Es útil para identificar quién es responsable del mantenimiento y la gestión del DAG. -
'depends_on_past': False
: Este argumento especifica si las tareas del DAG deben esperar a que las tareas del día anterior se completen antes de ejecutarse. En este caso, se establece enFalse
, lo que significa que las tareas no dependerán del estado de las tareas anteriores. -
'start_date': datetime(2024, 5, 13)
: Este argumento define la fecha y hora de inicio del DAG. Es la fecha en la que se comenzará a programar la ejecución del DAG. En este caso, el DAG está programado para comenzar el 13 de mayo de 2024. -
'email_on_failure': False
: Este argumento especifica si se enviará un correo electrónico cuando una tarea del DAG falle. En este caso, se establece enFalse
, lo que significa que no se enviarán correos electrónicos en caso de fallo. -
'email_on_retry': False
: Este argumento especifica si se enviará un correo electrónico cuando se vuelva a intentar una tarea después de un fallo. En este caso, se establece enFalse
, lo que significa que no se enviarán correos electrónicos en caso de reintentos. -
'retries': 1
: Este argumento especifica el número de intentos que se realizarán al ejecutar una tarea si esta falla. En este caso, se establece en1
, lo que significa que habrá un solo reintentos en caso de fallo. -
'retry_delay': timedelta(minutes=5)
: Este argumento especifica el tiempo de espera entre reintentos en caso de fallo de una tarea. En este caso, se establece en5
minutos, lo que significa que se esperará 5 minutos antes de intentar nuevamente después de un fallo.
-
-
Definición del DAG:
dag = DAG(
'download_csv_dag',
default_args=default_args,
description='DAG to download, move, and upload CSV file to Minio',
schedule_interval=None,
)Se define el DAG con un identificador (
download_csv_dag
), los argumentos por defecto, una descripción y el intervalo de ejecución (en este caso, no se ejecuta en un horario regular).-
'download_csv_dag'
: Es el nombre del DAG. Este nombre se utiliza para identificar el DAG dentro de Airflow y debe ser único dentro del entorno de Airflow. -
default_args=default_args
: Este argumento indica que se utilizarán los argumentos por defecto definidos en el diccionariodefault_args
que se proporcionó anteriormente. Estos argumentos especifican configuraciones como el propietario del DAG, la fecha de inicio, la configuración de reintentos, etc. -
description='DAG to download, move, and upload CSV file to Minio'
: Esta es una descripción opcional del DAG. Proporciona una breve explicación del propósito o la funcionalidad del DAG. En este caso, describe que el DAG está destinado a descargar, mover y cargar un archivo CSV en Minio. -
schedule_interval=None
: Este argumento especifica el intervalo de programación para la ejecución del DAG. Al establecerlo enNone
, se indica que el DAG no se ejecutará automáticamente en un horario programado, sino que se activará manualmente o a través de desencadenadores externos.
-
-
Definición de las tareas:
-
Tarea para descargar el archivo CSV:
download_csv_task = BashOperator(
task_id='download_csv',
bash_command='wget -O /opt/airflow/Proyectos/input/Churn_Modelling-1.csv https://raw.githubusercontent.com/xploiterx/datasets/master/Proyect-0/CSV/Churn_Modelling-1.csv',
dag=dag,
)Esta tarea utiliza
BashOperator
para descargar un archivo CSV desde una URL específica y guardarlo en un directorio local. -
Tarea para copiar el archivo CSV a otro directorio:
cp_csv_task = BashOperator(
task_id='cp_csv',
bash_command='cp /opt/airflow/Proyectos/input/Churn_Modelling-1.csv /opt/airflow/notebooks',
dag=dag,
)Esta tarea copia el archivo CSV descargado a otro directorio dentro del sistema de archivos local.
-
Tarea para subir el archivo CSV a Minio:
upload_csv_task = PythonOperator(
task_id='upload_csv_to_minio',
provide_context=True,
python_callable=upload_to_minio,
dag=dag,
)Esta tarea utiliza
PythonOperator
para ejecutar la funciónupload_to_minio
, subiendo el archivo CSV a Minio:-
task_id='upload_csv_to_minio'
: Este argumento especifica el identificador único de la tarea. Es el nombre por el cual la tarea será conocida dentro del DAG. En este caso, el identificador es'upload_csv_to_minio'
. -
provide_context=True
: Este argumento indica si se debe proporcionar el contexto de ejecución de Airflow a la función llamadaupload_to_minio
. Cuando está establecido enTrue
, el contexto de ejecución de Airflow se pasa como argumento a la función, lo que permite acceder a información como la fecha de ejecución, los parámetros del DAG, etc. -
python_callable=upload_to_minio
: Este argumento especifica la función que se ejecutará como parte de la tarea. En este caso, se proporciona la funciónupload_to_minio
que definimos anteriormente. Esta función realizará la carga del archivo CSV a Minio. -
dag=dag
: Este argumento indica a qué DAG pertenece la tarea. Aquí,dag=dag
significa que esta tarea pertenece al DAGdag
que se definió previamente.
-
-
-
Definición de las dependencias entre las tareas:
download_csv_task >> cp_csv_task >> upload_csv_task
Se establecen las dependencias entre las tareas, indicando que primero se debe descargar el archivo, luego copiarlo y finalmente subirlo a Minio.
Resumen
Hemos credo un DAG configurado para realizar las siguientes acciones secuenciales:
- Descarga un archivo CSV desde una URL pública.
- Copia el archivo descargado a otro directorio en el sistema de archivos local.
- Sube el archivo copiado a un bucket en Minio.
Este proceso se define utilizando operadores de Python y Bash en Airflow, con una configuración específica para el cliente de Minio y las tareas necesarias para completar el flujo de trabajo.