一、摘要
本篇闡述基于tcp通信協議的異步實現。
二、實驗平臺
visual studio 2010
三、異步通信實現原理及常用方法
3.1 建立連接
在同步模式中,在服務器上使用accept方法接入連接請求,而在客戶端則使用connect方法來連接服務器。相對地,在異步模式下,服務器可以使用beginaccept方法和endaccept方法來完成連接到客戶端的任務,在客戶端則通過beginconnect方法和endconnect方法來實現與服務器的連接。
beginaccept在異步方式下傳入的連接嘗試,它允許其他動作而不必等待連接建立才繼續執行后面程序。在調用beginaccept之前,必須使用listen方法來偵聽是否有連接請求,beginaccept的函數原型為:
1
|
beginaccept(asynccallback asynccallback, ojbect state) |
參數:
asynccallback:代表回調函數
state:表示狀態信息,必須保證state中包含socket的句柄
使用beginaccept的基本流程是:
(1)創建本地終節點,并新建套接字與本地終節點進行綁定;
(2)在端口上偵聽是否有新的連接請求;
(3)請求開始接入新的連接,傳入socket的實例或者stateojbect的實例。
參考代碼:
1
2
3
4
5
6
7
8
|
//定義ip地址 ipaddress local = ipaddress.parse( "127.0,0,1" ); ipendpoint iep = new ipendpoint(local,13000); //創建服務器的socket對象 socket server = new socket(addressfamily.internetwork,sockettype.stream,protocoltype.tcp); server.bind(iep); server.listen(20); server.beginaccecpt( new asynccallback(accept),server); |
當beginaccept()方法調用結束后,一旦新的連接發生,將調用回調函數,而該回調函數必須包括用來結束接入連接操作的endaccept()方法。
該方法參數列表為 socket endaccept(iasyncresult iar)
下面為回調函數的實例:
1
2
3
4
5
6
7
|
void accept(iasyncresult iar) { //還原傳入的原始套接字 socket myserver = (socket)iar.asyncstate; //在原始套接字上調用endaccept方法,返回新的套接字 socket service = myserver.endaccept(iar); } |
至此,服務器端已經準備好了。客戶端應通過beginconnect方法和endconnect來遠程連接主機。在調用beginconnect方法時必須注冊相應的回調函數并且至少傳遞一個socket的實例給state參數,以保證endconnect方法中能使用原始的套接字。下面是一段是beginconnect的調用:
1
2
3
4
|
socket socket= new socket(addressfamily.internetwork,sockettype.stream,protocoltype.tcp) ipaddress ip=ipaddress.parse( "127.0.0.1" ); ipendpoint iep= new ipendpoint(ip,13000); socket.beginconnect(iep, new asynccallback(connect),socket); |
endconnect是一種阻塞方法,用于完成beginconnect方法的異步連接誒遠程主機的請求。在注冊了回調函數后必須接收beginconnect方法返回的iasynccreuslt作為參數。下面為代碼演示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
void connect(iasyncresult iar) { socket client=(socket)iar.asyncstate; try { client.endconnect(iar); } catch (exception e) { console.writeline(e.tostring()); } finally { } } |
除了采用上述方法建立連接之后,也可以采用tcplistener類里面的方法進行連接建立。下面是服務器端對關于tcplistener類使用beginaccetptcpclient方法處理一個傳入的連接嘗試。以下是使用beginaccetptcpclient方法和endaccetptcpclient方法的代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public static void dobeginaccept(tcplistener listner) { //開始從客戶端監聽連接 console.writeline( "waitting for a connection" ); //接收連接 //開始準備接入新的連接,一旦有新連接嘗試則調用回調函數doaccepttcpcliet listner.beginaccepttcpclient( new asynccallback(doaccepttcpcliet), listner); } //處理客戶端的連接 public static void doaccepttcpcliet(iasyncresult iar) { //還原原始的tcplistner對象 tcplistener listener = (tcplistener)iar.asyncstate; //完成連接的動作,并返回新的tcpclient tcpclient client = listener.endaccepttcpclient(iar); console.writeline( "連接成功" ); } |
代碼的處理邏輯為:
(1)調用beginaccetptcpclient方法開開始連接新的連接,當連接視圖發生時,回調函數被調用以完成連接操作;
(2)上面doaccepttcpcliet方法通過asyncstate屬性獲得由beginaccepttcpclient傳入的listner實例;
(3)在得到listener對象后,用它調用endaccepttcpclient方法,該方法返回新的包含客戶端信息的tcpclient。
beginconnect方法和endconnect方法可用于客戶端嘗試建立與服務端的連接,這里和第一種方法并無區別。下面看實例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public void dobeginconnect(iasyncresult iar) { socket client=(socket)iar.asyncstate; //開始與遠程主機進行連接 client.beginconnect(serverip[0],13000,requestcallback,client); console.writeline( "開始與服務器進行連接" ); } private void requestcallback(iasyncresult iar) { try { //還原原始的tcpclient對象 tcpclient client=(tcpclient)iar.asyncstate; // client.endconnect(iar); console.writeline( "與服務器{0}連接成功" ,client.client.remoteendpoint); } catch (exception e) { console.writeline(e.tostring()); } finally { } } |
以上是建立連接的兩種方法。可根據需要選擇使用。
3.2 發送與接受數據
在建立了套接字的連接后,就可以服務器端和客戶端之間進行數據通信了。異步套接字用beginsend和endsend方法來負責數據的發送。注意在調用beginsend方法前要確保雙方都已經建立連接,否則會出異常。下面演示代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private static void send(socket handler, string data) { // convert the string data to byte data using ascii encoding. byte [] bytedata = encoding.ascii.getbytes(data); // begin sending the data to the remote device. handler.beginsend(bytedata, 0, bytedata.length, 0, new asynccallback(sendcallback), handler); } private static void sendcallback(iasyncresult ar) { try { // retrieve the socket from the state object. socket handler = (socket)ar.asyncstate; // complete sending the data to the remote device. int bytessent = handler.endsend(ar); console.writeline( "sent {0} bytes to client." , bytessent); handler.shutdown(socketshutdown.both); handler.close(); } catch (exception e) { console.writeline(e.tostring()); } } |
接收數據是通過beginreceive和endreceive方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
private static void receive(socket client) { try { // create the state object. stateobject state = new stateobject(); state.worksocket = client; // begin receiving the data from the remote device. client.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(receivecallback), state); } catch (exception e) { console.writeline(e.tostring()); } } private static void receivecallback(iasyncresult ar) { try { // retrieve the state object and the client socket // from the asynchronous state object. stateobject state = (stateobject)ar.asyncstate; socket client = state.worksocket; // read data from the remote device. int bytesread = client.endreceive(ar); if (bytesread > 0) { // there might be more data, so store the data received so far. state.sb.append(encoding.ascii.getstring(state.buffer, 0, bytesread)); // get the rest of the data. client.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(receivecallback), state); } else { // all the data has arrived; put it in response. if (state.sb.length > 1) { response = state.sb.tostring(); } // signal that all bytes have been received. receivedone. set (); } } catch (exception e) { console.writeline(e.tostring()); } } |
上述代碼的處理邏輯為:
(1)首先處理連接的回調函數里得到的通訊套接字client,接著開始接收數據;
(2)當數據發送到緩沖區中,beginreceive方法試圖從buffer數組中讀取長度為buffer.length的數據塊,并返回接收到的數據量bytesread。最后接收并打印數據。
除了上述方法外,還可以使用基于networkstream相關的異步發送和接收方法,下面是基于networkstream相關的異步發送和接收方法的使用介紹。
networkstream使用beginread和endread方法進行讀操作,使用beginwreite和endwrete方法進行寫操作,下面看實例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
static void datahandle(tcpclient client) { tcpclient tcpclient = client; //使用tcpclient的getstream方法獲取網絡流 networkstream ns = tcpclient.getstream(); //檢查網絡流是否可讀 if (ns.canread) { //定義緩沖區 byte [] read = new byte [1024]; ns.beginread(read,0,read.length, new asynccallback(myreadcallback),ns); } else { console.writeline( "無法從網絡中讀取流數據" ); } } public static void myreadcallback(iasyncresult iar) { networkstream ns = (networkstream)iar.asyncstate; byte [] read = new byte [1024]; string data = "" ; int recv; recv = ns.endread(iar); data = string .concat(data, encoding.ascii.getstring(read, 0, recv)); //接收到的消息長度可能大于緩沖區總大小,反復循環直到讀完為止 while (ns.dataavailable) { ns.beginread(read, 0, read.length, new asynccallback(myreadcallback), ns); } //打印 console.writeline( "您收到的信息是" + data); } |
3.3 程序阻塞與異步中的同步問題
.net里提供了eventwaithandle類來表示一個線程的同步事件。eventwaithandle即事件等待句柄,他允許線程通過操作系統互發信號和等待彼此的信號來達到線程同步的目的。這個類有2個子類,分別為autoresteevnt(自動重置)和manualrestevent(手動重置)。下面是線程同步的幾個方法:
(1)rset方法:將事件狀態設為非終止狀態,導致線程阻塞。這里的線程阻塞是指允許其他需要等待的線程進行阻塞即讓含waitone()方法的線程阻塞;
(2)set方法:將事件狀態設為終止狀態,允許一個或多個等待線程繼續。該方法發送一個信號給操作系統,讓處于等待的某個線程從阻塞狀態轉換為繼續運行,即waitone方法的線程不在阻塞;
(3)waitone方法:阻塞當前線程,直到當前的等待句柄收到信號。此方法將一直使本線程處于阻塞狀態直到收到信號為止,即當其他非阻塞進程調用set方法時可以繼續執行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
public static void startlistening() { // data buffer for incoming data. byte [] bytes = new byte [1024]; // establish the local endpoint for the socket. // the dns name of the computer // running the listener is "host.contoso.com". //iphostentry iphostinfo = dns.resolve(dns.gethostname()); //ipaddress ipaddress = iphostinfo.addresslist[0]; ipaddress ipaddress = ipaddress.parse( "127.0.0.1" ); ipendpoint localendpoint = new ipendpoint(ipaddress, 11000); // create a tcp/ip socket. socket listener = new socket(addressfamily.internetwork,sockettype.stream, protocoltype.tcp); // bind the socket to the local //endpoint and listen for incoming connections. try { listener.bind(localendpoint); listener.listen(100); while ( true ) { // set the event to nonsignaled state. alldone.reset(); // start an asynchronous socket to listen for connections. console.writeline( "waiting for a connection..." ); listener.beginaccept( new asynccallback(acceptcallback),listener); // wait until a connection is made before continuing. alldone.waitone(); } } catch (exception e) { console.writeline(e.tostring()); } console.writeline( "\npress enter to continue..." ); console.read(); } |
上述代碼的邏輯為:
(1)試用了manualrestevent對象創建一個等待句柄,在調用beginaccept方法前使用rest方法允許其他線程阻塞;
(2)為了防止在連接完成之前對套接字進行讀寫操作,務必要在beginaccept方法后調用waitone來讓線程進入阻塞狀態。
當有連接接入后系統會自動調用會調用回調函數,所以當代碼執行到回調函數時說明連接已經成功,并在函數的第一句就調用set方法讓處于等待的線程可以繼續執行。
四、實例
下面是一個實例,客戶端請求連接,服務器端偵聽端口,當連接建立之后,服務器發送字符串給客戶端,客戶端收到后并回發給服務器端。
服務器端代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
using system; using system.net; using system.net.sockets; using system.text; using system.threading; // state object for reading client data asynchronously public class stateobject { // client socket. public socket worksocket = null ; // size of receive buffer. public const int buffersize = 1024; // receive buffer. public byte [] buffer = new byte [buffersize]; // received data string. public stringbuilder sb = new stringbuilder(); } public class asynchronoussocketlistener { // thread signal. public static manualresetevent alldone = new manualresetevent( false ); public asynchronoussocketlistener() { } public static void startlistening() { // data buffer for incoming data. byte [] bytes = new byte [1024]; // establish the local endpoint for the socket. // the dns name of the computer // running the listener is "host.contoso.com". //iphostentry iphostinfo = dns.resolve(dns.gethostname()); //ipaddress ipaddress = iphostinfo.addresslist[0]; ipaddress ipaddress = ipaddress.parse( "127.0.0.1" ); ipendpoint localendpoint = new ipendpoint(ipaddress, 11000); // create a tcp/ip socket. socket listener = new socket(addressfamily.internetwork,sockettype.stream, protocoltype.tcp); // bind the socket to the local //endpoint and listen for incoming connections. try { listener.bind(localendpoint); listener.listen(100); while ( true ) { // set the event to nonsignaled state. alldone.reset(); // start an asynchronous socket to listen for connections. console.writeline( "waiting for a connection..." ); listener.beginaccept( new asynccallback(acceptcallback),listener); // wait until a connection is made before continuing. alldone.waitone(); } } catch (exception e) { console.writeline(e.tostring()); } console.writeline( "\npress enter to continue..." ); console.read(); } public static void acceptcallback(iasyncresult ar) { // signal the main thread to continue. alldone. set (); // get the socket that handles the client request. socket listener = (socket)ar.asyncstate; socket handler = listener.endaccept(ar); // create the state object. stateobject state = new stateobject(); state.worksocket = handler; handler.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(readcallback), state); } public static void readcallback(iasyncresult ar) { string content = string .empty; // retrieve the state object and the handler socket // from the asynchronous state object. stateobject state = (stateobject)ar.asyncstate; socket handler = state.worksocket; // read data from the client socket. int bytesread = handler.endreceive(ar); if (bytesread > 0) { // there might be more data, so store the data received so far. state.sb.append(encoding.ascii.getstring(state.buffer, 0, bytesread)); // check for end-of-file tag. if it is not there, read // more data. content = state.sb.tostring(); if (content.indexof( "<eof>" ) > -1) { // all the data has been read from the // client. display it on the console. console.writeline( "read {0} bytes from socket. \n data : {1}" , content.length, content); // echo the data back to the client. send(handler, content); } else { // not all data received. get more. handler.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(readcallback), state); } } } private static void send(socket handler, string data) { // convert the string data to byte data using ascii encoding. byte [] bytedata = encoding.ascii.getbytes(data); // begin sending the data to the remote device. handler.beginsend(bytedata, 0, bytedata.length, 0, new asynccallback(sendcallback), handler); } private static void sendcallback(iasyncresult ar) { try { // retrieve the socket from the state object. socket handler = (socket)ar.asyncstate; // complete sending the data to the remote device. int bytessent = handler.endsend(ar); console.writeline( "sent {0} bytes to client." , bytessent); handler.shutdown(socketshutdown.both); handler.close(); } catch (exception e) { console.writeline(e.tostring()); } } public static int main( string [] args) { startlistening(); return 0; } } |
客戶端代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
using system; using system.net; using system.net.sockets; using system.threading; using system.text; // state object for receiving data from remote device. public class stateobject { // client socket. public socket worksocket = null ; // size of receive buffer. public const int buffersize = 256; // receive buffer. public byte [] buffer = new byte [buffersize]; // received data string. public stringbuilder sb = new stringbuilder(); } public class asynchronousclient { // the port number for the remote device. private const int port = 11000; // manualresetevent instances signal completion. private static manualresetevent connectdone = new manualresetevent( false ); private static manualresetevent senddone = new manualresetevent( false ); private static manualresetevent receivedone = new manualresetevent( false ); // the response from the remote device. private static string response = string .empty; private static void startclient() { // connect to a remote device. try { // establish the remote endpoint for the socket. // the name of the // remote device is "host.contoso.com". //iphostentry iphostinfo = dns.resolve("user"); //ipaddress ipaddress = iphostinfo.addresslist[0]; ipaddress ipaddress = ipaddress.parse( "127.0.0.1" ); ipendpoint remoteep = new ipendpoint(ipaddress, port); // create a tcp/ip socket. socket client = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp); // connect to the remote endpoint. client.beginconnect(remoteep, new asynccallback(connectcallback), client); connectdone.waitone(); // send test data to the remote device. send(client, "this is a test<eof>" ); senddone.waitone(); // receive the response from the remote device. receive(client); receivedone.waitone(); // write the response to the console. console.writeline( "response received : {0}" , response); // release the socket. client.shutdown(socketshutdown.both); client.close(); console.readline(); } catch (exception e) { console.writeline(e.tostring()); } } private static void connectcallback(iasyncresult ar) { try { // retrieve the socket from the state object. socket client = (socket)ar.asyncstate; // complete the connection. client.endconnect(ar); console.writeline( "socket connected to {0}" , client.remoteendpoint.tostring()); // signal that the connection has been made. connectdone. set (); } catch (exception e) { console.writeline(e.tostring()); } } private static void receive(socket client) { try { // create the state object. stateobject state = new stateobject(); state.worksocket = client; // begin receiving the data from the remote device. client.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(receivecallback), state); } catch (exception e) { console.writeline(e.tostring()); } } private static void receivecallback(iasyncresult ar) { try { // retrieve the state object and the client socket // from the asynchronous state object. stateobject state = (stateobject)ar.asyncstate; socket client = state.worksocket; // read data from the remote device. int bytesread = client.endreceive(ar); if (bytesread > 0) { // there might be more data, so store the data received so far. state.sb.append(encoding.ascii.getstring(state.buffer, 0, bytesread)); // get the rest of the data. client.beginreceive(state.buffer, 0, stateobject.buffersize, 0, new asynccallback(receivecallback), state); } else { // all the data has arrived; put it in response. if (state.sb.length > 1) { response = state.sb.tostring(); } // signal that all bytes have been received. receivedone. set (); } } catch (exception e) { console.writeline(e.tostring()); } } private static void send(socket client, string data) { // convert the string data to byte data using ascii encoding. byte [] bytedata = encoding.ascii.getbytes(data); // begin sending the data to the remote device. client.beginsend(bytedata, 0, bytedata.length, 0, new asynccallback(sendcallback), client); } private static void sendcallback(iasyncresult ar) { try { // retrieve the socket from the state object. socket client = (socket)ar.asyncstate; // complete sending the data to the remote device. int bytessent = client.endsend(ar); console.writeline( "sent {0} bytes to server." , bytessent); // signal that all bytes have been sent. senddone. set (); } catch (exception e) { console.writeline(e.tostring()); } } public static int main( string [] args) { startclient(); return 0; } } |
五、實驗結果
圖1 服務器端界面
圖2 客戶端界面