在采用log4j
的kafka-appender收集spark
任務(wù)運(yùn)行日志時(shí),發(fā)現(xiàn)提交到yarn
上的任務(wù)始終ACCEPTED
狀態(tài),無(wú)法進(jìn)入RUNNING
狀態(tài),并且會(huì)重試兩次后超時(shí)。期初認(rèn)為是yarn資源不足導(dǎo)致,但在確認(rèn)yarn資源充裕的時(shí)候問(wèn)題依舊,而且基本上能穩(wěn)定復(fù)現(xiàn)。
起初是這么配置spark日志輸出到kafka的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
log4j.rootCategory=INFO, console, kafka log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m%n # Kafka appender log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender # Set Kafka topic and brokerList log4j.appender.kafka.topic=yarn_spark_log log4j.appender.kafka.brokerList=localhost:9092 log4j.appender.kafka.compressionType=none log4j.appender.kafka.syncSend=false log4j.appender.kafka.maxBlockMs=10 log4j.appender.kafka.layout=org.apache.log4j.PatternLayout log4j.appender.kafka.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m |
這里用org.apache.kafka.log4jappender.KafkaLog4jAppender
默認(rèn)將所有日志都輸出到kafka,這個(gè)appender已經(jīng)被kafka官方維護(hù),穩(wěn)定性應(yīng)該是可以保障的。
問(wèn)題定位
發(fā)現(xiàn)問(wèn)題后,嘗試將輸出到kafka的規(guī)則去掉,問(wèn)題解除!于是把問(wèn)題定位到跟日志輸出到kafka有關(guān)。通過(guò)其他測(cè)試,證實(shí)目標(biāo)kafka其實(shí)是正常的,這就非常奇怪了。
查看yarn的ResourceManager日志,發(fā)現(xiàn)有如下超時(shí)
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.util.AbstractLivelinessMonitor: Expired:appattempt_1578970174552_3204_000002 Timed out after 600 secs
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: Updating application attempt appattempt_1578970174552_3204_000002 with final
state: FAILED, and exit status: -1000
2020-05-07 21:49:48,231 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1578970174552_3204_000002 State change from LAUNCHED to FINAL_SAV
ING on event = EXPIRE
表明,yarn本身是接收任務(wù)的,但是發(fā)現(xiàn)任務(wù)遲遲沒(méi)有啟動(dòng)。在spark的場(chǎng)景下其實(shí)是指只有driver啟動(dòng)了,但是沒(méi)有啟動(dòng)executor。
而查看driver日志,發(fā)現(xiàn)日志輸出到一個(gè)地方就卡住了,不往下繼續(xù)了。通過(guò)對(duì)比成功運(yùn)行和卡住的情況發(fā)現(xiàn),日志卡在這條上:
2020/05/07 19:37:10.324 INFO SecurityManager: Changing view acls to: yarn,root
2020/05/07 19:37:10.344 INFO Metadata: Cluster ID: 6iG6WHA2SoK7FfgGgWHt_A
卡住的情況下,只會(huì)打出SecurityManager
這行,而無(wú)法打出Metadata
這行。
猜想Metadata
這行是kafka-client
本身打出來(lái)的,因?yàn)檎麄€(gè)上下文只有yarn, spark, kafka-client可能會(huì)打出這個(gè)日志。
在kafka-client 2.2.0版本中找到這個(gè)日志是輸出位置:
1
2
3
4
5
6
7
8
9
|
public synchronized void update(MetadataResponse metadataResponse, long now) { ... String newClusterId = cache.cluster().clusterResource().clusterId(); if (!Objects.equals(previousClusterId, newClusterId)) { log.info( "Cluster ID: {}" , newClusterId); } ... } |
看到synchronized
,高度懷疑死鎖。于是考慮用jstack
分析:
在yarn上運(yùn)行spark任務(wù)的時(shí)候,driver進(jìn)程叫ApplicationMaster,executor進(jìn)程叫CoarseGrainedExecutorBackend。這里首先嘗試再?gòu)?fù)現(xiàn)過(guò)程中找到drvier最終在哪個(gè)節(jié)點(diǎn)上運(yùn)行,然后快速使用jstack -F <pid>打印堆棧
jstack果然不負(fù)眾望,報(bào)告了死鎖!這里我把結(jié)果貼的全一點(diǎn)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
[root @node1 ~]# jstack 20136 20136 : Unable to open socket file: target process not responding or HotSpot VM not loaded The -F option can be used when the target process is not responding [root @node1 ~]# jstack -F 20136 Attaching to process ID 20136 , please wait... Debugger attached successfully. Server compiler detected. JVM version is 25.231 -b11 Deadlock Detection: Found one Java-level deadlock: ============================= "kafka-producer-network-thread | producer-1" : waiting to lock Monitor @0x00000000025fcc48 (Object @0x00000000ed680b60 , a org/apache/kafka/log4jappender/KafkaLog4jAppender), which is held by "main" "main" : waiting to lock Monitor @0x00007fec9dbde038 (Object @0x00000000ee44de38 , a org/apache/kafka/clients/Metadata), which is held by "kafka-producer-network-thread | producer-1" Found a total of 1 deadlock. Thread 20157 : (state = BLOCKED) - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci = 0 , line= 231 (Interpreted frame) - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci = 41 , line= 66 (Interpreted frame) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci = 26 , line= 206 (Interpreted frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci = 14 , line= 391 (Interpreted frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci = 34 , line= 856 (Interpreted frame) - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String, java.lang.Object) @bci = 34 , line= 324 (Interpreted frame) - org.apache.kafka.clients.Metadata.update(org.apache.kafka.common.requests.MetadataResponse, long ) @bci = 317 , line= 365 (Interpreted frame) - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(org.apache.kafka.common.requests.RequestHeader, long , org.apache.kafka.common.requests.MetadataResponse) @bci = 184 , line= 1031 (Interpreted frame) - org.apache.kafka.clients.NetworkClient.handleCompletedReceives(java.util.List, long ) @bci = 215 , line= 822 (Interpreted frame) - org.apache.kafka.clients.NetworkClient.poll( long , long ) @bci = 132 , line= 544 (Interpreted frame) - org.apache.kafka.clients.producer.internals.Sender.run( long ) @bci = 227 , line= 311 (Interpreted frame) - org.apache.kafka.clients.producer.internals.Sender.run() @bci = 28 , line= 235 (Interpreted frame) - java.lang.Thread.run() @bci = 11 , line= 748 (Interpreted frame) Thread 20150 : (state = BLOCKED) Thread 20149 : (state = BLOCKED) - java.lang.Object.wait( long ) @bci = 0 (Interpreted frame) - java.lang.ref.ReferenceQueue.remove( long ) @bci = 59 , line= 144 (Interpreted frame) - java.lang.ref.ReferenceQueue.remove() @bci = 2 , line= 165 (Interpreted frame) - java.lang.ref.Finalizer$FinalizerThread.run() @bci = 36 , line= 216 (Interpreted frame) Thread 20148 : (state = BLOCKED) - java.lang.Object.wait( long ) @bci = 0 (Interpreted frame) - java.lang.Object.wait() @bci = 2 , line= 502 (Interpreted frame) - java.lang.ref.Reference.tryHandlePending( boolean ) @bci = 54 , line= 191 (Interpreted frame) - java.lang.ref.Reference$ReferenceHandler.run() @bci = 1 , line= 153 (Interpreted frame) Thread 20137 : (state = BLOCKED) - java.lang.Object.wait( long ) @bci = 0 (Interpreted frame) - org.apache.kafka.clients.Metadata.awaitUpdate( int , long ) @bci = 63 , line= 261 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(java.lang.String, java.lang.Integer, long ) @bci = 160 , line= 983 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci = 19 , line= 860 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci = 12 , line= 840 (Interpreted frame) - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord) @bci = 3 , line= 727 (Interpreted frame) - org.apache.kafka.log4jappender.KafkaLog4jAppender.append(org.apache.log4j.spi.LoggingEvent) @bci = 69 , line= 283 (Interpreted frame) - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci = 106 , line= 251 (Interpreted frame) - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci = 41 , line= 66 (Interpreted frame) - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci = 26 , line= 206 (Interpreted frame) - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci = 14 , line= 391 (Interpreted frame) - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci = 34 , line= 856 (Interpreted frame) - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String) @bci = 12 , line= 305 (Interpreted frame) - org.apache.spark.internal.Logging$ class .logInfo(org.apache.spark.internal.Logging, scala.Function0) @bci = 29 , line= 54 (Interpreted frame) - org.apache.spark.SecurityManager.logInfo(scala.Function0) @bci = 2 , line= 44 (Interpreted frame) - org.apache.spark.SecurityManager.setViewAcls(scala.collection.immutable.Set, java.lang.String) @bci = 36 , line= 139 (Interpreted frame) - org.apache.spark.SecurityManager.<init>(org.apache.spark.SparkConf, scala.Option) @bci = 158 , line= 81 (Interpreted frame) - org.apache.spark.deploy.yarn.ApplicationMaster.<init>(org.apache.spark.deploy.yarn.ApplicationMasterArguments) @bci = 85 , line= 70 (Interpreted frame) - org.apache.spark.deploy.yarn.ApplicationMaster$.main(java.lang.String[]) @bci = 25 , line= 802 (Interpreted frame) - org.apache.spark.deploy.yarn.ApplicationMaster.main(java.lang.String[]) @bci = 4 (Interpreted frame) |
到這里,已經(jīng)確定是死鎖,導(dǎo)致driver一開(kāi)始就運(yùn)行停滯,那么當(dāng)然無(wú)法提交executor執(zhí)行。
具體的死鎖稍后分析,先考慮如何解決。從感性認(rèn)識(shí)看,似乎只要不讓kafka-client的日志也輸出到kafka即可。實(shí)驗(yàn)后,發(fā)現(xiàn)果然如此:如果只輸出org.apache.spark的日志就可以正常執(zhí)行。
根因分析
從stack的結(jié)果看,造成死鎖的是如下兩個(gè)線(xiàn)程:
- kafka-client內(nèi)部的網(wǎng)絡(luò)線(xiàn)程spark
- 主入口線(xiàn)程
兩個(gè)線(xiàn)程其實(shí)都是卡在打日志上了,觀察堆棧可以發(fā)現(xiàn),兩個(gè)線(xiàn)程同時(shí)持有了同一個(gè)log對(duì)象。而這個(gè)log對(duì)象實(shí)際上是kafka-appender。而kafka-appender本質(zhì)上持有kafka-client,及其內(nèi)部的Metadata對(duì)象。log4j的doAppend為了保證線(xiàn)程安全也用synchronized
修飾了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public synchronized void doAppend(LoggingEvent event) { if (closed) { LogLog.error( "Attempted to append to closed appender named [" +name+ "]." ); return ; } if (!isAsSevereAsThreshold(event.level)) { return ; } Filter f = this .headFilter; FILTER_LOOP: while (f != null ) { switch (f.decide(event)) { case Filter.DENY: return ; case Filter.ACCEPT: break FILTER_LOOP; case Filter.NEUTRAL: f = f.next; } } this .append(event); } |
于是事情開(kāi)始了:
-
main線(xiàn)程嘗試打日志,首先進(jìn)入了synchronized的doAppend,即獲取了
kafka-appender
的鎖 -
kafka-appender
內(nèi)部需要調(diào)用kafka-client發(fā)送日志到kafka,最終調(diào)用到Thread 20137
展示的,運(yùn)行到Metadata.awaitUpdate(也是個(gè)synchronized方法),內(nèi)部的wait會(huì)嘗試獲取metadata的鎖。(詳見(jiàn)https://github.com/apache/kaf...) -
但此時(shí),kafka-producer-network-thread線(xiàn)程剛好進(jìn)入了上文提到的打
Cluster ID
這個(gè)日志的這個(gè)階段(update方法也是synchronized的),也就是說(shuō)kafka-producer-network-thread線(xiàn)程獲得了metadata對(duì)象的鎖 -
kafka-producer-network-thread線(xiàn)程要打印日志同樣執(zhí)行synchronized的doAppend,即獲取了
kafka-appender
的鎖
上圖main-thread持有了log對(duì)象鎖,要求獲取metadata對(duì)象鎖;kafka-producer-network-thread持有了metadata對(duì)象鎖,要求獲取log對(duì)象鎖于是造成了死鎖。
總結(jié)
到此這篇關(guān)于spark通過(guò)kafka-appender指定日志輸出到kafka引發(fā)的死鎖的文章就介紹到這了,更多相關(guān)spark指定日志輸出內(nèi)容請(qǐng)搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!
原文鏈接:https://segmentfault.com/a/1190000022577776