11주차 수요일, 53일차 Today I Learned
Airflow 고급기능 (3)
: 기타 기능 - Dag Dependencies, Task Grouping, Dynamic Dags
✏️ 학습 내용
1. Dag Dependencies
DAG간 (혹은 Task간) 실행 순서 (혹은 수행 여부)를 정하려면 어떻게 해야 하는지 알아보자.
DAG를 실행하는 방법, 즉 스케줄되도록 하는 방법은 2가지가 있다. 이번에는 2가지 방법 중 '다른 Dag에 의해 트리거'하는 방법에 대해 알아볼 예정이다. 이 방법은 또다시 2가지 방식으로 나뉜다. 이 방식은 트리거에도 똑같이 적용될 수 있다.
- Schedule로 지정하여 주기적 실행하기
- 다른 Dag에 의해 트리거하기
- Explicit Trigger : Dag A가 분명하게 Dag B를 트리거 (TriggerDagRunOperator)
- Reactive Trigger : Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)
ExternalTaskSensor
특정 DAG의 task로, 다른 DAG의 특정 task가 끝났는지를 체크한다. 먼저 동일한 schedule_interval을 사용하며, 이 경우 두 task들의 Execution Date가 동일해야 하고, 그렇지 않으면 매칭이 안 된다. 이처럼 조건이 까다로워서 잘 사용하지 않을 수도 있다.
BranchPythonOperator
상황에 따라 뒤에 실행되어야 할 task를 동적으로 결정해주는 오퍼레이터로, 미리 정해준 오퍼레이터들 중에 선택하는 형태로 돌아간다. TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있다.
LatestOnlyOperator
Time-sensitive한 task들이 과거 데이터의 backfill 시에 실행되는 것을 막기 위한 오퍼레이터이다. 현재 시간이 지금 task가 처리하는 execution_date보다 미래이고 다음 execution_date보다 과거인 경우에만 뒤로 실행을 이어가고 아니면 중단되도록 한다.
Trigger Rules
Upstream task의 성공실패 상황에 따라 뒷단 task의 실행여부를 결정하고 싶을 때 사용할 수 있다. Operator에 trigger_rule이라는 파라미터로 결정 가능하다. 여기에 입력될 수 있는 값은 다양하며, 기본적으로 all_success가 적용되어 모두 성공해야 실행된다. 이 외 all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success 등 다양하다.
(참고) Jinaja Template
Python에서 널리 사용되는 템플릿 엔진으로, 장고 템플릿 엔진에서 영감을 받아 개발되었으며, Flask에서 사용된다. Jinja 이용 시 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML을 생성한다.
- 변수 : 이중 괄호 {{ }} 로 감사서 사용
- 제어문 : 퍼센트 기호 {% %}로 표시
Airflow에서 Jinja 템플릿을 사용하면 작업 이름, 파라미터, SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능하고, 이를 통해 재사용 가능하며 사용자 정의 가능한 워크플로우를 생성한다. 따라서 {{ ds }} 형식으로 execution_date을 코드 내에서 쉽게 사용하거나 파라미터 등으로 넘어온 변수를 쉽게 사용 가능하다.
(참고) BashOperator 레퍼런스 :
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/bash/index.html
마지막 부분에 (template) 표시가 있다면 Jinja Template 사용 가능하다는 뜻이다.
(참고) Sensor
Sensor는 특정 조건이 충졸될 때까지 대기하는 Operator로, 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용하다. Airflow는 몇 가지 내장 센서를 제공하며, 기본적으로 주기적으로 poke 한다. poke를 할지 결정해주는 파라미터는 mode로, mode의 값은 reschedule 혹은 poke가 되는데, poke가 기본값이며 이는 woker 하나를 붙잡고 계속 체크한다.
- FileSensor: 지정된 위치에 파일이 생길 때까지 대기
- HttpSensor: HTTP 요청을 수행하고 지정된 응답이 대기
- SqlSensor: SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
- TimeSensor: 특정 시간에 도달할 때까지 워크플로우를 일시 중지
- ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료를 대기
2. Task Grouping
task의 수가 많은 DAG라면 task들의 성격에 따라 관리하고 싶은 니즈가 존재한다. 이 때에 태스크 그룹핑이 필요해진다.
원래는 SubDAG가 사용되다가 Airflow 2.0에서 나온 Task Grouping으로 넘어가는 추세이다. 다수의 파일 처리를 하는 DAG라면 (1) 파일 다운로드 태스크들과 (2) 파일 체크 태스크와 (3) 데이터 처리 태스크들로 구성할 수 있다.
3. Dynamic Dags
Jinja Template, YAML을 기반으로 Dynamic Dags를 사용하면 코드 재사용을 최대화 할 수 있다. Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터를 제공해서 DAG를 동적으로 만든다. 이를 통해 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지할 수 있다. 오너가 다르거나 태스크의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋다. DAG를 계속해서 만드는 것과 한 DAG 안에서 태스크를 늘리는 것 사이의 밸런스는 필요하다.
💡 배운 점
- Dag Dependencies, DAG를 트리거하여 실행시키는 방법 2가지에 대해 배웠다.
- 이를 통해 다양한 오퍼레이터, 파라미터에 대해 배웠다.
- 추가로 Sensor에 대해 배웠다.
- 추가로 Jinja Template에 대해 배웠다.
- Task Grouping에 대해 배웠다.
- Dynamic DAGs에 대해 배웠다.
📝 남아있는 의문과 개선점
- Dag Dependencies에 대해서
☁️ 소감
기타 다른 기능이 무엇이 있는지에 대해 배웠다. 유명한 툴이다보니까 역시 기능이 많고, 유용하다. 3가지 기능에 대해서 개요 정도로 배웠는데 한 번 더 깊게 살펴보고 실습하여 적용해볼 수 있으면 좋을 것 같다. 그리고 이 외에 다른 기능에 대해서도 찾아보면 도움이 될 것 같다.
Jinja Template은 장고를 할 때 처음 알았는데, 너무 혁신적이었다. Airflow에서도 자주 사용될까?
'Data Engineering > grepp 데브코스 : TIL' 카테고리의 다른 글
[TIL_2024.01.05] dot & 데이터 카탈로그 (1) | 2024.01.05 |
---|---|
[TIL_2024.01.04] Airflow 운영과 대안 & dbt (1) | 2024.01.05 |
[TIL_2024.01.02] Airflow 고급기능 (2) : 구글시트 연동, API 및 Airflow 모니터링 (1) | 2024.01.03 |
[TIL_2024.01.01] Airflow 고급기능 (1) : ETL 구현 및 슬랙 (Slack)연동 (0) | 2024.01.03 |
[TIL_2023.12.22] Docker, K8s (5) : 서버 관리의 어려움, Container Orchestration, 쿠버네틱스 (0) | 2023.12.22 |