9주차 목요일, 44일차 Today I Learned
데이터 파이프라인 (4)
: PK 고유성, Backfill
✏️ 학습 내용
1. Open Weathermap DAG 구현하기 (API를 사용하여 DAG 만들기)
위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스에서 API key를 받아와서 사용하려고 한다. API Key를 open_weather_api_key라는 Variable로 저장할 것이다. 서울의 8일간 낮/최소/최대 온도를 읽는 DAG를 만들어 보겠다.
Open Weathermap의 one call API를 사용해서 정보를 읽어와서 각자 스키마 밑의 weather_forecast라는 테이블로 저장한다.
CREATE TABLE weather_forecast(
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE());
temp, min_temp, max_temp는 각각 낮 온도, 최소 온도, 최대 온도를 뜻한다. 날짜를 우선 PK로 지정했고, created_date는 레코드 생성 시간으로 자동 채워지는 필드인데, 중복 처리를 할 용도로 만들었다.
One-Call API는 결과를 JSON 형태로 리턴해준다. 이를 읽기 위해서 requests.get 결과의 text를 JSON으로 변환하거나 결과 오브젝트가 제공해주는 .json() 함수를 사용하면 된다. 결과 JSON에서 daily라는 필드에 앞으로 8일간 날씨 정보가 들어가 있다.
Airflow Connections를 통해 Redshift connection을 만들고, Full Refresh 기반으로 DAG를 구현하면 된다.
2. Primary key Uniqueness
테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드를 지정할 수 있다. (하나의 필드가 일반적이지만 다수의 필드를 지정할 수도 있긴 하다) 이는 CREATE TABLE 사용 시에 지정하는데, 관계형 데이터베이스 시스템이 PK의 값이 중복 존재하는 것을 막아준다 하지만 빅데이터 기반의 데이터 웨어하우스에서는 PK의 고유성(유일성)을 보장하지 않는다.
이를 보장하는 것은 데이터 인력의 책임이며, 보장하기에 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 되기 때문에 유일성을 보장해주지 않는다. 따라서 PK를 유지하는 방법이 중요해진다.
결론만 먼저 말하자면, ROW_NUMBER 함수를 통해서 유지할 수 있다.
ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_tate DESC) seq
date별로 레코드를 모으고, 그 안에서 created_date의 역순으로 소팅한 후, 일련 번호를 부여하고, seq가 1인 것만 가져오면 된다.
그 전에 임시 테이블을 만들고 거기로 현재 모든 레코드를 복사한 다음, 임시 테이블에 새로 데이터 소스에서 읽어들인 레코드들을 복사해야 한다. 그리고 중복을 걸러주는 SQL을 ROW_NUMBER 이용하여 작성하고, 이를 바탕으로 최종 원본 테이블로 복사하면 된다.
- 원래 테이블의 내용을 임시 테이블로 복사
- 임시 테이블에 레코드를 추가 (중복 데이터 존재 가능) - SQL ROW_NUMBER 이용
- 원본 테이블에서 레코드들을 삭제
- 중복을 없앤 형태로 새로운 테이블 생성
Auto Commit이 True인 경우 3-4번을 묶어야 데이터 정합성이 유지될 수 있다.
3. Backfill
백필이란 실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야 하는 경우를 의미한다.
관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 된다. 이를 어떻게 관리하느냐가 데이터 엔지니어의 삶에 큰 영향을 주는데, Airflow에서는 이러한 Backfill을 쉽게 할 수 있다. Backfill은 Full Refresh에서는 필요 없고, Incremental Update에서만 필요하다. 따라서 가능하다면 Full Refresh를 사용하는 것이 더 좋다. Incremental Update는 효율성은 더 좋아도 운영 및 유지보수 난이도가 올라간다.
Backfill과 관련된 Airflow 변수들
- start_date : DAG가 처음 읽어와야 하는 데이터의 날짜와 시간 (첫 실행 날짜는 start_date + DAG의 실행주기)
- execution_date : DAG가 읽어와야 하는 데이터의 날짜와 시간
- catchup : DAG가 처음 활성화된 시점이 start_date보다 미래라면 그 사이에 실행이 안 된 것들을 어떻게 할 것인지 결정해주는 파라미터 (기본값 : True - 실행 안 된 것들을 모두 따라잡으려고 함)
- end_date : 백필을 날짜 범위에 대해 하는 경우에만 필요하며 보통 필요하지 않음
4. Airflow의 Backfill 관련 동작 (퀴즈)
- Airflow에서 하나의 DAG는 다수의 Task로 구성된다.
- 매일 동작하는 DAG의 start date가 2023-12-13이라면 이 DAG의 첫 실행 날짜는 2023-12-14이다.
- 위 DAG의 경우 execution_date로 들어오는 날짜는 2023-12-13이다.
- Schedule interval이 "30 * * * *"으로 설정된 DAG에 대한 설명은 매시 30분마다 한 번씩 실행된다는 것이다
- Schedule interval이 "0 * * * *"으로 설정된 DAG의 start date가 "2023-12-13 00:00:00"이라면, DAG의 첫 번째 실행 날짜와 시간은 2023-12-13 01:00:00 이다.
- Airflow의 DAG가 처음 ON되었을 때 start_date와 현재 날짜 사이에 실행이 안 된 run들이 있을 경우 catchup 파라미터에 의해 결정된다.
- Redshift에서 큰 데이터를 테이블로 복사하는 방식은 복사할 레코드들을 파일로 저장해서 S3에 올린 후, Redshift로 벌크 복사하는 방식이다.
💡 배운 점
- Full Refresh 방식으로 DAG를 구현해보았다.
- Incremental Update 방식으로 DAG를 구현해보았다.
- PK 유일성을 보장하는 방법에 대해 배웠다. - ROW_NUMBER 이용, 혹은 UPSERT 이용
- 백필에 대해서 배웠다.
- 백필과 관련된 변수들에 대해 배웠다. - start_date, execution_date, catchup, end_date
☁️ 소감
아직 DAG를 작성하는 방법에 대해서는 이해가 쉽지는 않다. 퀴즈 내용이 일부 헷갈리는 것을 봐서는 더 상세히 작동 방식에 대해서 익혀야 할 것 같다.
아무래도 빅데이터를 다루어야 의미가 있는 직업이다 보니까, 한 번 자칫 실수로 코드를 만졌다가는 큰 실수로 이어지겠다는 아찔한 생각이 든다. 실수 한 번에 시간이나 금전적인 측면에서의 비용이 많이 들 것 같다. 조심 또 조심 해야 할 것 같은데, 그러기 위해서는 아는 것이 힘일 것이다!
'Data Engineering > grepp 데브코스 : TIL' 카테고리의 다른 글
[TIL_2023.12.18] Docker, K8s (1) : 도커 소개 및 설치, 간단한 예제 (1) | 2023.12.19 |
---|---|
[TIL_2023.12.15] 데이터 파이프라인 (5) : OLTP 복사, ELT (0) | 2023.12.15 |
[TIL_2023.12.13] 데이터 파이프라인 (3) : Airflow DAG 작성 (0) | 2023.12.13 |
[TIL_2023.12.12] 데이터 파이프라인 (2) : Airflow 설치 및 기본 프로그램 실행 (0) | 2023.12.12 |
[TIL_2023.12.11] 데이터 파이프라인 (1) : 데이터 파이프라인 (ETL), Airflow 소개 (0) | 2023.12.12 |