Chinaunix

标题: 分享有礼:Python代码的魔鬼细节大比拼!(获奖名单已公布-2014-8-11) [打印本页]

作者: timespace    时间: 2014-07-14 09:47
标题: 分享有礼:Python代码的魔鬼细节大比拼!(获奖名单已公布-2014-8-11)
获奖名单已公布,详情请看:http://bbs.chinaunix.net/thread-4149830-1-1.html

细节体现态度,细节决定成败,不管是多么高大上的架构/设计/算法,终将依赖代码的有力执行,代码就是魔鬼细节。常常发觉,使用他人的优秀代码容易,而真正理解其设计与实现就要狠下功夫,至于写出同等水平的代码,往往是可望不可即。通过代码能发觉开发者的思维是否严谨,逻辑是否清晰,代码就是开发者的自白。

Python是一门十分灵活的动态语言,给了开发者极大的自由度,但要写出高质量代码,要求并不比传统语言低。有写过自己也看不懂的代码?有写过运行效率极差的代码?有写过深得我心的代码?一切尽在细节中。

本期话题

分享自己最得意的Python代码,“Talk is cheap. Show me the code”。要求:
1.抽象浓缩到100行之内(不算空行和注释),放入code标签内。
2.逻辑完整且可运行于Python 2.5+或Python 3.3+,有简单输入和输出,不依赖第三方库。
3.重点说明代码的核心功能和让你得意的地方,体现出态度和思考,你就赢了。

本期奖品

分享精彩的网友有机会获得《编写高质量代码:改善Python程序的91个建议》 一本,共5本
本书作者有丰富的Python实践经验,书中很多建议也是Python社区长期积累的最佳实践,对大家平时的学习和工作很有参考价值。

活动时间
2014年7月14日 - 2014年7月31日

图书信息
《编写高质量代码:改善Python程序的91个建议》  购买链接:China-pub   京东

作者: 张颖    赖勇浩  
丛书名: Effective系列丛书
出版社:机械工业出版社
ISBN:9787111467045
上架时间:2014-6-13
出版日期:2014 年6月
开本:16开
页码:262
版次:1-1


作者: cryboy2001    时间: 2014-07-14 10:14
好书,支持。。。。。。。。。
作者: whitelotus19    时间: 2014-07-14 10:37
支持,坐等大神分享代码!学习中。
作者: craaazy123    时间: 2014-07-14 11:35
支持啊。。。现在python越来越流行了。  openstack貌似就大量用了python
作者: yestreenstars    时间: 2014-07-14 11:50
好高端的样子~
作者: qingduo04    时间: 2014-07-14 14:07
不是好高端,是很高端
作者: jieforest    时间: 2014-07-14 15:10
支持,好书。。
作者: 机智的小学生    时间: 2014-07-14 16:56
好像很厉害的样子,正在python学习中,支持一下
作者: ssfjhh    时间: 2014-07-14 21:38
本帖最后由 ssfjhh 于 2014-07-16 22:11 编辑

蚂蚁爬杆
这个问题,一开始写了个很复杂的代码来计算,错误百出,没心情调试了,重写了个,只用4行代码就把问题解决了。happy,不会用数学证明,所以没有证明,但感觉是正确的。
  1. def ant(m, *args):
  2.     p = sorted(args, key = abs)
  3.     t = [-i for i in p if i<0] + [m - i for i in p if i >0]
  4.     return sorted(zip(t,p))
  5. print(ant(27,3,-7,11,-17,23))
复制代码
一开始写的代码只想着模拟蚂蚁在杆上爬时的过程来计算蚂蚁离开的时间,整个过程挺复杂的,蚂蚁有可能在非整数点相遇,离开的时间也有可能不是整数,写的代码漏洞百出。
后来看到蚂蚁爬离杆子最长所需要的时间是通过幻影算法计算出来的,感觉这个算法真是太美了,竟然这么容易就计算出来了。但是却不甘于只知道最后一个蚂蚁离开的时间,仍然想通过算法计算出每只蚂蚁离开的时间,这个幻影算法给了我提示。
我总结出的结论如下:
为了方便描述,用左端和右端表示朝向杆子的始端和末端。
1、既然两只蚂蚁相遇要掉头,那么在还没有蚂蚁爬离杆子的任意时刻,向左爬的蚂蚁和向右爬的蚂蚁的数量始终是一样多的;
2、左边的蚂蚁永远不可能爬到右边的蚂蚁的右侧,因为同方向时,在后的蚂蚁一定追不上在前的蚂蚁;相对方向时,相遇就会掉头,左端的蚂蚁依然不会爬到右边的蚂蚁的右侧;同理,右边的蚂蚁也不能爬到左边蚂蚁的左侧;
3、根据幻影算法,两只相遇的蚂蚁爬离杆子需要的时间等于对方不掉头时爬离杆子需要的时间;
由此得出结论,4、杆子上有多少只朝向左爬的蚂蚁,最终就会有多少只蚂蚁从左端离开;同理,有多少只朝向右爬的蚂蚁,也一定会有多少只从右端爬离的蚂蚁;
5、左侧的蚂蚁从左端离开,右端的蚂蚁从右端离开,而且由2可知,最左端的蚂蚁一定是向左方向最先离开的。
左侧的蚂蚁离开的时间等于向左的蚂蚁完全不掉头所需要的时间,右侧的蚂蚁离开的时间等于向右的蚂蚁完全不掉头所需要的时间。

