9주차 수요일, 43일차 Today I Learned
데이터 파이프라인 (3)
: Airflow DAG 작성
✏️ 학습 내용
Airflow DAG 작성 방법에 대해 알아본다.
1. Hello World 예제 프로그램
1. Python Operator, Task Decorator
2. DAG 파라미터
Operators에 ➀. Python Operator, ➁ Task Decorator가 있다.
➀ Python Operator : 2개의 태스크로 구성된 데이터 파이프라인
def print_hello() :
def print_goodbye() :
print_hello = PythonOperator(
task_id = 'print_hello',
python_callable = print_hello,
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag
)
➁ Task Decorator : 더 직관적으로 사용
@task
def print_hello() :
@task
def print_goodbye() :
파이썬 오퍼레이터는 함수 입력 후 하단에 따로 파이썬 오퍼레이터를 지정해주었지만, 태스크 데코레이터는 함수 위에 @task만 붙여주면 되기 때문에 프로그램이 더 직관적이다.
이 외 중요한 DAG 파라미터 :
- max_active_runs
- max_active_tasks
- catchup
DAG 파라미터와 Task 파라미터의 차이점에 대한 이해가 중요하다. 위의 파라미터는 모두 DAG 파라미터로, DAG 객체를 만들 때 지정해주어야 한다.
with DAG(
dag_id = 'id_name',
start_date = datetime(2023,12,13),
catchup = False # default : True
tags = ['example'],
schedule = '0 2 * * *' # 5개 인자 (분, 시, 일, 월, 요일)
) as dag:
💡 schedule interval 참고 : https://crontab.guru/#30_6_
2. Name Gender 예제 프로그램 포팅
Colab에서 작성한 파이썬 코드를 Airflow로 포팅할 것이다. 그리고 일부 내용을 개선한다.
- params를 통해서 변수를 넘긴다. (link, Redshift의 스키마와 테이블 이름)
- execution_data를 얻어낸다.
- Xcom 객체를 사용해서 3개의 Task로 나눈다. (E, T, L)
- Variable 이용하여 CSV parameter 넘긴다.
- Redshift Connection 사용한다.
- PythonOperator 대신에 Task Decorator 사용한다. (이 경우 Xcom 사용 필요 없음)
Connections and Variables
각각 데이터베이스 연결 및 공통 설정 값을 저장하고 관리하는 데 사용된다.
- Connections는 데이터베이스, API, FTP 등과 같은 외부 시스템과의 연결 정보를 저장하는 데 사용
- Variables는 Airflow에서 전역적으로 사용되는 변수 값을 저장하는 데 사용
Airflow DAG를 보다 동적이고 유연하게 만들어주며, 코드에서 하드코딩된 연결 정보나 변수를 피하고 재사용성과 유지 보수성을 높일 수 있다. Airflow의 세련된 스케줄링 및 실행 기능을 활용하기 위해서는 Connections와 Variables를 효과적으로 관리하는 것이 중요하다.
Xcom
Operator들 간에 데이터를 주고 받기 위한 방식으로, 보통 한 오퍼레이터의 리턴값을 다른 오퍼레이터에서 읽어가는 형태가 된다.
3. Yahoo Finance API DAG 작성
1) Full Refresh 기반
Yahoo Finance API를 호출하여 애플 주식 정보를 수집하고 (지난 30일), Redshift 상의 테이블로 레코드들을 적재한다. 매번 실행될 때마다 테이블을 삭제한 후 다시 레코드를 적재한다. 트랜잭션 형태로 구성해야 한다.
2) Incremental Update 기반
Yahoo Finance API를 호출하여 애플 주식 정보를 수집하고 (지난 30일), Redshift 상의 테이블로 레코드들을 적재하고 중복을 제거한다. 매일 하루치의 데이터씩 늘어나게 된다. Full Refresh와 달리 테이블을 삭제하지 않고, 레코드를 적재한 후에 중복 제거 후 테이블을 재로드하는 방식이다. 트랜잭션 형태로 구성해야 한다.
4. Airflow.cfg에 대해서
- DAGs 폴더는 기본적으로 Airflow가 설치된 디렉토리 밑의 dags 폴더가 되며, dags_folder 키에 저장된다.
- DAGs 폴더에 새로운 DAG를 만들면 실제로 Airflow 시스템에서 5분 (=300) 후에 알게 되며, 이 스캔 주기를 결정해주는 키의 이름은 `dag_dir_list_interval` 이다.
- 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 api 섹션의 auto_backend를 airflow.api.auth.backend.basic_auth로 변경하면 된다.
- Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 password, secret, passwd, authorization, api_key, apikey, access_token 같은 단어가 들어가야 한다.
- 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 sudo systemctl restart 명령어로 airflow-webserver, airflow-scheduler을 restart 해주어야 한다.
- Metadata DB의 내용을 암호화하기 위해서 사용되는 키는 fernet_key 이다.
☁️ 소감
실제 코딩을 해보는 시간을 가졌는데, 이해가 잘 되지 않는다. 이건 프로그래밍 언어적인 문제이다. 좀 더 많이 해보고, 공부하는 수밖에.. 파이썬과 SQL 모두 공부해야 한다. 그리고 Airflow의 기능들에 대해서도 더 알아봐야겠다.
'Data Engineering > grepp 데브코스 : TIL' 카테고리의 다른 글
[TIL_2023.12.15] 데이터 파이프라인 (5) : OLTP 복사, ELT (0) | 2023.12.15 |
---|---|
[TIL_2023.12.14] 데이터 파이프라인 (4) : PK 고유성, Backfill (0) | 2023.12.14 |
[TIL_2023.12.12] 데이터 파이프라인 (2) : Airflow 설치 및 기본 프로그램 실행 (0) | 2023.12.12 |
[TIL_2023.12.11] 데이터 파이프라인 (1) : 데이터 파이프라인 (ETL), Airflow 소개 (0) | 2023.12.12 |
[TIL_2023.12.01] 데이터 인프라, 고급 SQL, BI 대시보드 (5) : 시각화 대시보드 툴, Superset 실습 (0) | 2023.12.01 |