이번에는 저번에 데이터베이스에 적재한 데이터를 가져와 scikit-learn 라이브러리를 사용하여 다음주 로또번호를 미리 예측해보았다. 이때 크게 두가지 task로 나누어 프로젝트를 진행했다. 

 

Task 1 : 매주 금요일 db에서 데이터를 가져와 예측 로또 번호 생성

Task 2 : 생성한 로또번호를 카카오와 이메일로 메세지 보내기

 

Task 1

import pandas as pd
from sklearn.linear_model import LinearRegression


def predict_lotto_num():
    tb = pd.read_csv("/opt/airflow/files/TbLottoStatus/TbLottoStatus.csv")
    tb = tb.drop(columns=['returnValue','drwNoDate'])
    tb = tb[['drwtNo1','drwtNo2','drwtNo3','drwtNo4','drwtNo5','drwtNo6','bnusNo']]
    tb['num'] = 0
    tb = tb.sample(n=700)
    tb.head()


    x = tb['num']
    num_1 = tb['drwtNo1']
    num_2 = tb['drwtNo2']
    num_3 = tb['drwtNo3']
    num_4 = tb['drwtNo4']
    num_5 = tb['drwtNo5']
    num_6 = tb['drwtNo6']
    num_bns = tb['bnusNo']

    pre_num_1 = LinearRegression()
    pre_num_1.fit(x.values.reshape(-1,1),num_1)
    ans_1 = pre_num_1.predict([[len(tb)+1]])[0]

    pre_num_2 = LinearRegression()
    pre_num_2.fit(x.values.reshape(-1,1),num_2)
    ans_2 = pre_num_2.predict([[len(tb)+1]])[0]

    pre_num_3 = LinearRegression()
    pre_num_3.fit(x.values.reshape(-1,1),num_3)
    ans_3 = pre_num_3.predict([[len(tb)+1]])[0]

    pre_num_4 = LinearRegression()
    pre_num_4.fit(x.values.reshape(-1,1),num_4)
    ans_4 = pre_num_4.predict([[len(tb)+1]])[0]

    pre_num_5 = LinearRegression()
    pre_num_5.fit(x.values.reshape(-1,1),num_5)
    ans_5 = pre_num_5.predict([[len(tb)+1]])[0]
    pre_num_6 = LinearRegression()
    pre_num_6.fit(x.values.reshape(-1,1),num_6)
    ans_6 = pre_num_6.predict([[len(tb)+1]])[0]

    pre_num_bns = LinearRegression()
    pre_num_bns.fit(x.values.reshape(-1,1),num_bns)
    ans_bns = pre_num_bns.predict([[len(tb)+1]])[0]

    this_week = []
    this_week.append(int(round(ans_1,0)))
    this_week.append(int(round(ans_2,0)))
    this_week.append(int(round(ans_3,0)))
    this_week.append(int(round(ans_4,0)))
    this_week.append(int(round(ans_5,0)))
    this_week.append(int(round(ans_6,0)))
    this_week.append(int(round(ans_bns,0)))
    print(this_week)
    return this_week

 

로또 추천 방법의 경우 현재까지 있는 데이터 중 700개를 sample로 하여 LinearRegression 기법을 이용해 번호를 추천하기로했다. 

 

Task1,2 DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from common.predict import predict_lotto_num
from airflow.operators.email import EmailOperator
from airflow.decorators import task
from airflow import Dataset
from datetime import timedelta
import pendulum
from config.on_failure_callback_to_kakao import on_failure_callback_to_kakao
from config.send_msg_to_kakao import send_success_msg_to_kakao
from hooks.custom_postgres_hook import CustomPostgresHook

dataset_dags_dataset_producer = Dataset("dags_lotto_data")

with DAG(
    dag_id="dags_recommend_lotto_num",
    schedule=[dataset_dags_dataset_producer],
    start_date=pendulum.datetime(2023, 10, 1, tz="Asia/Seoul"),
    catchup=False,
    tags=["lotto","recommend"],
    default_args={
        'on_failure_callback':on_failure_callback_to_kakao,
        'execution_timeout': timedelta(seconds=180)
    }

) as dag:
    @task(task_id='inner_function1')
    def inner_func1(**kwargs):
        print('로또 번호 추천 작업 시작')

    predict_lotto_num1 = PythonOperator(
        task_id = 'predict_lotto_num',
        python_callable = predict_lotto_num
            
    )
    

    def select_postgres(postgres_conn_id, tbl_nm, **kwargs):
        custom_postgres_hook = CustomPostgresHook(postgres_conn_id=postgres_conn_id)
        custom_postgres_hook.select(table_name=tbl_nm)

    select_postgresdb = PythonOperator(
        task_id='select_postgres',
        python_callable=select_postgres,
        op_kwargs={'postgres_conn_id': 'conn-db-postgres-custom',
                   'tbl_nm':'lotto_add_table'}
    )
    send_num_to_email = EmailOperator(
            task_id='send_email',
            to='fresh0911@naver.com',
            subject='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
            html_content='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 이번 주 추천 번호는 <br> \
            {{ti.xcom_pull(task_ids="python_t1")}} 입니다 <br>'
    )

    send_num_to_kakao = PythonOperator(
        task_id = 'send_num_to_kakao',
        python_callable = send_success_msg_to_kakao,
        
    )
            

    inner_func1() >> [predict_lotto_num1, select_postgresdb]
    predict_lotto_num1 >> send_num_to_email
    select_postgresdb >> send_num_to_kakao

 

DAG의 경우, 금요일에 전 주 로또 데이터가 추가되면 해당 DAG이 실행되도록 하였다. email로 전송하는 경우는 airflow 컨테이너에 저장되는 데이터를 이용하였고 카카오 메시지로 보내는 경우 postgres db에있는 데이터를 불러와 사용하였다. 

 

Task1,2 DAG 그래프 구성

 

메일로 오는 추천번호
카카오톡 메세지로 오는 추천 번호

 

+ Recent posts