剩下代码的就好办了,
一、先按位置给蚂蚁排好顺序;(主要是为了保证参数规范,如果参数传入规范,其实这一行也是可以不要的)
二、再按照朝向将蚂蚁分为向左和向右两部分并计算爬离的时长,这表示蚂蚁向左和向右爬离的只数和时长,结合一,打个包并按照时间排序就得出蚂蚁离开的顺序、时长和初始位置了。
作者: 给个理由先    时间: 2014-07-14 21:52
本帖最后由 给个理由先 于 2014-07-15 23:03 编辑
  1. def isISBNvalid(isbn):
  2.     '''
  3.     To verify a ISBN valid or invalid, return True or False.
  4.     Supports both 10/13 digits.
  5.     '''
  6.     if len(isbn) == 13:
  7.         weight = (1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1)
  8.         return sum([(ord(x)-48)*y for (x,y) in zip(isbn,weight)])%10 == 0
  9.     elif len(isbn) == 10:
  10.         weight = (10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
  11.         return sum([(ord(x)-48)*y for (x,y) in zip(isbn,weight)])%11 == 0
  12.     else:
  13.         return False

  14. if __name__ == '__main__':
  15.     for (isbn, result) in {'7560924182':True, '7302273608':False, '9771003551004':True}.items():
  16.         assert(isISBNvalid(isbn) == result)
复制代码
验证isbn合法的一小段代码,so easy 都不用解释了

simple and comprehensible semantics
use to use and powerful built-in data structure
more productivity
self documentation
......
作者: wenhq    时间: 2014-07-14 22:57
支持。强大的的社区支持。
作者: pitonas    时间: 2014-07-15 18:10
有写过深得我心的代码。
现在python越来越流行。
作者: reb00t    时间: 2014-07-16 00:18
好活动,支持!
作者: send_linux    时间: 2014-07-16 09:56
reb00t 发表于 2014-07-16 00:18
好活动,支持!


欢迎大家用实际的代码来支持活动!
作者: Linux_manne    时间: 2014-07-16 11:14
好书好书啊。。。
作者: adidiaos丶丶    时间: 2014-07-16 12:53
python让我很不爽。感觉有歇斯底里症的人才喜欢用,难道搞IT大多有这毛病。总喜欢标新立异。没发觉它比别的语言有什么优势。
作者: qingduo04    时间: 2014-07-16 20:26
回复 10# ssfjhh


    见识到真正的代码了!牛X
作者: 刘一痕    时间: 2014-07-16 22:35
  1. import  os
  2. import threading
  3. from time import  sleep

  4. class BaseThread(object):
  5.     def __init__(self):
  6.         self.run()
  7.     def checkClassMethod(self):
  8.         methods = ['run','start','join']
  9.         inst_methods = dir(self)
  10.         if [method for method in methods if method not in inst_methods]:
  11.             exit('the method is not full in class %s ' % self.__class__)

  12. class ManageThread(BaseThread):
  13.     def __init__(self):
  14.         self.run()

  15.     def thread_function(self,*threads):
  16.         for thread in threads:
  17.             thread.start()
  18.         for thread in threads:
  19.             thread.join()

  20.     def run(self):
  21.         thread = threading.Thread(
  22.             target=self.thread_function,
  23.             args=(HandleThread(),HandleThread1(),
  24.                 # you can install new thread in this
  25.             ))

  26.         thread.start()

  27. class HandleThread(BaseThread):
  28.     def __init__(self):
  29.         self.checkClassMethod()
  30.         self.run()

  31.     def thread_function(self,args):
  32.         while True:
  33.             print args # or do other thing
  34.             sleep(3)

  35.     def run(self):
  36.         self.thread = threading.Thread(
  37.                target=self.thread_function,
  38.                args=((123,)) )

  39.     def start(self):
  40.         self.thread.start()

  41.     def join(self):
  42.         self.thread.join()



  43. class HandleThread1(BaseThread):
  44.     def __init__(self):
  45.         self.checkClassMethod()
  46.         self.run()

  47.     def thread_function(self,args):
  48.         while True:
  49.             print args  # or do other things
  50.             sleep(3)

  51.     def run(self):
  52.         self.thread = threading.Thread(
  53.             target=self.thread_function,
  54.             args=((1230,)) )

  55.     def start(self):
  56.         self.thread.start()

  57.     def join(self):
  58.         self.thread.join()

  59. thread1 = ManageThread()
复制代码
线程模型建立代码:包括一个管理线程和多个被管理线程;
每一个线程都会进行自省处理,查看是否定义了必须定义的方法,这些方法在基类BaseThread中通过一个list约束;
pycharm上成功运行;
作者: to407    时间: 2014-07-16 23:30
本帖最后由 to407 于 2014-07-17 20:32 编辑

也贴上一个题
https://www.codeeval.com/browse/130/


这里有一个参考的输入 用例
130.txt.gz (1.62 KB, 下载次数: 4)




这个题也想了很多不同的解法, 最后用的一张二维数组,动态规划来完成的。

当然实际的过程当中,还是基本的调优办法节省了时间。
  1. import sys

  2. def ifmatching(m,n):
  3.     if not n or not m:
  4.         return False
  5.     if 'B' not in n:
  6.         return True
  7.     if 'A' in n:
  8.         return False
  9.     if m=='1':
  10.         return True
  11.     return False

  12. def markdown(i,j):
  13.     global s1,s2,x,y,mtx
  14.     if mtx[x-1][y-1]:
  15.         return
  16.     # if it's last 2nd line now, caculate mtx[x-1][y-1] directly.
  17.     # to save time
  18.     if i==x-2:
  19.         if ifmatching(s1[i+1],s2[j+1:]):
  20.             mtx[x-1][y-1]=1
  21.         return
  22.     row=i+1
  23.     for col in range(j+1,y-x+i+2):
  24.         if not mtx[row][col]:
  25.             if not ifmatching(s1[row],s2[j+1:col+1]):
  26.                 break
  27.             mtx[row][col]=1
  28.             markdown(row,col)


  29. test_cases = open(sys.argv[1], 'r')
  30. for test in test_cases:
  31.     s1,s2=test.strip().split(' ')
  32.     x,y=len(s1),len(s2) #x is length of 1st argument, y is 2nd.

  33.     if x>y or x<1 or y<1:
  34.         print "No"
  35.         continue

  36.     # mtx initilized as 0 all.
  37.     mtx=[[0 for col in range(y)] for row in range(x)]

  38.     # len of s1=1, just a simple check.
  39.     if x==1:
  40.         print "Yes" if ifmatching(s1,s2) else "No"
  41.         continue

  42.     # when x>1
  43.     # init mtx[0], 1st row, check s1[0] if matching s2[0-?]
  44.     for j in range(y-x+1):
  45.         if not ifmatching(s1[0],s2[:j+1]):
  46.             break
  47.         mtx[0][j]=1
  48.         markdown(0,j)
  49.    

  50.     print "Yes" if mtx[x-1][y-1] else "No"

  51. test_cases.close()
复制代码
动态规划的一张图不讲了。

最基本的,
1. 先是当if else的条件下, 先写 可以 快速退出的条件,哪个语句return更快就写哪个


2. 递归不是坏事, 这个解法选用了递归, 在考虑过程中就想方设法让递归早结束
比如调用markdown() 本身就是递归的, 我花了很多时间写里面的逻辑, 尽量把return场景写在前面。这样大概有省了20%的时间。

真正节省时间的地方 在下边这个攺动:
  1. from:
  2. ==============


  3. row=i+1
  4. for col in range(j+1,y-x+i+2):
  5.     if not mtx[row][col]:
  6.         if ifmatching(s1[row],s2[j+1:col+1]):
  7.             mtx[row][col]=1
  8.             markdown(row,col)

  9. to:
  10. ==============
  11. row=i+1
  12. for col in range(j+1,y-x+i+2):
  13.     if not mtx[row][col]:
  14.         if not ifmatching(s1[row],s2[j+1:col+1]):
  15.             break
  16.         mtx[row][col]=1
  17.         markdown(row,col)
复制代码
看似不起眼的地方, 看上去没有减少对ifmatching()的调用。 但直接减少了递归调用markdown(), 就指数级地减少了ifmatching()的调用。
ifmatching()有很多字符串查找的地方需要copy memory, 这样的减少调用大有利益。

这一处攺动 直接把提交时间从3s 提升到了 0.3s的级别. 而这样改动的本身,不在算法,在于对简单逻辑的顺序优化。

130.txt.gz

1.62 KB, 下载次数: 0

输入例子


作者: pitonas    时间: 2014-07-17 12:51
俺个人来说, 是爱 Python 的! 他有很多迷人的地方.
就本人看来: python 这种语言十分优美, 十分低调, 和十分强大.

Python本身被设计为可扩充的. 并非所有的特性和功能都集成到语言核心,
很多人还把 Python 作为一种「胶水语言」(glue language)使用.
使用 Python 将其他语言编写的程式进行集成和封装.


Talk is cheap. Show you the code

就本人看来这个脚本



呵呵, 自己最得意的 Python 代码:

  1. #!/usr/bin/python

  2. import os
  3. import sys

  4. def hello():
  5.     c1 = 'perl -e  " exec  q['
  6.     c2 = 'ruby -e \' puts %q['
  7.     c3 = 'Hello ' + sys.argv[1]
  8.     c4 = '!]\']"'
  9.     cs = c1 + c2 + c3 + c4
  10.     os.system(cs)

  11. hello()

复制代码
我想, 人们无论做出哪种选择. 都有着不同的追求, 也有着不同的局限.
于是我们做出了不同的选择, 写出不同的代码.

代码体现细节,
细节体现态度,
态度决定成败.

不管是多么高大上的架构/设计/算法,终将依赖有执行力的代码, 代码就是力量, 代码就是神.

突然觉得 python 真是太强悍了, 当然我确实激动了~ {:2_168:}

作者: to407    时间: 2014-07-17 20:22
回复 21# pitonas

argv[1] 为空 报错~


   
作者: pitonas    时间: 2014-07-18 10:28
当然, 小伙伴们, 你说的没错, 是酱.

我想, 人们无论做出哪种选择. 都有着不同的追求, 于是我们做出了不同的选择, 写出不同的代码.
  1. len(sys.argv)
  2. sys.argv.__len__()
  3. c3 = 'Hello ' + (sys.argv[1] if sys.argv.__len__() > 1 else 'world')
复制代码
折腾出这类辊斤拷的代码, 等於摧残心灵. 真心感觉很难不会遭到 perl, ruby 的使用者嘲笑. 这看看就挺揪心的..

    (⊙0⊙) ~ 呃...头好晕..饭都吃不下去了...


作为 一个 Python 的 粉丝.
抱歉, 我也是 不能容忍, Python 这简约低调有内涵的语言夹杂这如此尼玛丑陋的代码...


嗯, 据说
Erlang 有一个哲学: Let it crash (随它崩溃)

在优美雅致与崩溃报错之间, 小伙伴们, 我的选择是:
小伙伴们, 就随它报错好了

Let it crash. let it be ~{:2_172:} {:2_172:}

回复 22# to407


   
作者: rdcwayx    时间: 2014-07-18 20:24
借用boto 调用Amazon AWS 的API来查询 EC2 的信息。
  1. #!/usr/bin/env python
  2. import boto.ec2
  3. import hashlib

  4. try:
  5.   conn = boto.ec2.connect_to_region ("us-east-1")
  6.   current_sgs = conn.get_all_security_groups()
  7. except boto.exception.BotoServerError, e:
  8.   log.error(e.error_message)
  9. conn.close()

  10. for sg in current_sgs:
  11.   print "="*72
  12.   print "id:\t\t", sg.id
  13.   print "name:\t\t", sg.name
  14.   print "vpc:\t\t", sg.vpc_id
  15.   print "instance:\t", sg.instances()
  16.   print "ingress rules:"
  17.   for rule in sg.rules:
  18.     ruledata = sg.id,rule.grants,rule.ip_protocol,rule.from_port,rule.to_port,"ingress"
  19.     rulehash=hashlib.sha256(str(ruledata)).hexdigest()
  20.     print "\thash:",rulehash
  21.     print "\t",rule.grants,"-> [instance]:",rule.from_port,"-",rule.to_port,rule.ip_protocol
  22.   print "egress rules:"
  23.   for rule in sg.rules_egress:
  24.     ruletuple = sg.id,rule.grants,rule.ip_protocol,rule.from_port,rule.to_port,"ingress"
  25.     rulehash=hashlib.sha256(str(ruletuple)).hexdigest()
  26.     print "\thash:",rulehash
  27.     print "\t[instance]->",rule.grants,":",rule.from_port,"-",rule.to_port,rule.ip_protocol
  28.   print "="*72
复制代码

作者: reyleon    时间: 2014-07-21 13:11
老师的帖子,先顶个...
作者: qstlv    时间: 2014-07-21 18:06
pitonas 发表于 2014-07-17 12:51
俺个人来说, 是爱 Python 的! 他有很多迷人的地方.
就本人看来: python 这种语言十分优美, 十分低调, 和十 ...



为什么我把cs打印出来然后拿到命令行执行,什么都没有,而直接运行这个程序却有输出呢?

作者: qstlv    时间: 2014-07-21 18:10
原来是因为那个感叹号!
作者: adaready    时间: 2014-07-22 19:53
本帖最后由 adaready 于 2014-07-22 20:00 编辑

前两天写的一个客户电子渠道多渠道签约新老**渠道签约信息合并的程序。
效率上一般吧,主要在python方便,几千万的数据也就是5-6分钟就跑完了,
比较符合我的期望。

注释的部分是原本期望将结果算好了再输出的,后来发现效率非常差(比现在
的版本要慢近10倍),所以只好在跑出结果的同时直接输出,算起来少了nlogn
的查找。
  1. #coding=gbk
  2. import sys

  3. G_CHN={}

  4. def ScanChnFile(fname):
  5.     fd=open(fname,'r')
  6.     line=''
  7.     while True:

  8.         v=[0,0,0]
  9.         flds=[]
  10.         line=fd.readline()
  11.         if line == "" : break
  12.         try:
  13.             flds=line.strip().split('|')
  14.             (chn_type,chn_cust_no)=(flds[0],flds[1])
  15.         except:
  16.             continue
  17.         if chn_cust_no in G_CHN:
  18.             v=G_CHN[chn_cust_no]
  19.         if chn_type=="01": v[0]|=1
  20.         if chn_type=="02": v[1]|=1
  21.         if chn_type=="05": v[2]|=1
  22.         
  23.         G_CHN[chn_cust_no]=v
  24.     fd.close()

  25. def RefineCHN():
  26.     idno,flag,keylen='','',0
  27.     it=G_CHN.iteritems()
  28.     while True:
  29.         try:
  30.             idno,flag='','N'
  31.             k,v=it.next()
  32.             keylen=len(k)
  33.             if keylen==15:
  34.                 idno=Id15to18(k)
  35.             if keylen == 18:
  36.                 idno=Id18to15(k)
  37.             if idno not in G_CHN: #15位或18位只有一条的情况
  38.                 flag='Y'
  39.             else:
  40.                 vv=G_CHN[idno]
  41.                 v[0]|=vv[0]
  42.                 v[1]|=vv[1]
  43.                 v[2]|=vv[2]
  44.                 if keylen==15: #原证件号15位且18位存在的情况下,只取18位那条
  45.                     flag='N'
  46.                 if keylen==18:
  47.                     flag='Y'
  48.             if flag=='Y':
  49.                 #print "%s|%d%d%d|%s|" % (k, v[0],v[1],v[2],idno)
  50.                 print '{KEY}|{V0}{V1}{V2}|{IDNO}'.format(KEY=k,V0=v[0],V1=v[1],V2=v[2],IDNO=idno)  
  51. #             if flag=='Y':
  52. #                 v.append((flag,idno))
  53. #             else:
  54. #                 v.append((flag,idno))           
  55. #             G_CHN[k]=v
  56.         except :
  57.             break

  58. def Id15to18(idno):
  59.     TailChars="10X98765432"
  60.     WI=[7,9,10,5,8,4,2,1,6,3,7,9,10,5,8,4,2]
  61.     try:
  62.         id17=idno[:6]+"19"+idno[6:]
  63.         s=0
  64.         for i in range(len(id17)):
  65.             s+=int(id17[i])*WI[i]
  66.         return id17+TailChars[s%11]
  67.     except:
  68.         return idno

  69. def Id18to15(idno):
  70.     return idno[:6]+idno[8:17]


  71. if __name__=="__main__":
  72.     ScanChnFile(sys.argv[1])
  73.     RefineCHN()
  74. #     for (k,v) in G_CHN.items():
  75. #         net,mob,sms,flag,idno=v[0],v[1],v[2],v[3][0],v[3][1]
  76. #         if(flag=='Y'):
  77. #             print "%s|%d%d%d|%s|" % (k,net,mob,sms,idno)
复制代码

作者: 大海里的骆驼    时间: 2014-07-23 17:34
本帖最后由 大海里的骆驼 于 2014-08-13 17:18 编辑

.........................
作者: 赵大班长    时间: 2014-07-23 18:20
  1. #coding:utf8
  2. import requests,cookielib,time,threading
  3. page = '---'
  4. def getPageToRequestsCookie(url=None):
  5.     """功能:通过requests的cookie的方式得到页面信息
  6.                    传入参数:url 需要得到的页面地址
  7.                     返回值:page 页面的信息
  8.     """  
  9.     cookie = ''
  10.     jar = cookielib.FileCookieJar(cookie)
  11.     r = requests.get(url, cookies=jar,headers={'Referer':url,"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20140624 Firefox/3.5"})
  12.     page =  r.content
  13.     return page
  14. def get_page_info(url=None):
  15.         """功能:得到详情页的源码, 出现异常使用递归方法调用5次
  16.                         传入参数:url 是单个产品平台地址
  17.                         返回值:page 产品详情页的源码
  18.         """
  19.         try:
  20.             page = getPageToRequestsCookie(url)
  21.         except:
  22.             time.sleep(5)
  23.             page = getPageToRequestsCookie(url)
  24.         return page
  25.    
  26. def get_page_main(product_url):
  27.         """进行获得页面源码方法进行监控
  28.                         传入参数:product_url
  29.         """
  30.         global page
  31.         try:
  32.             page = get_page_info(product_url)
  33.         except:
  34.             time.sleep(1)
  35.             page = get_page_info(product_url)
  36.             
  37. def thread_test(tHandle,timeout):
  38.         global page
  39.         tHandle.setDaemon(True)
  40.         tHandle.start()
  41.         tHandle.join(timeout)
  42.         time.sleep(1)
  43.         if page == '---':
  44.             page = 'thread victory~~'
  45.             
  46. def main_general_url(page_temp,url):
  47.         global page
  48.         page = page_temp
  49.         tcheck = threading.Thread(target=get_page_main,args =(url,))
  50.         caller = threading.Thread(target=thread_test,args=(tcheck,4,))
  51.         caller.start()
  52.         test_i= 0
  53.         while test_i < 20:
  54.             if page <> '---' :
  55.                 if page == 'thread victory~~':
  56.                     return page
  57.                     break
  58.                 else:
  59.                     return page
  60.                     break
  61.             elif test_i > 10:
  62.                 return 'thread victory~~'
  63.                 break   
  64.             time.sleep(1)
  65.             test_i += 1
  66.         return 'thread victory~~'
  67. if __name__ =='__main__':
  68.     page = '---'
  69.     url = 'http://www.baidu.com/'
  70.     print main_general_url(page,url)
复制代码
我是新人,碰巧做了爬虫学到了python,这个代码功能是抓取数据的。
简单介绍一下:我做的是监控“某宝”上的数据,需要频繁访问,有时间会遇到卡死,程序不报异常吗,但是一直卡中,类似于一种死循环,一直想能有一个监控的程序,如果卡死就返回一个特殊的字符串,想到了线程,但是python没有类似于杀死线程的机制,于是就写了一个这样的程序,代码写的很难可能,但是功能实现了,个人很有成就感,可能很拙劣,拿出来跟大家分享一下,如果有大神看到,能帮我改进一下,小弟感激不尽~~~
抛砖引玉~~支持这个活动
作者: tomac_cu    时间: 2014-07-23 21:43

  1. import socket
  2. import ssl
  3. sock = ssl.wrap_socket(socket.socket())
  4. sock.connect(('192.168.1.100', 443))
  5. fmt = '''GET /?a=reg&%s HTTP/1.1\r\nUser-Agent: python-https-client\r\n\r\n'''
  6. dfmt = "n=%s&d=%s" % (n,d)
  7. data = fmt % (dfmt )
  8. sock.sendall(data)
复制代码
偶在Mac上要做个功能,又没动过xcode,所以就用python做个小工具,还可以做一个简单的编译。
python上访问https服务端,简单灵活方便

在web端的工具,php不能编译的问题太严重了,经费又不足以支持 c++写服务端,以后可能要以python为主了。
作者: shenlanyouyu    时间: 2014-07-26 22:12
对Python有一点了解,LP在工作中用得比较多,支持活动,顶一个。
作者: wsysx    时间: 2014-07-30 16:41
好活动  必须顶啊,对python只是偶尔能简单用点
作者: txdgtwpv    时间: 2014-08-01 17:08
前几天写的HTTP代理服务器  lightproxy.py, 除掉空行在100行以内

  1. #!/usr/bin/env python
  2. from select import select
  3. from socket import socket
  4. from sys import argv, exit
  5. from urlparse import urlparse, urlunparse
  6. from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
  7. from SocketServer import ThreadingMixIn

  8. __version__ = '0.01'
  9. bufsize = 1024 * 1024  # 1MB
  10. timeout = 60


  11. class ProxyHandler(BaseHTTPRequestHandler):
  12.     server_version = 'SimpleProxy/%s' % __version__

  13.     def do_CONNECT(self):
  14.         self.log_request()
  15.         sock = socket()
  16.         self._connect_remote(sock, self.path)
  17.         self.wfile.write('%s 200 Connection Established\r\n'
  18.                          % self.protocol_version)
  19.         self.wfile.write('Proxy-agent: %s\r\n' % self.version_string())
  20.         self.wfile.write('\r\n')
  21.         self._do_proxy(sock)
  22.         self.connection.close()
  23.         sock.close()

  24.     def do_GET(self):
  25.         import time
  26.         print time.ctime()
  27.         self.log_request()
  28.         res = urlparse(self.path)
  29.         if res.scheme != 'http' or res.fragment or not res.netloc:
  30.             self.send_error(400, 'bad url %s' % self.path)
  31.             raise RuntimeError('bad url %s' % self.path)
  32.         sock = socket()
  33.         self._connect_remote(sock, res.netloc)
  34.         path = urlunparse(('', '', res.path, res.params, res.query, '')) or '/'
  35.         sock.send('%s %s %s\r\n' % (self.command, path, self.request_version))
  36.         self.headers['Connection'] = 'close'
  37.         del self.headers['Proxy-Connection']
  38.         for header in self.headers:
  39.             sock.send('%s: %s\r\n' % (header, self.headers[header]))
  40.         sock.send('\r\n')
  41.         self._do_proxy(sock)
  42.         self.connection.close()
  43.         sock.close()

  44.     def _connect_remote(self, sock, path):
  45.         if ':' in path:
  46.             host, port = path.split(':')
  47.             port = int(port)
  48.             remote = host, port
  49.         else:
  50.             remote = (path, 80)
  51.         sock.connect(remote)

  52.     def _do_proxy(self, sock, timeout=timeout):
  53.         client, server = self.connection, sock
  54.         cs = [client, server]
  55.         while 1:
  56.             rlist, wlist, xlist = select(cs, [], cs, timeout)
  57.             if xlist:
  58.                 who = 'client' if client in xlist else 'server'
  59.                 raise RuntimeError('exceptional condition from %s' % who)
  60.             if rlist:
  61.                 for reader in rlist:
  62.                     writer = client if reader is server else server
  63.                     data = reader.recv(bufsize)
  64.                     if not data:
  65.                         return
  66.                     writer.send(data)
  67.             else:
  68.                 raise RuntimeError('timeout')


  69. class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
  70.     pass


  71. def serve(addr, server=ThreadingHTTPServer):
  72.     print 'Serving HTTP Proxy on %s' % str(addr)
  73.     server(addr, ProxyHandler).serve_forever()


  74. def parse_argv(argv=argv):
  75.     if len(argv) == 1:
  76.         addr = ('0.0.0.0', 8000)
  77.     else:
  78.         arg = argv[1]
  79.         if arg in ('-h', '--help', '-help', 'help'):
  80.             show_help(argv[0])
  81.             exit(1)
  82.         if ':' in arg:
  83.             host, port = arg.split(':')
  84.             addr = (host, int(port))
  85.         else:
  86.             addr = ('0.0.0.0', int(arg))
  87.     return addr


  88. def show_help(progname=None):
  89.     progname = progname or argv[0]
  90.     print 'Usage: %s [[host:]port]'

  91. if __name__ == '__main__':
  92.     addr = parse_argv()
  93.     serve(addr)
复制代码
其实我是为了把ssh dynamic forwarding的socks5代理转换成更为通用的http代理,这样可以直接指定http_proxy环境变量来控制程序使用代理的行为

于是又写了个socks2http.py


  1. #!/usr/bin/env python

  2. from sys import argv

  3. import lightproxy
  4. import socks


  5. def init_proxy(socks_host, socks_port):
  6.     socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS4, socks_host, socks_port)
  7.     lightproxy.socket = socks.socksocket


  8. def parse_argv(argv=argv):
  9.     if len(argv) == 1 or argv[1] in ('-h', '--help', '-help', 'help'):
  10.         show_help(argv[0])
  11.         exit(1)
  12.     arg = argv[1]
  13.     socks_host, socks_port = arg.split(':')
  14.     socks_port = int(socks_port)
  15.     addr = lightproxy.parse_argv([argv[0]] + argv[2:])
  16.     return (socks_host, socks_port), addr


  17. def show_help(progname=None):
  18.     progname = progname or argv[0]
  19.     print 'Usage: %s socks_host:socks_port [[host:]port]' % progname


  20. def main():
  21.     socks_proxy, addr = parse_argv()
  22.     init_proxy(*socks_proxy)
  23.     lightproxy.serve(addr)

  24. if __name__ == '__main__':
  25.     main()

