Introduction
멀티 노드 Kafka 클러스터 환경에서 트위터 데이터를 분석하여 비트코인 관련 해시태그의 인기 해시태그를 실시간으로 파악하는 과제를 진행했습니다.
이 과제에서는 Spark RDD Streaming 를 활용하여 실시간 데이터를 처리하는 방식을 시도했습니다.
프로젝트의 주요 목표:
1. Kafka를 통해 트위터 데이터를 스트리밍 방식으로 Spark로 전송
2. 상위 30개의 인기 해시 태그를 실시간으로 계산 및 출력
3. RDD Streaming을 활용한 윈도우 연산 및 슬라이딩 간격 기반 데이터 분석 구현
Kafka Producer : 데이터 ingesting simulation
실제 트위터 스트리밍 환경을 모델링하기 위해 Kafka 프로듀서를 작성하여 데이터를 Kafka 클러스터로 전송했습니다.
주요 구현 사항
- 각 트윗은 타임스탬프 정보(timestamp)를 포함하고 있으며, 이를 기반으로 전송 간격을 설정했습니다.
- Python으로 작성된 Kafka Producer는 time.sleep()을 사용해 트윗 간 전송 간격을 조정했으며, kafka-console-producer 명령을 통해 데이터를 Kafka로 전송했습니다.
시간 간격에 따른 데이터 전송
- 트윗 1(tweet1)을 전송한 후, timestamp2 - timestamp1 간격만큼 대기합니다.
- 이후 트윗 2(tweet2)를 전송하며, 이를 반복해 트윗 데이터를 Kafka에 스트리밍합니다.
이를 통해 Kafka 클러스터는 실제 트위터 스트리밍 환경과 유사한 방식으로 데이터를 수신하고 처리할 수 있습니다.
1) 타임스탬프 변환 (convert_to_seconds 함수)
def convert_to_seconds(ts):
timestamp = time.mktime(time.strptime(ts, "%Y-%m-%d %H:%M:%S"))
return timestamp
2) 전송 간격 계산 및 대기 (sleep_based_on_interval 함수)
def sleep_based_on_interval(current_ts, last_ts):
interval = current_ts - last_ts
if last_ts == 0:
time.sleep(1) # 첫 전송은 1초 대기
elif interval > 0 and last_ts != 0:
time.sleep(interval) # 타임스탬프 간격만큼 대기
3) 주요 흐름 (main 함수)
for line in f:
# 트윗과 타임스탬프 분리
parts = line.rstrip().rsplit(',')
tweet = parts[:-1]
ts = parts[-1]
ts = convert_to_seconds(ts)
# 대기 시간 계산
if last_ts is not None:
sleep_based_on_interval(ts, last_ts)
# Kafka 전송 명령 실행
cmd = 'echo "' + res + '"./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092 --topic q4a'
os.system(cmd)
last_ts = ts
- convert_to_seconds: 타임스탬프를 초 단위로 변환해 대기 시간 계산의 기초를 제공합니다.
- sleep_based_on_interval: 데이터를 실제 발생 시간과 동일한 간격으로 전송하여 현실적인 스트리밍 시뮬레이션을 구현합니다.
- Kafka 메세지 전송: Kafka 클러스터로 데이터를 스트리밍하기 위해 명령어를 동적으로 생성해 전송합니다.
Consumer: Spark RDD Streaming을 활용한 실시간 데이터 처리
Producer를 통해 Kafka로 전송된 데이터를 Spark RDD Streaming을 사용하여 실시간으로 처리를 했습니다. 이때, RDD Streaming은 Kafka에서 데이터를 읽어오고, RDD 형태로 변환하여 해쉬태그를 분석했습니다.
# Kafka에서 데이터 읽어오기
kafkaStream = KafkaUtils.createStream(
ssc, zk_quorum="kafka1:2181,kafka2:2181",
groupId="spark-streaming2",
topics={"q4a": 2}
)
# 트윗에서 해시태그 추출
hashtags = kafkaStream.map(lambda x: re.findall(r"#\w+", x[1])).flatMap(lambda x: x)
# 해시태그 빈도 계산 (5분 윈도우, 2분 슬라이딩)
hashtag_counts = hashtags.map(lambda x: (x, 1)).reduceByKeyAndWindow(
lambda x, y: x + y,
lambda x, y: x - y,
300,
120
)
# 상위 30개 해시태그 출력
hashtag_counts.foreachRDD(lambda rdd: print_top30_hashtags(rdd))
- Kafka에서 데이터 읽기: KafkaUtils.createStream 메서드는 Zookeeper를 통해 Kafka 클러스터와 통신하며, q4a 토픽에서 메시지를 소비합니다.
- 해시태그 추출:
hashtags = kafkaStream.map(lambda x: re.findall(r"#\w+", x[1]).flatMap(lambda x: x))
- kafkaStream은 Kafka에서 스트리밍된 데이터의 RDD입니다. x[1]은 Kafka 메시지의 값(트윗 텍스트)을 의미합니다.
- re.findall(r"#\w+", x[1]): 트윗 텍스트에서 #로 시작하는 단어(해시태그)를 정규 표현식으로 추출합니다.
- flatMap(lambda x: x): re.findall이 반환하는 리스트를 평탄화(flatten)하여 각 해시태그를 개별 아이템으로 만듭니다.
- 해쉬태그 카운트:
hashtag_counts = hashtags.map(lambda x: (x, 1)).reduceByKeyAndWindow(
lambda x, y: x + y, lambda x, y: x - y, 300, 120)
- hashtags.map(lambda x: (x, 1)): 각 해시태그에 대해 (해시태그, 1) 형태의 키-값 쌍을 생성합니다. 즉, 각 해시태그를 처음 만날 때마다 1로 카운트를 시작합니다.
- reduceByKeyAndWindow
- 이 함수는 시간 기반의 윈도우를 적용하여 데이터를 집계합니다.
- 첫 번째 인수 lambda x, y: x + y: 같은 키(해시태그)에 대해 값을 더합니다. 즉, 동일한 해시태그가 나타날 때마다 그 카운트를 증가시킵니다.
- 두 번째 인수 lambda x, y: x - y: 윈도우가 이동하면서 오래된 데이터를 제거할 때 사용하는 함수입니다. 즉, 윈도우 밖의 데이터를 차감합니다.
- 300: 윈도우 크기입니다. 이 값은 300초(5분)로 설정되어 있으므로, 5분 간격으로 해시태그 빈도를 집계합니다.
- 120: 슬라이딩 간격입니다. 120초(2분) 간격으로 윈도우가 슬라이딩됩니다. 즉, 매 2분마다 윈도우가 이동하여 데이터를 새로 반영합니다.
- 이 함수는 시간 기반의 윈도우를 적용하여 데이터를 집계합니다.
- 상위 30개 해시태그 출력:
hashtag_counts.foreachRDD(print_top30_hashtags)
def print_top30_hashtags(rdd):
top30 = rdd.takeOrdered(30, key=lambda x: -x[1])
for tag, count in top30:
print(tag)
- 이 함수는 RDD에서 상위 30개의 해시태그를 추출하는 역할을 합니다.rdd.takeOrdered(30, key=lambda x: -x[1]): rdd에서 카운트 값을 기준으로 내림차순으로 정렬하여 상위 30개의 해시태그를 가져옵니다.
- 각 해시태그와 그 카운트를 출력합니다.



