Introduction

이전 포스팅에서는 Spark의 RDD Streaming을 이용하여 멀티 노드 Kafka 클러스터에서 전송된 데이터를 분석하고, 실시간으로 비트코인 관련 인기 해시태그를 파악하는 과정을 소개했습니다.

이번에는 Spark Structured Streaming을 활용하여 동일한 데이터를 처리하는 방식을 다뤄보겠습니다. Structured Streaming은 RDD 기반의 처리 방식에 비해 더 높은 추상화와 간단한 API를 제공하며, 성능 최적화와 확장성 측면에서도 유리하다고 합니다.

 

프로젝트의 주요 목표

1. Kafka로부터 실시간 트위터 데이터를 수신 및 분석

2. 상위 30개의 인기 해시태그를 실시간으로 계산 및 출력

3. Structured Streaming의 윈도우 연산과 데이터 프레임 기반 API 활용

 

Kafka Producer : 데이터 스트리밍 시뮬레이션

Kafka Producer 설정은 이전 포스팅의 코드를 그대로 활용하며, 트윗 데이터를 Kafka 클러스터로 전송합니다.

 

Spark Structured Streaming 구성

1) Kafka 데이터 읽기

kafkaStream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092") \
    .option("subscribe", "q5a") \
    .load()

 

  • Kafka 소스로부터 스트리밍 데이터를 읽어옵니다.
    • kafka.bootstrap.servers: Kafka 클러스터의 브로커 주소.
    • subscribe: 데이터가 있는 Kafka 토픽(q5a)을 지정.
  • Kafka 메시지는 key-value 형식이며, 여기서는 value와 timestamp 필드를 사용합니다.

2) 해시태그 추출 및 필터링

hashtags_df = df \
    .select(col("timestamp"), explode(split(col("value"), " ")).alias("hashtag")) \
    .where(col("hashtag").startswith("#"))
  • Kafka 메시지(value)를 단어 단위로 나누고 해시태그로 필터링.
  • explode(split(...)): 메시지를 공백으로 나누고 각 단어를 별도 행으로 확장.
  • .where(...): #로 시작하는 단어만 선택.

3) 윈도우 집계 및 정렬

windowed_counts = hashtags_df \
    .groupBy(window(col("timestamp"), "300 seconds", "120 seconds"), col("hashtag")) \
    .count().alias('count')

sorted_counts = windowed_counts.orderBy("window", desc("count"))

 

 

 

 

  • 해시태그를 5분(300초) 윈도우로 집계하고, 2분(120초) 간격으로 슬라이딩.
    • window(...): 지정된 윈도우 크기와 슬라이딩 간격으로 데이터 그룹화.
    • .count(): 각 해시태그의 빈도를 계산.
  • 정렬:
    • orderBy: 윈도우와 해시태그 빈도(count) 기준으로 정렬

4) 스트리밍 결과 저장 및 쿼리 실행 및 상위 30개 해시태그 추출

query = sorted_counts \
    .writeStream \
    .queryName("test") \
    .outputMode("complete") \
    .format("memory") \
    .start()
    
spark.sql("CREATE OR REPLACE TEMP VIEW temp_view AS SELECT * FROM test")

res = spark.sql("""
    SELECT *
    FROM (
        SELECT *,
               RANK() OVER (PARTITION BY window ORDER BY count DESC, hashtag ASC) AS rank
        FROM temp_view
    ) ranked
    WHERE ranked.rank <= 30
""")
res.show(n=res.count(), truncate=False)

 

 

  • 결과를 메모리 내 test 테이블로 저장.
  • outputMode("complete"): 전체 결과를 출력. 스트리밍 결과의 전체 상태를 지속적으로 갱신.
  • format("memory"): 결과를 메모리에 저장하여 쿼리 가능.
  • 메모리에 저장된 스트리밍 결과(test)를 SQL로 처리.
    • RANK() OVER: 각 윈도우 내에서 해시태그 빈도(count) 기준으로 순위(rank) 부여.
    • WHERE ranked.rank <= 30: 상위 30개 해시태그 선택.
  • 결과 출력:
    •  

상위 30개 결과가 담겨진 테이블 출력 형태

 

My Review

이번 과제를 진행하면서 RDD와 Spark Streaming을 비교하면서 느낀점을 정리해보았습니다.

RDD로 스트리밍 데이터를 처리했을 때는 데이터 흐름을 수동으로 설계하고, 상태 관리와 같은 부분을 명시적으로 처리해야 했습니다. 특히 시간 기반의 윈도우 처리나 상태 저장과 같은 고급 기능을 구현할 때 많은 코드가 필요했고, 이를 최적화하기도 까다로웠습니다.

반면, Spark Streaming은 RDD 위에서 동작하며, 작업들을 추상화해 제공하므로 상대적으로 간단한 코드로 동일한 기능을 구현할 수 있었습니다. 예를 들어, window() 함수를 활용해 상태를 관리하거나 시간 간격을 기준으로 데이터를 집계할 때, 코드를 작성하는 데 훨씬 적은 시간이 소요되었습니다. 그래서 확실히 Spark Streaming이 데이터를 처리하는 과정을 간소화하고 제공되는 API를 통해 쉽게 구현할 수 있어 더 직관적이고 효율적이라고 느꼈습니다. 데이터 엔지니어링 초보자의 입장에서는 실시간 데이터 처리를 처음 접할 때는 Spark Streaming을 활용하는 것이 더 적합할 것 같습니다. 

Introduction

멀티 노드 Kafka 클러스터 환경에서 트위터 데이터를 분석하여 비트코인 관련 해시태그의 인기 해시태그를 실시간으로 파악하는 과제를 진행했습니다.

이 과제에서는 Spark RDD Streaming 를 활용하여 실시간 데이터를 처리하는 방식을 시도했습니다.

 

프로젝트의 주요 목표:

1. Kafka를 통해 트위터 데이터를 스트리밍 방식으로 Spark로 전송

2. 상위 30개의 인기 해시 태그를 실시간으로 계산 및 출력

3. RDD Streaming을 활용한 윈도우 연산 및 슬라이딩 간격 기반 데이터 분석 구현

 

Kafka Producer : 데이터 ingesting simulation

실제 트위터 스트리밍 환경을 모델링하기 위해 Kafka 프로듀서를 작성하여 데이터를 Kafka 클러스터로 전송했습니다.

주요 구현 사항

  • 각 트윗은 타임스탬프 정보(timestamp)를 포함하고 있으며, 이를 기반으로 전송 간격을 설정했습니다.
  • Python으로 작성된 Kafka Producer는 time.sleep()을 사용해 트윗 간 전송 간격을 조정했으며, kafka-console-producer 명령을 통해 데이터를 Kafka로 전송했습니다.

시간 간격에 따른 데이터 전송

  1. 트윗 1(tweet1)을 전송한 후, timestamp2 - timestamp1 간격만큼 대기합니다.
  2. 이후 트윗 2(tweet2)를 전송하며, 이를 반복해 트윗 데이터를 Kafka에 스트리밍합니다.