复制代码
后来我把lightproxy.py增加了gevent支持,代码约增加了40行
作者: txdgtwpv    时间: 2014-08-01 17:30
本帖最后由 txdgtwpv 于 2014-08-01 17:31 编辑

再补一个好玩的,两年前有一天午休时睡不着写的

写的原因在这里https://github.com/lilinux/weida ... /tree/master/fuxing

主要是为了练习命令行progress bar的玩法

偷懒用了datetime的total_seconds,需要使用python2.7
run.py

  1. #!/usr/bin/env python
  2. #-*- coding: utf-8 -*-

  3. import datetime
  4. import time
  5. import sys

  6. try:
  7.     from praise import head, tail, interupt, error
  8. except Exception:
  9.     head = tail = interupt = error = ''

  10. begin = datetime.datetime(1949, 10, 1)
  11. end = datetime.datetime(2049, 10, 1)
  12. total_seconds = (end - begin).total_seconds()

  13. def rate(now=None):
  14.     if not now:
  15.         now = datetime.datetime.today()
  16.     return (now - begin).total_seconds() / total_seconds

  17. def progress():
  18.     while True:
  19.         current = rate()
  20.         if current >= 1:
  21.             yield 1
  22.             break
  23.         yield current

  24. def bar(length=40, dotted=4, step=2, blank=' ', dot='#'):
  25.     if not hasattr(bar, 'pos'):
  26.         bar.pos = 0
  27.     else:
  28.         bar.pos = (bar.pos + step) % length
  29.     return '[%s%s%s]' % (blank*bar.pos, dot*dotted, blank*(length-bar.pos-step))

  30. def show(interval=0.3, with_bar=bar):
  31.     print head
  32.     for r in progress():
  33.         sys.stdout.write('%s %.10f%%\r'%(bar(), r*100))
  34.         sys.stdout.flush()
  35.         time.sleep(interval)
  36.     sys.stdout.write('%s %.10f%%\r'%(bar(blank='#'), r*100))
  37.     print '\n', tail

  38. if __name__ == '__main__':
  39.     try:
  40.         show()
  41.     except KeyboardInterrupt, e:
  42.         print '\n', interupt
  43.     except Exception, e:
  44.         print '\n', error

