作者lihgong (人生,是一句引用句)
看板MATLAB
标题[心得] Distributed Computing Wrapper
时间Sat Jul 14 10:55:48 2007
一开始写分散式处理程式的时候, 常常觉得会把 code 弄得很乱
因为要连线到 job-manager, 要指定 job 和里面的 tasks
工作丢进去算, 算完还不一定对, 有错也不会自动重算
整体来讲, Matlab提供的 Distributed Computing Toolbox
直接要拿来用, 真的不怎麽好用 ^^"
经过一阵子尝试, 我把要丢给 Distributed Computing 的任务
用一个 wrapper (包装器) 包装起来, 上面第一段提到的细节全部都写在 wrapper
程式码也变得比较精简
wrapper的内容, 对於需要跟 Dist-Computing toolbox 打交道的朋友
应该也是有帮助
----
要丢到 wrapper 的程式, 必须写成特定的形式
一般有三个步骤.
1. 准备传给 function 的变数
2. 呼叫 wrapper 计算
3. 处理计算结果
首先, 假如我要运算的 function 是 ret = fun(a, b);
那麽 fun() 有两个输入和一个输出, 假设我希望计算下列的结果...
ret(1) = fun(1, 10);
ret(2) = fun(2, 10);
...
ret(10) = fun(10, 10);
----
step.1 准备传给 function 的变数
把要传入 fun() 的传入值用 cell-array 包装起来
input = { {[1] [10]}, {[2] [10]}, ... {10, 10} };
* input 本身是一个大的 cell array
* input 里面有很多个小的 cell-array, eg. { [1] [10] }
* 小 cell-array 里, 每个元素, 分别对应到 function 的输入参数
* 以 { [1] [10] } 来说, 到时候会算 fun([1], [10])
概念上, 先把所有的 input 全部都准备好
用 Matlab 的术语来说, input 变数在描述 "job"
input 变数里每个小 cell, 在描述每个 "task"
step2. 呼叫 wrapper 计算
fhandle = @fun; % 告诉 wrapper 要算的 function-name
num_of_outputs = 1; % 告诉 wrapper 函数输出的数目
is_destroy = 1; % 要不要清除在 jobmanager 上的运算结果
ret = dist_computing_wrapper_jobs( ...
fhandle, num_of_outputs, input, is_destroy);
step3. 处理计算结果
下面举例, 把10次算完的结果画成 10 张图, 其他 case 依样画葫芦即可
for ret_index=1:10
figure; plot( ret{ret_index} );
end
----
已知问题.
1. 这个 wrapper 会自动把算错的 task 重新执行, 如果原本程式就写错
执行过程中本来就会错误, 那套用这个 wrapper 会引发无穷回圈
2. 下面标示蓝色的程式区块, 请按照需要设定
----
function ret = dist_computing_wrapper_jobs(fhandle, num_of_outputs, input, is_destroy)
%% 连线到 job manager
jobmanager_name = 'jobmanager_xeon';
jobmanager_host = 'xeon';
jm = findResource( ...
'scheduler', 'type', 'jobmanager', ...
'name',jobmanager_name, ...
'LookupURL',jobmanager_host);
%% 建立 jobs 和 tasks
[len_job len_task] = size(input);
jobs = [];
for job_index=1:len_job
% 准备 jobs 并且丢去给他算
%% 建立 job 和里面的 task
j = createJob(jm);
% 描述 job 里的 tasks
for index_task = 1:len_task
createTask(j, fhandle, num_of_outputs, input(job_index, index_task));
end
% 设定档案分享
set(j, 'FileDependencies', {'.' '../delsig'});
set(j, 'PathDependencies', {'.' '../delsig'});
% 设定 worker 在第一次执行 job 里的 task 时, reset
set(j, 'RestartWorker', true);
% DEBUG 看 错误讯息
alltasks = get(j, 'Tasks');
set(alltasks, 'CaptureCommandWindowOutput', true);
% 提交 job 并执行
submit(j);
jobs = [jobs; j];
fprintf('Job #%d submitted\n', job_index);
end
%% 等所有jobs执行结束 & 取回执行结果
ret = [];
for job_index=1:len_job
waitForState(jobs(job_index));
% 检查是不是所有 job 都正确
tasks = findTask(jobs(job_index));
error_task = [];
for task_index=1:length(tasks)
if(isempty(tasks(task_index).ErrorIdentifier) == 0) % Error task
error_task = [error_task task_index];
end
end
ret_currjob = getAllOutputArguments(jobs(job_index)); % 取回执行结果
if(isempty(error_task) == 0) % 有错误的结果
fprintf('执行有碰到错误, 自动重算\n');
input_redo = input(error_task);
ret_currjob_redo = dist_computing_wrapper_jobs(fhandle, num_of_outputs, input_redo, is_destroy);
for error_task_index=1:length(error_task)
ret_currjob(error_task(error_task_index)) = ret_currjob_redo(error_task_index);
end
end
fprintf('Job #%d completed ^+++^\n', job_index);
ret = [ret; ret_currjob];
end
if(is_destroy)
destroy(jobs)
end
--
※ 发信站: 批踢踢实业坊(ptt.cc)
◆ From: 140.113.236.3
※ 编辑: lihgong 来自: 140.113.236.3 (07/14 10:57)