이를 통해 Kafka 클러스터는 실제 트위터 스트리밍 환경과 유사한 방식으로 데이터를 수신하고 처리할 수 있습니다.

 

1) 타임스탬프 변환 (convert_to_seconds 함수)

def convert_to_seconds(ts): 
    timestamp = time.mktime(time.strptime(ts, "%Y-%m-%d %H:%M:%S")) 
    return timestamp

 

2) 전송 간격 계산 및 대기 (sleep_based_on_interval 함수)

def sleep_based_on_interval(current_ts, last_ts): 
    interval = current_ts - last_ts 
    if last_ts == 0: 
        time.sleep(1)  # 첫 전송은 1초 대기
    elif interval > 0 and last_ts != 0: 
        time.sleep(interval)  # 타임스탬프 간격만큼 대기

 

3) 주요 흐름 (main 함수)

for line in f: 
    # 트윗과 타임스탬프 분리
    parts = line.rstrip().rsplit(',') 
    tweet = parts[:-1] 
    ts = parts[-1]     
    ts = convert_to_seconds(ts) 

    # 대기 시간 계산
    if last_ts is not None: 
        sleep_based_on_interval(ts, last_ts) 

    # Kafka 전송 명령 실행
    cmd = 'echo "' + res + '"./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092 --topic q4a' 
    os.system(cmd) 
    last_ts = ts

 

  1. convert_to_seconds: 타임스탬프를 초 단위로 변환해 대기 시간 계산의 기초를 제공합니다.
  2. sleep_based_on_interval: 데이터를 실제 발생 시간과 동일한 간격으로 전송하여 현실적인 스트리밍 시뮬레이션을 구현합니다.
  3. Kafka 메세지 전송: Kafka 클러스터로 데이터를 스트리밍하기 위해 명령어를 동적으로 생성해 전송합니다.

Consumer: Spark RDD Streaming을 활용한 실시간 데이터 처리

Producer를 통해 Kafka로 전송된 데이터를 Spark RDD Streaming을 사용하여 실시간으로 처리를 했습니다. 이때, RDD Streaming은 Kafka에서 데이터를 읽어오고, RDD 형태로 변환하여 해쉬태그를 분석했습니다.

# Kafka에서 데이터 읽어오기
kafkaStream = KafkaUtils.createStream(
    ssc, zk_quorum="kafka1:2181,kafka2:2181", 
    groupId="spark-streaming2", 
    topics={"q4a": 2}
)

# 트윗에서 해시태그 추출
hashtags = kafkaStream.map(lambda x: re.findall(r"#\w+", x[1])).flatMap(lambda x: x)

# 해시태그 빈도 계산 (5분 윈도우, 2분 슬라이딩)
hashtag_counts = hashtags.map(lambda x: (x, 1)).reduceByKeyAndWindow(
    lambda x, y: x + y, 
    lambda x, y: x - y, 
    300, 
    120
)

# 상위 30개 해시태그 출력
hashtag_counts.foreachRDD(lambda rdd: print_top30_hashtags(rdd))

 

  • Kafka에서 데이터 읽기: KafkaUtils.createStream 메서드는 Zookeeper를 통해 Kafka 클러스터와 통신하며, q4a 토픽에서 메시지를 소비합니다.

 

  • 해시태그 추출: 
hashtags = kafkaStream.map(lambda x: re.findall(r"#\w+", x[1]).flatMap(lambda x: x))

 

  • kafkaStream은 Kafka에서 스트리밍된 데이터의 RDD입니다. x[1]은 Kafka 메시지의 값(트윗 텍스트)을 의미합니다.
  • re.findall(r"#\w+", x[1]): 트윗 텍스트에서 #로 시작하는 단어(해시태그)를 정규 표현식으로 추출합니다.
  • flatMap(lambda x: x): re.findall이 반환하는 리스트를 평탄화(flatten)하여 각 해시태그를 개별 아이템으로 만듭니다.

 

  • 해쉬태그 카운트:
hashtag_counts = hashtags.map(lambda x: (x, 1)).reduceByKeyAndWindow(
    lambda x, y: x + y, lambda x, y: x - y, 300, 120)
  • hashtags.map(lambda x: (x, 1)): 각 해시태그에 대해 (해시태그, 1) 형태의 키-값 쌍을 생성합니다. 즉, 각 해시태그를 처음 만날 때마다 1로 카운트를 시작합니다.
  • reduceByKeyAndWindow
    • 이 함수는 시간 기반의 윈도우를 적용하여 데이터를 집계합니다.
      • 첫 번째 인수 lambda x, y: x + y: 같은 키(해시태그)에 대해 값을 더합니다. 즉, 동일한 해시태그가 나타날 때마다 그 카운트를 증가시킵니다.
      • 두 번째 인수 lambda x, y: x - y: 윈도우가 이동하면서 오래된 데이터를 제거할 때 사용하는 함수입니다. 즉, 윈도우 밖의 데이터를 차감합니다.
      • 300: 윈도우 크기입니다. 이 값은 300초(5분)로 설정되어 있으므로, 5분 간격으로 해시태그 빈도를 집계합니다.
      • 120: 슬라이딩 간격입니다. 120초(2분) 간격으로 윈도우가 슬라이딩됩니다. 즉, 매 2분마다 윈도우가 이동하여 데이터를 새로 반영합니다.
  • 상위 30개 해시태그 출력:
hashtag_counts.foreachRDD(print_top30_hashtags)

def print_top30_hashtags(rdd):
    top30 = rdd.takeOrdered(30, key=lambda x: -x[1]) 
    for tag, count in top30:
        print(tag)

 

  • 이 함수는 RDD에서 상위 30개의 해시태그를 추출하는 역할을 합니다.rdd.takeOrdered(30, key=lambda x: -x[1]): rdd에서 카운트 값을 기준으로 내림차순으로 정렬하여 상위 30개의 해시태그를 가져옵니다.
  • 각 해시태그와 그 카운트를 출력합니다.

실시간으로 topic으로 데이터 ingesting 모습
2분 마다 Top 30 Hashtag 결과 (1)

 

2분 마다 Top 30 Hashtag 결과 (2)

 

My Review

이번 프로젝트에서 가장 어려웠던 점은 Kafka 프로듀서로 데이터를 실시간 간격에 맞춰 전송했을 때, Spark RDD에서 reduceByKeyAndWindow를 사용하여 데이터를 처리하는 동작 원리를 이해하는 것이었습니다. 처음에는 윈도우와 슬라이딩 간격을 어떻게 설정하고, 그에 따라 데이터가 어떻게 집계되는지 감이 오지 않았습니다.

