11주차 월요일, 51일차 Today I Learned
Airflow 고급기능 (1)
: ETL 구현 및 슬랙 (Slack)연동
✏️ 학습 내용
1. ETL 구현
1) Airflow Docker 환경 설정
① learn-airflow Github repo PULL or CLONE
② docker-compose.yaml 수정
③ docker compose up -d 수행
④ 웹 UI 로그인
실습에 앞서 Docker 환경 설정 세팅하기로 한다. 기존에 진행했으면 추가 업데이트 (PULL)하거나 새로 시작하면 복사 (CLONE)하면 된다. 위 4단계를 터미널에서 진행하고나면 세팅이 완료된다.
단, 여기에서 Airflow 실행환경 관리방안에 대해 생각해볼 수 있다.
- 기타 환경설정값들을 어떻게 관리하고 배포할까?
- 보통 docker-compose.yaml 파일에서 AIRFLOW_VAR, AIRFLOW_CONN 정의해서 관리하거나,
- 환경변수가 아닌 별도 credentials 전용 Secrets 백엔드라는 것을 사용한다.
- 어디까지 Airflow Image로 관리하고, 무엇을 docker-compose.yaml 에서 관리할까?
- 회사마다 다르지만 Airflow 자체 이미지를 만들고, 여기에 환경변수를 넣고, 이를 docker-compose.yaml 파일에서 사용
- 또는 docker-compose.yaml에서 환경변수를 직접 설정
- DAG 코드 또한 마찬가지로, 이미지로 코드를 복사하여 만드는 것이 더 깔끔하고, 아니면 개발/테스트용일 때는 docker-compose에서 host volume 형태로 설정
2) Summary 테이블 구현 (Airflow + Redshift)
CTAS로 ELT (Airflow의 DAG) 구현하여 Redshift의 Table로 만들어보자.
간단한 DAG를 구현할건데, PythonOperator를 만들고 params 파라미터를 설정하여 사용자별 채널 정보를 요약해볼 수 있다. 이 때 코드에서 CTAS로 구현하는데, CTAS 부분을 아예 별도의 환경설정 파일로 떼어낼 수도 있다. 이 경우 환경 설정 중심의 접근 방식으로, 비개발자들이 사용할 때 어려움을 덜 느끼고 그러면서 더 다양한 테스트를 추가할 수 있다. 물론 궁극적으로는 dbt가 가장 좋다.
2. 슬랙 Slack 연동
DAG 실행 중에 Task에서 에러가 발생하면 그것을 지정된 슬랙 워크스페이스 채널로 보내는 기능을 구현해보자.
① 슬랙 Workspace에 App 설정하고, 특정 채널과 연동하기
- 'A' workspace 밑에 'B' 라는 App 생성
- 'B'가 'C' 채널에 메세지를 보낼 수 있게 설정
- Incoming Webhooks App 생성하기 (api.slack.com/messaging/webhooks 참고)
- 테스트
② 연동을 위한 함수 제작 후, 이를 태스크에 적용되는 default_args의 on_failure_callback에 지정하기
- webhook url의 엔드포인트를 Variable로 저장하기
- 슬랙에 에러 메시지 보내는 별도 모듈 : slack.py
- 이를 DAG 인스턴스를 만들 때 에러 콜백 : NameGenderCSVtoRedshift_v4.py
💡 배운 점
- Airflow Docker 환경 설정을 복습했다.
- Airflow 실행환경 관리방안에 대한 여러 고민사항에 대해 알게 되었다.
- CTAS로 ELT (Airflow의 DAG) 구현하여 Redshift의 Table로 만드는 실습을 진행했다.
- DAG 실행 중 Task에서 에러가 발생하면 슬랙 메시지를 보내도록, 슬랙 연동 실습을 하였다.
📝 남아있는 의문과 개선점
- config 자세히 공부하기
☁️ 소감
강의를 들을 때는 오래 걸렸는데 TIL을 작성해보니까 간단하게 끝났다. (약간 허무) 다음에는 무엇을 하는 것인지 대충 훑어보고 강의를 들으면 머리 속에서 청사진이 그려지지 않을까 생각했다.