1. 소개

이 글에서는 Teamfight Tactics(TFT) 사용자 데이터를 수집하고 처리하기 위한 데이터 파이프라인 설계에 대해 다루겠습니다. 데이터 파이프라인은 사용자 정보를 PostgreSQL 데이터베이스에서 S3로 전송하고, 이후 Lambda 함수를 호출하여 API를 통해 게임 데이터를 가져오는 과정을 포함합니다. 

 

2. 데이터 파이프라인 아키텍처

 

파이프라인의 기본 구조는 PostgreSQL DB, AWS S3, Lambda, Airflow로 구성됩니다.

 

1) TFT API를 호출하여 사용자 정보 수집 후, 이를 PostgreSQL DB에 설계한 스키마에 맞게 저장

2) 저장된 유저 데이터를 여러 파일로 분할하여 S3 버킷에 적재

3) Airflow를 통하여 Lambda 함수 트리거 (매일 새벽 1시마다)

    => Lambda 함수는 각 사용자의 게임 결과를 TFT API에서 추가로 가져와 다시 S3 로 적재하여 게임 데이터를 지속적으로 확보

 

이러한 구조로 데이터 흐름을 자동화하여 주기적으로 유저들의 정보를 토대로 유저들의 경기 결과를 가져오기 위한 데이터 파이프라인을 만들었습니다. 이번 글에서는 어떻게 유저 정보를 가져왔는지, 어떻게 S3까지의 ETL 데이터 파이프라인을 DAG으로 구성했는지에 대해 구체적으로 설명할 것입니다. 또한, 유저 데이터가 많아질 경우 고려해봐야 할 것에 대해서도 설명해 볼 것입니다.

 

2. TFT User Data 수집

TFT API는 다음과 같이 load_data 클래스를 만들어 API를 이용할 수 있는 환경을 만들었습니다. extract_sky를 이용하여 천상계(챌린저, 그랜드 마스터, 마스터) 유저들의 정보를 얻어 올 수 있습니다. 하지만 이 정보에서는 user_id (puuid) 정보가 누락되어 있기에 extract_game_by_summoner를 이용하여 puuid가 포함된 유저 정보 데이터를 가져와야 합니다. API를 이용할 때 제한 사항이 있었는데 호출 시 1초에 20회, 2분에 100회라는 제한이 있어서, time.sleep(2)를 이용하여 API 접근 제한이 안 생기도록 하였습니다.

class load_data:
	def __init__(self, api_key):
        self.api_key = api_key
        self.request_header  = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36",
    "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
    "Accept-Charset": "application/x-www-form-urlencoded; charset=UTF-8",
    "Origin": "https://developer.riotgames.com",
    "X-Riot-Token": "RGAPI-3d7be9b6-c10f-4648-ae67-42f9f89d1de2" #매일 갱신해야댐
    }
        self.base_url = "https://kr.api.riotgames.com/tft/"

    def extract_sky(self, tier):
        code_name = f"league/v1/{tier}"
        account_id=requests.get(f"{self.base_url}{code_name}", headers=self.request_header).json()
        return account_id
        
    def extract_game_by_summoner(self, idname):
        id_code_name = self.base_url + "league/v1/entries/by-summoner/" +idname
        my_id = requests.get(id_code_name, headers = self.request_header).json()
        time.sleep(2)
        return my_id

 

3. DAG 구성

데이터 크롤링 주기 : 매일 새벽 1시 ( DAG Schedule )

=> 매일 새벽 12시에 유저 랭킹 정보가 갱신되기에 다음과 같이 설정

DAG 구성 및 순서

 

유저 데이터를 S3 까지 적재하는 과정은 크게 4개의 task로 구성하였습니다.

 