하지만, reduceByKeyAndWindow는 주어진 시간 동안 데이터를 집계하고, 슬라이딩 간격에 맞춰 새로운 데이터를 계속 반영하는 방식이라는 점을 이해하고 나니 점차 해결할 수 있었습니다. 데이터를 5분 간격으로 집계하고, 2분마다 윈도우가 슬라이딩되는 구조가 어떻게 동작하는지 시각적으로 이해하면서 구현을 마칠 수 있었습니다. 이 과정에서 실시간 데이터 스트리밍 처리 방식에 대해 더 깊이 이해하게 되었습니다.

Introduction

Kafka는 분산 스트리밍 플랫폼으로, 실시간 데이터 스트리밍을 위한 시스템입니다. 이 글에서는 GCP Dataproc을 사용하여 3개의 노드로 구성된 Kafka 클러스터를 설정하고, 이를 테스트하는 과정을 설명하겠습니다. 또한 Kafka와 함께 사용하는 Zookeeper의 역할에 대해서도 간략히 다룹니다.

 

Kafka는 데이터 파이프라인을 만들고, 실시간 이벤트 처리를 지원하며, 대규모 분산 시스템에서 데이터를 효율적으로 처리할 수 있습니다.

Kafka의 주요 구성 요소:

  • Producer: 데이터를 생성하여 Kafka 토픽에 전송하는 역할을 합니다.
  • Consumer: Kafka 토픽으로부터 데이터를 읽어와 처리하는 역할을 합니다.
  • Broker: Kafka의 서버로, 데이터를 수신하고 저장하며, Producer와 Consumer 간의 메시지를 전달하는 역할을 합니다.
  • Topic: Kafka에서 데이터를 구분하는 논리적인 단위입니다. 데이터를 특정 토픽에 보내면, 그 토픽을 구독한 Consumer가 데이터를 처리합니다.
  • Partition: 각 Topic은 여러 Partition으로 나눠져 데이터를 분산 저장하고 병렬 처리를 가능하게 합니다.

Zookeeper는 분산 시스템을 위한 중앙 집중식 서비스로, 데이터의 동기화와 관리, 분산 시스템의 상태를 관리하는 역할을 합니다.

Zookeeper의 주요 기능:

  • 클러스터 관리: Zookeeper는 Kafka 클러스터의 각 브로커(서버)의 상태를 추적하고, 브로커가 장애를 일으켰을 때 자동으로 복구하는 기능을 제공합니다.
  • 리더 선출: Kafka는 각 Partition에 대해 하나의 리더를 선출하는데, 이 과정에서 Zookeeper가 리더 선출을 관리합니다.
  • 메타데이터 관리: Zookeeper는 Kafka의 메타데이터(토픽, 파티션, 리더 등)를 관리하여 클러스터의 일관성을 유지합니다.

How to install multi node kafka cluster

1. GCP Dataproc을 이용하여 3개의 Mutli node kafka cluster 생성

2. Kafka 및 Zookeeper 설치

 

wget https://downloads.apache.org/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xvzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0

 

Start zookeeper (Each node)
Zookeeper 설정 파일 수정

3. Kafka 브로커 설정

Kafka 브로커가 Zookeeper와 통신하도록 설정합니다.

Kafka 브로커 설정 파일 수정

각 노드에서 Kafka를 시작합니다.

Start Kafka (Each node)

 

 

 

4. 클러스터 테스트 및 검증

    1) Create topic

Kafka에서 토픽을 생성할 때, kafka-topics.sh 명령어를 사용합니다. 이 명령어로 my-test-topic과 같은 새로운 토픽을 생성할 수 있습니다. 

my-test-topic 이라는 topic 생성 cmd

 

Kafka의 기본 동작은 Producer가 메시지를 Topic에 전송하고, Consumer는 해당 Topic을 구독하여 메시지를 받아서 처리하는 방식입니다. 메시지가 실제로 Producer에서 Consumer로 전달되는지 확인하려면 다음과 같은 절차를 따릅니다.

 

   2) Produce message to topic

사용자가 입력한 메시지는 my-test-topic으로 전송됩니다. 예를 들어, "hello"와 같은 메시지를 입력하면 그 메시지가 Kafka의 my-test-topic에 저장됩니다.

topic으로 message 생성

    3) Consume message from topic

Consumer는 아래 명령어로 실행하여, my-test-topic에서 메시지를 읽어옵니다. Producer에서 메시지를 입력하고 Consumer가 이를 받으면, Consumer 터미널에서 해당 메시지가 출력됩니다. 예를 들어, Producer에서 "Hello Kafka"라는 메시지를 보냈다면, Consumer에서는 "Hello Kafka"라는 메시지가 출력됩니다.

consumer에서 생성된 메세지 receive

 

 

 

My Review

 

멀티 노드 Kafka 클러스터의 장점으로는 분산 시스템의 확장성, 고가용성, 내결함성을 크게 향상 시킬 수 있다고합니다.

  1. 확장성 (Scalability): 여러 노드에서 데이터를 분산 처리하므로, 시스템 부하가 커져도 손쉽게 클러스터를 확장할 수 있습니다. 필요에 따라 새로운 노드를 추가하여 처리 성능을 향상시킬 수 있습니다.
  2. 고가용성 (High Availability): 복제 기능을 통해 데이터 손실을 방지하고, 장애가 발생한 노드를 자동으로 복구할 수 있습니다. 이는 실시간 데이터 스트리밍을 요구하는 환경에서 매우 중요한 요소입니다.
  3. 내결함성 (Fault Tolerance): Kafka는 데이터를 여러 노드에 복제하여 저장하므로, 특정 노드가 실패하더라도 시스템은 정상적으로 동작할 수 있습니다. Zookeeper는 클러스터 상태를 모니터링하고, 장애 발생 시 빠르게 복구 작업을 수행합니다.

이러한 장점 덕분에 Kafka는 대규모 데이터 스트리밍 시스템에서 필수적인 역할을 합니다. 예를 들어, 금융 서비스에서는 실시간 거래 데이터를 처리하고, 소셜 미디어 플랫폼에서는 사용자의 활동 데이터를 실시간으로 분석하는 데 유용합니다. Kafka는 이러한 환경에서 실시간 데이터 처리와 안정적인 메시징 시스템을 제공하며, 멀티 노드 클러스터 구성을 통해 더욱 효율적인 데이터 파이프라인을 구축할 수 있습니다.

 

1. 소개

이 글에서는 Teamfight Tactics(TFT) 사용자 데이터를 수집하고 처리하기 위한 데이터 파이프라인 설계에 대해 다루겠습니다. 데이터 파이프라인은 사용자 정보를 PostgreSQL 데이터베이스에서 S3로 전송하고, 이후 Lambda 함수를 호출하여 API를 통해 게임 데이터를 가져오는 과정을 포함합니다. 

 

2. 데이터 파이프라인 아키텍처

 

파이프라인의 기본 구조는 PostgreSQL DB, AWS S3, Lambda, Airflow로 구성됩니다.

 