My Review
이번 프로젝트에서 가장 어려웠던 점은 Kafka 프로듀서로 데이터를 실시간 간격에 맞춰 전송했을 때, Spark RDD에서 reduceByKeyAndWindow를 사용하여 데이터를 처리하는 동작 원리를 이해하는 것이었습니다. 처음에는 윈도우와 슬라이딩 간격을 어떻게 설정하고, 그에 따라 데이터가 어떻게 집계되는지 감이 오지 않았습니다.
하지만, reduceByKeyAndWindow는 주어진 시간 동안 데이터를 집계하고, 슬라이딩 간격에 맞춰 새로운 데이터를 계속 반영하는 방식이라는 점을 이해하고 나니 점차 해결할 수 있었습니다. 데이터를 5분 간격으로 집계하고, 2분마다 윈도우가 슬라이딩되는 구조가 어떻게 동작하는지 시각적으로 이해하면서 구현을 마칠 수 있었습니다. 이 과정에서 실시간 데이터 스트리밍 처리 방식에 대해 더 깊이 이해하게 되었습니다.
'과제' 카테고리의 다른 글
| [Kafka] Spark와 Kafka로 실시간 비트코인 해시태그 분석 : Spark Structured Streaming (2) | 2024.12.27 |
|---|---|
| [Kafka] GCP Dataproc을 활용한 멀티 노드 Kafka 클러스터 구축 (2) | 2024.12.02 |
| [Spark] RDD 를 이용한 Page Rank 구현 (2) | 2024.07.01 |
| [Project] Make K-means Clustering Program (0) | 2023.09.02 |
| [Project] Finding frequent item sets (1) | 2023.07.31 |