作者dasea (植栽鸡肉饭)
看板ncyu_phyedu
标题[讨论] 串线与同步处理
时间Sat Jan 30 14:59:44 2010
Program,Process,Thread
在介绍Thread之前,我们必须先把Program和Process这两个观念作一个厘清。
Program:一群程式码的集合,用以解决特定的问题。以物件导向的观念来类比,相当於
Class。
Process:由Program所产生的执行个体,一个Program可以同时执行多次,产生多个Process
。以物件导向的观念来类比,相当於Object。每一个Process又由以下两个东西组成
一个Memory Space。相当於Object的variable,不同Process的Memory Space也不同,彼此
看不到对方的Memory Space。
一个以上的Thread。Thread代表从某个起始点开始(例如main),到目前为止所有函数的呼
叫路径,以及这些呼叫路径上所用到的区域变数。当然程式的执行状态,除了纪录在主记忆
体外,CPU内部的暂存器(如Program Counter, Stack Pointer, Program Status Word等)
也需要一起纪录。所以Thread又由下面两项组成
Stack:纪录函数呼叫路径,以及这些函数所用到的区域变数
目前CPU的状态
由上面的描述中,我们在归纳Thread的重点如下
一个Process可以有多个Thread。
同一个Process内的Thread使用相同的Memory Space,但这些Thread各自拥有其Stack。换
句话说,Thread能透过reference存取到相同的Object,但是local variable却是各自独立
的。
作业系统会根据Thread的优先权以及已经用掉的CPU时间,在不同的Thread作切换,以让各
个Thread都有机会执行。
如何产生Thread
Java以java.lang.Thread这个类别来表示Thread。Class Thread有两个Constructor:
Thread()
Thread(Runnable)
第一个Constrctor没有参数,第二个需要一个Runnable物件当参数。Runnable是一个
interface,定义於java.lang内,其宣告为
public interface Runnable {
public void run();
}
使用Thread()产生的Thread,其进入点为Thread里的run();使用Thread(Runnable)产生的
Thread,其进入点为Runnable物件里的run()。当run()结束时,这个Thread也就结束了;这
和main()结束有相同的效果。其用法以下面范例说明:
public class ThreadExample1 extends Thread {
public void run() { // overwrite Thread's run()
System.out.println("Here is the starting point of Thread.");
for (;;) { // infinite loop to print message
System.out.println("User Created Thread");
}
}
public static void main(String[] argv) {
Thread t = new ThreadExample1(); // 产生Thread物件
t.start(); // 开始执行t.run()
for (;;) {
System.out.println("Main Thread");
}
}
}
以上程式执行後,萤幕上会持续印出"User Created Thread"或"Main Thread"的字样。利
用Runnable的写法如下
public class ThreadExample2 implements Runnable {
public void run() { // implements Runnable run()
System.out.println("Here is the starting point of Thread.");
for (;;) { // infinite loop to print message
System.out.println("User Created Thread");
}
}
public static void main(String[] argv) {
Thread t = new Thread(new ThreadExample2()); // 产生Thread物件
t.start(); // 开始执行Runnable.run();
for (;;) {
System.out.println("Main Thread");
}
}
}
Thread的优先权与影响资源的相关方法
Thread.setPriority(int)可以设定Thread的优先权,数字越大优先权越高。Thread定义了
3个相关的static final variable
public static final int MAX_PRIORITY 10
public static final int MIN_PRIORITY 1
public static final int NORM_PRIORITY 5
要提醒读者的是,优先权高的Thread其占有CPU的机会比较高,但优先权低的也都会有机会
执行到。其他有关Thread执行的方法有:
yield():先让给别的Thread执行
sleep(int time):休息time mini second(1/1000秒)
join():呼叫ThreadA.join()的执行绪会等到ThreadA结束後,才能继续执行
你可以执行下面的程式,看看yield()的效果
public class ThreadExample1 extends Thread {
public void run() { // overwrite Thread's run()
System.out.println("Here is the starting point of Thread.");
for (;;) { // infinite loop to print message
System.out.println("User Created Thread");
yield();
}
}
public static void main(String[] argv) {
Thread t = new ThreadExample1(); // 产生Thread物件
t.start(); // 开始执行t.run()
for (;;) {
System.out.println("Main Thread");
yield();
}
}
}
观看join的效果
public class JoinExample extends Thread {
String myId;
public JoinExample(String id) {
myId = id;
}
public void run() { // overwrite Thread's run()
for (int i=0; i < 500; i++) {
System.out.println(myId+" Thread");
}
}
public static void main(String[] argv) {
Thread t1 = new JoinExample("T1"); // 产生Thread物件
Thread t2 = new JoinExample("T2"); // 产生Thread物件
t1.start(); // 开始执行t1.run()
t2.start();
try {
t1.join(); // 等待t1结束
t2.join(); // 等待t2结束
} catch (InterruptedException e) {}
for (int i=0;i < 5; i++) {
System.out.println("Main Thread");
}
}
}
观看sleep的效果
public class SleepExample extends Thread {
String myId;
public SleepExample(String id) {
myId = id;
}
public void run() { // overwrite Thread's run()
for (int i=0; i < 500; i++) {
System.out.println(myId+" Thread");
try {
sleep(100);
} catch (InterruptedException e) {}
}
}
public static void main(String[] argv) {
Thread t1 = new SleepExample("T1"); // 产生Thread物件
Thread t2 = new SleepExample("T2"); // 产生Thread物件
t1.start(); // 开始执行t1.run()
t2.start();
}
}
Critical Section(关键时刻)的保护措施
如果设计者没有提供保护机制的话,Thread取得和失去CPU控制权的时机是由作业系统来决
定。也就是说Thread可能在执行任何一个机器指令时,被作业系统取走CPU控制权,并交给
另一个Thread。由於某些真实世界的动作是不可分割的,例如跨行转帐X圆由A帐户到B帐户
,转帐前後这两个帐户的总金额必须相同,但以程式来实作时,却无法用一个指令就完成,如
转帐可能要写成下面的这一段程式码
if (A >= X) {
A = A - X; // 翻译成3个机器指令LOAD A, SUB X, STORE A
B = B +X;
}
如果两个Thread同时要存取A,B两帐户进行转帐,假设当Thread one执行到SUBX後被中断
,Threadtwo接手执行完成另一个转帐要求,然後Threadone继续执行未完成的动作,请问这
两个转帐动作正确吗?我们以A=1000,B=0,分别转帐100,200圆来说明此结果
LOAD A // Thread 1, 现在A还是1000
SUB 100 // Thread 1
LOAD A // 假设此时Thread 1被中断,Thread 2接手, 因为Thread 1 还没有执行
STORE A, 所以变数A还是1000
SUB 200 // Thread 2
STORE A // Thread 2, A = 800
LOAD B // Thread 2, B现在是0
ADD 200 // Thread 2
STORE B // B=200
STORE A // Thread 1拿回控制权, A = 900
LOAD B // Thread 1, B = 200
ADD 100 // Thread 1
STORE B // B = 300
你会发现执行完成後A=900,B=300,也就是说银行平白损失了200圆。当然另外的执行顺序
可能造成其他不正确的结果。我们把这问题再整理一下:
写程式时假设指令会循序执行
某些不可分割的动作,需要以多个机器指令来完成
Thread执行时可能在某个机器指令被中断
两个Thread可能执行同一段程式码,存取同一个资料结构
这样就破坏了第1点的假设
因此在撰写多执行绪的程式时,必须特别考虑这种状况(又称为race condition)。Java的
解决办法是,JVM会在每个物件上摆一把锁(lock),然後程式设计者可以宣告执行某一段程
式(通常是用来存取共同资料结构的程式码, 又称为Critical Section)时,必须拿到某物
件的锁才行,这个锁同时间最多只有一个执行绪可以拥有它。
public class Transfer extends Thread {
public static Object lock = new Object();
public static int A = 1000;
public static int B = 0;
private int amount;
public Transfer(int x) {
amount = x;
}
public void run() {
synchronized(lock) { // 取得lock,如果别的thread A已取得,则目前这个
thread会等到thread A释放该lock
if (A >= amount) {
A = A - amount;
B = B + amount;
}
} // 离开synchronized区块後,此thread会自动释放lock
}
public static void main(String[] argv) {
Thread t1 = new Transfer(100);
Thread t2 = new Transfer(200);
t1.start();
t2.start();
}
}
除了synchronized(ref)的语法可以锁定ref指到的物件外,synchronized也可以用在
object method前面,表示要锁定this物件才能执行该方法。以下是Queue结构的范例
public class Queue {
private Object[] data;
private int size;
private int head;
private int tail;
public Queue(int maxLen) {
data = new Object[maxLen];
}
public synchronized Object deQueue() {
Object tmp = data[head];
data[head] = null;
head = (head+1)%data.length;
size--;
return tmp;
}
public synchronized void enQueue(Object c) {
data[tail++] = c;
tail %= data.length;
size++;
}
}
虽然上面的程式正确无误,但并未考虑资源不足时该如何处理。例如Queue已经没有资料了
,却还想拿出来;或是Queue里已经塞满了资料,使用者却还要放进去?我们当然可以使用
Exception Handling的机制:
public class Queue {
private Object[] data;
private int size;
private int head;
private int tail;
public Queue(int maxLen) {
data = new Object[maxLen];
}
public synchronized Object deQueue() throws Exception {
if (size == 0) {
throw new Exception();
}
Object tmp = data[head];
data[head] = null;
head = (head+1)%data.length;
size--;
return tmp;
}
public synchronized void enQueue(Object c) throws Exception {
if (size >= maxLen) {
throw new Exception();
}
data[tail++] = c;
tail %= data.length;
size++;
}
}
但假设我们的执行环境是,某些Thread专门负责读取使用者的需求,并把工作放到Queue里
面,某些Thread则专门由Queue里抓取工作需求做进一步处理。这种架构的好处是,可以把
慢速或不定速的输入(如透过网路读资料,连线速度可能差很多),和快速的处理分开,可使
系统的反应速度更快,更节省资源。那麽以Exceptoin来处理Queue空掉或爆掉的情况并不
合适,因为使用Queue的人必须处理例外状况,并不断的消耗CPU资源:
public class Getter extends Thread {
Queue q;
public Getter(Queue q) {
this.q = q;
}
public void run() {
for (;;) {
try {
Object data = q.deQueue();
// processing
} catch(Exception e) {
// if we try to sleep here, user may feel slow response
// if we do not sleep, CPU will be wasted
}
}
}
}
public class Putter extends Thread {
Queue q;
public Putter(Queue q) {
this.q = q;
}
public void run() {
for (;;) {
try {
Object data = null;
// get user request
q.enQueue(data);
} catch(Exception e) {
// if we try to sleep here, user may feel slow response
// if we do not sleep, CPU will be wasted
}
}
}
}
public class Main {
public static void main(String[] argv) {
Queue q = new Queue(10);
Getter r1 = new Getter(q);
Getter r2 = new Getter(q);
Putter w1 = new Putter(q);
Putter w2 = new Putter(q);
r1.start();
r2.start();
w1.start();
w2.start();
}
}
为了解决这类资源分配的问题,Java Object提供了下面三个method:
wait():使呼叫此方法的Thread进入Blocking Mode,并设为等待该Object, 呼叫wait()时
, 该Thread必须拥有该物件的lock。Blocking Mode下的Thread必须释放所有手中的lock,
并且无法使用CPU。
notifyAll():让等待该Object的所有Thread进入Runnable Mode。
notify():让等待该Object的某一个Thread进入Runnable Mode。
所谓Runnable Mode是指该Thread随时可由作业系统分配CPU资源。Blocking Mode表示该
Thread正在等待某个事件发生,作业系统不会让这种Thread取得CPU资源。前一个Queue的
范例就可以写成:
public class Queue {
private Object[] data;
private int size;
private int head;
private int tail;
public Queue(int maxLen) {
data = new Object[maxLen];
}
public synchronized Object deQueue() {
while (size==0) { // When executing here, Thread must have got lock
and be in running mode
// Let current Thread wait this object(to sleeping mode)
try {
wait(); // to sleeping mode, and release all lock
} catch(Exception ex) {};
}
Object tmp = data[head];
data[head] = null;
head = (head+1)%data.length;
if (size==data.length) {
// wake up all Threads waiting this object
notifyAll();
}
size--;
return tmp;
} // release lock
public synchronized void enQueue(Object c) {
while (size==data.length) { // When executing here, Thread must have
got lock and be in running mode
// Let current thread wait this object(to sleeping mode)
try {
wait(); // to sleeping mode, and release all lock
} catch(Exception ex) {};
}
data[tail++] = c;
tail %= data.length;
size++;
if (size==1) {
// wake up all Threads waiting this object
notifyAll();
}
}
}
public class ReaderWriter extends Thread {
public static final int READER = 1;
public static final int WRITER = 2;
private Queue q;
private int mode;
public void run() {
for (int i=0; i < 1000; i++) {
if (mode==READER) {
q.deQueue();
} else if (mode==WRITER) {
q.enQueue(new Integer(i));
}
}
}
public ReaderWriter(Queue q, int mode) {
this.q = q;
this.mode = mode;
}
public static void main(String[] args) {
Queue q = new Queue(5);
ReaderWriter r1, r2, w1, w2;
(w1 = new ReaderWriter(q, WRITER)).start();
(w2 = new ReaderWriter(q, WRITER)).start();
(r1 = new ReaderWriter(q, READER)).start();
(r2 = new ReaderWriter(q, READER)).start();
try {
w1.join(); // wait until w1 complete
w2.join(); // wait until w2 complete
r1.join(); // wait until r1 complete
r2.join(); // wait until r2 complete
} catch(InterruptedException epp) {
}
}
}
Multiple Reader-Writer Monitors
上一节的Queue资料结构,不论是enQueue()或deQueue()都会更动到Queue的内容。而在许
多应用里,资料结构可以允许同时多个读一个写。本节举出几个不同的例子,说明多个
Reader-Writer时的可能排程法。
Single Reader-Writer, 只同时允许一个执行绪存取
public class SingleReaderWriter {
int n; // number of reader and write, 0 or 1
public synchronized void startReading() throws InterruptedException {
while (n != 0) {
wait();
}
n = 1;
}
public synchronized void stopReading() {
n = 0;
notify();
}
public synchronized void startWriting() throws InterruptedException {
while (n != 0) {
wait();
}
n = 1;
}
public synchronized void stopWriting() {
n = 0;
notify();
}
}
// 这是一个使用范例, 程式能否正确执行要靠呼叫正确的start和stop
public class WriterThread extends Thread {
SingleReaderWriter srw;
public WriterThread(SingleReaderWriter srw) {
this.srw = srw;
}
public void run() {
startWring();
// insert real job here
stopWriting();
}
}
public class ReaderThread extends Thread {
SingleReaderWriter srw;
public ReaderThread(SingleReaderWriter srw) {
this.srw = srw;
}
public void run() {
startReading();
// insert real job here
stopReading();
}
}
public class Test {
public static void main(String[] argv) {
SingleReaderWriter srw = new SingleReaderWriter;
// create four threads
(new WriterThread(srw)).start();
(new WriterThread(srw)).start();
(new ReaderThread(srw)).start();
(new ReaderThread(srw)).start();
}
}
其他可能的策略实作如下:
Reader优先:
public class ReadersPreferredMonitor {
int nr; // The number of threads currently reading, nr > = 0
int nw; // The number of threads currently writing, 0 or 1
int nrtotal; // The number of threads either reading or waiting to read,
nrtotal > = nr
int nwtotal; // The number of threads either writing or waiting to write
public synchronized void startReading() throws InterruptedException {
nrtotal++; // 想要read的thread又多了一个
while (nw != 0) { // 还有write thread正在write
wait();
}
nr++; // 正在读的thread多了一个
}
public synchronized void startWriting() throws InterruptedException {
nwtotal++; // 想要写的thread又多了一个
while (nrtotal+nw != 0) { // 只要有thread想要读,或是有thread正在写,礼
让
wait();
}
nw = 1;
}
public synchronized void stopReading() {
nr--; // 正在读的少一个
nrtotal--; // 想要读的少一个
if (nrtotal == 0) { // 如果没有要读的,叫醒想写的
notify();
}
}
public synchronized void stopWriting() {
nw = 0; // 没有thread正在写
nwtotal--; // 想写的少一个
notifyAll(); // 叫醒所有想读和想写的
}
}
Writer优先:
public class WritersPreferredMonitor {
int nr; // The number of threads currently reading, nr > = 0
int nw; // The number of threads currently writing, 0 or 1
int nrtotal; // The number of threads either reading or waiting to read,
nrtotal > = nr
int nwtotal; // The number of threads either writing or waiting to write
public synchronized void startReading() throws InterruptedException {
nrtotal++; // 想要read的thread又多了一个
while (nwtotal != 0) { // 还有thread想要write
wait();
}
nr++; // 正在读的thread多了一个
}
public synchronized void startWriting() throws InterruptedException {
nwtotal++; // 想要写的thread又多了一个
while (nr+nw != 0) { // 有thread正在读,或是有thread正在写
wait();
}
nw = 1;
}
public synchronized void stopReading() {
nr--; // 正在读的少一个
nrtotal--; // 想要读的少一个
if (nr == 0) { // 如果没有正在读的,叫醒所有的(包括想写的)
notifyAll();
}
}
public synchronized void stopWriting() {
nw = 0; // 没有thread正在写
nwtotal--; // 想写的少一个
notifyAll(); // 叫醒所有想读和想写的
}
}
Reader和Writer交互执行:
public class AlternatingReadersWritersMonitor {
int[] nr = new int[2]; // The number of threads currently reading
int thisBatch; // Index in nr of the batch of readers currently reading(0
or 1)
int nextBatch = 1; // Index in nr of the batch of readers waitin to
read(always 1-thisBatch)
int nw; // The number of threads currently writing(0 or 1)
int nwtotal; // The number of threads either writing or waiting to write
public synchronized void startReading() throws InterruptedException {
if (nwtotal == 0) { // 没有thread要write, 将reader都放到目前要处理的这
一批
nr[thisBatch]++;
} else {
nr[nextBatch]++;
int myBatch = nextBatch;
while (thisBatch != myBatch) {
wait();
}
}
}
public synchronized void stopReading() {
nr[thisBatch]--;
if (nr[thisBatch] == 0) { // 目前这批的reader都读完了,找下一个writer
notifyAll();
}
}
public synchronized void startWriting() throws InterruptedException {
nwtotal++;
while (nr[thisBatch]+nw != 0) { // 目前这批还没完,或有thread正在写
wait();
}
nw = 1;
}
public synchronized void stopWriting() {
nw = 0;
nwtotal--;
int tmp = thisBatch; // 交换下一批要读的
thisBatch = nextBatch;
nextBatch = tmp;
notifyAll();
}
}
给号依序执行
public class TakeANumberMonitor {
int nr; // The number of threads currently reading
int nextNumber; // The number to be taken by the next thread to arrive
int nowServing; // The number of the thread to be served next
public synchronized void startReading() throws InterruptedException {
int myNumber = nextNumber++;
while (nowServing != myNumber) { // 还没轮到我
wait();
}
nr++; // 多了一个Reader
nowServing++; // 准备检查下一个
notifyAll();
}
public synchronized void startWriting() throws InterruptedException {
int myNumber = nextNumber++;
while (nowServing != myNumber) { // 还没轮到我
wait();
}
while (nr > 0) { // 要等所有的Reader结束
wait();
}
}
public synchronized void stopReading() {
nr--; // 少了一个Reader
if (nr == 0) {
notifyAll();
}
}
public synchronized void stopWriting() {
nowServing++; // 准备检查下一个
notifyAll();
}
}
--
※ 发信站: 批踢踢实业坊(ptt.cc)
◆ From: 61.58.22.74