1) TFT API를 호출하여 사용자 정보 수집 후, 이를 PostgreSQL DB에 설계한 스키마에 맞게 저장

2) 저장된 유저 데이터를 여러 파일로 분할하여 S3 버킷에 적재

3) Airflow를 통하여 Lambda 함수 트리거 (매일 새벽 1시마다)

    => Lambda 함수는 각 사용자의 게임 결과를 TFT API에서 추가로 가져와 다시 S3 로 적재하여 게임 데이터를 지속적으로 확보

 

이러한 구조로 데이터 흐름을 자동화하여 주기적으로 유저들의 정보를 토대로 유저들의 경기 결과를 가져오기 위한 데이터 파이프라인을 만들었습니다. 이번 글에서는 어떻게 유저 정보를 가져왔는지, 어떻게 S3까지의 ETL 데이터 파이프라인을 DAG으로 구성했는지에 대해 구체적으로 설명할 것입니다. 또한, 유저 데이터가 많아질 경우 고려해봐야 할 것에 대해서도 설명해 볼 것입니다.

 

2. TFT User Data 수집

TFT API는 다음과 같이 load_data 클래스를 만들어 API를 이용할 수 있는 환경을 만들었습니다. extract_sky를 이용하여 천상계(챌린저, 그랜드 마스터, 마스터) 유저들의 정보를 얻어 올 수 있습니다. 하지만 이 정보에서는 user_id (puuid) 정보가 누락되어 있기에 extract_game_by_summoner를 이용하여 puuid가 포함된 유저 정보 데이터를 가져와야 합니다. API를 이용할 때 제한 사항이 있었는데 호출 시 1초에 20회, 2분에 100회라는 제한이 있어서, time.sleep(2)를 이용하여 API 접근 제한이 안 생기도록 하였습니다.

class load_data:
	def __init__(self, api_key):
        self.api_key = api_key
        self.request_header  = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36",
    "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
    "Accept-Charset": "application/x-www-form-urlencoded; charset=UTF-8",
    "Origin": "https://developer.riotgames.com",
    "X-Riot-Token": "RGAPI-3d7be9b6-c10f-4648-ae67-42f9f89d1de2" #매일 갱신해야댐
    }
        self.base_url = "https://kr.api.riotgames.com/tft/"

    def extract_sky(self, tier):
        code_name = f"league/v1/{tier}"
        account_id=requests.get(f"{self.base_url}{code_name}", headers=self.request_header).json()
        return account_id
        
    def extract_game_by_summoner(self, idname):
        id_code_name = self.base_url + "league/v1/entries/by-summoner/" +idname
        my_id = requests.get(id_code_name, headers = self.request_header).json()
        time.sleep(2)
        return my_id

 

3. DAG 구성

데이터 크롤링 주기 : 매일 새벽 1시 ( DAG Schedule )

=> 매일 새벽 12시에 유저 랭킹 정보가 갱신되기에 다음과 같이 설정

DAG 구성 및 순서

 

유저 데이터를 S3 까지 적재하는 과정은 크게 4개의 task로 구성하였습니다.

 

1) Task 1: PostgreSQL에 사용자 데이터 적재 (insrt_postgres)
=> 이 태스크는 CSV 파일에서 PostgreSQL DB로 사용자 데이터를 적재하는 과정입니다. CustomPostgresHook을 활용하여 데이터 수집 프로세스를 최적화하였습니다. 이때 추출한 User 정보에서 추후에 데이터 분석에 필요없을 데이터들은 제거할 필요가 느껴졌습니다. 그래서 그러한 칼럼들은 제거하여 PostgreSQL DB에 적재를 하였습니다.

 

2) Task 2: 사용자 데이터 처리 ( process_user_data )
=> PythonOperator를 사용하여 이 태스크는 PostgreSQL DB에서 사용자 데이터를 관리 가능한 배치로 가져옵니다. 페이지네이션(LIMIT 및 OFFSET)을 활용하여 성능 저하를 방지하며, 대량의 데이터를 처리할 때 데이터베이스에 부담을 주지 않도록 구현하였습니다. 이를 통해 데이터 파이프라인의 안정성과 효율성을 올려보았습니다.

 

3) Task 3: S3에 배치 저장
=> 이 태스크는 처리된 사용자 데이터를 AWS S3에 저장합니다. 실행 날짜 기반으로 파일 경로를 동적으로 생성하고, 배치 파일 개수만큼 데이터 파일을 업로드합니다. 이를 통해 데이터의 접근성과 관리 용이성을 높여 줍니다.

 

4) Task 4 : 알림 이메일 발송

    => 데이터 처리 및 업로드 작업이 성공적으로 완료된 후, 사용자 데이터가 S3에 성공적으로 저장되었음을 확인하는 이메일 알림을 발송했습니다. 

 

with DAG(
        dag_id='dags_postgres_to_S3_LocalDB',
        start_date=pendulum.datetime(2024, 10, 1, tz='Asia/Seoul'),
        schedule='0 1 * * *', # 매일 새벽 1시
        catchup=False
) as dag:
    start = EmptyOperator(
    task_id='start'
    )
    
    insrt_postgres = PythonOperator(
        task_id='insrt_postgres',
        python_callable=insrt_postgres,
        op_kwargs={'postgres_conn_id': 'conn-db-postgres-custom',
                   'tbl_nm':'user_info',
                   'file_nm':'/opt/airflow/files/challenger_user_data.csv'}
    )

    process_user = PythonOperator(
    task_id="process_user_data",
    python_callable=process_user_data,
    op_kwargs={
        'postgres_conn_id': 'conn-db-postgres-custom',
        'query': 'SELECT * FROM user_info',
        'batch_size': 100,  # 한 번에 처리할 배치 크기 설정
        'file_prefix': '/opt/airflow/files/{{data_interval_end.in_timezone("Asia/Seoul") | ds_nodash }}/batch_user_data_'  # 저장 파일 경로 설정
    },
    provide_context=True
)
 
    save_batches = PythonOperator(
        task_id='save_batches_to_s3',
        python_callable=save_batches_to_s3,
        provide_context=True
    )
    
    send_email_task = EmailOperator(
        task_id='send_email_task',
        to='fresh0911@naver.com',
        subject='유저 정보 S3 적재 성공',
        html_content='S3 적재 성공하였습니다.'
    )

    start >> insrt_postgres >> process_user >>save_batches >> send_email_task

4. 대용량 유저 데이터일 경우

프로젝트를 진행하면서는 300명의 챌린저 유저들을 파악하기 위해 유저 데이터의 증가를 고려할 필요는 없었습니다. 그래서 처음에는 유저 데이터를 Airflow의 메타 DB에 저장하여 이를 S3로 적재하는 방식을 사용했습니다.

하지만 만약 유저 데이터가 증가한다면, 메타 DB에 저장하는 데이터 처리 방식에 대한 고려가 필요하다고 생각했습니다.

 

