PipedWriter和PipedReader源碼分析
1. PipedWriter 源碼(基于jdk1.7.40)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
package java.io; public class PipedWriter extends Writer { // 與PipedWriter通信的PipedReader對象 private PipedReader sink; // PipedWriter的關閉標記 private boolean closed = false ; // 構造函數(shù),指定配對的PipedReader public PipedWriter(PipedReader snk) throws IOException { connect(snk); } // 構造函數(shù) public PipedWriter() { } // 將“PipedWriter” 和 “PipedReader”連接。 public synchronized void connect(PipedReader snk) throws IOException { if (snk == null ) { throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException( "Already connected" ); } else if (snk.closedByReader || closed) { throw new IOException( "Pipe closed" ); } sink = snk; snk.in = - 1 ; snk.out = 0 ; // 設置“PipedReader”和“PipedWriter”為已連接狀態(tài) // connected是PipedReader中定義的,用于表示“PipedReader和PipedWriter”是否已經連接 snk.connected = true ; } // 將一個字符c寫入“PipedWriter”中。 // 將c寫入“PipedWriter”之后,它會將c傳輸給“PipedReader” public void write( int c) throws IOException { if (sink == null ) { throw new IOException( "Pipe not connected" ); } sink.receive(c); } // 將字符數(shù)組b寫入“PipedWriter”中。 // 將數(shù)組b寫入“PipedWriter”之后,它會將其傳輸給“PipedReader” public void write( char cbuf[], int off, int len) throws IOException { if (sink == null ) { throw new IOException( "Pipe not connected" ); } else if ((off | len | (off + len) | (cbuf.length - (off + len))) < ) { throw new IndexOutOfBoundsException(); } sink.receive(cbuf, off, len); } // 清空“PipedWriter”。 // 這里會調用“PipedReader”的notifyAll(); // 目的是讓“PipedReader”放棄對當前資源的占有,讓其它的等待線程(等待讀取PipedWriter的線程)讀取“PipedWriter”的值。 public synchronized void flush() throws IOException { if (sink != null ) { if (sink.closedByReader || closed) { throw new IOException( "Pipe closed" ); } synchronized (sink) { sink.notifyAll(); } } } // 關閉“PipedWriter”。 // 關閉之后,會調用receivedLast()通知“PipedReader”它已經關閉。 public void close() throws IOException { closed = true ; if (sink != null ) { sink.receivedLast(); } } } |
2. PipedReader 源碼(基于jdk1.7.40)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
package java.io; public class PipedReader extends Reader { // “PipedWriter”是否關閉的標記 boolean closedByWriter = false ; // “PipedReader”是否關閉的標記 boolean closedByReader = false ; // “PipedReader”與“PipedWriter”是否連接的標記 // 它在PipedWriter的connect()連接函數(shù)中被設置為true boolean connected = false ; Thread readSide; // 讀取“管道”數(shù)據的線程 Thread writeSide; // 向“管道”寫入數(shù)據的線程 // “管道”的默認大小 private static final int DEFAULT_PIPE_SIZE = 1024 ; // 緩沖區(qū) char buffer[]; //下一個寫入字符的位置。in==out代表滿,說明“寫入的數(shù)據”全部被讀取了。 int in = -; //下一個讀取字符的位置。in==out代表滿,說明“寫入的數(shù)據”全部被讀取了。 int out = ; // 構造函數(shù):指定與“PipedReader”關聯(lián)的“PipedWriter” public PipedReader(PipedWriter src) throws IOException { this (src, DEFAULT_PIPE_SIZE); } // 構造函數(shù):指定與“PipedReader”關聯(lián)的“PipedWriter”,以及“緩沖區(qū)大小” public PipedReader(PipedWriter src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } // 構造函數(shù):默認緩沖區(qū)大小是1024字符 public PipedReader() { initPipe(DEFAULT_PIPE_SIZE); } // 構造函數(shù):指定緩沖區(qū)大小是pipeSize public PipedReader( int pipeSize) { initPipe(pipeSize); } // 初始化“管道”:新建緩沖區(qū)大小 private void initPipe( int pipeSize) { if (pipeSize <= 0 ) { throw new IllegalArgumentException( "Pipe size <= 0" ); } buffer = new char [pipeSize]; } // 將“PipedReader”和“PipedWriter”綁定。 // 實際上,這里調用的是PipedWriter的connect()函數(shù) public void connect(PipedWriter src) throws IOException { src.connect( this ); } // 接收int類型的數(shù)據b。 // 它只會在PipedWriter的write(int b)中會被調用 synchronized void receive( int c) throws IOException { // 檢查管道狀態(tài) if (!connected) { throw new IOException( "Pipe not connected" ); } else if (closedByWriter || closedByReader) { throw new IOException( "Pipe closed" ); } else if (readSide != null && !readSide.isAlive()) { throw new IOException( "Read end dead" ); } // 獲取“寫入管道”的線程 writeSide = Thread.currentThread(); // 如果“管道中被讀取的數(shù)據,等于寫入管道的數(shù)據”時, // 則每隔1000ms檢查“管道狀態(tài)”,并喚醒管道操作:若有“讀取管道數(shù)據線程被阻塞”,則喚醒該線程。 while (in == out) { if ((readSide != null ) && !readSide.isAlive()) { throw new IOException( "Pipe broken" ); } /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } if (in < 0) { in = 0; out = 0; } buffer[in++] = (char) c; if (in >= buffer.length) { in = 0; } } // 接收字符數(shù)組b。 synchronized void receive(char c[], int off, int len) throws IOException { while (--len >= ) { receive(c[off++]); } } // 當PipedWriter被關閉時,被調用 synchronized void receivedLast() { closedByWriter = true; notifyAll(); } // 從管道(的緩沖)中讀取一個字符,并將其轉換成int類型 public synchronized int read() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < )) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; while (in < 0) { if (closedByWriter) { /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < )) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++]; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; } // 從管道(的緩沖)中讀取數(shù)據,并將其存入到數(shù)組b中 public synchronized int read(char cbuf[], int off, int len) throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } if ((off < 0) || (off > cbuf.length) || (len < 0) || ((off + len) > cbuf.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read(); if (c < 0) { return -1; } cbuf[off] = (char)c; int rlen = 1; while ((in >= 0) && (--len > 0)) { cbuf[off + rlen] = buffer[out++]; rlen++; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -; } } return rlen; } // 是否能從管道中讀取下一個數(shù)據 public synchronized boolean ready() throws IOException { if (!connected) { throw new IOException( "Pipe not connected" ); } else if (closedByReader) { throw new IOException( "Pipe closed" ); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < )) { throw new IOException( "Write end dead" ); } if (in < 0 ) { return false ; } else { return true ; } } // 關閉PipedReader public void close() throws IOException { in = -; closedByReader = true ; } } |
示例
下面,我們看看多線程中通過PipedWriter和PipedReader通信的例子。例子中包括3個類:Receiver.java, Sender.java 和 PipeTest.java
Receiver.java的代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
import java.io.IOException; import java.io.PipedReader; @SuppressWarnings ( "all" ) /** * 接收者線程 */ public class Receiver extends Thread { // 管道輸入流對象。 // 它和“管道輸出流(PipedWriter)”對象綁定, // 從而可以接收“管道輸出流”的數(shù)據,再讓用戶讀取。 private PipedReader in = new PipedReader(); // 獲得“管道輸入流對象” public PipedReader getReader(){ return in; } @Override public void run(){ readMessageOnce() ; //readMessageContinued() ; } // 從“管道輸入流”中讀取次數(shù)據 public void readMessageOnce(){ // 雖然buf的大小是2048個字符,但最多只會從“管道輸入流”中讀取1024個字符。 // 因為,“管道輸入流”的緩沖區(qū)大小默認只有1024個字符。 char [] buf = new char [ 2048 ]; try { int len = in.read(buf); System.out.println( new String(buf, 0 ,len)); in.close(); } catch (IOException e) { e.printStackTrace(); } } // 從“管道輸入流”讀取>1024個字符時,就停止讀取 public void readMessageContinued(){ int total= 0 ; while ( true ) { char [] buf = new char []; try { int len = in.read(buf); total += len; System.out.println( new String(buf,,len)); // 若讀取的字符總數(shù)>1024,則退出循環(huán)。 if (total > 1024 ) break ; } catch (IOException e) { e.printStackTrace(); } } try { in.close(); } catch (IOException e) { e.printStackTrace(); } } } Sender.java的代碼如下: import java.io.IOException; import java.io.PipedWriter; @SuppressWarnings ( "all" ) /** * 發(fā)送者線程 */ public class Sender extends Thread { // 管道輸出流對象。 // 它和“管道輸入流(PipedReader)”對象綁定, // 從而可以將數(shù)據發(fā)送給“管道輸入流”的數(shù)據,然后用戶可以從“管道輸入流”讀取數(shù)據。 private PipedWriter out = new PipedWriter(); // 獲得“管道輸出流”對象 public PipedWriter getWriter(){ return out; } @Override public void run(){ writeShortMessage(); //writeLongMessage(); } // 向“管道輸出流”中寫入一則較簡短的消息:"this is a short message" private void writeShortMessage() { String strInfo = "this is a short message" ; try { out.write(strInfo.toCharArray()); out.close(); } catch (IOException e) { e.printStackTrace(); } } // 向“管道輸出流”中寫入一則較長的消息 private void writeLongMessage() { StringBuilder sb = new StringBuilder(); // 通過for循環(huán)寫入1020個字符 for ( int i= 0 ; i< 102 ; i++) sb.append( "0123456789" ); // 再寫入26個字符。 sb.append( "abcdefghijklmnopqrstuvwxyz" ); // str的總長度是1020+26=1046個字符 String str = sb.toString(); try { // 將1046個字符寫入到“管道輸出流”中 out.write(str); out.close(); } catch (IOException e) { e.printStackTrace(); } } } |
PipeTest.java的代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
import java.io.PipedReader; import java.io.PipedWriter; import java.io.IOException; @SuppressWarnings ( "all" ) /** * 管道輸入流和管道輸出流的交互程序 */ public class PipeTest { public static void main(String[] args) { Sender t1 = new Sender(); Receiver t2 = new Receiver(); PipedWriter out = t1.getWriter(); PipedReader in = t2.getReader(); try { //管道連接。下面句話的本質是一樣。 //out.connect(in); in.connect(out); /** * Thread類的START方法: * 使該線程開始執(zhí)行;Java 虛擬機調用該線程的 run 方法。 * 結果是兩個線程并發(fā)地運行;當前線程(從調用返回給 start 方法)和另一個線程(執(zhí)行其 run 方法)。 * 多次啟動一個線程是非法的。特別是當線程已經結束執(zhí)行后,不能再重新啟動。 */ t.start(); t.start(); } catch (IOException e) { e.printStackTrace(); } } } |
運行結果:
1
|
this is a short message |
結果說明:
(01) in.connect(out);
它的作用是將“管道輸入流”和“管道輸出流”關聯(lián)起來。查看PipedWriter.java和PipedReader.java中connect()的源碼;我們知道 out.connect(in); 等價于 in.connect(out);
(02)
t1.start(); // 啟動“Sender”線程
t2.start(); // 啟動“Receiver”線程
先查看Sender.java的源碼,線程啟動后執(zhí)行run()函數(shù);在Sender.java的run()中,調用writeShortMessage();
writeShortMessage();的作用就是向“管道輸出流”中寫入數(shù)據"this is a short message" ;這條數(shù)據會被“管道輸入流”接收到。下面看看這是如何實現(xiàn)的。
先看write(char char的源碼。PipedWriter.java繼承于Writer.java;Writer.java中write(char c[])的源碼如下:
1
2
3
|
public void write( char cbuf[]) throws IOException { write(cbuf, 0 , cbuf.length); } |
實際上write(char c[])是調用的PipedWriter.java中的write(char c[], int off, int len)函數(shù)。查看write(char c[], int off, int len)的源碼,我們發(fā)現(xiàn):它會調用 sink.receive(cbuf, off, len); 進一步查看receive(char c[], int off, int len)的定義,我們知道sink.receive(cbuf, off, len)的作用就是:將“管道輸出流”中的數(shù)據保存到“管道輸入流”的緩沖中。而“管道輸入流”的緩沖區(qū)buffer的默認大小是1024個字符。
至此,我們知道:t1.start()啟動Sender線程,而Sender線程會將數(shù)據"this is a short message"寫入到“管道輸出流”;而“管道輸出流”又會將該數(shù)據傳輸給“管道輸入流”,即而保存在“管道輸入流”的緩沖中。
接下來,我們看看“用戶如何從‘管道輸入流'的緩沖中讀取數(shù)據”。這實際上就是Receiver線程的動作。
t2.start() 會啟動Receiver線程,從而執(zhí)行Receiver.java的run()函數(shù)。查看Receiver.java的源碼,我們知道run()調用了readMessageOnce()。
而readMessageOnce()就是調用in.read(buf)從“管道輸入流in”中讀取數(shù)據,并保存到buf中。
通過上面的分析,我們已經知道“管道輸入流in”的緩沖中的數(shù)據是"this is a short message";因此,buf的數(shù)據就是"this is a short message"。
為了加深對管道的理解。我們接著進行下面兩個小試驗。
試驗一:修改Sender.java
將
1
2
3
4
|
public void run(){ writeShortMessage(); //writeLongMessage(); } |
修改為
1
2
3
4
|
public void run(){ //writeShortMessage(); writeLongMessage(); } |
運行程序。運行結果如下:
從中,我們看出,程序運行出錯!拋出異常 java.io.IOException: Pipe closed
為什么會這樣呢?
我分析一下程序流程。
(01) 在PipeTest中,通過in.connect(out)將輸入和輸出管道連接起來;然后,啟動兩個線程。t1.start()啟動了線程Sender,t2.start()啟動了線程Receiver。
(02) Sender線程啟動后,通過writeLongMessage()寫入數(shù)據到“輸出管道”,out.write(str.toCharArray())共寫入了1046個字符。而根據PipedWriter的源碼,PipedWriter的write()函數(shù)會調用PipedReader的receive()函數(shù)。而觀察PipedReader的receive()函數(shù),我們知道,PipedReader會將接受的數(shù)據存儲緩沖區(qū)。仔細觀察receive()函數(shù),有如下代碼:
1
2
3
4
5
6
7
8
9
10
11
12
|
while (in == out) { if ((readSide != null ) && !readSide.isAlive()) { throw new IOException( "Pipe broken" ); } /* full: kick any waiting readers */ notifyAll(); try { wait( 1000 ); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } |
而in和out的初始值分別是in=-1, out=0;結合上面的while(in==out)。我們知道,它的含義就是,每往管道中寫入一個字符,就達到了in==out這個條件。然后,就調用notifyAll(),喚醒“讀取管道的線程”。
也就是,每往管道中寫入一個字符,都會阻塞式的等待其它線程讀取。
然而,PipedReader的緩沖區(qū)的默認大小是1024!但是,此時要寫入的數(shù)據卻有1046!所以,一次性最多只能寫入1024個字符。
(03) Receiver線程啟動后,會調用readMessageOnce()讀取管道輸入流。讀取1024個字符會,會調用close()關閉,管道。
由(02)和(03)的分析可知,Sender要往管道寫入1046個字符。其中,前1024個字符(緩沖區(qū)容量是1024)能正常寫入,并且每寫入一個就讀取一個。當寫入1025個字符時,依然是依次的調用PipedWriter.java中的write();然后,write()中調用PipedReader.java中的receive();在PipedReader.java中,最終又會調用到receive(int c)函數(shù)。 而此時,管道輸入流已經被關閉,也就是closedByReader為true,所以拋出throw new IOException("Pipe closed")。
我們對“試驗一”繼續(xù)進行修改,解決該問題。
試驗二: 在“試驗一”的基礎上繼續(xù)修改Receiver.java
將
1
2
3
4
|
public void run(){ readMessageOnce() ; //readMessageContinued() ; } |
修改為
1
2
3
4
|
public void run(){ //readMessageOnce() ; readMessageContinued() ; } |
此時,程序能正常運行。運行結果為:
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
012345678901234567890123456789abcd
efghijklmnopqrstuvwxyz
以上所述是小編給大家介紹的PipedWriter和PipedReader源碼分析,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!