1) Task 1: PostgreSQL에 사용자 데이터 적재 (insrt_postgres)
=> 이 태스크는 CSV 파일에서 PostgreSQL DB로 사용자 데이터를 적재하는 과정입니다. CustomPostgresHook을 활용하여 데이터 수집 프로세스를 최적화하였습니다. 이때 추출한 User 정보에서 추후에 데이터 분석에 필요없을 데이터들은 제거할 필요가 느껴졌습니다. 그래서 그러한 칼럼들은 제거하여 PostgreSQL DB에 적재를 하였습니다.

 

2) Task 2: 사용자 데이터 처리 ( process_user_data )
=> PythonOperator를 사용하여 이 태스크는 PostgreSQL DB에서 사용자 데이터를 관리 가능한 배치로 가져옵니다. 페이지네이션(LIMIT 및 OFFSET)을 활용하여 성능 저하를 방지하며, 대량의 데이터를 처리할 때 데이터베이스에 부담을 주지 않도록 구현하였습니다. 이를 통해 데이터 파이프라인의 안정성과 효율성을 올려보았습니다.

 

3) Task 3: S3에 배치 저장
=> 이 태스크는 처리된 사용자 데이터를 AWS S3에 저장합니다. 실행 날짜 기반으로 파일 경로를 동적으로 생성하고, 배치 파일 개수만큼 데이터 파일을 업로드합니다. 이를 통해 데이터의 접근성과 관리 용이성을 높여 줍니다.

 

4) Task 4 : 알림 이메일 발송

    => 데이터 처리 및 업로드 작업이 성공적으로 완료된 후, 사용자 데이터가 S3에 성공적으로 저장되었음을 확인하는 이메일 알림을 발송했습니다. 

 

with DAG(
        dag_id='dags_postgres_to_S3_LocalDB',
        start_date=pendulum.datetime(2024, 10, 1, tz='Asia/Seoul'),
        schedule='0 1 * * *', # 매일 새벽 1시
        catchup=False
) as dag:
    start = EmptyOperator(
    task_id='start'
    )
    
    insrt_postgres = PythonOperator(
        task_id='insrt_postgres',
        python_callable=insrt_postgres,
        op_kwargs={'postgres_conn_id': 'conn-db-postgres-custom',
                   'tbl_nm':'user_info',
                   'file_nm':'/opt/airflow/files/challenger_user_data.csv'}
    )

    process_user = PythonOperator(
    task_id="process_user_data",
    python_callable=process_user_data,
    op_kwargs={
        'postgres_conn_id': 'conn-db-postgres-custom',
        'query': 'SELECT * FROM user_info',
        'batch_size': 100,  # 한 번에 처리할 배치 크기 설정
        'file_prefix': '/opt/airflow/files/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/batch_user_data_'  # 저장 파일 경로 설정
    },
    provide_context=True
)
 
    save_batches = PythonOperator(
        task_id='save_batches_to_s3',
        python_callable=save_batches_to_s3,
        provide_context=True
    )
    
    send_email_task = EmailOperator(
        task_id='send_email_task',
        to='fresh0911@naver.com',
        subject='유저 정보 S3 적재 성공',
        html_content='S3 적재 성공하였습니다.'
    )

    start >> insrt_postgres >> process_user >>save_batches >> send_email_task

4. 대용량 유저 데이터일 경우

프로젝트를 진행하면서는 300명의 챌린저 유저들을 파악하기 위해 유저 데이터의 증가를 고려할 필요는 없었습니다. 그래서 처음에는 유저 데이터를 Airflow의 메타 DB에 저장하여 이를 S3로 적재하는 방식을 사용했습니다.

하지만 만약 유저 데이터가 증가한다면, 메타 DB에 저장하는 데이터 처리 방식에 대한 고려가 필요하다고 생각했습니다.

 

Case : DB의 유저 데이터 증가한다면 어떠한 방식으로 데이터를 가져와야 할까?

 

