1. SON Algorithm이란 ? 

SON Algorithm은 baskets을 작은 chunk로 나누어 모든 frequent itemset을 찾는 것을 도와주는 Algorithm이다. 이때 이 Algorithm은 distributed/ parallel computing에 적합해 MapReduce를 활용하여 쓰이기 좋은 구조이다. 그래서 이 알고리즘을 두 MapReduce job으로 설명해 볼 것이다.

 

1st MapReduce

먼저 1st MapReduce job의 목표는 Candidate itemsets을 생성하는 것이 목표이다.

 

Mapper for Job 1

    - A-priori Algorithm을 chunk에 적용해 s(support) 값을 넘는지 check하기

    - 각 chunk에서 frequent itemsets을 만들어 (F, c) 형태로 output하기 (F => key (itemset) c => value (count) )

 

Reducer for Job 1

    - 2nd MapReduce에서 활용할 candidate itemset을 output하기

    - 이때 받아온 (F, c) 형태에서 count 값은 버리고 오직 itemset인 F만 output

 

 

2nd MapReduce

2nd MapReduce job의 목표는 true frequent itemsets을 찾는 것이 목표이다.

 

Mapper for Job 2

    -  1st MapReduce에서 만든 candidate itemset을 이용하여 각 chunk의 frequency count하기

 

Reducer for Job 2

    - Mapper of job 2에서 생성한 output들을 종합하여 count를 집계하기

    - 그후 support(s) 값보다 작은 값들은 제외시켜 output하기

 

2. How to use SON Algorithm ?

 

1st Mapper

for line in sys.stdin:
    line = line.strip()
    word_list = line.split(' ')
    total_basket += 1
    word_list = list(set(word_list))
    basket_list.append(word_list)
    for word in word_list:
        if word in item:
            item[word] += 1
        else:
            item[word] = 1

s = ths * total_basket

for word, cnt in item.items():
    if cnt >= s:
        check_item[word] = cnt

check_pairs = {}

for basket in basket_list:
    for i in range(len(basket)-1):
        for j in range(i+1, len(basket)):
            if (basket[i] in check_item) and (basket[j] in check_item):
                if (basket[i] <= basket[j]):
                    pair = basket[i] + " " + basket[j]
                else : pair = basket[j] + " " + basket[i]
                if pair in check_pairs:
                    check_pairs[pair] += 1
                else:
                    check_pairs[pair] = 1

for pair, cnt in check_pairs.items():
    if (cnt >= s):
        print("%s\t%s" %(pair, cnt))

 

1st Reducer

cur_pair = None
for line in sys.stdin:
    line = line.strip()
    pair, count = line.split('\t')
    if pair != cur_pair:
        if cur_pair is not None:
            print ("%s" % (cur_pair))
        cur_pair = pair

print("%s" % (cur_pair))

 

Fig. Result of 1st MapReduce Job

 

2nd Mapper

check_pairs = {}
file = open("pair_list", "r")
for line in file:
    pair = line.strip()
    check_pairs[pair] = 0

for line1 in sys.stdin:
    line1 = line1.strip()
    word_list = line1.split(' ')
    word_list = list(set(word_list))
    #print(word_list)
    for i in range(len(word_list) - 1):
        for j in range(i+1, len(word_list)):
            if (word_list[i] <= word_list[j]):
                pair = word_list[i] + " " + word_list[j]
            else : pair = word_list[j] + " " + word_list[i]
            #print(pair)
            if pair in check_pairs:
                check_pairs[pair] +=1
                

for pair, cnt in check_pairs.items():
    print("%s\t%s" % (pair, cnt))

2nd Reducer

cur_pair = None
total = 0
ths = 0.01
total_basket = 4984636
s = ths * total_basket

for line in sys.stdin:
    line = line.strip()
    pair, cnt = line.split("\t")
    if pair != cur_pair:
        if cur_pair is not None:
            if total >= s:
                print("%s\t%s" % (cur_pair, total))
        cur_pair = pair
        total = int(cnt)
    else: total += int(cnt)

print("%s\t%s" % (cur_pair, total))

 

Fig2. Result of 2nd MapReduce Job

+ Recent posts