- 论坛徽章:
- 0
|
- ###########################################################################
- #
- # This program is part of Zenoss Core, an open source monitoring platform.
- # Copyright (C) 2007, Zenoss Inc.
- #
- # This program is free software; you can redistribute it and/or modify it
- # under the terms of the GNU General Public License version 2 as published by
- # the Free Software Foundation.
- #
- # For complete information please visit: http://www.zenoss.com/oss/
- #
- ###########################################################################
- import sys
- import os
- import time
- import socket
- import ip
- import icmp
- import errno
- import logging
- log = logging.getLogger("zen.Ping")
- from twisted.internet import reactor, defer
- from twisted.spread import pb
- class PermissionError(Exception):
- """Not permitted to access resource."""
- class IpConflict(Exception):
- """Pinging two jobs simultaneously with different hostnames but the same IP"""
- class PingJob(pb.Copyable, pb.RemoteCopy):
- """
- Class representing a single target to be pinged.
- """
- def __init__(self, ipaddr, hostname="", status=0, unused_cycle=60):
- self.parent = False
- self.ipaddr = ipaddr
- self.hostname = hostname
- self.status = status
- self.reset()
- def reset(self):
- self.deferred = defer.Deferred()
- self.rrt = 0
- self.start = 0
- self.sent = 0
- self.message = ""
- self.severity = 5
- self.inprocess = False
- self.pathcheck = 0
- self.eventState = 0
- def checkpath(self):
- if self.parent:
- return self.parent.checkpath()
- def routerpj(self):
- if self.parent:
- return self.parent.routerpj()
- pb.setUnjellyableForClass(PingJob, PingJob)
- plog = logging.getLogger("zen.Ping")
- class Ping(object):
- """
- Class that provides asyncronous icmp ping.
- """
-
- def __init__(self, tries=2, timeout=2, sock=None):
- self.reconfigure(tries, timeout)
- self.procId = os.getpid()
- self.jobqueue = {}
- self.pktdata = 'zenping %s %s' % (socket.getfqdn(), self.procId)
- self.createPingSocket(sock)
- def reconfigure(self, tries=2, timeout=2):
- self.tries = tries
- self.timeout = timeout
- def createPingSocket(self, sock):
- """make an ICMP socket to use for sending and receiving pings"""
- socketargs = socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP
- if sock is None:
- try:
- s = socket
- self.pingsocket = s.socket(*socketargs)
- except socket.error, e:
- err, msg = e.args
- if err == errno.EACCES:
- raise PermissionError("must be root to send icmp.")
- raise e
- else:
- self.pingsocket = socket.fromfd(sock, *socketargs)
- os.close(sock)
- self.pingsocket.setblocking(0)
- reactor.addReader(self)
- def fileno(self):
- return self.pingsocket.fileno()
- def doRead(self):
- self.recvPackets()
- def connectionLost(self, unused):
- reactor.removeReader(self)
- self.pingsocket.close()
- def logPrefix(self):
- return None
- def sendPacket(self, pingJob):
- """Take a pingjob and send an ICMP packet for it"""
- #### sockets with bad addresses fail
- try:
- pkt = icmp.Echo(self.procId, pingJob.sent, self.pktdata)
- buf = icmp.assemble(pkt)
- pingJob.start = time.time()
- plog.debug("send icmp to '%s'", pingJob.ipaddr)
- self.pingsocket.sendto(buf, (pingJob.ipaddr, 0))
- reactor.callLater(self.timeout, self.checkTimeout, pingJob)
- pingJob.sent += 1
- current = self.jobqueue.get(pingJob.ipaddr, None)
- if current:
- if pingJob.hostname != current.hostname:
- raise IpConflict("Host %s and %s are both using ip %s" %
- (pingJob.hostname,
- current.hostname,
- pingJob.ipaddr))
- self.jobqueue[pingJob.ipaddr] = pingJob
- except (SystemExit, KeyboardInterrupt): raise
- except Exception, e:
- pingJob.rtt = -1
- pingJob.message = "%s sendto error %s" % (pingJob.ipaddr, e)
- self.reportPingJob(pingJob)
- def recvPackets(self):
- """receive a packet and decode its header"""
- while reactor.running:
- try:
- data, (host, port) = self.pingsocket.recvfrom(1024)
- if not data: return
- ipreply = ip.disassemble(data)
- try:
- icmppkt = icmp.disassemble(ipreply.data)
- except ValueError:
- plog.debug("checksum failure on packet %r", ipreply.data)
- try:
- icmppkt = icmp.disassemble(ipreply.data, 0)
- except ValueError:
- continue # probably Unknown type
- except Exception, ex:
- plog.debug("Unable to decode reply packet payload %s", ex)
- continue
- sip = ipreply.src
- if (icmppkt.get_type() == icmp.ICMP_ECHOREPLY and
- icmppkt.get_id() == self.procId and
- self.jobqueue.has_key(sip)):
- plog.debug("echo reply pkt %s %s", sip, icmppkt)
- self.pingJobSucceed(self.jobqueue[sip])
- elif icmppkt.get_type() == icmp.ICMP_UNREACH:
- try:
- origpkt = icmppkt.get_embedded_ip()
- dip = origpkt.dst
- if (origpkt.data.find(self.pktdata) > -1
- and self.jobqueue.has_key(dip)):
- self.pingJobFail(self.jobqueue[dip])
- except ValueError, ex:
- plog.warn("failed to parse host unreachable packet")
- else:
- plog.debug("unexpected pkt %s %s", sip, icmppkt)
- except (SystemExit, KeyboardInterrupt): raise
- except socket.error, err:
- errnum, errmsg = err.args
- if errnum == errno.EAGAIN:
- return
- raise err
- except Exception, ex:
- log.exception("receiving packet error: %s" % ex)
- def pingJobSucceed(self, pj):
- """PingJob completed successfully.
- """
- plog.debug("pj succeed for %s", pj.ipaddr)
- pj.rtt = time.time() - pj.start
- pj.message = "ip %s is up" % (pj.ipaddr)
- self.reportPingJob(pj)
- def pingJobFail(self, pj):
- """PingJob has failed remove from jobqueue.
- """
- plog.debug("pj fail for %s", pj.ipaddr)
- pj.rtt = -1
- pj.message = "ip %s is down" % (pj.ipaddr)
- self.reportPingJob(pj)
- def reportPingJob(self, pj):
- if str(pj.message).endswith("is down"):
- cmdPing="ping "+pj.ipaddr+" -c 1"
- flag=os.system(cmdPing)
- if int(flag)==0:
- pj.rtt = time.time() - pj.start
- pj.message="ip "+pj.ipaddr+" is up"
- try:
- del self.jobqueue[pj.ipaddr]
- except KeyError:
- pass
- # also free the deferred from further reporting
- if pj.rtt < 0:
- pj.deferred.errback(pj)
- else:
- pj.deferred.callback(pj)
- def checkTimeout(self, pj):
- if self.jobqueue.has_key(pj.ipaddr):
- now = time.time()
- if now - pj.start > self.timeout:
- if pj.sent >= self.tries:
- plog.debug("pj timeout for %s", pj.ipaddr)
- self.pingJobFail(pj)
- else:
- self.sendPacket(pj)
- else:
- plog.debug("calling checkTimeout needlessly for %s", pj.ipaddr)
- def jobCount(self):
- return len(self.jobqueue)
- def ping(self, ip):
- "Ping the ip and return the result in a deferred"
- pj = PingJob(ip)
- self.sendPacket(pj)
- return pj.deferred
- def _printResults(results, start):
- good = [pj for s, pj in results if s and pj.rtt >= 0]
- bad = [pj for s, pj in results if s and pj.rtt < 0]
- if good: print "Good ips: %s" % " ".join([g.ipaddr for g in good])
- if bad: print "Bad ips: %s" % " ".join([b.ipaddr for b in bad])
- print "Tested %d ips in %.1f seconds" % (len(results), time.time() - start)
- reactor.stop()
- if __name__ == "__main__":
- ping = Ping()
- logging.basicConfig()
- log = logging.getLogger()
- log.setLevel(10)
- if len(sys.argv) > 1: targets = sys.argv[1:]
- else: targets = ("127.0.0.1",)
- lst = defer.DeferredList(map(ping.ping, targets), consumeErrors=True)
- lst.addCallback(_printResults, time.time())
- reactor.run()
复制代码 zenoss-api_2.5.0\api\Products.ZenStatus.AsyncPing-module.html
ip 和icmp模块在哪里可以找得到啊 |
|