1) RDD in Spark
- Spark 의 데이터 구조는 크게 RDD, Dataframe, Dataset 세 종류로 구분
- 이 중 RDD는 Spark 1.0부터 도입되었던 가장 기초적인 데이터 구조
- RDD 는 Resillient Distributed Data 로 "회복력 있는 분산 데이터"라고 해석 될 수 있다.

RDD는 Read Only라는 특성이 있어 특정 동작을 위해서는 새로운 RDD를 생성하여 Spark 내의 연산에 있어 수많은 RDD들이 생성됩니다. 이때 생성되는 RDD들은 DAG의 형태를 가집니다. DAG 형태를 가지고 있어서 RDD 관련 정보가 메모리에서 유실되었을 경우, 그래프를 복기하여 다시 자동으로 복구할 수 있습니다. 이러한 특성 때문에 Fault-tolerant를 보장하는 강력한 기능을 가지고 있습니다.
또한 RDD의 동작 원리 중 핵심은 Lazy Evaluation 입니다. 즉, 즉시 실행하지 않고 Action 연산자를 만나기 전 까지는, Transformation 연산자가 아무리 쌓여도 처리하지 않습니다. 이는 Hadoop의 Map Reduce 동작과 대조적이기 때문에 Spark는 간단한 operation들에 대한 성능적 이슈를 고려하지 않아도 된다는 장점을 가지고 있습니다.
=> Spark는 연산을 지연시키기 때문에 중복 연산을 제거하거나, 최적의 실행 계획을 수립 가능 반면에 Map Reduce의 경우 작업이 즉시 실행 되어 최적화가 어렵습니다.
2) Page Rank 가 어떤 식으로 Spark RDD로 구현되는가 ?
rdd = sc.textFile("hdfs://cluster-d37a-m/test/web-Google.txt")
links_with_rank = rdd.map(lambda line: tuple(line.split("\t"))).distinct()
links_grouped = links_with_rank.map(lambda x: (x[0], [x[1]])).reduceByKey(lambda x, y: x + y)
ranks = links_grouped.map(lambda x: (x[0], 1.0))
joined_rdd = links_grouped.join(ranks)
itr = 10
for _ in range(itr):
contribs = joined_rdd.values().flatMap(lambda x: [(url, x[1] / len(x[0])) for url in x[0]])
ranks = contribs.reduceByKey(lambda x, y: x + y).mapValues(lambda x: 0.15 + 0.85 * x)
joined_rdd = links_grouped.join(ranks)
top_100_nodes = ranks.takeOrdered(100, key=lambda x: -x[1])
for node in top_100_nodes:
print(node)
1. 데이터 로드
sc.textFile을 사용하여 HDFS에 저장된 텍스트 파일을 RDD로 로드합니다.
이 RDD는 파일의 각 줄을 하나의 요소로 포함합니다.
2. 링크 파싱 및 중복 제거
links_with_rank = rdd.map(lambda line: tuple(line.split("\t"))).distinct()
=> 각 줄을 탭 문자로 분리하여 (출발 노드, 도착 노드) 형태의 튜플로 변환합니다. 그리고 중복된 링크를 제거합니다.
3. 링크 그룹화
links_grouped = links_with_rank.map(lambda x: (x[0], [x[1]])).reduceByKey(lambda x, y: x + y)
=> 각 출발 노드를 키로 하여 연결된 도착 노드 리스트를 값으로 갖는 RDD를 생성합니다.
4. 링크와 랭크 조인
joined_rdd = links_grouped.join(ranks)
=> 링크 구조와 초기 랭크를 조인하여 (출발 노드, (도착 노드 리스트, 랭크)) 형태의 RDD를 생성합니다.
5. PageRank 계산
각 반복(itr)에서:
- 기여도 계산: 각 페이지의 랭크를 해당 페이지로부터 연결된 모든 페이지에 균등하게 분배하여 기여도를 계산합니다.
- 랭크 업데이트: 기여도를 합산하여 새로운 랭크를 계산합니다. 이때 damping factor(0.85)를 적용하여 새로운 랭크를 산출합니다.
- 랭크와 링크 조인: 업데이트된 랭크와 기존 링크 구조를 다시 조인합니다.
6. 최종 랭크 정렬
top_100_nodes = ranks.takeOrdered(100, key=lambda x: -x[1])
=> takeOrdered는 행동 연산(action)으로, 이 시점에서 모든 이전의 변환 연산이 실행됩니다.
3) Pre-partition mechanism 을 사용하여 셔플링 오버헤드 줄이기
rdd = sc.textFile("hdfs://cluster-d37a-m/test/web-Google.txt", minPartitions = 2)
로 설정하면 미리 파티 션 수를 조정 할 수 있다. 이때 나는 파티션을 2,5,10으로 설정해보았고 2로 설정했을 때 7분, 5로 설정했을 때 4분 30초, 10으로 설정했을 때 6분이 걸렸다.
- 적은 파티션 (2개) : 파티션 수가 적으면 각 파티션당 데이터 양이 많아질 수 있습니다. 이는 셔플링 오버헤드가 줄어들지만, 파티션당 작업 부하가 많아질 수 있어 처리 시간이 길어질 수 있다.
- 중간 파티션 (5개) : 중간 정도의 파티션 수는 작업 부하와 셔플링 오버헤드를 적절히 균형잡을 수 있습니다. 보통 이런 경우가 가장 좋은 성능을 보였다.
- 많은 파티션 (10개) : 파티션 수가 많으면 각 파티션당 데이터 양이 줄어들어 작업 부하가 감소할 수 있습니다. 하지만 많은 셔플링 데이터가 발생할 수 있어 이로 인한 오버헤드가 발생할 수 있습니다.
이러한 결과를 통해 특정 데이터셋과 클러스터 환경에서 가장 적합한 파티션 수를 결정할 수 있다. 파티션 수가 클 수록 일반적으로 성능이 좋아지지만, 과도한 셔플링 오버헤드가 발생할 수 있음에 유의해야 겠다..
4) What is Page Rank?
https://guswns00123.tistory.com/18
[Algorithm] What is the page rank?
1. Page rank 란? Page rank란 페이지의 중요도를 웹페이지 간 연결 관계에 기반을 두고 측정한 지표이다. Figure1 은 웹페이지들을 노드로 취급하고 화살표가 가르키는 방향은 해당 웹페이지에서 가르
guswns00123.tistory.com
'과제' 카테고리의 다른 글
| [Kafka] Spark와 Kafka로 실시간 비트코인 해시태그 분석 : RDD 활용 (3) | 2024.12.08 |
|---|---|
| [Kafka] GCP Dataproc을 활용한 멀티 노드 Kafka 클러스터 구축 (2) | 2024.12.02 |
| [Project] Make K-means Clustering Program (0) | 2023.09.02 |
| [Project] Finding frequent item sets (1) | 2023.07.31 |
| [Project] Make Community Detection in Online Social Networks (0) | 2023.07.24 |