Case : DB의 유저 데이터 증가한다면 어떠한 방식으로 데이터를 가져와야 할까?

 

대용량 데이터이라고 가정했기에 우선 메타 DB에 저장시킬 수 없다고 생각하여 DB의 데이터를 csv파일로 디스크에 저장시킨 후 S3로 전송시키는 방식을 선택했습니다. 이때, DB에서 유저 정보를 가져올때 두 가지 방식을 고려 했습니다.

  • 첫 번째 방법은 DB 전체 데이터를 한꺼번에 가져와 로컬 디스크에 저장하는 방식입니다.
  • 두 번째 방법은 배치 처리를 통해 데이터를 나누어 디스크에 저장하는 방식입니다. 예를 들어, 한 번에 500개 또는 1000개의 레코드를 가져와 이를 파일로 저장한 후 다음 배치를 처리하는 방식을 고려했습니다.

아래 코드의 경우 배치 처리 방식을 적용하여 DB의 데이터를 나누어 저장했습니다. 배치 처리 방식을 통해 데이터를 나누어 전송했을 때, 메모리 사용량이 감소하고 안정적인 성능을 유지할 수 있었습니다. 최종적으로 한 번에 1000개의 레코드를 처리하는 것이 시간과 메모리 측면에서 가장 효과적이라는 결론을 도출했습니다.

def process_user_data(postgres_conn_id, query, batch_size=1000, file_prefix=None, **kwargs):
    conn = PostgresHook(postgres_conn_id=postgres_conn_id).get_conn()
    cursor = conn.cursor()

    offset = 0
    batch_number = 1
    while True:
        # LIMIT과 OFFSET을 사용해 배치 처리
        batch_query = f"{query} LIMIT {batch_size} OFFSET {offset}"
        cursor.execute(batch_query)
        batch_data = cursor.fetchall()

        if not batch_data:
            break  # 더 이상 데이터가 없으면 루프 종료

        # 배치 데이터를 CSV로 저장
        file_path = f"{file_prefix}{batch_number}.csv"
        with open(file_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerows(batch_data)  # 데이터 저장

        # 다음 배치로 넘어감
        offset += batch_size
        batch_number += 1

    cursor.close()
    conn.close()

 

process_user_data 함수를 통해 DB의 데이터를 1000개씩 나누어 배치 처리하였고, 16,000명의 유저 데이터를 이용해 실험한 결과, 다음과 같은 메모리 사용량과 처리 속도를 기록했습니다. 단일 처리의 경우, 전체 SELECT 방식이므로 처리 속도는 빨랐지만 메모리 사용량이 많았습니다. 반면에 배치 처리 경우, 1000개씩 유저 데이터를 나눠 가져왔기에 메모리 사용량이 적었고 배치 크기에 따라 최적의 시간을 찾을 수 있었습니다.

 

 

배치 처리 결과
단일 처리 결과

 

아직 시도해보지 않았지만 S3로의 데이터 전송 최적화 또한 해볼 수 있을 것 같습니다. 로컬 디스크에 저장된 데이터는 S3로 전송되어야 하며, 이 과정에서 S3 Multipart Upload와 Parallel Uploads를 사용할 수 있을 것 같습니다. Multipart Upload를 사용하면 대용량 데이터를 여러 조각으로 나누어 업로드함으로써 네트워크 중단 시에도 재전송이 가능하여 효율성이 높아집니다. 이 방법은 단일 처리를 했을 때 함께 사용하기 좋을 것 같다고 생각합니다. 반면 Parallel Uploads를 활용하면 배치 처리로 나눈 여러 파일을 동시에 전송하여 전체 전송 속도를 증가시킬 수 있습니다.

5. Conclusion

이번 글에서는 Teamfight Tactics(TFT) 사용자 데이터를 수집하고 처리하는 데이터 파이프라인을 설계하고 구현하는 과정을 설명했습니다. PostgreSQL 데이터베이스에서 사용자 데이터를 저장한 후 AWS S3와 Lambda를 이용하여 자동화된 데이터 수집 파이프라인을 구축하였습니다. 특히, 대용량 데이터를 효율적으로 처리하기 위해 배치 처리 방식과 S3로의 전송 최적화 방법을 고려했습니다. 다음 글에서는 수집한 유저 데이터를 토대로 TFT 유저들의 게임 결과를 가져오기 위한 데이터 파이프 라인에 대해 설명해보겠습니다.

 

 

 

 

Intro

이 글에서는 Teamfight Tactics 게임 데이터를 다뤄보기 위해 효율적인 데이터 베이스 설계를 고민한 내용을 다루려고 합니다. 이 설계는 TFT API로 얻을 수 있는 데이터를 체계적으로 저장하고, 분석 작업을 수월하게 하기 위한 데이터 베이스 구조를 목표로 합니다. 특히 중복 데이터를 최소화하고 유연성을 높이는데 중점을 두고 있습니다.

 

1. TFT API 데이터 구조

 

TFT API 에서 가져오는 주요 데이터는 크게 세 가지로 나눠 집니다.

  • 유저 정보 : 각 플레이어의 고유한 ID(puuid), 랭크 정보, 승리/패배 통계 등이 포함
  • 게임 정보 : 각 게임의 고유 ID(match id), 버전 정보, 플레이 한 유저 id
  • 게임 결과 : 각 게임의 고유 ID, 게임 시간, 유저가 사용한 유닛, 특성, 증강체 등 세부적인 결과 데이터

TFT API로 가져올 수 있는 정보는 이와 같고 추후에 어떻게 API 데이터를 가져왔는 지에 대해서도 써볼 예정입니다.

 

2. 초기 스키마 설계

처음에는 데이터 저장을 단순하게 설계하려 했습니다. 각 테이블에 필요한 모든 정보를 저장하고, 서로 관계를 연결하는 방식이었습니다. 하지만 여기서 게임 결과 데이터에 다양한 속성을 포함되어 있는데, 이 중에 특성, 유닛, 증강체 데이터의 경우 list로 포함되어 있어 추후에 데이터 분석을 하기에는 활용하기 어렵고 한 속성에 여러 값이 존재하는 것이 좋지 않기에 각각의 엔터티로 분리하기로 했다. 이러한 설계로 우선 다음과 같은 ER 다이어그램을 만들어 보았다.

 

TFT 분석 데이터 ER 구조

 

하지만 이러한 구조는 시간이 지날 수록 몇 가지 문제를 야기할 수 있음을 깨달았습니다.

  • 데이터 중복 : 한 유저가 여러 게임에서 다양한 유닛, 특성, 증강체를 사용하는데 매번 동일한 user id, game id를 반복해서 저장하게 되면 데이터 중복이 심해집니다.
  • 확장성 문제 : 더 많은 데이터가 추가될 수록 테이블이 복잡해지고 데이터 분석에 쓸 데이터를 관리하기 어려워 질 것이라 생각했습니다.

Point : TFT 게임 특성 상 매 판마다 유저들이 다양한 유닛과 특성을 이용하기에 user id, game id를 일일히 반복해서 저장하면 비효율적일 것!

 

 

3. 개선된 데이터베이스 구조

이 문제를 해결하기 위해 테이블을 나누고, 관계형 데이터베이스의 장점을 살려 중복 데이터를 최소화하는 방향으로 설계를 변경했습니다.  이 과정에서는 또한 관계성을 명확히 하기 위해 UserGame 테이블을 중심으로 설계를 개선했습니다.

 

연결 관계 

1) 유저 ↔ UserGame : 한 사용자는 여러 게임 세션을 가질 수 있습니다.

