监控SparkStreaming程序脚本

虽然Spark on yarn非常的稳定,一般情况下是不会出问题的.但是我们的SparkStreaming程序是一直运行着出实时报表的.
我们必须得对SparkStreaming程序进行监控,在程序退出后,能够及时的重启.
基于此需求,我想到了通过调用yarn的rest接口来获取提交到yarn上的任务

思路

调用yarn提供的rest接口来获取所有正在运行的任务

1
curl --compressed -H "Accept: application/json" -X GET "http://master:8088/ws/v1/cluster/apps?states=RUNNING"

如果对别的接口有兴趣,可以看看官网.

脚本

脚本也很简单,简单看看就明白了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# -*- coding: utf-8 -*-
'''
Created by hushiwei on 2018/1/5.
监控SparkStreaming程序
一旦挂了,执行重启,同时发送邮件和微信报警
'''
import os
import subprocess
import json
import logging
import time
import urllib2
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
wechats = "HuShiwei"
sendEmails = ['hsw_v5@163.com', 'xxxx@gm825.com']
urlRun = 'curl --compressed -H "Accept: application/json" -X GET "http://u007:8089/ws/v1/cluster/apps?states=RUNNING"'
urlAcc = 'curl --compressed -H "Accept: application/json" -X GET "http://u007:8089/ws/v1/cluster/apps?states=ACCEPTED"'
monitorPrograms = {
"com.xxxx.streaming.ADXStreaming": "/home/hadoop/statistics/ad/adxstreaming/start_adx_streaming_yarn.sh",
"com.xxxx.online.streaming.DSPStreaming": "/home/hadoop/statistics/ad/dsp_ad_puton/dsp_ad_puton_streaming/start_dsp_streaming_yarn_test.sh",
"com.xxxx.streaming.CPDAppStreaming": "/home/hadoop/statistics/ad/dsp_app_promotion/start_dsp_app_promotion_yarn.sh"
}
class WeChat(object):
'''
发送微信工具类
'''
def __init__(self, corpid, corpsecret, tokenpath):
self.corpid = corpid
self.corpsecret = corpsecret
self.tokenpath = tokenpath
self.logger = logging.getLogger('wechat')
def saveToken(self):
'''
:return:
'''
try:
with open(self.tokenpath, 'r') as f:
token = f.read()
if len(token) < 10:
token = self.getToken()
self.logger.info("Can not get token from %s,prepare to get token on api which token is %s" % (
self.tokenpath, token))
return token
else:
return token
except IOError:
token = self.getToken()
self.logger.info(
"Can not get token from %s,prepare to get token on api which token is %s" % (self.tokenpath, token))
return token
def getToken(self):
Url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s' % (self.corpid, self.corpsecret)
req = urllib2.Request(Url)
result = urllib2.urlopen(req)
json_access_token = json.loads(result.read())
access_token = json_access_token['access_token']
with open(self.tokenpath, 'w') as f:
f.write(access_token)
return access_token
def setMessage(self, wechatids, text):
token = self.saveToken()
message = self.makeMessage(text)
submiturl = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={0}'.format(token)
data = {"touser": wechatids, "msgtype": "text", "agentid": "1000002", "text": {"content": message}, "safe": "0"}
data = json.dumps(data, ensure_ascii=False)
send_request = urllib2.Request(submiturl, data)
self.logger.info("Send wechat %s" % text)
response = json.loads(urllib2.urlopen(send_request).read())
if response['errcode'] == 42001 or response['errcode'] == 40014:
self.logger.info("Send wechat errorcode : %s" % response['errcode'])
os.remove(self.tokenpath)
self.setMessage(wechatids, text)
def makeMessage(self, text):
def date():
date = time.strftime('%m-%d %H:%M:%S', time.localtime())
return date
return "%s \nCall Time:%s" % (text, date())
class Message(object):
'''
构造邮箱发送的内容
'''
def format_str(self, strs):
if not isinstance(strs, unicode):
strs = unicode(strs)
return strs
def __init__(self, from_user, to_user, subject, content, with_attach=False):
'''
:param from_user: 谁发过来的邮件
:param to_user: 发给谁
:param subject: 邮件主题
:param content: 邮件内容
:param with_attach: 邮件是否包含附件
'''
if with_attach:
self._message = MIMEMultipart()
self._message.attach(MIMEText(content, 'plain', 'utf-8'))
else:
self._message = MIMEText(content, 'plain', 'utf-8')
self._message['Subject'] = Header(subject, 'utf-8')
self._message['From'] = Header(self.format_str(from_user), 'utf-8')
self._message['To'] = Header(self.format_str(to_user), 'utf-8')
self._with_attach = with_attach
def attach(self, file_path):
if self._with_attach == False:
print "Please init the Message with attr 'with_attach = True'"
exit(1)
if os.path.isfile(file_path) == False:
print "The file doesn`t exist!"
exit(1)
atta = MIMEText(open(file_path, 'rb').read(), 'base64', 'utf-8')
atta['Content-Type'] = 'application/octet-stream'
atta['Content-Disposition'] = 'attachment; filename="%s"' % Header(os.path.basename(file_path), 'utf-8')
self._message.attach(atta)
def getMessage(self):
return self._message.as_string()
class SMTPClient(object):
'''
发送邮件工具类
'''
def __init__(self, hostname, port, user, passwd):
'''
初始化相关参数
:param hostname: QQ邮箱:smtp.qq.com
:param port: QQ邮箱ssl加密端口:465
:param user: QQ邮箱账号
:param passwd: QQ邮箱授权秘钥,在web qq邮箱上获取
'''
self._HOST = hostname
self._PORT = port
self._USER = user
self._PASS = passwd
def send(self, receivers, msg):
'''
发送邮件方法
:param receivers: 邮件接收者,可以是多个.为列表
:param msg: 发送的邮件内容
:return:
'''
if isinstance(msg, Message) == False:
print "Error Message Instance!"
exit(1)
try:
smtpObj = smtplib.SMTP_SSL(self._HOST, self._PORT)
smtpObj.connect(self._HOST)
smtpObj.login(self._USER, self._PASS)
smtpObj.sendmail(self._USER, receivers, msg.getMessage())
return (1, "邮件发送成功")
except smtplib.SMTPException, e:
return (0, "Error: 无法发送邮件%s" % e)
def run_it(cmd):
'''
通过python执行shell命令
:param cmd:
:return:
'''
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True,
stderr=subprocess.PIPE)
# print ('running:%s' % cmd)
out, err = p.communicate()
if p.returncode != 0:
print ("Non zero exit code:%s executing: %s \nerr course ---> %s" % (p.returncode, cmd, err))
return out
def reStartSparkScript(scriptPath):
'''
执行spark脚本
1.cd到脚本所在路径
2.在改路径执行脚本
:param scripyPath:
:return:
'''
logger = logging.getLogger("Main")
scriptDir, script = os.path.split(scriptPath)
os.chdir(scriptDir)
run_it("sh %s" % script)
logger.info("exec [ %s ] on [ %s ] " % (script, scriptDir))
def collectMonitorStatus(yarnRestApi):
'''
从Yarn的Running接口或者Accept接口中获取我们需要监控的程序状态
:param str: yarn的running接口或者accept接口
:return:
'''
strUrl = run_it(yarnRestApi)
result = []
obj = json.loads(strUrl)
if obj['apps'] is None:
return result
else:
apps = obj['apps']['app']
result = [(app['name'], app['state']) for app in apps if app['name'] in monitorPrograms]
return result
def checkMonitorApps():
'''
调用yarn的running接口和accept接口
判断这里面是否有我们需要监控的spark程序
如果没有就执行报警和重启
:return:
'''
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger("Main")
smpt_client = SMTPClient('smtp.qq.com', 465, '694244330@qq.com', 'xxxxxx')
wechat_client = WeChat('xxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxxxxxxxx', '/tmp/token.txt')
runningStatus = collectMonitorStatus(urlRun)
acceptStatus = collectMonitorStatus(urlAcc)
runningAcceptApps = dict(runningStatus + acceptStatus)
logger.info("SparkStreaming ON Yarn Running And Accept ===>%s " % str(runningAcceptApps))
for monitor in monitorPrograms:
if monitor not in runningAcceptApps:
logging.info("[ %s ] is not running or accept,prepare to restart!" % monitor)
msg = Message("694244330@qq.com", "hushwiei", monitor, '%s is failed, prepare to resart! -- %s' % (
monitor, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())))
smpt_client.send(sendEmails, msg)
wechat_client.setMessage(wechats, "%s is not running or accept,prepare to restart!" % monitor)
reStartSparkScript(monitorPrograms[monitor])
def main():
checkMonitorApps()
if __name__ == '__main__':
main()
Donate comment here