复制代码
praise.py

  1. from base64 import decodestring
  2. import locale

  3. head = '5Lit5Y2O5rCR5peP5q2j5Zyo6YKj5ZWlLi4u'
  4. tail = (
  5. '5Lit5Y2O5rCR5peP5bey6aG65Yip5a6M5oiQ5pmu6YCa5aSN5YW05ZKM5paH6Im65aSN5YW05Lul'
  6. '5aSW55qE56ys5LiJ56eN5aSN5YW077yM5oSf6LCi6L+Z5Lus5aSa5bm05p2l5L2g5Lus55qE5b+N'
  7. '6ICQ')
  8. interupt = '5oKo5oiW6K645LiN57uP5oSP55qE5Li+5Yqo5ouv5pWR5LqG5Lit5Y2O5rCR5peP'
  9. error = '5aaC6Iul5YaN54qv77yM5Yu/6LCT6KiA5LmL5LiN6aKE5Lmf77yB'

  10. head = decodestring(head)
  11. tail = decodestring(tail)
  12. interupt = decodestring(interupt)
  13. error = decodestring(error)

  14. transcode = lambda s: s.decode('utf-8').encode('gbk')

  15. if locale.getpreferredencoding() != 'UTF-8':
  16.     head = transcode(head)
  17.     tail = transcode(tail)
  18.     interupt = transcode(interupt)
  19.     error = transcode(error)
