作者lihgong (人生,是一句引用句)
看板MATLAB
標題[心得] Distributed Computing Wrapper
時間Sat Jul 14 10:55:48 2007
一開始寫分散式處理程式的時候, 常常覺得會把 code 弄得很亂
因為要連線到 job-manager, 要指定 job 和裡面的 tasks
工作丟進去算, 算完還不一定對, 有錯也不會自動重算
整體來講, Matlab提供的 Distributed Computing Toolbox
直接要拿來用, 真的不怎麼好用 ^^"
經過一陣子嘗試, 我把要丟給 Distributed Computing 的任務
用一個 wrapper (包裝器) 包裝起來, 上面第一段提到的細節全部都寫在 wrapper
程式碼也變得比較精簡
wrapper的內容, 對於需要跟 Dist-Computing toolbox 打交道的朋友
應該也是有幫助
----
要丟到 wrapper 的程式, 必須寫成特定的形式
一般有三個步驟.
1. 準備傳給 function 的變數
2. 呼叫 wrapper 計算
3. 處理計算結果
首先, 假如我要運算的 function 是 ret = fun(a, b);
那麼 fun() 有兩個輸入和一個輸出, 假設我希望計算下列的結果...
ret(1) = fun(1, 10);
ret(2) = fun(2, 10);
...
ret(10) = fun(10, 10);
----
step.1 準備傳給 function 的變數
把要傳入 fun() 的傳入值用 cell-array 包裝起來
input = { {[1] [10]}, {[2] [10]}, ... {10, 10} };
* input 本身是一個大的 cell array
* input 裡面有很多個小的 cell-array, eg. { [1] [10] }
* 小 cell-array 裡, 每個元素, 分別對應到 function 的輸入參數
* 以 { [1] [10] } 來說, 到時候會算 fun([1], [10])
概念上, 先把所有的 input 全部都準備好
用 Matlab 的術語來說, input 變數在描述 "job"
input 變數裡每個小 cell, 在描述每個 "task"
step2. 呼叫 wrapper 計算
fhandle = @fun; % 告訴 wrapper 要算的 function-name
num_of_outputs = 1; % 告訴 wrapper 函數輸出的數目
is_destroy = 1; % 要不要清除在 jobmanager 上的運算結果
ret = dist_computing_wrapper_jobs( ...
fhandle, num_of_outputs, input, is_destroy);
step3. 處理計算結果
下面舉例, 把10次算完的結果畫成 10 張圖, 其他 case 依樣畫葫蘆即可
for ret_index=1:10
figure; plot( ret{ret_index} );
end
----
已知問題.
1. 這個 wrapper 會自動把算錯的 task 重新執行, 如果原本程式就寫錯
執行過程中本來就會錯誤, 那套用這個 wrapper 會引發無窮迴圈
2. 下面標示藍色的程式區塊, 請按照需要設定
----
function ret = dist_computing_wrapper_jobs(fhandle, num_of_outputs, input, is_destroy)
%% 連線到 job manager
jobmanager_name = 'jobmanager_xeon';
jobmanager_host = 'xeon';
jm = findResource( ...
'scheduler', 'type', 'jobmanager', ...
'name',jobmanager_name, ...
'LookupURL',jobmanager_host);
%% 建立 jobs 和 tasks
[len_job len_task] = size(input);
jobs = [];
for job_index=1:len_job
% 準備 jobs 並且丟去給他算
%% 建立 job 和裡面的 task
j = createJob(jm);
% 描述 job 裡的 tasks
for index_task = 1:len_task
createTask(j, fhandle, num_of_outputs, input(job_index, index_task));
end
% 設定檔案分享
set(j, 'FileDependencies', {'.' '../delsig'});
set(j, 'PathDependencies', {'.' '../delsig'});
% 設定 worker 在第一次執行 job 裡的 task 時, reset
set(j, 'RestartWorker', true);
% DEBUG 看 錯誤訊息
alltasks = get(j, 'Tasks');
set(alltasks, 'CaptureCommandWindowOutput', true);
% 提交 job 並執行
submit(j);
jobs = [jobs; j];
fprintf('Job #%d submitted\n', job_index);
end
%% 等所有jobs執行結束 & 取回執行結果
ret = [];
for job_index=1:len_job
waitForState(jobs(job_index));
% 檢查是不是所有 job 都正確
tasks = findTask(jobs(job_index));
error_task = [];
for task_index=1:length(tasks)
if(isempty(tasks(task_index).ErrorIdentifier) == 0) % Error task
error_task = [error_task task_index];
end
end
ret_currjob = getAllOutputArguments(jobs(job_index)); % 取回執行結果
if(isempty(error_task) == 0) % 有錯誤的結果
fprintf('執行有碰到錯誤, 自動重算\n');
input_redo = input(error_task);
ret_currjob_redo = dist_computing_wrapper_jobs(fhandle, num_of_outputs, input_redo, is_destroy);
for error_task_index=1:length(error_task)
ret_currjob(error_task(error_task_index)) = ret_currjob_redo(error_task_index);
end
end
fprintf('Job #%d completed ^+++^\n', job_index);
ret = [ret; ret_currjob];
end
if(is_destroy)
destroy(jobs)
end
--
※ 發信站: 批踢踢實業坊(ptt.cc)
◆ From: 140.113.236.3
※ 編輯: lihgong 來自: 140.113.236.3 (07/14 10:57)