대용량 데이터이라고 가정했기에 우선 메타 DB에 저장시킬 수 없다고 생각하여 DB의 데이터를 csv파일로 디스크에 저장시킨 후 S3로 전송시키는 방식을 선택했습니다. 이때, DB에서 유저 정보를 가져올때 두 가지 방식을 고려 했습니다.

  • 첫 번째 방법은 DB 전체 데이터를 한꺼번에 가져와 로컬 디스크에 저장하는 방식입니다.
  • 두 번째 방법은 배치 처리를 통해 데이터를 나누어 디스크에 저장하는 방식입니다. 예를 들어, 한 번에 500개 또는 1000개의 레코드를 가져와 이를 파일로 저장한 후 다음 배치를 처리하는 방식을 고려했습니다.

아래 코드의 경우 배치 처리 방식을 적용하여 DB의 데이터를 나누어 저장했습니다. 배치 처리 방식을 통해 데이터를 나누어 전송했을 때, 메모리 사용량이 감소하고 안정적인 성능을 유지할 수 있었습니다. 최종적으로 한 번에 1000개의 레코드를 처리하는 것이 시간과 메모리 측면에서 가장 효과적이라는 결론을 도출했습니다.

def process_user_data(postgres_conn_id, query, batch_size=1000, file_prefix=None, **kwargs):
    conn = PostgresHook(postgres_conn_id=postgres_conn_id).get_conn()
    cursor = conn.cursor()

    offset = 0
    batch_number = 1
    while True:
        # LIMIT과 OFFSET을 사용해 배치 처리
        batch_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
        cursor.execute(batch_query)
        batch_data = cursor.fetchall()

        if not batch_data:
            break  # 더 이상 데이터가 없으면 루프 종료

        # 배치 데이터를 CSV로 저장
        file_path = f"{file_prefix}{batch_number}.csv"
        with open(file_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerows(batch_data)  # 데이터 저장

        # 다음 배치로 넘어감
        offset += batch_size
        batch_number += 1

    cursor.close()
    conn.close()

 

process_user_data 함수를 통해 DB의 데이터를 1000개씩 나누어 배치 처리하였고, 16,000명의 유저 데이터를 이용해 실험한 결과, 다음과 같은 메모리 사용량과 처리 속도를 기록했습니다. 단일 처리의 경우, 전체 SELECT 방식이므로 처리 속도는 빨랐지만 메모리 사용량이 많았습니다. 반면에 배치 처리 경우, 1000개씩 유저 데이터를 나눠 가져왔기에 메모리 사용량이 적었고 배치 크기에 따라 최적의 시간을 찾을 수 있었습니다.

 

 

배치 처리 결과
단일 처리 결과

 

아직 시도해보지 않았지만 S3로의 데이터 전송 최적화 또한 해볼 수 있을 것 같습니다. 로컬 디스크에 저장된 데이터는 S3로 전송되어야 하며, 이 과정에서 S3 Multipart Upload와 Parallel Uploads를 사용할 수 있을 것 같습니다. Multipart Upload를 사용하면 대용량 데이터를 여러 조각으로 나누어 업로드함으로써 네트워크 중단 시에도 재전송이 가능하여 효율성이 높아집니다. 이 방법은 단일 처리를 했을 때 함께 사용하기 좋을 것 같다고 생각합니다. 반면 Parallel Uploads를 활용하면 배치 처리로 나눈 여러 파일을 동시에 전송하여 전체 전송 속도를 증가시킬 수 있습니다.

5. Conclusion

이번 글에서는 Teamfight Tactics(TFT) 사용자 데이터를 수집하고 처리하는 데이터 파이프라인을 설계하고 구현하는 과정을 설명했습니다. PostgreSQL 데이터베이스에서 사용자 데이터를 저장한 후 AWS S3와 Lambda를 이용하여 자동화된 데이터 수집 파이프라인을 구축하였습니다. 특히, 대용량 데이터를 효율적으로 처리하기 위해 배치 처리 방식과 S3로의 전송 최적화 방법을 고려했습니다. 다음 글에서는 수집한 유저 데이터를 토대로 TFT 유저들의 게임 결과를 가져오기 위한 데이터 파이프 라인에 대해 설명해보겠습니다.

 

 

 

+ Recent posts