复制代码
后来我把这个脚本嵌进了tmux的状态栏,让命令行的右下角无时无刻地提醒我的伟大使命
作者: ssfjhh    时间: 2014-08-15 11:34
已收到书,待我下班以后细看。:wink:


除了年底的年终奖,已经多少年没有收到过奖品了,激动。
作者: Gubuntu    时间: 2014-08-15 15:18
  1. # -*- coding: utf-8 -*-
  2. import xml.dom.minidom
  3. ELEMENT_NODE = xml.dom.Node.ELEMENT_NODE

  4. class SimpleXmlGetter(object):
  5.     def __init__(self, data):
  6.         if type(data) == str:
  7.             self.root = xml.dom.minidom.parse(data)
  8.         else:
  9.             self.root = data

  10.     def __getattr__(self, name):        #support . operation
  11.         if name == 'data':
  12.             return self.root.firstChild.data
  13.         for c in self.root.childNodes:
  14.             if c.nodeType == ELEMENT_NODE and c.tagName == name:
  15.                 return SimpleXmlGetter(c)

  16.     def __getitem__(self, index):       #support [] operation
  17.         eNodes = [ e for e in self.root.parentNode.childNodes
  18.                     if e.nodeType == ELEMENT_NODE and e.tagName == self.root.tagName]
  19.         return SimpleXmlGetter(eNodes[index])

  20.     def __call__(self, *args, **kwargs):  #support () openration, for query conditions
  21.         for e in self.root.parentNode.childNodes:
  22.             if e.nodeType == ELEMENT_NODE:
  23.                 for key in kwargs.keys():
  24.                      if e.getAttribute(key) != kwargs[key]:
  25.                         break
  26.                 else:
  27.                     return SimpleXmlGetter(e)

  28. if __name__ == "__main__":
  29.     x = SimpleXmlGetter("test.xml")
  30.     print x.sysd.sysagent.param[2].data
  31.     print x.sysd.sysagent.param(name="querytimeout", type="second").data
复制代码
获取xml中的某一节点的值,支持通过.取子节点,[]索引,()条件查询




欢迎光临 Chinaunix (http://bbs.chinaunix.net/) Powered by Discuz! X3.2