2) 게임 정보 ↔ UserGame : 하나의 게임 정보는 여러 사용자 게임 세션에 사용될 수 있습니다.

3) UserGame ↔ 게임 결과 : 각 사용자 게임 세션은 하나의 게임 결과를 가집니다.

4) UserGame ↔ 사용 유닛, 사용 증강체, 사용 특성 : 각 사용자 게임 세션은 각각의 사용된 유닛,증강체,특성을 가집니다.

TFT 분석 데이터 ER 개선된 구조

 

UserGame은 각 유저가 게임에 참여한 정보를 저장하며, 각 유닛, 특성, 증강체는 user_game_id로 연결됩니다. 이를 통해 user_id, game_id의 중복을 줄일 수 있고, 확장성을 높였습니다. 이 테이블로 인하여 데이터의 중복을 최소화하고, 유연하고 확장 가능한 구조를 만드는 것이 가능했습니다. 이렇게 분리된 테이블 구조는 몇 가지 이점을 제공합니다.

  • 중복 제거 : UserGame 테이블을 통해 유저와 게임 정보를 한 번만 저장하고, 나머지 사용 유닛, 사용 증강체, 사용 특성 테이블에서 이를 참조함으로 써 중복을 줄일 수 있습니다.
  • 확장성 : 추후에 유닛, 증강체, 특성에 관한 속성이나 데이터를 추가 할 경우, 기존 테이블에 간단히 추가 할 수 있어서 확장성이 좋을 것입니다.
  • 쿼리 효율성 : 각 테이블이 명확한 관계를 가지고 있어 정보를 쉽게 검색 할 수 있습니다. user_game_id를 통해 각 테이블을 연결해 데이터를 빠르게 조회할 수 있습니다. 

이 구조는 특히 데이터를 직관적으로 관리 할 수 있을 것이고 각 데이터에 대한 분석을 위해 효율적으로 작동할 수 있도록 설계해보았습니다.

 

Conclusion

이번 데이터베이스 설계에서는 TFT API 데이터를 효과적으로 저장하고, 나중에 분석할 때도 성능과 유지보수를 고려한 구조를 목표로 했습니다. 이 과정에서 중복 최소화, 유연한 테이블 구조를 통해 효율적인 데이터를 관리할 수 있는 방법을 찾으려 노력했고, 추후에 추가될 데이터들(유닛, 증강체, 특성 정보 관련)들을 고려하였습니다. 데이터 베이스 설계를 직접 함으로써 데이터의 특성과 요구 사항에 맞게 데이터 구조를 적절하게 설계하는 것은 중요하다고 느꼈고, 이 구조를 기반으로 앞으로 어떻게 데이터를 데이터베이스 까지 적재하는 지 구체적인 과정에 대해서도 설명할 것입니다.

 

공통점

  • Hive와 PIG는 둘 다 Hadoop 기반의 데이터 처리 도구입니다. 기본적으로 두 처리 도구 전부 내부에서 맵리듀스 프레임워크를 사용하여 데이터 처리 작업을 수행

차이점

  • Hive는 SQL과 비슷한 HiveQL 쿼리 언어를 사용하여 대화형으로 데이터를 처리하는 데 중점을 두며, SQL에 익숙한 사용자나 대화형으로 데이터를 처리하려는 경우 유용
  • PIG는 데이터 처리 작업을 위해 스크립트 언어를 사용하며, 스크립트는 데이터 흐름을 정의하고, 중간 처리 결과를 다양한 방식으로 조작하고, 최종 결과를 생성하는 방식을 동작
  • PIG는 복잡한 데이터 처리 작업을 위해 구성 가능하며, 유연성과 확장성이 뛰어남

두 도구를 함께 사용하는 것도 가능!

=>  PIG로 처리한 중간 결과를 Hive로 불러와서 SQL 쿼리로 분석 or Hive에서 처리된 결과를 PIG 스크립트로 조작

 

PIG is good at !

  • JOIN 기능에서 성능적으로 좋음 : PIG의 실행계획과 함께 작업을 실행하면 여러개의 테이블을 조인하여 셔플링 작업이 진행될 때 실행계획으로 인해 여러 대안 중 하나를 선택하여 최적화 시켜줌 

       =>  즉, 옵티마이저를 실행 하기에 가장 효율적은 방안을 제시

  • 구조화 되지 않은 데이터 처리에 적합
  • 성능 최적화 가능성 : PIG는 개발자가 직접 작업의 최적화 단계를 제어할 수 있는 기능을 제공하며, 이를 통해 HIVE보다 더 세밀하게 성능을 튜닝할 수 있는 여지를 줌. HIVE는 SQL 기반으로 자동 최적화를 수행하지만, 모든 경우에 최적화가 이상적으로 작동하지 않을 수 있음

HIVE is good at !

  • 구조화된 데이터 분석에 적합 :대규모 테이블을 SQL 방식으로 쉽게 처리 가능
  • 쿼리 실행 시 자동으로 최적화 단계를 거침
  • BI 통합 가능성 : SQL 기반으로 다양한 BI 툴과 쉽게 연동 가능
  • 데이터 관리 편의성 : 테이블 기반의 스키마 관리로, 데이터의 일관성 및 구조적 관리 가능

'Hadoop' 카테고리의 다른 글

[Hadoop] What is Hive?  (0) 2024.10.05
[Hadoop] What is Pig?  (4) 2024.09.30
YARN vs Multi Node Kubernetes  (3) 2024.06.27
[Hadoop] Hadoop vs Spark  (3) 2024.06.08
[Hadoop] What is HDFS ?  (0) 2024.06.01

1. What is Hive?

Hive는 Hadoop에서 데이터를 처리하기 위한 데이터 웨어하우징 솔루션입니다.

Hive는 SQL을 사용하여 대규모 데이터 집합을 분석할 수 있도록 해줍니다.

HiveQL이라는 SQL과 비슷한 언어를 사용하여 데이터에 대한 쿼리 및 분석을 수행합니다. 맵리듀스와 같은 하둡의 다른 기술과 함께 사용될 수 있으며, 대규모 데이터 처리 및 분석 작업을 수행하는데 매우 유용합니다.

 

2. Hive 특징

1) SQL 같은 쿼리 언어로 MapReduce 코드를 만듭니다

