Task 1 : 지금까지 존재하는 역대 로또 당첨 내역 db에 저장하는 DAG 짜기

Task 2 : 주마다 나오는 새로운 로또 당첨 내역 새롭게 db에 추가하는 DAG 짜기

 

이때, 로또 데이터들이 주마다 url 형태로 저장되어있는 것을 발견해 이것을 이용하여 db에 추출하는 것으로 결정했다.

 

기존 테이블

 

원래 데이터는 이렇게 구성되어 있어서 여기서 내가 필요한 값들만 추출하여 TbLottoStatus 테이블을 만들어 현재까지 있는 데이터들을 모두 저장킬 수 있는 DAG을 구성했다.

 

Task 1 DAG

from operators.lotto_api_to_csv_operator import LottoApiToCsvOperator
from airflow import DAG
import pendulum


# 누적 로또 데이터 가져오기
with DAG(
    dag_id='dags_lotto_api',
    schedule=None,
    start_date=pendulum.datetime(2024,3,17, tz='Asia/Seoul'),
    catchup=False
) as dag:
    tb_lotto_status = LottoApiToCsvOperator(
        task_id='tb_lotto_status',
        dataset_nm='TblottoStatus',
        path='/opt/airflow/files/TbLottoStatus/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}',
        file_name='TbLottoStatus.csv'
    )

    tb_lotto_status

간소화시킨 테이블

 

위에 DAG은 누적된 데이터만 한번만 가져오면 되기때문에 따로 schedule을 설정하지 않았다. 그리고 Url 형태의 매 회차 데이터를 가져오기 위해 Custom operator를 만들어 TbLottoStatus.csv에 저장시키게 하였다.

 

Task 2 DAG

from operators.lotto_api_add_csv_operator import LottoApiAddCsvOperator
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from hooks.custom_postgres_hook_lotto import CustomPostgresHookLotto
from datetime import timedelta
from airflow import Dataset
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from config.on_failure_callback_to_kakao import on_failure_callback_to_kakao

dataset_dags_dataset_producer = Dataset("dags_lotto_data")

with DAG(
    dag_id='dags_lotto_data',
    schedule='0 0 * * 6',
    start_date=pendulum.datetime(2023,10,1, tz='Asia/Seoul'),
    catchup=False,
    default_args={
        'on_failure_callback':on_failure_callback_to_kakao,
        'execution_timeout': timedelta(seconds=180)
    }
) as dag:
    
    start_task = BashOperator(
        task_id='start_task',
        bash_command='echo "전 주 데이터 추가 작업 시작"'
    )

    tb_lotto_add = LottoApiAddCsvOperator(
        task_id='tb_lotto_add',
        outlets=[dataset_dags_dataset_producer],
        path='/opt/airflow/files/TbLottoAdd/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}',
        file_name='TbLottoStatus.csv',
        file = '/opt/airflow/files/TbLottoStatus/TbLottoStatus.csv',
        time = '{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}'

    )

    def insrt_postgres(postgres_conn_id, tbl_nm, file_nm, **kwargs):
        custom_postgres_hook = CustomPostgresHookLotto(postgres_conn_id=postgres_conn_id)
        custom_postgres_hook.bulk_load(table_name=tbl_nm, file_name=file_nm, delimiter=',', is_header=True, is_replace=True)

    insrt_postgresdb = PythonOperator(
        task_id='insrt_postgres',
        outlets=[dataset_dags_dataset_producer],
        python_callable=insrt_postgres,
        op_kwargs={'postgres_conn_id': 'conn-db-postgres-custom',
                   'tbl_nm':'lotto_add_table',
                   'file_nm':'/opt/airflow/files/TbLottoAdd/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash}}/TbLottoStatus.csv'}
    )

    def upload_to_s3(filename, key, bucket_name):
        hook = S3Hook('aws_default')
        hook.load_file(filename=filename,
                       key = key,
                       bucket_name=bucket_name,
                       replace=True)
    
    upload_s3 = PythonOperator(
        task_id = 'upload_s3',
        python_callable=upload_to_s3,
        op_kwargs={
            'filename' : '/opt/airflow/files/TbLottoAdd/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash}}/TbLottoStatus.csv',
            'key' : 'lotto/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash}}/TbLottoStatus.csv',
            'bucket_name' : 'morzibucket'
        })

    finish_task = BashOperator(
        task_id='finish_task',
        outlets=[dataset_dags_dataset_producer],
        bash_command='echo "전 주 데이터 추가 작업 완료"'
    )

    start_task >> tb_lotto_add >> [insrt_postgresdb, upload_s3] >> finish_task

 

해당 DAG의 경우는 조금 더 많은 Task 를 만들었다.

tb_lotto_add : 매주 금요일마다 새로운 로또 당첨 데이터 가져오기

insrt_postgresdb : 가져온 새로운 로또 데이터를 postgres db에 저장시키기

upload_s3 : 새로운 로또 데이터를 s3에 저장시키기

또한 여기서 혹시 어떠한 task에서 실패가 생길 경우 알 수 있게 on failure call back을 만들어 kakao로 알람오게 만들었다.

 

Task 2 DAG graph 구성

 

+ Recent posts