用Python对MySQL同步状态进行监控使用Python对MySQL数据库服务器是否可访问,及主从同步是否中断进行监控,是一件非常简单的事情。感谢Python给我们带来了如此简单,强大,快捷的开发环境。 本文使用到的Python模块 使用telnetlib校验服务器是否可被访问 使用SMTP向管理员发送通知邮件 使用MySQL官方的驱动对数据库进行访问 使用optparse实现命令行参数的提取 实现原理 使用optparse模块获取命令行参数。读取defaults-file设置文件内容(如果存在),使用参数覆盖defaults-file的值(如果传递参数,如:–host, –user, –to之类)。 直接去连MySQL等待是否能进行访问的返回结果太慢了,所以使用telnet对服务器的连通性进行验证。可以设置等待时间,可控性高一些。 当服务器工作正常,使用MySQL与服务器进行连接,获取主从同步的状态。 将获取服务器的异常状态信息(服务器无法访问,主从同步的状态中断),使用SMTP发送给管理员,并把造成中断同步的异常信息一同发送到管理员的邮箱中。
slavecheckpoint.py coding=utf-8"""数据库同步状态侦测MySQL数据库同步复制状态监测脚本。可配合Linux下的crond进行定时监测。如果同步状态异常,侧使用邮件通知管理员,并将造成同步中断的错误信息也包含到邮件当中,管理员可即时通过错误信息直接定位异常。实例:python slavecheckpoint.py --defaults-file=/etc/slave.cnf --to=xxxx@abc.com===FILE:slave.cnf===========[config]smtp_host=smtp.163.comfrom=消息中心host=localhost"""import mysql.connectorfrom mysql.connector import errorcodeimport telnetlibimport smtplibfrom email.mime.text import MIMETextimport optparsefrom ConfigParser import ConfigParserimport os,time,sysclass SlaveStatu: __instance__ = None __error__ = [] def __init__(self,*args,**kwargs): self.__config__ = { "host":"localhsot", "user":"root", "password":"", "port":3306, "smtp_host":"localhost", "smtp_user":"", "smtp_password":"", "from":"admin@localhost", "to":"" } #优先读取设置文件中的值 if not kwargs["defaults_file"] is None: defaults_file = self.__read_defaults_file__( kwargs["defaults_file"] ) del kwargs["defaults_file"] #使用参数的设置去覆盖设置文件的值 for key,val in kwargs.items(): if not val is None and len(val) > 0: self.__config__[key] = val def __configParseMySQL__(self): """ 提取数据库的设置 :return: dict """ return { "host" : self.__config__["host"], "port" : self.__config__["port"], "user" : self.__config__["user"], "password" : self.__config__["password"] } def __configParseSMTP__(self): """ 提取SMTP邮件设置 :return: dict """ return { "smtp_host": self.__config__["smtp_host"], "smtp_user": self.__config__["smtp_user"], "smtp_password": self.__config__["smtp_password"], "from": self.__config__["from"], "to": self.__config__["to"] } def __read_defaults_file__( self, filePath ): """ 加载设置文件设置的值 :param filePath: 设置文件路径 :return: """ section = "config" if os.path.exists( filePath ): cnf = ConfigParser() cnf.read( filePath ) options = cnf.options( section ) for key in options: self.__config__[key] = cnf.get( section, key ) def telnet( self, host, port, timeout=5 ): """ 测试服务器地址和端口是否畅通 :param host: 服务器地址 :param port: 服务器端口 :param timeout: 测试超时时间 :return: Boolean """ try: tel = telnetlib.Telnet( host, port, timeout ) tel.close() return True except: return False def connect(self): """ 创建数据库链接 """ try: config = self.__configParseMySQL__() if self.telnet( config["host"],config["port"]): self.__instance__ = mysql.connector.connect( **config ) return True else: raise Exception("unable connect") except: self.__error__.append( "无法连接服务器主机: {host}:{port}".format( host=config[ "host"], port=config["port"]) ) return False def isSlave(self): """ 数据库同步是否正常 :return: None同步未开启,False同步中断,True同步正常 """ cur = self.__instance__.cursor(dictionary=True) cur.execute("SHOW SLAVE STATUS") result = cur.fetchone() cur.close() if result: if result["Slave_SQL_Running"] == "Yes" and result["Slave_IO_Running"] == "Yes": return True else: if result["Slave_SQL_Running"] == "No": self.__error__.append( result["Last_SQL_Error"] ) else: self.__error__.append( result["Last_IO_Error"] ) return False def get_last_error(self): """ 获取第一个错误信息 :return: String """ if self.__error__: return self.__error__.pop(0) def notify(self,title,message): """ 发送消息提醒 :param title: 消息的标题 :param message: 消息的内容 :return: """ msg = [title,message] pool = [] notify = notify_email( self.__configParseSMTP__() ) pool.append( notify ) for item in pool: item.ring( msg ) def close(self): """ 关闭数据库链接 """ if self.__instance__: self.__instance__.close()class notify_email(object): def __init__(self,config): self.config = config def ring(self, message=[]): subject = message.pop(0) messageBody = "".join( message ) mailList = self.config["to"].split(";") datetime = time.strftime("%Y-%m-%d %H:%M:%S") for to in mailList: body = """ 管理员{admin},你好: 收到这封邮件说明你的数据库同步出现异常,请您及时进行处理。 异常信息: {body} {date} """.format( admin=to, body=messageBody, date=datetime ) msg = MIMEText( body, "html", "utf-8" ) msg["From"] = self.config["from"] msg["To"] = to msg["Subject"] = subject smtp = smtplib.SMTP() smtp.connect( self.config["smtp_host"] ) if self.config.has_key("smtp_user"): smtp.login( self.config["smtp_user"], self.config["smtp_password"] ) smtp.sendmail( self.config["from"], to, msg.as_string() ) smtp.quit()if __name__ == "__main__": #命令行参数列表 usage = """usage: MySQLStat [options]""" opt = optparse.OptionParser(usage=usage) opt.add_option("-H","--host",dest="host",help="MySQL host (default: localhost)") opt.add_option("-u","--user",dest="user",help="MySQL user") opt.add_option("-p","--password",dest="password",help="MySQL password") opt.add_option("-P","--port",dest="port",help="MySQL port (default: 3306)") opt.add_option("","--smtp_host",dest="smtp_host",help="SMTP host (default: localhost)") opt.add_option("","--smtp_user",dest="smtp_user",help="SMTP user") opt.add_option("","--smtp_password",dest="smtp_password",help="SMTP password") opt.add_option("","--from",dest="from",help="Email from") opt.add_option("","--to",dest="to",help="Email to") opt.add_option("","--defaults-file",dest="defaults_file",help="config file path") (options,args) = opt.parse_args() options = options.__dict__ Statu = SlaveStatu( **options ) subject = "服务中心异常信息提醒" if Statu.connect() is False or Statu.isSlave() is False: Statu.notify( subject, Statu.get_last_error() ) Statu.close()
server1.cnf 设置文件内容 [config]smtp_host=smtp.aliyun.comsmtp_user=xxxx@aliyun.comsmtp_password=xxxxxxfrom=管理中心host=xxx.xxx.xxx.xxxuser=rootpassword=123456
完成了以上的配置之后,我们在定时任务里添加一条任务,就可以让程序为我们监控MySQL的服务器状态了。 crontab设置 */2 * * * * python slavecheckpoint.py --defaults-file=server1.cnf --to=dba@abc.com
github项目地址: https://github.com/yagas/checkpoint.git |