9주차 금요일, 45일차 Today I Learned
데이터 파이프라인 (5)
: OLTP 복사, ELT
✏️ 학습 내용
1. OLTP 테이블 복사하기
프로덕션 데이터베이스 (MySQL, OLTP)에서 데이터 웨어하우스 (Redshift, OLAP)로 데이터를 복사해오는 실습을 진행할 것이다.
- 방법 1. Full Refreh
- 방법 2. Incremental Update
- ROW_NUMBER (SQL) 활용
- UPSERT (Redshift 제공) 활용
프로덕션 데이터베이스는 OLTP 방식이며, 데이터 웨어하우스는 OLAP 방식이다. OLTP 테이블을 복사해서 OLAP 테이블을 만드는 것이 목적이다. 프로덕션 데이터베이스의 대표적인 예시인 MySQL에서 데이터 웨어하우스의 일종인 Redshift로 데이터를 복사해오는 방법은 두 가지가 존재한다.
Full Refresh 방법은 간단하게 전체를 삭제하고 다시 전체를 복사해오는 방식이며, Incremental Update는 특정 날짜나 기간을 지정하여 새롭게 추가된 레코드를 복제해오는 방식이다. 후자의 경우, 백필이 필요한데 이를 위해서 이용할 수 있는 문법이 2가지 있다. 그것이 바로 ROW_NUMBER를 사용하거나 Redshift에서 제공하는 UPSERT 문법을 사용하는 것이다.
레코드가 적은 경우, 데이터 소스에서 INSERT INTO 명령어를 통해 적재할 수 있지만, 우리는 데이터 소스를 읽어와서 파일로 저장하고, 클라우드 스토리지 (S3)에 업로드하여 벌크 복제를 할 것이다. 이 때, S3에 대해 Connection이 필요하고, 보안 세팅이 필요하다. 보안 세팅은 Airflow DAG에서 S3에 접근하기 위한 쓰기 권한, Redshift가 S3에 접근하기 위한 읽기 권한 2가지가 필요하다.
Docker Airflow에서 MySQLdb 모듈이 없다는 에러가 발생할 수 있는데, root 유저로 로그인하여 몇 가지 명령어를 실행하여 에러를 해결할 수 있다.
이미 존재하는 오퍼레이터를 이용할 것이다.
1-1. OLTP 테이블 복사하기 (Full Refresh 방법)
- DAG : MySQL_to_Redshift
- Task 1 : SqlToS3Operator - 데이터 소스를 읽어서 S3에 파일 형태로 적재한다.
- replace를 True로 설정한다. - 오버라이딩 설정하여 덮어쓴다.
- pd_kwargs에서 index, header를 False로 설정한다. - 일련번호와 헤더를 복제하지 않는다.
- Task 2 : S3ToRedshiftOperator - COPY 명령으로 Redshift에 벌크 업데이트를 한다.
- method를 REPLACE로 설정한다. - Full Refresh
- Task 1 : SqlToS3Operator - 데이터 소스를 읽어서 S3에 파일 형태로 적재한다.
1) AWS S3 Connections 설정 (IAM User 설정)
2) Redshift S3 Connections 설정 (IAM Role 설정)
3) MySQL 관련 모듈 설치 (Docker 이용)
4) MySQL_to_Redshift DAG 실행
1-2. OLTP 테이블 복사하기 (Incremental Update 방법 - UPSERT 방식)
Incremental Update 방식에서 Redshift가 제공하는 UPSERT 명령어를 활용해 볼 것이다.
created (timestamp) : Optional, modified (timestamp), deleted (boolean) : True로 설정되어 있어야 한다.
- DAG : MySQL_to_Redshift_v2
- ROW_NUMBER로 직접 구현
- 우선 임시 테이블을 만들어 복사한다.
- 원본 테이블의 레코드 중 modified 날짜가 execution_date에 해당하는 모든 레코드를 임시 테이블에 복사한다.
- 임시테이블 레코드들을 PK 기준으로 파티션하고, modified 기준으로 정렬하여 일련번호 1인 것만 다시 원래 테이블에 복사한다.
- UPSERT를 위해 S3ToRedshiftOperator 이용
- query에 DATE(modified) = DATE(execution_date) 조건을 부여한다.
- method를 UPSERT로 지정한다.
- upsert_keys 파라미터로 Primary key를 지정한다.
- ROW_NUMBER로 직접 구현
1) AWS S3 Connections 설정 (IAM User 설정)
2) Redshift S3 Connections 설정 (IAM Role 설정)
3) MySQL 관련 모듈 설치 (Docker 이용)
4) MySQL_to_Redshift_v2 DAG 실행
1) AWS S3 Connections 설정 (IAM User 설정) 방법
- AWS IAM의 Users로 접속하여 User를 생성한다.
- Full Access 권한 or 특정 버킷에 대한 권한을 부여한다. (후자의 경우가 더 안전하다)
- Users에서 해당 Account에 접속하여 Access key 2가지를 생성하여 Access key, Secret Access key 정보를 알아낸다.
(Secret Access key는 첫 생성 시에만 정보가 뜨므로 별도 저장해놓아야 한다.)
2) Redshift S3 Connections 설정 (IAM Role 설정) 방법
- AWS IAM의 Role로 접속하여 Role을 설정한다.
- Full Access 권한 or 특정 버킷에 대한 권한을 부여한다. (후자의 경우가 더 안전하다)
- Redshift 콘솔에서 역할 권한을 지정한다. (클러스터 선택 후 Manage IAM roles)
3) MySQL 관련 모듈 설치 (Docker 이용) 방법
- root 유저로 Airflow Scheduler Container 로그인
- 모듈들 업그레이드하기
# 컨테이너 ID 확인하여 root 유저로 로그인하기
docker ds
docker exec --user root -it {ID} sh
# APT 프로그램 매니저 업데이트 + MySQL 클라이언트 라이브러리 업데이트 + gcc 컴파일러 설치 + MySQL 모듈 재설치
sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
4) Airflow 유저로 다시 로그인하여 DAG 실행 가능
docker exec -it {id} sh
airflow dags test {DAG 이름}
2. Backfill 실행
모든 DAG가 백필이 필요한 것은 아니고, Incremental Update 방식에는 백필이 필요하다. Airflow에서 추천하는 방식으로 구현했다면 백필이 쉬워진다. 여기서 백필이란 일별 혹은 시간별로 업데이트하는 경우를 의미한다. 데이터의 크기가 커지면 백필 기능을 구현해두는 것이 필수이며, 이 때 Airflow가 큰 도움이 된다. 하지만 데이터 소스의 도움 없이는 불가능하다.
백필의 실행 방법은 3가지가 있다.
- 한 번에 하나씩 여러 번 실행 - Airflow dags test {DAG 이름} {날짜} 에서 원하는 날짜를 입력하여 실행
- 한 번에 여러 날짜 동시 실행
- 단, DB 과부화, 충돌 문제가 발생할 수도 있다.
- max_active_runs로 제어 가능하다. - 1로 세팅하여 한 번씩 연달아서 실행된다.
- Backfill 커맨드를 이용한다. - 시작 날짜와 종료 날짜 지정 가능하다.
3. Summary Table 구현 (ELT 구현)
해당 내용은 강의에서 다루지 않았다.
💡 배운 점
- Full Refresh 방식 DAG 구현 방법
- Incremental Update 방식 DAG 구현 방법
- ROW_NUMBER 이용 구현
- UPSERT 이용 구현
- Back fill 적용
📝 남아있는 의문과 개선점
- 데이터 소스가 백필 방식을 지원해야 한다는 것은 무슨 말일까?
- 멱등성에 대한 이해가 부족하다.
- 스케일링 방식에 대하여
☁️ 소감
Airflow 기초적인 내용에 대한 공부를 마쳤다. (강의는 일단 다 들었다는 말이 더 정확할 것 같다.) 어제 강의에 비해서는 좀 더 쉽게 이해가 갔다. 반복의 힘일지도 모른다. DAG, 즉 데이터 파이프라인을 구현하는 방식이 2가지 있으며, 각각 어떤 장단점이 있는지, 그리고 Incremental Update 방식에서는 백필이 필요한데 이 때 어떻게 구현할 수 있는지에 대해서 배웠다.
무엇을 배웠는 지에 대해서 파악을 하고 있으니까 좀 더 고도화하는 것은 시간을 들이면 가능할 것이다. 과제나 실습 등을 통해서 한 번 더 정리하면 좋을 것 같지만, 컴퓨터 사양이 좋지 않아 고민이 된다.
같은 수강생 중 한 분께 개인서버에서 Airflow를 설치해서 사용한 방법에 대해 설명을 듣기로 했다. :)
'Data Engineering > grepp 데브코스 : TIL' 카테고리의 다른 글
[TIL_2023.12.19] Docker, K8s (2) : CI/CD와 Github Actions, Test 및 Dockerization (0) | 2023.12.19 |
---|---|
[TIL_2023.12.18] Docker, K8s (1) : 도커 소개 및 설치, 간단한 예제 (1) | 2023.12.19 |
[TIL_2023.12.14] 데이터 파이프라인 (4) : PK 고유성, Backfill (0) | 2023.12.14 |
[TIL_2023.12.13] 데이터 파이프라인 (3) : Airflow DAG 작성 (0) | 2023.12.13 |
[TIL_2023.12.12] 데이터 파이프라인 (2) : Airflow 설치 및 기본 프로그램 실행 (0) | 2023.12.12 |