2) HiveQL 은 SQL문 과 유사하지만 다른점은 아래와 같습니다.

  • Hive에 사용된 데이터는 HDFS에 저장되기에 UPDATE, DELETE문을 사용할 수 없음
  • FROM절에서만 sub 쿼리 사용
  • SELECT 문을 사용할 때 HAVING 절을 사용할 수 없음
  • EQ 조인(=)만 사용 가능
  • FROM절에 테이블 하나만 지정 가능

3) 효율적인 쿼리 수행을 위해 파티션과 버킷을 제공

 

    파티션 : PARTITION BY (컬럼명 컬럼타입)을 통해 파티션 설정 가능

- 파티션별로 디렉토리를 나눠서 자장하고 파티션 키는 테이블 새로운 컬럼으로 추가

 

    버킷 : CLUSTERED BY (컬럼명) INTO 버킷개수 BUCKETS 을 통해 설정 가능

- 조인키로 버킷을 생성하면 필요한 버킷만 조회하면 되기 때문에 풀스캔을 하지 않아 빠르게 작업을 처리할 수 있습니다. 또한 TABLESAMPLE 절을 이용하여 데이터 샘플링을 할 수 있습니다.

4) 성능적인 측면

  • Pig와 마찬가지로 작은 데이터일 경우 쿼리 응답속도 낮음
  • 트랜잭션 미지원
  • JOIN 기능이 생각보다 자원 낭비를 할 수 있음
  • 통계정보 및 오류 정보 바로 확인 불가능

 

 

 

3. HIVE 사용 예시

문제 : 연도별 평균 발생 횟수가 가장 많은 20개의 바이그램을 해당 평균값과 구하기.
데이터 : Google book bigram (1-gram)

 

CREATE TABLE data ( 
bigram STRING, 
year INT, 
match_count INT, 
volume_count INT 
) 
PARTITIONED BY (year INT)
CLUSTERED BY (bigram) INTO 10 BUCKETS
STORED AS ORC;

LOAD DATA IN PATH 'hdfs://cluster-d37a-m/test/googlebooks-eng-all-1gram-20120701-b INTO TABLE data; 
LOAD DATA IN PATH 'hdfs://cluster-d37a-m/test/googlebooks-eng-all-1gram-20120701-a' INTO TABLE data; 

CREATE TABLE joined_data AS SELECT bigram, SUM(match_count) AS total_match_count, COUNT(year) AS num_years FROM data GROUP BY bigram; 
CREATE TABLE res_data AS SELECT bigram, total_match_count / num_years AS avg_match_per_year FROM joined_data; 
CREATE TABLE top_20_bigrams AS SELECT * FROM res_data DISTRIBUTE BY avg_match_per_year SORT BY avg_match_per_year DESC LIMIT 20;
INSERT OVERWRITE DIRECTORY 'hdfs://cluster-d37a-m/test/output3' STORED AS ORC SELECT * FROM res_data; 
INSERT OVERWRITE DIRECTORY 'hdfs://cluster-d37a-m/test/output4' STORED AS ORC SELECT * FROM top_20_bigrams;

 

각 단계의 수행방식과 Hadoop 분산 시스템에서의 동작 과정

 

1. CREATE TABLE data : data라는 테이블을 생성하고 ORC 열 지향적 포맷 으로 저장

=> ORC 또는 Parquet 같은 열 지향적 포맷은 Hive의 성능을 극대화 할 수 있어 대용량 데이터 처리에 효율적

파티셔닝 : year 컬럼 기준으로 파티션을 나누어 특정 기준으로 물리적으로 나눠 저장

버킷팅 : bigram을 기준으로 데이터를 버킷에 나누어 저장하므로 GROUP BY bigram과 같은 연산 성능 개선

 

2. LOAD DATA : data 테이블 스키마에 맞게 HDFS에 있는 데이터를 파싱하여 Hive 테이블에 삽입, 이때 데이터를 여러 노드에서 병렬로 로드 할 수 있어 대규모 데이터 처리에 적합

 

3. CREATE TABLE joined_data : data 테이블의 데이터를 bigram 별로 그룹화 한후 각 bigram에 대해 match count를 합산하고 연도의 개수도 계산하여 joined_data 테이블에 저장

=> 이 단계에서 MapReduce가 핵심 역할, bigram키로 Map 작업 수행, Reduce 단계에서 각 키별로 SUM과 COUNT 계산

 

4. CREATE TABLE res_data : joined_data 테이블에서 각 bigram의 match_count 평균을 구해 새로운 res_data에 저장

=> 이 단계는 Hadoop에서 분산처리하여 작업 실행

 

5. CREATE TABLE top_20_bigrams : res_data 테이블에서 평균 값이 가장 높은 상위 20개 bigram을 추출하여 top_20_bigram에 저장

=> Hive에서 분산된 정렬을 위해 DISTRIBUTE BY와 SORT BY 구문 사용, SORT BY는 각 리듀서 에서 부분적으로 정렬할 수 있어 성능 향상, 이로 단순한 ORDER BY보다 성능이 좋아 질 수 있음

 

6. INSERT OVERWRITE DIRECTORY : res_data와 top_20_bigram 테이블의 내용을 각각 지정된 HDFS 경로에 텍스트 파일로 저장

=> ORC 포맷을 저장하여 향후 데이터 조회 성능을 높일 수 있음

연간 평균 발생 횟수가 가장 많은 20개의 bigram 출력

 

같은 Bigram 프로그램을 Pig와 Hive에서 실행해본 결과 Pig의 경우 9분이 소요되었고, Hive의 경우 5분이 소요되었습니다.

제 생각에 이러한 차이가 나는 이유는 아마도 데이터 처리 방식의 차이가 있다고 생각합니다.

Pig의 경우 절차적 데이터 흐름 방식을 사용하기에 개발자가 데이터가 어떻게 처리될지에 대한 구체적인 단계를 직접 정의해야 합니다. 이 과정에서 각 단계마다 추가적인 오버헤드가 발생하여 실행 시간이 더 오래 걸린 것 같습니다.

반면에 Hive는 선언적 프로그래밍 방식을 사용하여 사용자가 무엇을 할지 명시만 할 뿐 어떻게 할지는 시스템이 자동으로 처리하여 실행 계획을 시스템이 알아서 최적화 해주어 실행 시간이 더 빨랐던 것 같습니다.

 

'Hadoop' 카테고리의 다른 글

[Hadoop] PIG vs HIVE  (1) 2024.10.11
[Hadoop] What is Pig?  (4) 2024.09.30
YARN vs Multi Node Kubernetes  (3) 2024.06.27
[Hadoop] Hadoop vs Spark  (3) 2024.06.08
[Hadoop] What is HDFS ?  (0) 2024.06.01

1. What is Pig ?

Apache Pig는 대규모 비정형 및 반정형 데이터를 분석하기 위한 프레임워크이다.

