🔍 ¿Qué es Airflow?🤔💻
Airflow es una plataforma de código abierto que se utiliza para organizar flujos de trabajo de datos complejos. Está diseñada en torno al concepto de gráficos acíclicos dirigidos (DAG), que definen una serie de tareas y sus dependencias. Airflow se compone de varios microservicios que trabajan juntos para ejecutar estas tareas. A continuación, se ofrece una explicación sencilla de los componentes clave:
🌐 𝗦𝗲𝗿𝘃𝗲𝗿𝘀𝗶𝗼́𝗻: Esta es la interfaz de usuario de Airflow, donde puedes crear, monitorear y administrar DAG. Proporciona un panel de control fácil de usar que te ayuda a visualizar tus flujos de trabajo de datos, verificar su progreso y solucionar cualquier problema.
🕰️ 𝗦𝗰𝗵𝗲𝗱𝘂𝗹𝗲𝗿: este componente es responsable de administrar la ejecución de las tareas. Monitorea constantemente los DAG que has creado y programa las tareas para que se ejecuten según sus dependencias y configuraciones de tiempo. El Scheduler se asegura de que las tareas se ejecuten en el orden correcto y en el momento correcto.
🔧 𝗘𝘅𝗲𝗰𝘂𝘁𝗼𝗿: El ejecutor es responsable de ejecutar las tareas. Se comunica con el programador para recibir información sobre qué tareas ejecutar y luego lanza los procesos o contenedores necesarios para ejecutar las tareas. Hay diferentes tipos de ejecutores en Airflow, como LocalExecutor, CeleryExecutor y KubernetesExecutor, según su infraestructura y sus requisitos.
👷 𝗪𝗼𝗿𝗸𝗲𝗿: El Worker es un componente que realiza las tareas asignadas por el Executor. Puede ser un proceso o contenedor independiente, según el Executor elegido. Los Workers son responsables de ejecutar el código o los scripts reales definidos en sus tareas y de informar su estado al Executor.
💾 𝗠𝗲𝘁𝗮𝗱𝗮𝘁𝗮 𝗗𝗮𝘁𝗮𝗯𝗮𝘀𝗲: Este es el repositorio central donde Airflow almacena información sobre los DAG, las tareas y su historial de ejecución. Ayuda a mantener el estado de sus flujos de trabajo y proporciona datos valiosos para la supervisión y la resolución de problemas. Airflow admite varias bases de datos como PostgreSQL, MySQL y SQLite para este propósito.
📨 𝗠𝗲𝘀𝘀𝗮𝗴𝗲 𝗕𝗿𝗼𝗸𝗲𝗿 (𝗼𝗽𝘁𝗶𝗼𝗻𝗮𝗹): En configuraciones distribuidas, donde se utiliza CeleryExecutor, se necesita un agente de mensajes para gestionar la comunicación entre el Scheduler y los Workers. El agente de mensajes, como RabbitMQ o Redis, ayuda a pasar información de tareas del Scheduler a los Workers y garantiza la ejecución confiable y eficiente de las tareas en un entorno distribuido.
Airflow es una herramienta poderosa para administrar flujos de trabajo de datos, y comprender su arquitectura es clave para garantizar su uso eficaz en su organización. Por lo tanto, si está buscando una plataforma confiable para administrar sus tareas de ingeniería de datos, ¡definitivamente vale la pena considerar Airflow!
Instalación
Tengo algunas preguntas sobre el inicio de Airflow.
Para instalar Airflow tenemos 3 maneras diferentes de lograr esto:
-
Instalar Airflow con el comando
pip install apache-airflow
, inicializo la base de datos conairflow initdb
y arrancar el servidor web con el comandoairflow webserver -p 8080
con esto puedes ir alocalhost:8080
para acceder a la interfaz de usuario. -
Iniciar Airflow con el comando
airflow standalone
. -
Descarga el siguiente archivo
docker-compose.yaml
concurl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.0/docker-compose.yaml'
. Creo los directoriosdags
,logs
,plugins
y el archivo.env
con los siguientes comandos:
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
Luego ejecuta el comando docker-compose up airflow-init
y luego Inicia los servicios con docker-compose up
.
Consideraciones
Diferencia entre airflow webserver -p 8080
y airflow standalone
-
airflow webserver -p 8080
: Este comando inicia solo el servidor web de Airflow en el puerto 8080. Pero aun necesitas ejecutar otros comandos para iniciar elscheduler
y otros componentes necesarios para que Airflow funcione completamente. -
airflow standalone
: Este comando es una forma simplificada de iniciar Airflow en un solo proceso. Inicia tanto el servidor web como elscheduler
en un entorno de prueba. Es una forma rápida de tener Airflow funcionando sin necesidad de múltiples comandos, pero no es recomendado para entornos de producción.
Inicialización y dependencia de servicios en docker-compose
El comando docker-compose up airflow-init
inicializa la base de datos y otros requisitos iniciales de Airflow. Luego, se debe ejecutar docker-compose up
para iniciar los demás servicios.
No se puede lanzar directamente docker-compose up
sin antes lanzar docker-compose up airflow-init
porque varios servicios en docker-compose.yaml
dependen de la finalización exitosa del servicio airflow-init
Revisemos:
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
Esto asegura que el servicio airflow-init
se complete antes de iniciar otros servicios, garantizando que la base de datos y otras configuraciones iniciales estén listas.
Instalación dependencias en contenedores Docker
Puedes instalar dependencias adicionales con el comandopip install 'tu dependiencia'
, o con el comando pip install -r requeriments.txt
con esto se instala en el entorno local donde ejecutaste el comando. Para que esto también esté disponible en el entorno Docker, debes incluir tu instalación en el Dockerfile
o en el docker-compose.yaml
.
Aquí hay dos métodos para hacerlo:
Método 1: Modificar el Dockerfile
Si tienes un Dockerfile
para Airflow, añade la instalación del plugin:
FROM apache/airflow:2.5.0
USER root
RUN pip install 'apache-airflow-providers-apache-spark'
RUN copy requeriments.txt .
RUN pip install requeriments.txt
USER airflow
Método 2: Modificar docker-compose.yaml
Añade un comando de instalación en el docker-compose.yaml
:
services:
airflow-webserver:
image: apache/airflow:2.5.0
...
command: >
bash -c "pip install 'apache-airflow-providers-apache-spark' &&
airflow webserver"
...
airflow-scheduler:
image: apache/airflow:2.5.0
...
command: >
bash -c "pip install 'apache-airflow-providers-apache-spark' &&
airflow scheduler"
...
# Añade el comando de instalación para otros servicios si es necesario
Después de realizar estos cambios, reconstruye y reinicia los servicios Docker:
docker-compose down
docker-compose up --build
Esto asegurará que el plugin apache-airflow-providers-apache-spark
esté instalado en todos los contenedores necesarios.
DAGs
En Apache Airflow, un DAG (Directed Acyclic Graph, por sus siglas en inglés) es una estructura que representa un flujo de trabajo. Un DAG define las tareas que se deben ejecutar y la secuencia en la que deben realizarse. Cada nodo en el DAG representa una tarea, y las flechas (aristas) muestran las dependencias entre estas tareas.
Ejemplo Sencillo de un DAG en Airflow
Supongamos que tienes un proceso simple que consta de tres tareas:
- Extraer Datos: Descargar datos de una API.
- Transformar Datos: Limpiar y transformar los datos descargados.
- Cargar Datos: Subir los datos transformados a una base de datos.
En este ejemplo, la tarea de transformar datos depende de que se complete la tarea de extracción de datos, y la tarea de cargar datos depende de que se complete la tarea de transformación de datos. El DAG en este caso sería:
Extraer Datos -> Transformar Datos -> Cargar Datos
Creando un DAG en Airflow
A continuación, se muestra un código simple para definir este DAG en Python usando Airflow:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def extraer_datos():
# Código para extraer datos de la API
print("Datos extraídos")
def transformar_datos():
# Código para transformar los datos
print("Datos transformados")
def cargar_datos():
# Código para cargar los datos en la base de datos
print("Datos cargados")
# Definir el DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1
}
dag = DAG(
'simple_etl',
default_args=default_args,
description='Un DAG simple para ETL',
schedule_interval='@daily',
)
# Definir las tareas
tarea_extraer = PythonOperator(
task_id='extraer_datos',
python_callable=extraer_datos,
dag=dag,
)
tarea_transformar = PythonOperator(
task_id='transformar_datos',
python_callable=transformar_datos,
dag=dag,
)
tarea_cargar = PythonOperator(
task_id='cargar_datos',
python_callable=cargar_datos,
dag=dag,
)
# Establecer dependencias
tarea_extraer >> tarea_transformar >> tarea_cargar
Explicación:
- Importaciones: Se importan las clases necesarias de Airflow y Python.
- Funciones: Se definen tres funciones que representan las tareas de extracción, transformación y carga de datos.
- default_args: Se definen los argumentos predeterminados para el DAG, como el propietario, la fecha de inicio y el número de reintentos.
- dag: Se crea una instancia del DAG con un nombre, argumentos predeterminados, una descripción y un intervalo de programación (en este caso, diario).
- PythonOperator: Se crean operadores Python para cada tarea, asignando una función a cada uno.
- Dependencias: Se establecen las dependencias entre las tareas usando el operador
>>
, indicando quetarea_extraer
debe completarse antes de que comiencetarea_transformar
, y quetarea_transformar
debe completarse antes de que comiencetarea_cargar
.
Beneficios de Usar DAGs en Airflow
- Modularidad: Cada tarea se define por separado, facilitando su mantenimiento y actualización.
- Visualización: Airflow proporciona una interfaz gráfica que muestra el DAG y su estado, lo que facilita el monitoreo.
- Programación: Los DAGs pueden programarse para ejecutarse en intervalos regulares (diariamente, semanalmente, etc.).
- Manejo de Dependencias: Airflow maneja automáticamente las dependencias entre tareas, asegurando que se ejecuten en el orden correcto.
Airflow With Docker Compose
Utilizar Apache Airflow con Docker Compose ofrece ventajas significativas para el desarrollo y despliegue de workflows. Docker Compose simplifica la configuración y gestión al encapsular Airflow y sus dependencias en contenedores, asegurando portabilidad, consistencia y facilitando la gestión de versiones. El aislamiento de dependencias permite un despliegue más controlado y escalable de los servicios de Airflow, optimizando tanto el desarrollo como la operación de workflows complejos en entornos de desarrollo y producción.
Mini Data Platform
A partir de esta sección usaremos nuestra mini plataforma de datos preconfigurada para poder ejecutar tanto Airflow como otros servicios que veremos más adelante.
Ahora clonaremos el siguiente repositorio en la la Machine-0 en la carpeta repos https://github.com/xploiterx/mini-data-platform y desde la terminal nos ubicaremos en el repositorio clonado y ejecutaremos el siguiente comando:
docker compose up
Con este comando se desplegarán los contenedores Airflow, Minio, Jupyter Notebook, PostgreSQL y PgAdmin.
¿Qué es docker compose?
Docker Compose es una herramienta que te permite definir y gestionar aplicaciones Docker multi-contenedor de manera más sencilla que ejecutar los contenedores individuales. Además te permite especificar las dependencias entre contenedores, configurar sus servicios, y luego ejecutar y escalar toda la aplicación con un solo comando. Esto es super eficiente para desplegar en nuestro caso una plataforma de datos que replica los entornos de desarrollo empresariales.
Ahora revisemos algunos puntos clave sobre Docker Compose:
-
Definición Declarativa: Utiliza un archivo YAML para definir la configuración de la aplicación, incluyendo los servicios, volúmenes, redes y variables de entorno necesarios.
-
Multi-Contenedor: Permite definir y gestionar aplicaciones que constan de múltiples contenedores Docker que trabajan juntos, por ejemplo, una aplicación web con su base de datos y un servidor de backend.
-
Gestión de Dependencias: Docker Compose gestiona las dependencias entre los contenedores, asegurando que se inicien y se conecten correctamente según la configuración especificada.
-
Configuración Simplificada: Facilita la configuración de la red y los volúmenes entre los contenedores, lo que simplifica el despliegue y la gestión de aplicaciones complejas.
-
Comandos Unificados: Proporciona comandos simples para realizar tareas comunes como la creación, inicio, detención y eliminación de los servicios definidos en el archivo Compose.
Como viste, Docker Compose es una herramienta poderosa para gestionar aplicaciones Docker complejas, permitiendo a los desarrolladores definir, gestionar y desplegar aplicaciones multi-contenedor de manera más eficiente y consistente.
Docker Jupyter Notebook
Este es un contenedor especial desarrollado por Z2h que te permite ejecutar cargas de trabajo Python, Spark y R.