第一步:進入opresty目錄
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[root@node03 openresty] # cd /export/servers/openresty/ [root@node03 openresty] # ll total 356 drwxr-xr-x 2 root root 4096 Jul 26 11:33 bin drwxrwxr-x 44 1000 1000 4096 Jul 26 11:31 build drwxrwxr-x 43 1000 1000 4096 Nov 13 2017 bundle -rwxrwxr-x 1 1000 1000 45908 Nov 13 2017 configure -rw-rw-r-- 1 1000 1000 22924 Nov 13 2017 COPYRIGHT drwxr-xr-x 6 root root 4096 Jul 26 11:33 luajit drwxr-xr-x 6 root root 4096 Aug 1 08:14 lualib -rw-r--r-- 1 root root 5413 Jul 26 11:32 Makefile drwxr-xr-x 11 root root 4096 Jul 26 11:35 nginx drwxrwxr-x 2 1000 1000 4096 Nov 13 2017 patches drwxr-xr-x 44 root root 4096 Jul 26 11:33 pod -rw-rw-r-- 1 1000 1000 3689 Nov 13 2017 README.markdown -rw-rw-r-- 1 1000 1000 8690 Nov 13 2017 README-win32.txt -rw-r--r-- 1 root root 218352 Jul 26 11:33 resty.index drwxr-xr-x 5 root root 4096 Jul 26 11:33 site drwxr-xr-x 2 root root 4096 Aug 1 10:54 testlua drwxrwxr-x 2 1000 1000 4096 Nov 13 2017 util [root@node03 openresty] # |
說明:接下來我們關注兩個目錄 lualib 和 nginx
? 1.lualib: 是存放opresty所需要的集成軟件包的
? 2.nginx: 是nginx服務目錄
接下來,我們進入lualib目錄一看究竟:
1
2
3
4
5
6
7
8
|
[root@node03 openresty] # cd lualib/ [root@node03 lualib] # ll total 116 -rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.so drwxr-xr-x 3 root root 4096 Jul 26 11:33 ngx drwxr-xr-x 2 root root 4096 Jul 26 11:33 rds drwxr-xr-x 2 root root 4096 Jul 26 11:33 redis drwxr-xr-x 9 root root 4096 Aug 1 10:34 resty |
這里我們看到了redis和ngx集成軟件包,說明我們可以之間使用nginx和redis而無需導入任何依賴包!!!!
下面看看resty里面有些說明呢????
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
[root@node03 lualib] # cd resty/ [root@node03 resty] # ll total 152 -rw-r--r-- 1 root root 6409 Jul 26 11:33 aes.lua drwxr-xr-x 2 root root 4096 Jul 26 11:33 core -rw-r--r-- 1 root root 596 Jul 26 11:33 core.lua drwxr-xr-x 2 root root 4096 Jul 26 11:33 dns drwxr-xr-x 2 root root 4096 Aug 1 10:42 kafka #這是我們自己導入的 drwxr-xr-x 2 root root 4096 Jul 26 11:33 limit -rw-r--r-- 1 root root 4616 Jul 26 11:33 lock.lua drwxr-xr-x 2 root root 4096 Jul 26 11:33 lrucache -rw-r--r-- 1 root root 4620 Jul 26 11:33 lrucache.lua -rw-r--r-- 1 root root 1211 Jul 26 11:33 md5.lua -rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua -rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua -rw-r--r-- 1 root root 616 Jul 26 11:33 random.lua -rw-r--r-- 1 root root 9227 Jul 26 11:33 redis.lua -rw-r--r-- 1 root root 1192 Jul 26 11:33 sha1.lua -rw-r--r-- 1 root root 1045 Jul 26 11:33 sha224.lua -rw-r--r-- 1 root root 1221 Jul 26 11:33 sha256.lua -rw-r--r-- 1 root root 1045 Jul 26 11:33 sha384.lua -rw-r--r-- 1 root root 1359 Jul 26 11:33 sha512.lua -rw-r--r-- 1 root root 236 Jul 26 11:33 sha.lua -rw-r--r-- 1 root root 698 Jul 26 11:33 string.lua -rw-r--r-- 1 root root 5178 Jul 26 11:33 upload.lua drwxr-xr-x 2 root root 4096 Jul 26 11:33 upstream drwxr-xr-x 2 root root 406 Jul 26 11:33 websocket |
這里我們看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管
注意:這里的 kafka 這個包是沒有的,說明opnresty么有集成kafka。此處我已經提前導入啦kafka集成包
我們看看kafka里面多有哪些包:
1
2
3
4
5
6
7
8
9
10
11
|
[root@node03 resty] # cd kafka [root@node03 kafka] # ll total 48 -rw-r--r-- 1 root root 1369 Aug 1 10:42 broker.lua -rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua -rw-r--r-- 1 root root 710 Aug 1 10:42 errors.lua -rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua -rw-r--r-- 1 root root 4072 Aug 1 10:42 request.lua -rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua -rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua -rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua |
附上 kafka 集成包:kafka.rar
第二步:創建kafka測試lua文件
1.退回到openresty
1
|
[root@node03 kafka] # cd /export/servers/openresty/ |
2.創建測試文件
1
2
|
[root@node03 openresty] # mkdir -r testlua #這里文件名自己取,文件位置自己定,但必須找得到 |
這里文件名自己取,文件位置自己定,但必須找得到!!!!!!!!!!!下面會用到!!!!!!!!!!
3.進入剛剛創建的文件夾并創建kafkalua.lua腳本文件
創建文件:vim kafkalua.lua或者touch kafkalua.lua
1
2
3
4
|
[root@node03 openresty] # cd testlua/ [root@node03 testlua] # ll total 8 -rw-r--r-- 1 root root 3288 Aug 1 10:54 kafkalua.lua |
kafkalua.lua:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
--測試語句可以不用 ngx.say('hello kafka file configuration successful!!!!!!') --數據采集閾值限制,如果lua采集超過閾值,則不采集 local DEFAULT_THRESHOLD = 100000 -- kafka分區數 local PARTITION_NUM = 6 -- kafka主題名稱 local TOPIC = 'B2CDATA_COLLECTION1' -- 輪詢器共享變量KEY值 local POLLING_KEY = "POLLING_KEY" -- kafka集群(定義kafka broker地址,ip需要和kafka的host.name配置一致) local function partitioner(key, num, correlation_id) return tonumber(key) end --kafka broker列表 local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}} --kafka參數, local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner } -- 共享內存計數器,用于kafka輪詢使用 local shared_data = ngx.shared.shared_data local pollingVal = shared_data:get(POLLING_KEY) if not pollingVal then pollingVal = 1 shared_data:set(POLLING_KEY, pollingVal) end --獲取每一條消息的計數器,對PARTITION_NUM取余數,均衡分區 local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM) shared_data:incr(POLLING_KEY, 1) -- 并發控制 local isGone = true --獲取ngx.var.connections_active進行過載保護,即如果當前活躍連接數超過閾值進行限流保護 if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then isGone = false end -- 數據采集 if isGone then local time_local = ngx.var.time_local if time_local == nil then time_local = "" end local request = ngx.var.request if request == nil then request = "" end local request_method = ngx.var.request_method if request_method == nil then request_method = "" end local content_type = ngx.var.content_type if content_type == nil then content_type = "" end ngx.req.read_body() local request_body = ngx.var.request_body if request_body == nil then request_body = "" end local http_referer = ngx.var.http_referer if http_referer == nil then http_referer = "" end local remote_addr = ngx.var.remote_addr if remote_addr == nil then remote_addr = "" end local http_user_agent = ngx.var.http_user_agent if http_user_agent == nil then http_user_agent = "" end local time_iso8601 = ngx.var.time_iso8601 if time_iso8601 == nil then time_iso8601 = "" end local server_addr = ngx.var.server_addr if server_addr == nil then server_addr = "" end local http_cookie = ngx.var.http_cookie if http_cookie == nil then http_cookie = "" end --封裝數據 local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie; --引入kafka的producer local producer = require "resty.kafka.producer" --創建producer local bp = producer:new(BROKER_LIST, CONNECT_PARAMS) --發送數據 local ok, err = bp:send(TOPIC, partitions, message) --打印錯誤日志 if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end end |
第三步:修改nginx配置文件nginx.conf
1.進入ngin/conf目錄
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
[root@node03 openresty] # cd /export/servers/openresty/nginx/conf/ [root@node03 conf] # ll total 76 -rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf -rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default -rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params -rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default -rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf -rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win -rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types -rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default -rw-r--r-- 1 root root 3191 Aug 1 10:52 nginx.conf -rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default -rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params -rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params.default -rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params -rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params.default -rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf |
2.修改nginx.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
[root@node03 conf] # vim nginx.conf #1.說明找到第一個server #2.在server上面添加兩行代碼如下 #3.在server里面添加kafka相關的代碼如下 #------------------添加的代碼--------------------------------------- #開啟共享字典,設置內存大小為10M,供每個nginx的線程消費 lua_shared_dict shared_data 10m; #配置本地域名解析 resolver 127.0.0.1; #------------------添加的代碼--------------------------------------- server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { root html; index index.html index.htm; } #------------------添加的代碼--------------------------------------- location /kafkalua { #這里的kafkalua就是工程名字,不加默認為空 #開啟nginx監控 stub_status on; #加載lua文件 default_type text /html ; #指定kafka的lua文件位置,就是我們剛才創建的kafkalua.lua(前面已經強調要記住的!!!!) content_by_lua_file /export/servers/openresty/testlua/kafkalua .lua; } #------------------添加的代碼--------------------------------------- } |
說明:location /kafkalua{...}這里的kafkalua是工程名,可以隨意取也可以不取,但是必須要記住!!!
看到我們上面配置了兩個location,第一個為location /{...}第二個為location /kafkalua{...}那么他們有什么區別呢???先向下看,迷霧將會慢慢揭開。
第四步:啟動nginx
1.進入nginx/sbin
1
2
3
4
|
[root@node03 sbin] # cd /export/servers/openresty/nginx/sbin/ [root@node03 sbin] # ll total 16356 -rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx |
2.測試配置文件是否正確
1
2
3
4
|
[root@node03 sbin] # nginx -t nginx: the configuration file /export/servers/openresty/nginx/conf/nginx .conf syntax is ok nginx: configuration file /export/servers/openresty/nginx/conf/nginx .conf test is successful #看到已經成功啦 |
3.啟動nginx
1
2
|
[root@node03 sbin] # nginx #不顯示任何東西一般是成功啦 |
4.查看nginx是否啟動成功
1
2
3
4
5
6
|
[root@node03 sbin] # ps -ef | grep nginx root 3730 1 0 09:24 ? 00:00:00 nginx: master process nginx nobody 3731 3730 0 09:24 ? 00:00:20 nginx: worker process is shutting down nobody 5766 3730 0 12:17 ? 00:00:00 nginx: worker process root 5824 3708 0 12:24 pts /1 00:00:00 grep nginx <span class= "hljs-comment" > #看到有兩個nginx進程,表示成功le</span> |
5.瀏覽器訪問nginx
在瀏覽器輸入:node03/kafkalua
說明:如何么有配置hosts則輸入openresty所在設備的地址如:192.168.52.120/kafkalua
在瀏覽器輸入:node03/或者 192.168.52.120/
再在瀏覽器輸入:node03:80/kafkalua 和 node03:80/試試 搬來nginx.conf來看看:
node03:80/kafkalua 這里的nide03是服務器的別名或者之間寫文服務器地址,80是【listen 80;】配置的監聽端口,80端口可以省略不寫,如果這寫成【listen 8088;】那么瀏覽器需輸入 node03:8088/kafkalua (這里不能省略8088),kafkalua是工程名。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { root html; index index.html index.htm; } #------------------添加的代碼--------------------------------------- location /kafkalua { #這里的kafkalua就是工程名字,不加默認為空 #開啟nginx監控 stub_status on; #加載lua文件 default_type text /html ; #指定kafka的lua文件位置,就是我們剛才創建的kafkalua.lua(前面已經強調要記住的!!!!) content_by_lua_file /export/servers/openresty/testlua/kafkalua .lua; } |
第五步:創建測試爬蟲程序
1.創建maven工程導入依賴
1
2
3
4
5
6
7
8
9
10
11
12
|
< dependencies > < dependency > < groupId >org.jsoup</ groupId > < artifactId >jsoup</ artifactId > < version >1.11.3</ version > </ dependency > < dependency > < groupId >org.apache.httpcomponents</ groupId > < artifactId >httpclient</ artifactId > < version >4.5.4</ version > </ dependency > </ dependencies > |
2.偽爬蟲程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
|
public class SpiderGoAirCN { private static String basePath = "http://node03/kafkalua" ; public static void main(String[] args) throws Exception { for ( int i = 0 ; i < 50000 ; i++) { // 請求查詢信息 spiderQueryao(); // 請求html spiderHtml(); // 請求js spiderJs(); // 請求css spiderCss(); // 請求png spiderPng(); // 請求jpg spiderJpg(); Thread.sleep( 100 ); } } /** * * @throws Exception */ public static void spiderQueryao() throws Exception { // 1.指定目標網站 ^.*/B2C40/query/jaxb/direct/query.ao.*$ String url = basePath + "/B2C40/query/jaxb/direct/query.ao" ; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader( "Time-Local" , getLocalDateTime()); httpPost.setHeader( "Requst" , "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1" ); httpPost.setHeader( "Request Method" , "POST" ); httpPost.setHeader( "Content-Type" , "application/x-www-form-urlencoded; charset=UTF-8" ); httpPost.setHeader( "Referer" , "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=" + getGoTime() + "&at=1&ct=0&it=0" ); httpPost.setHeader( "Remote Address" , "192.168.56.80" ); httpPost.setHeader( "User-Agent" , "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36" ); httpPost.setHeader( "Time-Iso8601" , getISO8601Timestamp()); httpPost.setHeader( "Server Address" , "243.45.78.132" ); httpPost.setHeader( "Cookie" , "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D" + getGoTime() + "%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(" + getGoTime() + ")" ); // 4.設置請求參數 ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add( new BasicNameValuePair( "json" , "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}" )); httpPost.setEntity( new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null ); } public static void spiderHtml() throws Exception { // 1.指定目標網站 ^.*html.*$ String url = basePath + "/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0" ; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader( "Time-Local" , getLocalDateTime()); httpPost.setHeader( "Requst" , "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1" ); httpPost.setHeader( "Request Method" , "POST" ); httpPost.setHeader( "Content-Type" , "application/x-www-form-urlencoded; charset=UTF-8" ); httpPost.setHeader( "Referer" , "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0" ); httpPost.setHeader( "Remote Address" , "192.168.56.1" ); httpPost.setHeader( "User-Agent" , "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36" ); httpPost.setHeader( "Time-Iso8601" , getISO8601Timestamp()); httpPost.setHeader( "Server Address" , "192.168.56.80" ); httpPost.setHeader( "Cookie" , "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)" ); // 4.設置請求參數 // httpPost.setEntity(new StringEntity( // "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember=")); ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add( new BasicNameValuePair( "json" , "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}" )); httpPost.setEntity( new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null ); } public static void spiderJs() throws Exception { // 1.指定目標網站 String url = basePath + "/B2C40/dist/main/modules/common/requireConfig.js" ; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader( "Time-Local" , getLocalDateTime()); httpPost.setHeader( "Requst" , "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1" ); httpPost.setHeader( "Request Method" , "POST" ); httpPost.setHeader( "Content-Type" , "application/x-www-form-urlencoded; charset=UTF-8" ); httpPost.setHeader( "Referer" , "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0" ); httpPost.setHeader( "Remote Address" , "192.168.56.1" ); httpPost.setHeader( "User-Agent" , "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36" ); httpPost.setHeader( "Time-Iso8601" , getISO8601Timestamp()); httpPost.setHeader( "Server Address" , "192.168.56.80" ); httpPost.setHeader( "Cookie" , "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)" ); // 4.設置請求參數 ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add( new BasicNameValuePair( "json" , "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}" )); httpPost.setEntity( new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null ); } public static void spiderCss() throws Exception { // 1.指定目標網站 String url = basePath + "/B2C40/dist/main/css/flight.css" ; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader( "Time-Local" , getLocalDateTime()); httpPost.setHeader( "Requst" , "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1" ); httpPost.setHeader( "Request Method" , "POST" ); httpPost.setHeader( "Content-Type" , "application/x-www-form-urlencoded; charset=UTF-8" ); httpPost.setHeader( "Referer" , "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html" ); httpPost.setHeader( "Remote Address" , "192.168.56.1" ); httpPost.setHeader( "User-Agent" , "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36" ); httpPost.setHeader( "Time-Iso8601" , getISO8601Timestamp()); httpPost.setHeader( "Server Address" , "192.168.56.80" ); httpPost.setHeader( "Cookie" , "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)" ); // 4.設置請求參數 ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add( new BasicNameValuePair( "json" , "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}" )); httpPost.setEntity( new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null ); } public static void spiderPng() throws Exception { // 1.指定目標網站 String url =basePath + "/B2C40/dist/main/images/common.png" ; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader( "Time-Local" , getLocalDateTime()); httpPost.setHeader( "Requst" , "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1" ); httpPost.setHeader( "Request Method" , "POST" ); httpPost.setHeader( "Content-Type" , "application/x-www-form-urlencoded; charset=UTF-8" ); httpPost.setHeader( "Referer" , "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0" ); httpPost.setHeader( "Remote Address" , "192.168.56.1" ); httpPost.setHeader( "User-Agent" , "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36" ); httpPost.setHeader( "Time-Iso8601" , getISO8601Timestamp()); httpPost.setHeader( "Server Address" , "192.168.56.80" ); httpPost.setHeader( "Cookie" , "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)" ); // 4.設置請求參數 ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add( new BasicNameValuePair( "json" , "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}" )); httpPost.setEntity( new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null ); } public static void spiderJpg() throws Exception { // 1.指定目標網站 String url = basePath + "/B2C40/dist/main/images/loadingimg.jpg" ; // 2.發起請求 HttpPost httpPost = new HttpPost(url); // 3. 設置請求參數 httpPost.setHeader( "Time-Local" , getLocalDateTime()); httpPost.setHeader( "Requst" , "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1" ); httpPost.setHeader( "Request Method" , "POST" ); httpPost.setHeader( "Content-Type" , "application/x-www-form-urlencoded; charset=UTF-8" ); httpPost.setHeader( "Referer" , "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0" ); httpPost.setHeader( "Remote Address" , "192.168.56.1" ); httpPost.setHeader( "User-Agent" , "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36" ); httpPost.setHeader( "Time-Iso8601" , getISO8601Timestamp()); httpPost.setHeader( "Server Address" , "192.168.56.80" ); httpPost.setHeader( "Cookie" , "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)" ); // 4.設置請求參數 ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>(); parameters .add( new BasicNameValuePair( "json" , "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}" )); httpPost.setEntity( new UrlEncodedFormEntity(parameters)); // 5. 發起請求 CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(httpPost); // 6.獲取返回值 System.out.println(response != null ); } public static String getLocalDateTime() { DateFormat df = new SimpleDateFormat( "dd/MMM/yyyy'T'HH:mm:ss +08:00" , Locale.ENGLISH); String nowAsISO = df.format( new Date()); return nowAsISO; } public static String getISO8601Timestamp() { DateFormat df = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss+08:00" ); String nowAsISO = df.format( new Date()); return nowAsISO; } public static String getGoTime() { DateFormat df = new SimpleDateFormat( "yyyy-MM-dd" ); String nowAsISO = df.format( new Date()); return nowAsISO; } public static String getBackTime() { Date date = new Date(); // 取時間 Calendar calendar = new GregorianCalendar(); calendar.setTime(date); calendar.add(calendar.DATE, + 1 ); // 把日期往前減少一天,若想把日期向后推一天則將負數改為正數 date = calendar.getTime(); SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd" ); String dateString = formatter.format(date); return dateString; } } |
第六步:啟動kafka
1.創建主題topic
1
2
|
[root@node01 bin] # kafka-topics.sh --zookeeper node01:2181 --partitions 3 --replication-factor 3 --create --topic B2CDATA_COLLECTION1 |
2.開啟kafka消費者
1
2
|
[root@node01 bin] # kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic B2CDATA_COLLECTION1 |
第七步:開啟爬蟲程序并觀察結果
1.啟動爬蟲程序
2.觀察消費者窗口如下
第八步:啟動kafka-manager觀察
1.啟動kafka-manager
1
2
3
4
5
6
7
8
9
10
11
|
[root@node01 conf] # cd /export/servers/kafka-manager-1.3.3.23/bin/ [root@node01 bin] # ll total 36 -rwxr-xr-x 1 root root 13747 May 1 06:27 kafka-manager -rw-r--r-- 1 root root 9975 May 1 06:27 kafka-manager.bat -rwxr-xr-x 1 root root 1383 May 1 06:27 log-config -rw-r--r-- 1 root root 105 May 1 06:27 log-config.bat [root@node01 bin] # #啟動 [root@node01 bin] # ./kafka-manager |
啟動后的窗口:
2.瀏覽器訪問
瀏覽器輸入:node01:9000
kafka manager使用不做講解,觀察B2CDATA_COLLECTION1主題消費情況:
? 有三個分區,每個分區消費的消息差多說明成功啦,
? 如果不一樣,則是kafkalua.lua 腳本中沒有配置分區策略,默認分區會導致 數據傾斜 我們需配置自己的分區策略!
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://www.cnblogs.com/-xiaoyu-/p/11294905.html