Pig는 구조화되지 않은 데이터들을 처리하는데 적합한 언어로, 테이블 구조가 아니더라도 데이터 처리 가능 하며 PigLatin이라는 언어를 사용하여 SQL과 유사한 방식으로 데이터를 처리할 수 있습니다.

Hadoop 클러스터에서 실행되며, 데이터 처리를 위해 MapReduce를 사용합니다.

Pig는 대규모 데이터 처리를 단순화하고 생산성을 높이는 데 효과적입니다.

 

2. Pig 특징

1) Pig Latin 언어 with 실행 계획

  • Pig는 Pig Latin이라는 고급 스크립팅 언어를 사용하여 데이터 변환, 필터링, 집계 등의 작업을 쉽게 수행할 수 있습니다. SQL과 유사한 문법을 제공해 데이터 작업을 직관적으로 처리할 수 있습니다.

    => Pig Latin으로 작성한 데이터 처리 프로그램은 논리적인 실행 계획으로 변환되고, 이것은 최종적으로 MapReduce 실행 계획으로 변환

Pig의 동작 과정 예시

2) 추상화된 데이터 처리

  • Pig는 데이터를 로드하고, 변형하고, 저장하는 등의 작업을 고수준에서 처리할 수 있도록 해줍니다. 이를 통해 사용자는 더 적은 노력으로 복잡한 데이터 분석 작업을 수행할 수 있습니다. 즉, 스크립트가 내부적으로 MapReduce를 수행하기에 사용자는 더 직관적으로 데이터를 다루기 쉬움

3) Pig는 Hive에 비해 JOIN 기능에 있어서 성능적으로 좋다. 그 이유는 여러 개의 테이블을 조인하여 MapReduce하게 된다면 셔플링 작업에 성능적인 제한사항이 있다. 실행계획과 함께 작업을 실행하면 해결 할 수 있지만 Hive에서는 약하다. 그에 비해 Pig에서는 실행 계획이 있기에 여러 대안 중 하나를 선택하여 최적화를 진행해준다. 즉, 옵티마이저를 실행 해주기에 장점도 있지만 잘못 실행하면 반복을 많이하고 CPU를 많이 사용하게 된다. 

 

4) Pig는 소량의 데이터에는 비효율적입니다.

 

  • MapReduce의 초기화 및 오버헤드 => 즉, 작은 데이터에 대해서도 Pig는 Hadoop 클러스터에서 작업을 분배하고, 데이터 분할, 맵 작업, 리듀스 작업 등의 단계를 거치기 때문에, 처리 시간이 상대적으로 오래 걸립니다.
  • 배치 처리(batch processing) 방식 =>. 배치 처리는 데이터를 모아서 한 번에 처리하는 방식으로, 대량 데이터를 효율적으로 처리하는 데 적합합니다. 하지만 소량의 데이터의 경우 이러한 배치 처리 방식은 지연 시간(latency)을 증가시킬 수 있습니다.

5) Pig 사용하는 기업 : Yahoo, Zoom

 

3. Pig 사용 예시

문제 : 연도별 평균 발생 횟수가 가장 많은 20개의 바이그램을 해당 평균값과 구하기.
데이터 : Google book bigram (1-gram)
# Load a-1gram, b-1gram 파일 LOAD
data = LOAD 'hdfs://cluster-d37a-m/test/googlebooks-eng-all-1gram-20120701-b' USING PigStorage('\t') AS (bigram, year, match_count, volume_count); 
data2 = LOAD 'hdfs://cluster-d37a-m/test/googlebooks-eng-all-1gram-20120701-a' USING PigStorage('\t') AS (bigram, year, match_count, volume_count); 

# 두 파일 JOIN 기능 수행
join_data = UNION data, data2; 

# bigram 에 따라 GROUP 기능 수행
grouped_data = GROUP join_data BY bigram; 

# SUM 및 COUNT 기능 수행
bigram_status = FOREACH grouped_data GENERATE group AS bigram,  
SUM(join_data.match_count) AS total_match_count,  
COUNT(join_data.year) AS num_years; 

# avg_match_per_year 값 계산
res_data = FOREACH bigram_status GENERATE bigram,  
(double)total_match_count / num_years AS avg_match_per_year; 

# ORDER BY 기능 수행
sorted_data = ORDER res_data BY avg_match_per_year DESC; 
top_20_bigrams = LIMIT sorted_data 20; 

# STORE 기능 수행
STORE top_20_bigrams INTO 'hdfs://cluster-d37a-m/test/output2' USING PigStorage('\t');

 

각 단계의 수행 방식과 Hadoop 분산 시스템에서의 동작 과정

  1. LOAD => HDFS에서 데이터를 로드하며, Pig는 이를 처리하기 위해 내부적으로 MapReduce 작업을 준비
  2. JOIN => Pig는 결합된 데이터를 기반으로 다음 단계의 작업을 준비하며, 이 단계에서 두 파일의 데이터를 하나로 병합하는 MapReduce 작업이 수행됩니다.
  3. GROUP => bigram 값을 기준으로 데이터가 분산되고, 같은 bigram 값을 가진 데이터가 하나의 그룹으로 모입니다. 이 과정에서 Pig는 데이터를 정렬하고 그룹화하는 MapReduce 작업을 실행합니다.
  4. SUM / COUNT => FOREACH를 사용해 각 bigram 그룹에서 계산을 수행합니다. 각 그룹의 match_count 값을 모두 더해 total_match_count를 계산하고, 해당 bigram이 등장한 연도의 개수(year)를 세어 num_years를 계산하기에 MapReduce 작업을 사용하여 병렬 처리를 수행
  5. 새로운 필드 값 생성 => 각 bigram 그룹에 대해, 등장 횟수(total_match_count)를 연도 개수(num_years)로 나누어 avg_match_per_year(연도별 평균 등장 횟수)를 계산, 이 또한 MapReduce 작업을 통해 처리됩니다.
  6. ORDER BY / LIMIT => 계산된 결과를 avg_match_per_year 기준으로 내림차순으로 정렬한 후, 상위 20개의 bigram만 추출합니다.
  7. STORE => STORE는 최종적으로 데이터를 지정된 경로에 저장하며, HDFS에 데이터를 기록하는 작업을 수행합니다. 이때도 분산 파일 시스템에 데이터를 기록하기 위해 MapReduce 작업이 진행됩니다.

 

Pig 실행 결과

 

 

연간 평균 발생 횟수가 가장 많은 20개의 bigram 출력

 

'Hadoop' 카테고리의 다른 글

[Hadoop] PIG vs HIVE  (1) 2024.10.11
[Hadoop] What is Hive?  (0) 2024.10.05
YARN vs Multi Node Kubernetes  (3) 2024.06.27
[Hadoop] Hadoop vs Spark  (3) 2024.06.08
[Hadoop] What is HDFS ?  (0) 2024.06.01

+ Recent posts