作者yauhh (哟)
看板Cloud
标题Re: MapReduce (Re: [情报] 利用智慧型手机建构云端
时间Mon Aug 30 20:51:00 2010
※ 引述《yauhh (哟)》之铭言:
: 我想MapReduce是个model,不一定有个完整的底层平台.
: 既然是programming model,有可能从一开始写程式就可以顺着这个心法做.
: 从档案文件处理出发,首先要从档案系统取得档案清单. 这是map:
: TakeFiles(folder, nil):
当时我谈MapReduce framework,同时在另一方面想,各种程式的写法如何表达成
map-reduce的组合,也是个问题. 由於目前还没把自己的framework做好,
现在有个想法只能用嘴上提一提.
在此要谈的是map-reduce版本的sort. 我用Erlang语言来说明.
一般Quick sort是最令人满意的排序了. Qsort递回定义为以下程式:
qsort([]) ->
[];
qsort([X|Xs]) ->
qsort([ Y || Y <- Xs, Y < X ]) ++ [X] ++ qsort([ Z || Z <- Xs, Z >= X]).
是在资料结构上,有个分解->递回->结合的处理过程.
在多运算单元的场合,分散式排序dsort是两个map. 第一个map是
distribute(List, List):
for all Number in List,
emit(Number, to-be-Nth(number, List)).
拿到一列可容许长度的资料,对每个资料项目number取它应该在排序後的第几个位置.
to-be-Nth函数找法很简单,在预定排序是由小到大的前提之後,只要求List中有多少个
小於number的数字,将个数加一即为结果.
第二个map是
write(Skip, List<Number, Nth>):
for all Number, Nth in List<Number, Nth>
write Number at position #Nth
no_emit()
这map稍微改一下map的规格,使map可能送出一对资料,或者可能不送出资料.
这个map, write函数,是拿到一些可容许数量的数对,分别代表资料与排序位置,
它就把资料写到指定的位置. 在此是引用档案系统可以随机写入的假设.
Dsort程式,对应为以上第一个map ( distribute(List,List) ), 具体如下:
dsort(Xs) ->
L = length(Xs),
[ spawn(?MODULE, sort_node, [self(), Nth, Xs])
|| Nth <- lists:seq(1, length(Xs)) ],
wait([], L).
主要程式先对每个资料项目,做map到 sort_node 函数的动作. Erlang语言是内建
分散式系统讯息传递定义的语言,同样一个spawn内建函数除了可以分生出程序之外,
也可以用以对其他计算点(别台电脑)指定要分生出程序. 以上dsort函数先分出很多
程序,分出程序同时给了所需的资料参数,接着就等待每个程序传讯息回来.
等待L个讯息回来的wait函数定义如下:
wait(State, 0) ->
State;
wait(State, N) ->
receive
{Kth, Y} -> State1 = [{Kth,Y}|State]
end,
wait(State1, N-1).
然後sort_node定义为上述的第二个map ( write(Skip,List<Number,Nth>) ),
每一个sort_node的instance只送回一项数对.
kth_sort(Nth, Xs) ->
length([ Y || Y <- Xs, Y < lists:nth(Nth, Xs) ]) + 1.
sort_node(FromID, Nth, Xs) ->
FromID ! { lists:nth(Xs), kth_sort(Nth, Xs) }.
测试环境 Erlang R13B02, CPU Via 700 (大概吧,HP mini-note 2133/Xubuntu 10.4)
使用timer内建模组的 tc/3 函数做执行时间测试:
> timer:tc(sort, qsort, [lists:seq(1,200)]).
{31428, ... } 31428微秒
> timer:tc(sort, dsort, [lists:seq(1,200)]).
{12691, ... } 12691微秒
> timer:tc(sort, qsort, [lists:seq(1,200)]).
{25120, ... } 25120微秒
> timer:tc(sort, dsort, [lists:seq(1,200)]).
{7582, ... } 7582微秒
另外在四核心平台测的结果可能大不相同,不过,将dsort/1最後的wait([], L)删除,
执行时间也是大幅删减到比qsort/1远远较小. 而sort_node/3是在许多个分出来的
程序中同时进行,这种平行本身就节省了大量的计算时间.
这表示,我可以将程式写成:第一个map送讯息叫第二个map分别处理每个单元,
而第二个map不必将结果带回给第一个map. 而是由第二个map按照资料的新位置写到
档案系统中. 如此以完成分散式排序.
这种排序方法其实跟Quick sort比较,没有很强的意义. 应该可以跟同是外部排序的
Merge sort好好比比看.
--
※ 发信站: 批踢踢实业坊(ptt.cc)
◆ From: 218.160.115.187