- 论坛徽章:
- 11
|
回复 1# yestreenstars
1.不多
2.不多
3.没有感觉
4.代码
#工作流程说明:
# .主线程
# 控制配置文件刷新和其它线程的启停
# .socket主线程:
# 接收client连接,放到Context.vClients
# .客户端线程
# 解包client数据,产生dEvent,检验后放到 Context.qIndb去入event表
# .入库线程
# 对Context.qIndb中的数据进行分类处理,对告警事件,如果入库成功
# 添加INSTID,然后放到 Context.qArData
# .告警规则线程
# 对Context.qArData进行压缩和延迟
# 待通知的结果放到 Context.qAlarm
# .告警通知
# 对Context.qAlarm按规则进行mail或sms,发送结果放到Context.qIndb去入send表
#, 并将告警发送关系放到Context.qIndb去入cdr表
# .告警重发
# 根据send表,cdr表和时间设置进行重发
#
# 每个线程起来后不需要停止
#
###############################################################################
import sys,os,time
import select,socket,string,cPickle as pickle
import threading,thread,logging,Queue
import signal,StringIO,traceback
###############################################################################
from rules import engine as _engine
from rules.entity import Utils as _utils, InitUtils
import SocketServerPlus as SocketServer,select
###############################################################################
__version__ = 'Server 2.0'
# Context:上下文
#
class Context:
pass # 太长省了....
###############################################################################
#主线程
#检测配置文件变化和停止标志.
#控制其它线程
#
class MainThread:
pass # 太长省了....
###############################################################################
#可控的安全线程(可控,能控并且是必控)
#start,stop
#
class Subroutine(threading.Thread):
pass # 太长省了....
class MainSock(Subroutine):
def __init__(self):
Subroutine.__init__(self)
return
def _init(self):
SocketServer.ThreadingTCPServer.allow_reuse_address = True
if context.sServIP or context.nServPort > 0:
sAddress = (context.sServIP,context.nServPort)
_INFO('%s Start Server ... [%s]',_cOdepOs(self), str(sAddress))
context.server = SocketServer.ThreadingTCPServer(sAddress,RequestHandlerClass)
pass
else:
_ERROR('%s Server=[%s] Paramter Error.',_cOdepOs(self), str(sAddress))
pass
return True
def _refresh(self):
if not context.server:
return
sAddress = (context.sServIP,context.nServPort)
if sAddress == context.server.server_address:
return
self._finish()
self._init()
return
def _run(self):
if not context.server:
return False
r, w, e = select.select([context.server], [], [], 0.5)
if r:
context.server._handle_request_noblock()
pass
return True
def _finish(self):
if context.server:
sAddress = (context.sServIP,context.nServPort)
_INFO('%s Stop Server ... [%s]',_cOdepOs(self), str(sAddress))
context.server.server_close()
del context.server
pass
context.server = None
while len(context.vClients):
time.sleep(1)
_INFO('%s wait client ... [%d]',_cOdepOs(self), len(context.vClients))
continue
return True
#告警规则的调用流程
class AlarmRuleThread2(Subroutine):
pass # 太长省了....
class ForwardThread(Subroutine):
def __init__(self):
Subroutine.__init__(self)
self.engine = _engine.RuleEngine(context,_cOdepOs)
pass
def _init(self):
try:
service = context.plat.GetServiceByServer()
root = service.getElementsByTagName('FORWARD_RULE_SERVICE')[0]
self.engine.Load(root)
self.engine.init()
pass
except:
fp = StringIO.StringIO()
traceback.print_exc(file=fp)
message = fp.getvalue()
_ERROR('%s except=>>>%s<<<', _cOdepOs(self),message)
pass
return True
#
def _refresh(self):
self._finish()
self._init()
return
#
def _run(self):
bEmpty = context.qAlarm.empty()
#_INFO('%s ??????', _cOdepOs(self))
while not context.qAlarm.empty():
item = context.qAlarm.get()
_INFO('%s forward disp alarm %s', _cOdepOs(self),_utils.getkey(item))
#拿去转发
self.engine.rc.vInput.append(item)
setattr(self,'输入',getattr(self,'输入',0)+1)
continue
#根据规则转发
self.engine.run()
#更新发送状态表
for item in self.engine.rc.vOutput:
self.__indbq(item)
setattr(self,'输出',getattr(self,'输出',0)+1)
pass
del self.engine.rc.vOutput[:]
if bEmpty:
time.sleep(1)
pass
return True
def _hb(self):
_INFO('Forward process [%d] alarm, and output [%d] alarm',
getattr(self,'输入',0),getattr(self,'输出',0))
#告警发送状态数据入库
def __indbq(self,dEvent):
#数据正确性,完整性由forward rule保证,这里不做校验
sDataKey = _utils.getkey(dEvent)
#先验证一下
sFuncOld = dEvent['_func']
assert(sFuncOld in ('InsertAlarmCdr','InsertAlarmSend'))
_INFO('%s put to indb quequ for [%s], key=(%s)',
_cOdepOs(self),sFuncOld,sDataKey)
context.qIndb.put(dEvent)
return
#
def _finish(self):
self.engine.end()
return True
class ResendThread(Subroutine):
def __init__(self):
Subroutine.__init__(self)
self.hModule = None
self.hInst = None
pass
def _init(self):
try:
self.hModule = __import__(context.sResend, globals(), locals(), ['*'])
self.hModule = reload(self.hModule)
self.hInst = self.hModule.TReSendMessage()
self.hInst.Init(context.db)
self.hInst.Run()
pass
except:
self.hModule = None
self.hInst = None
fp = StringIO.StringIO()
traceback.print_exc(file=fp)
message = fp.getvalue()
_ERROR('%s except=>>>%s<<<', _cOdepOs(self),message)
pass
return True
#
def _refresh(self):
self._finish()
self._init()
return
#
def _run(self):
time.sleep(1)
return True
#
def _finish(self):
if self.hInst:
del self.hInst
self.hInst = None
return True
#专门入库的线程
class Save2DB(Subroutine):
pass # 太长省了....
#处理传过来的命令,
#解析出包放到 context.qDB 上
#指定要执行的DB操作
#必要的回包
#注:CER/CEA:心跳消息 CAR/CAA:告警消息 CSR/CSA:统计消息..
class PackageHandler:
def __init__(self):
_DEBUG('%s create package handler %d ', _cOdepOs(self), id(self))
pass
def __del__(self):
_DEBUG('%s delete package handler %d.', _cOdepOs(self), id(self))
pass
def __repr__(self):
cc = getattr(self,'context for client',None)#here maybe not set
return '<PackageHandler(%d) for %s>'%(id(self),cc)
def __ReplyPackage(self,nIndex,sNeId,sCmdCode,BodySize):
cc = getattr(self,'context for client')
sData = string.zfill(BodySize,7)
sBuffer = cc.pkg.GetBodyStringByInfo(nIndex,sNeId,sCmdCode[:2]+'A',sData)
_DEBUG('%s Send %s Data Package. INDEX=[%d],SIZE=[%d]', _cOdepOs(self), cc, nIndex, len(sData))
if True:#cc.hSocket:
cc.qSend.put(sBuffer)
sBuffer = cc.sSend + sBuffer
pass
return
def __SavePackage(self,PackageVer,CreateTime,NeId,CmdCode,DataList):
cc = getattr(self,'context for client')
_DEBUG('%s save package[%s] to indb queue for %s, key=(%s)',_cOdepOs(self), CmdCode, cc,''.join(DataList[:3]))
Result = 1
#
_INFO('put !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
try:
#保存告警数据
if CmdCode == 'CAR' and len(DataList) > 6:
#AlarmType,NeId,TaskId,AlarmTime,AlarmCode,ResetFlag,DateList
#Result = DBInst.InsertAlarmEvent(DataList[0],DataList[1],DataList[2],DataList[3],DataList[4],DataList[5],DataList[6:])
context.qIndb.put(['InsertAlarmEvent',(CmdCode,DataList),(DataList[0],DataList[1],DataList[2],DataList[3],DataList[4],DataList[5],DataList[6:])])
pass
#保存统计数据
elif CmdCode == 'CSR' and len(DataList) > 6:
#StatType,NeId,TaskId,KpiTabId,StatTime,RowIndex,KpiValues
#STAT_FUNCTION|78|12|41|2010-04-11 17:39:52|44|5080|Liu.QiQuan|taskhost|0|10448896|"taskhost.exe"
if DataList[0].find('ATTR')>=0: sOpt = 'InsertStatAttrCDR'
else: sOpt = 'InsertStatCDR'
context.qIndb.put([sOpt,(CmdCode,DataList),(DataList[0],DataList[1],DataList[2],DataList[3],DataList[4],DataList[5],DataList[6:])])
pass
#保存心跳数据
elif CmdCode == 'CHR' and len(DataList) >= 3:
#Result = DBInst.UpdateNeHeart(DataList[1],DataList[2],CreateTime)
context.qIndb.put(['UpdateNeHeart',(CmdCode,DataList),(DataList[1],DataList[2],CreateTime)])
#_INFO('%s %s UpdateNeHeart(%s,%s,%s) =>',_cOdepOs(self), cc, DataList[1],DataList[2],CreateTime)
pass
else:
Result = 0
_WARN('%s find %s unkown CmdCode[%s]',_cOdepOs(self), cc, CmdCode)
pass
return Result
except:
fp = StringIO.StringIO()
traceback.print_exc(file=fp)
message = fp.getvalue()
_ERROR('%s Save %s Database Error! ParamText=%s, Message=[%s]', _cOdepOs(self),cc,str(DataList),message)
return 0
def __ProcessOnePackage(self,BodyIndex,PackageVer,CreateTime,NeID,CmdCode,BodyMsg):
# #InstId = -1
# #vData = BodyMsg.split('|')
# #vData[-1] = vData[-1].replace('#13#10',os.linesep)
if CmdCode in ['CAR','CSR','CHR','CER']:
#完成一次包的入库工作
# #nCount = self.__SavePackage(PackageVer,CreateTime,NeID,CmdCode,vData)
bOK,dEvent = _utils.set(BodyIndex,PackageVer,CreateTime,NeID,CmdCode,BodyMsg)
if bOK:
_LDEBUG('%s recv NE[%s] PKG[%s]',_cOdepOs(self),NeID,CmdCode)
context.qIndb.put(dEvent)
pass
else:
_WARN('%s error data : %s',_cOdepOs(self), dEvent)
pass
pass
if CmdCode in ['CAR','CSR','CHR']:
#发送回复包
self.__ReplyPackage(BodyIndex,NeID,CmdCode,len(BodyMsg))
pass
return 1
def process(self):
cc = getattr(self,'context for client')
if cc.qRecv.qsize() == 0:
return
sBuffer = cc.qRecv.get()
cc.nTotal += self.__ProcessPackage(sBuffer)
if cc.nTotal>=1000:
time.sleep(0.01)
cc.nTotal = 0
pass
return
def __ProcessPackage(self, sBuffer):
cc = getattr(self,'context for client')
try:
nPackage = 0
#合并上次的剩余包信息
sBuffer = cc.sLeft + sBuffer
cc.sLeft = ''
#初步判断是否合法
nBegin = sBuffer.find(cc.pkg.GetPackageName())
if nBegin > 0:
_WARN('%s Skip %s Package Head [%d] Char. PACKAGE_SIZE=[%d],PACKAGE_TEXT=[%s]',
_cOdepOs(self),cc,nBegin,len(sBuffer),sBuffer)
sBuffer = sBuffer[nBegin:]
pass
elif nBegin==-1:
_WARN('%s Skip %s Package Buffer. PACKAGE_SIZE=[%d],PACKAGE_TEXT=[%s]',
_cOdepOs(self),cc,len(sBuffer),sBuffer)
return nPackage
#如果不到一个包头,则等下一次的包
if cc.pkg.GetPackageHeadSize() >= len(sBuffer):
cc.sLeft = sBuffer
_DEBUG('%s Skip %s Package. PACKAGE_SIZE=%d,BUFFER_SIZE=%d,PACKAGE_TEXT=[%s]',
_cOdepOs(self),cc,cc.pkg.GetPackageHeadSize(),len(sBuffer),sBuffer)
return nPackage
#解包信息
BodyIndex,BodySize,NeID,CmdCode,PackageVer,CreateTime,BodyMsg = cc.pkg.GetBodyInfoByString(sBuffer)
#如果最后一个包是半个包(有可能只有包头)
if BodySize > len(BodyMsg):
cc.sLeft = sBuffer
_DEBUG('%s Skip %s Package. PACKAGE_SIZE=%d,BODY_SIZE=%d,BODY_READ_SIZE=%d,PACKAGE_TEXT=[%s]',
_cOdepOs(self),cc,len(sBuffer),BodySize,len(BodyMsg),cc.sLeft)
return nPackage
_DEBUG('%s %s PACKAGE RECV: %s',_cOdepOs(self),cc,cc.pkg)
#处理一个完整的包
if self.__ProcessOnePackage(BodyIndex,PackageVer,CreateTime,NeID,CmdCode,BodyMsg):
nPackage += 1
pass
if len(sBuffer)==cc.pkg.GetPackageHeadSize()+BodySize:
return nPackage
#计算出下一次包的内容
NextPackageBuffer = sBuffer[cc.pkg.GetPackageHeadSize()+BodySize:]
_DEBUG('%s %s Next Package. SIZE=%d,TEXT=[%s]',_cOdepOs(self),cc,len(NextPackageBuffer),NextPackageBuffer)
#如果下一次的包还可以解一次,则递归调用
if len(NextPackageBuffer) > cc.pkg.GetPackageHeadSize():
nPackage += self.__ProcessPackage(NextPackageBuffer)
pass
else:
cc.sLeft = NextPackageBuffer
pass
pass
except:
fp = StringIO.StringIO()
traceback.print_exc(file=fp)
message = fp.getvalue()
_ERROR('%s Parse %s Package Error. Message = %s',_cOdepOs(self),cc,message)
pass
return nPackage
#客户端socket请求收发处理
class RequestHandlerClass(SocketServer.BaseRequestHandler):
def __init__(self, request, client_address, server):
SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
_DEBUG('%s create request handler=[%s:%d]',_cOdepOs(self),client_address[0],client_address[1])
pass
def __del__(self):
_DEBUG('%s delete request handler=%d',_cOdepOs(self),id(self))
pass
def __repr__(self):
try:
return '<RequestHandlerClass(%d) for (%s:%d>'%(self.client_address[0],self.client_address[1],id(self))
except:
return '<RequestHandlerClass(%d)>'%id(self)
return
def __handle(self,cc):
hSock = self.request
if not cc.qSend.empty() or cc.sSend:
w = [hSock]
pass
else:
w = []
pass
ready_to_read, ready_to_write, in_error = select.select([hSock], w, [hSock], 1)
if in_error:
raise RuntimeError, 'socket error'
if ready_to_read:
#读取客户端口的数据包.
sBuffer = hSock.recv(8*1024)
#没有数据,则客户端退出.
if not sBuffer:
_INFO('%s %s Is Disconnect.', _cOdepOs(self),cc)
return False
_DEBUG('%s Read %s Message. SIZE=%d',_cOdepOs(self), cc,len(sBuffer))
#发送消息给队列..
cc.qRecv.put(sBuffer)
setattr(self,'dead time',time.time()+10*60)
pass
if ready_to_write:
if not cc.qSend.empty():
cc.sSend += cc.qSend.get()
pass
nCount = hSock.send(cc.sSend)
cc.sSend = cc.sSend[nCount:]
_DEBUG('%s Write %s Message. SIZE=%d',_cOdepOs(self), cc,nCount)
pass
if getattr(self,'dead time',time.time()) < time.time():
_INFO('%s check socket dead %s',_cOdepOs(self), self)
return False
getattr(self,'package hander').process()
return True
def setup(self):
#创建client的上下文,和报文处理工具.
cc = ClientContext(self.request,self.client_address)
setattr(self,'context for client',cc)
ph = PackageHandler()
setattr(self,'package hander',ph)
#交叉引用
setattr(ph,'context for client',cc)
setattr(cc,'package hander', ph)
#加到全局上下文
context.vClients.append(cc)
_INFO('%s %s Connect. Total %d Clients.', _cOdepOs(self), cc, len(context.vClients))
setattr(self,'dead time',time.time()+10*60)
pass
def handle(self):
cc = getattr(self,'context for client')
_DEBUG('%s begin request handler %s',_cOdepOs(self),cc)
try:
while True:
if context.bStopThread:
break
if context.server is None:
_INFO('%s find server quit,so quit %s',_cOdepOs(self),cc)
break
if not self.__handle(cc):
break
continue
pass
except:
fp = StringIO.StringIO()
traceback.print_exc(file=fp)
message = fp.getvalue()
_ERROR('%s Read Message Error. Message=%s',_cOdepOs(self),message)
pass
_DEBUG('%s quit request handler',_cOdepOs(self))
return
def finish(self):
assert(self.request is not None)
#移除交叉引用列表.
cc = getattr(self,'context for client')
ph = getattr(cc,'package hander')
setattr(cc,'package hander', None)
setattr(ph,'context for client', None)
for cc in context.vClients:
if cc.hSocket != self.request:
continue
context.vClients.remove(cc)
break
try:
self.request.shutdown(2)
pass
except:
pass
try:
self.request.close()
pass
except:
pass
_DEBUG('%s Destory request handler for [%s:%s]', _cOdepOs(self),self.client_address[0],self.client_address[1])
return
#==============================================================
#Function Name:Main
#
#==============================================================
#唯一的全局的实例
context = Context()
if __name__ == '__main__':
try:
AgentServer = MainThread()
if AgentServer.Init(): AgentServer.Run()
del AgentServer
except SystemExit:
pass
except:
fp = StringIO.StringIO()
traceback.print_exc(file=fp)
message = fp.getvalue()
print message
else:
pass
|
|