- 论坛徽章:
- 11
|
本帖最后由 bskay 于 2014-08-18 15:10 编辑
贴一个类似的
# coding: utf-8
'''
模拟人使用ftp和telnet的工具,把telnet当成是接收“行”的工具(会超时)
'''
__author__ = 'xxx@nj'
__date__ = '$__DATE__$'
import sys, os, time
import telnetlib, re, ftplib, socket
from log_ui import setmodulog,INFO,DEBUG
setmodulog(INFO)
if __name__ == '__main__' :
setmodulog(DEBUG)
pass
def setdebugmode(debug=True):
if debug:
setmodulog(DEBUG)
else:
setmodulog(INFO)
return
#封装对telnetlib库的使用
class TelnetUI:
'''模拟一个人(正常的人)在使用telnet登录的场景:
先连接到主机
等看到错误提示(超时,不能连接)则返回
或者看到提示输入登录名,输入用户
等看到提示输入密码,输入密码
如果看到提示输入终端类型,输入vt100 (可选)
看到登录失败,)则返回
看到成功,敲几次回车,看看提示符(PS1)是什么
输入命令,等待执行完成(看看提示符是不是出现了)
*** 注意,在脚本模式里面,尽量不要使用可能改变PS1的命令
那样会导致匹配实效(可以通过调用比较慢的_p_ps1来重新检测来是匹配重新生效)
使用样例
e=TelnetUI('user','pwd','10.45.1.1')
ofs = file('E:\\aa.txt','wb')
ofs.write(e.exec_cmd('cd bin;cmd1'))
ofs.flush()
ofs.write(e.exec_cmd('cmd2'))
ofs.flush()
ofs.write(e.exec_cmd('cmd3'))
ofs.flush()
while True:
text = e.ex_result(20)
if not text:
break
ofs.write(text)
ofs.flush()
continue
ofs.close()
'''
def __init__(self, user=None, passwd=None, hostname=None, timeout=0.5):
self.user, self.passwd, self.host = user, passwd, hostname
self.port = 23
self.timeout = timeout
self.handle = None
self.lines = []
self.ps1 = None
if self.user and self.passwd and self.host:
self._telnet()
pass
return
def connect(self, host=None, port=None, timeout=None):
if self.handle is not None:
return
self.host = host or self.host
self.port = port or self.port
self.timeout = host or self.timeout
self.handle = telnetlib.Telnet(self.host,self.port)
self.lines = []
self.ps1 = None
return
def login(self, user, passwd, timeout=None):
self.user = user or self.user
self.passwd = passwd or self.passwd
self.timeout = timeout or self.timeout
return self._p_login() and self._p_ps1()
#
def reset_buf(self):
'''清除以前的输出结果
'''
self.lines = []
self._recv_line(self.timeout)
if self.lines:
_INFO('rich text >>>%s<<<',''.join(self.lines))
self.lines = []
pass
return
#
def send_cmd(self,cmd):
self.handle.write(cmd.strip()+'\r\n')
return
#
def _recv_line(self, idle):
'''接收一行一行的数据,如果不是行,在idle之后没有数据就返回
'''
if len(self.lines) > 0 and self.lines[-1][-1:] != '\n':
part_line = self.lines[-1]
else:
part_line = ''
last_time = time.time()+idle
while time.time() < last_time:
i,m,text = self.handle.expect(['.*\n'],0.1)
if text:
_DEBUG('%s i=%d, t=>>>%s<<<',_oo(self),i,text)
pass
if i == -1:
if text != '':
part_line += text
last_time = time.time()+idle
continue
continue
assert(i == 0 and text != '')
self.lines.append(part_line+text)
part_line = ''
continue
if part_line:
self.lines.append(part_line)
return
def _match(self, pattern):
'''从接收到的行里面匹配指定掩码的行,返回掩码索引和匹配到的行的索引'''
if isinstance(pattern, list):
pl = pattern[:]
else:
pl = [pattern]
for i in range(len(pl)):
if not hasattr(pl, "search"):
pl = re.compile(pl)
for l in range(len(self.lines)):
for p in range(len(pl)):
if pl[p].search(self.lines[l]):
return p,l
return -1,-1
def wait_e(self,pattern,timeout):
last_time = time.time()+timeout
while time.time() < last_time:
self._recv_line(self.timeout)
p,l = self._match(pattern)
if p != 0:
continue
#if l != len(self.lines)-1:
# continue
return p,l
return -1,-1
def _p_login(self):
#用户名
p,l = self.wait_e(['[Ll]ogin:',],5)
if p != 0:
_INFO('ERROR, connect host[%s] timeout\n>>>%s<<<',self.host,''.join(self.lines))
return False
_DEBUG('%s USER p=%d, l=>>>%s<<<',_oo(self),p,self.lines[l])
self.handle.write(self.user+'\r\n')
#口令
p,l = self.wait_e(['[Pp]assword:',],1)
_DEBUG('%s PASS p=%d, l=>>>%s<<<',_oo(self),p,self.lines[l])
assert(p == 0)
self.lines = []
self.handle.write(self.passwd+'\r\n')
#终端类型
el = [
'tset: unknown terminal type', #有的linux要设置终端
'Terminal type',
]
p,l = self.wait_e(el,5)
if p != -1:
_DEBUG('%s TERM: %s',_oo(self),self.lines[l])
self.handle.write('vt100\r\n')
self.lines = []
pass
#检查登录出错
el = ['Login incorrect']
p,l = self.wait_e(el,1)
if p != -1:
_INFO('%s ERROR, login incorrect:\n>>%s<<',_oo(self),''.join(self.lines))
return False
_DEBUG('%s FINISH',_oo(self))
self.lines = []
return True
def _p_ps1(self):
'''通过连续输入3次回车,快速读取结果,找到PS1的值'''
r = []
self.lines = []
for i in range(3):
self.handle.write('\r\n')
self._recv_line(self.timeout)
r.append(''.join(self.lines))
self.lines = []
if r[1] != r[2]:
_INFO('%s ERROR: can not cals PS1 r=[%s]',_oo(self),','.join(r))
return False
self.ps1 = r[2]
if r[2][:2] == '\r\n':
self.ps1 = r[2][2:]
_DEBUG('%s PS1=[%s], r=[%s]',_oo(self),self.ps1,','.join(r))
return True
def exec_cmd(self, cmd):
self.reset_buf()
self.send_cmd(cmd)
text = ''
bend, text = self.ex_result(30)
if not bend:
_DEBUG('%s cmd [%s] not end',_oo(self),cmd)
pass
return text
#返回附加的结果
def ex_result(self,timeout=20):
bFinish = False
lasttime = time.time()+timeout
while time.time()<lasttime:
self._recv_line(0.1)
if len(self.lines) == 0:
time.sleep(1)
_DEBUG('recv empty....')
continue
# #print 'break'
#包含PS1不算完成,PS1应该在最后才算完成
if self.lines[-1].count(self.ps1) == 0:
_DEBUG('recv line ....')
continue
if not self.lines[-1].endswith(self.ps1):
_DEBUG('last line [%s] ps1=[%s]',repr(self.lines[-1]),repr(self.ps1))
continue
bFinish = True
break
return bFinish, ''.join(self.lines)
#初始化
def _telnet(self):
self.connect()
if not self.login(self.user,self.passwd):
#ftp连不上不会抛异常,so这里抛一下
raise RuntimeError,'connect error'
return
#print v[-1]
_INFO('TDO>>> self.ps1 = >>>%s<<<',self.ps1)
return
def is_dir(self, rpath):
#TODO more...
cmd = 'test -d "%(rpath)s" && echo Y\e\s'
if self.exec_cmd(cmd).count('Yes') != 0:
return True
return
def expand_exp(self, exp):
cmd = 'echo __BB%sEE__' % exp
text = self.exec_cmd(cmd)
_DEBUG('text=%s',text)
text = text.split('__BB')[-1].split('EE__')[0]
return text
pass
|
|