이번에는 저번에 데이터베이스에 적재한 데이터를 가져와 scikit-learn 라이브러리를 사용하여 다음주 로또번호를 미리 예측해보았다. 이때 크게 두가지 task로 나누어 프로젝트를 진행했다.
Task 1 : 매주 금요일 db에서 데이터를 가져와 예측 로또 번호 생성
Task 2 : 생성한 로또번호를 카카오와 이메일로 메세지 보내기
Task 1
import pandas as pd
from sklearn.linear_model import LinearRegression
def predict_lotto_num():
tb = pd.read_csv("/opt/airflow/files/TbLottoStatus/TbLottoStatus.csv")
tb = tb.drop(columns=['returnValue','drwNoDate'])
tb = tb[['drwtNo1','drwtNo2','drwtNo3','drwtNo4','drwtNo5','drwtNo6','bnusNo']]
tb['num'] = 0
tb = tb.sample(n=700)
tb.head()
x = tb['num']
num_1 = tb['drwtNo1']
num_2 = tb['drwtNo2']
num_3 = tb['drwtNo3']
num_4 = tb['drwtNo4']
num_5 = tb['drwtNo5']
num_6 = tb['drwtNo6']
num_bns = tb['bnusNo']
pre_num_1 = LinearRegression()
pre_num_1.fit(x.values.reshape(-1,1),num_1)
ans_1 = pre_num_1.predict([[len(tb)+1]])[0]
pre_num_2 = LinearRegression()
pre_num_2.fit(x.values.reshape(-1,1),num_2)
ans_2 = pre_num_2.predict([[len(tb)+1]])[0]
pre_num_3 = LinearRegression()
pre_num_3.fit(x.values.reshape(-1,1),num_3)
ans_3 = pre_num_3.predict([[len(tb)+1]])[0]
pre_num_4 = LinearRegression()
pre_num_4.fit(x.values.reshape(-1,1),num_4)
ans_4 = pre_num_4.predict([[len(tb)+1]])[0]
pre_num_5 = LinearRegression()
pre_num_5.fit(x.values.reshape(-1,1),num_5)
ans_5 = pre_num_5.predict([[len(tb)+1]])[0]
pre_num_6 = LinearRegression()
pre_num_6.fit(x.values.reshape(-1,1),num_6)
ans_6 = pre_num_6.predict([[len(tb)+1]])[0]
pre_num_bns = LinearRegression()
pre_num_bns.fit(x.values.reshape(-1,1),num_bns)
ans_bns = pre_num_bns.predict([[len(tb)+1]])[0]
this_week = []
this_week.append(int(round(ans_1,0)))
this_week.append(int(round(ans_2,0)))
this_week.append(int(round(ans_3,0)))
this_week.append(int(round(ans_4,0)))
this_week.append(int(round(ans_5,0)))
this_week.append(int(round(ans_6,0)))
this_week.append(int(round(ans_bns,0)))
print(this_week)
return this_week
로또 추천 방법의 경우 현재까지 있는 데이터 중 700개를 sample로 하여 LinearRegression 기법을 이용해 번호를 추천하기로했다.
Task1,2 DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from common.predict import predict_lotto_num
from airflow.operators.email import EmailOperator
from airflow.decorators import task
from airflow import Dataset
from datetime import timedelta
import pendulum
from config.on_failure_callback_to_kakao import on_failure_callback_to_kakao
from config.send_msg_to_kakao import send_success_msg_to_kakao
from hooks.custom_postgres_hook import CustomPostgresHook
dataset_dags_dataset_producer = Dataset("dags_lotto_data")
with DAG(
dag_id="dags_recommend_lotto_num",
schedule=[dataset_dags_dataset_producer],
start_date=pendulum.datetime(2023, 10, 1, tz="Asia/Seoul"),
catchup=False,
tags=["lotto","recommend"],
default_args={
'on_failure_callback':on_failure_callback_to_kakao,
'execution_timeout': timedelta(seconds=180)
}
) as dag:
@task(task_id='inner_function1')
def inner_func1(**kwargs):
print('로또 번호 추천 작업 시작')
predict_lotto_num1 = PythonOperator(
task_id = 'predict_lotto_num',
python_callable = predict_lotto_num
)
def select_postgres(postgres_conn_id, tbl_nm, **kwargs):
custom_postgres_hook = CustomPostgresHook(postgres_conn_id=postgres_conn_id)
custom_postgres_hook.select(table_name=tbl_nm)
select_postgresdb = PythonOperator(
task_id='select_postgres',
python_callable=select_postgres,
op_kwargs={'postgres_conn_id': 'conn-db-postgres-custom',
'tbl_nm':'lotto_add_table'}
)
send_num_to_email = EmailOperator(
task_id='send_email',
to='fresh0911@naver.com',
subject='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some_logic 처리결과',
html_content='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 이번 주 추천 번호는 <br> \
{{ti.xcom_pull(task_ids="python_t1")}} 입니다 <br>'
)
send_num_to_kakao = PythonOperator(
task_id = 'send_num_to_kakao',
python_callable = send_success_msg_to_kakao,
)
inner_func1() >> [predict_lotto_num1, select_postgresdb]
predict_lotto_num1 >> send_num_to_email
select_postgresdb >> send_num_to_kakao
DAG의 경우, 금요일에 전 주 로또 데이터가 추가되면 해당 DAG이 실행되도록 하였다. email로 전송하는 경우는 airflow 컨테이너에 저장되는 데이터를 이용하였고 카카오 메시지로 보내는 경우 postgres db에있는 데이터를 불러와 사용하였다.



'My Project' 카테고리의 다른 글
| TFT 데이터 적재를 위한 데이터 파이프라인 설계 (2) | 2024.10.22 |
|---|---|
| Teamfight Tactics(TFT) 데이터 베이스 설계하기 (1) | 2024.10.15 |
| 로또 번호 추천 자동화 시스템 만들기 (1) (4) | 2024.03.19 |
| [Project] 로또 번호 추천 자동화 시스템 만들기 (2) | 2023.10.11 |