在日常的工作中,python用到的概率还是比较的大的,那么,我们应该怎么去学习关于python的知识呢?平常用到的脚本都是有哪些呢?下面小编给大家详细介绍下。
前言
日常生活中常会遇到一些小任务,如果人工处理会很麻烦。
用python做些小脚本处理,能够提高不少效率。或者可以把python当工具使用,辅助提高一下办公效率。(比如我常拿python当计算器,计算和字符转换用)
以下总结下个人用到的一些python小脚本留作备忘。
打印16进制字符串
用途:通信报文中的hex数据不好看,可以打印为16进制的字符串显示出来。
#coding=utf-8 #name:myutil.py def print_hex1(s,prev='0x'): for c in s: print'%s%02x'%(prev,ord(c)), print def print_hex(s): for c in s: print'%02x'%(ord(c)), print print'myutil' def print_hex3(s,prev='0x'): i=0 for c in s: print'%s%s,'%(prev,s[i:i+2]), i+=2 print
文件合并
之前搞单片机时生成的hex应用程序文件不能直接刷到单片机里,还需要把iap程序合并成一个文件才能烧写到单片机。每次打包麻烦,做个脚本处理:
#path='C:\\Users\\test\\IAP_CZ_v204w.hex' #file=open(path,'r') #for ll in file.readlines() #print ll #coding=gb18030 import time import os def prr(): print'file combination begin..' path0=os.getcwd() print path0 path=path0 #path1=path0 path2=path0 path+='\\IAP_CZ_v204w.hex' #path1+='\\NC_armStaSystem.hex' path2+='\\' print path s=raw_input('enter file path:') path1=s #path1+='\\NC_armStaSystem.hex' print path1 s=raw_input('enter file name:') path2+=s path2+=time.strftime('_%y%m%d%H%M%S') path2+='.hex' print path2 prr() try: f1=open(path,'r') count=0 for l in f1.readlines(): #print l count+=1 #print count f1.close() f1=open(path,'r') f2=open(path1,'r') f3=open(path2,'w') while(count>1): l=f1.readline() #print l f3.write(l) count-=1 #print count f3.flush() for l in f2.readlines(): f3.write(l) f3.flush() f3.close() print'combination success!' except Exception,ex: print'excettion occured!' print ex s=raw_input('press any key to continue...') finally: f1.close() f2.close() s=raw_input('press any key to continue...')
多线程下载图集
网上好看的动漫图集,如果手工下载太费时了。简单分析下网页地址规律,写个多线程脚本搞定。
#!/usr/bin/python #-*-coding:utf-8-*- #filename:paxel.py '''It is a multi-thread downloading tool It was developed follow axel. Author:volans E-mail:volansw[at]gmail.com ''' import sys import os import time import urllib from threading import Thread local_proxies={'http':'http://131.139.58.200:8080'} class AxelPython(Thread,urllib.FancyURLopener): '''Multi-thread downloading class. run()is a vitural method of Thread. ''' def __init__(self,threadname,url,filename,ranges=0,proxies={}): Thread.__init__(self,name=threadname) urllib.FancyURLopener.__init__(self,proxies) self.name=threadname self.url=url self.filename=filename self.ranges=ranges self.downloaded=0 def run(self): '''vertual function in Thread''' try: self.downloaded=os.path.getsize(self.filename) except OSError: #print'never downloaded' self.downloaded=0 #rebuild start poind self.startpoint=self.ranges[0]+self.downloaded #This part is completed if self.startpoint>=self.ranges[1]: print'Part%s has been downloaded over.'%self.filename return self.oneTimeSize=16384#16kByte/time print'task%s will download from%d to%d'%(self.name,self.startpoint,self.ranges[1]) self.addheader("Range","bytes=%d-%d"%(self.startpoint,self.ranges[1])) self.urlhandle=self.open(self.url) data=self.urlhandle.read(self.oneTimeSize) while data: filehandle=open(self.filename,'ab+') filehandle.write(data) filehandle.close() self.downloaded+=len(data) #print"%s"%(self.name) #progress=u'\r...' data=self.urlhandle.read(self.oneTimeSize) def GetUrlFileSize(url,proxies={}): urlHandler=urllib.urlopen(url,proxies=proxies) headers=urlHandler.info().headers length=0 for header in headers: if header.find('Length')!=-1: length=header.split(':')[-1].strip() length=int(length) return length def SpliteBlocks(totalsize,blocknumber): blocksize=totalsize/blocknumber ranges=[] for i in range(0,blocknumber-1): ranges.append((i*blocksize,i*blocksize+blocksize-1)) ranges.append((blocksize*(blocknumber-1),totalsize-1)) return ranges def islive(tasks): for task in tasks: if task.isAlive(): return True return False def paxel(url,output,blocks=6,proxies=local_proxies): '''paxel ''' size=GetUrlFileSize(url,proxies) ranges=SpliteBlocks(size,blocks) threadname=["thread_%d"%i for i in range(0,blocks)] filename=["tmpfile_%d"%i for i in range(0,blocks)] tasks=[] for i in range(0,blocks): task=AxelPython(threadname<i>,url,filename<i>,ranges<i>) task.setDaemon(True) task.start() tasks.append(task) time.sleep(2) while islive(tasks): downloaded=sum([task.downloaded for task in tasks]) process=downloaded/float(size)*100 show=u'\rFilesize:%d Downloaded:%d Completed:%.2f%%'%(size,downloaded,process) sys.stdout.write(show) sys.stdout.flush() time.sleep(0.5) filehandle=open(output,'wb+') for i in filename: f=open(i,'rb') filehandle.write(f.read()) f.close() try: os.remove(i) pass except: pass filehandle.close() if __name__=='__main__': url="http://xz1.mm667.com/xz84/images/001.jpg" output='001.jpg' paxel(url,output,blocks=4,proxies={})
多线程下载图片
多线程下载图片并存储到指定目录中,若目录不存在则自动创建。
#-*-coding:UTF-8-*- ''' import re import urllib urls='http://xz5.mm667.com/xz82/images/01.jpg' def getHtml(url): page=urllib.urlopen(url) html=page.read() return html def getImg(html): reg=r'src="(.+?\.jpg)"pic_ext' imgre=re.compile(reg) imglist=imgre.findall(html) x=0 for imgurl in imglist: urllib.urlretrieve(imgurl,'%s.jpg'%x) x=x+1 html=getHtml("http://tieba.baidu.com/p/2460150866") getImg(html) ''' import re import urllib import threading import time import socket socket.setdefaulttimeout(30) urls=[] j=0 for i in xrange(1,81): if(i-1)%4==0: j+=1 if((j-1)%5)==0: j=1 site='http://xz%d.mm667.com/xz%02d/images/'%(j,i) urls.append(site) print urls[i-1] #print urls ''' urls.append('http://xz1.mm667.com/xz01/images/') urls.append('http://xz1.mm667.com/xz02/images/') urls.append('http://xz1.mm667.com/xz03/images/') urls.append('http://xz1.mm667.com/xz04/images/') urls.append('http://xz1.mm667.com/xz84/images/') urls.append('http://xz2.mm667.com/xz85/images/') urls.append('http://xz3.mm667.com/xz86/images/') urls.append('http://xz1.mm667.com/s/') urls.append('http://xz1.mm667.com/p/') ''' def mkdir(path): #引入模块 import os #去除首位空格 path=path.strip() #去除尾部\符号 path=path.rstrip("\\") #判断路径是否存在 #存在True #不存在False isExists=os.path.exists(path) #判断结果 if not isExists: #如果不存在则创建目录 print path+u'创建成功' #创建目录操作函数 os.makedirs(path) return True else: #如果目录存在则不创建,并提示目录已存在 print path+u'目录已存在' return False def cbk(a,b,c): '''''回调函数 a:已经下载的数据块 b:数据块的大小 c:远程文件的大小 ''' per=100.0*a*b/c if per>100: per=100 print'%.2f%%'%per #url='http://www.sina.com.cn' local='d:\\mysite\\pic1\\' d=0 mutex=threading.Lock() #mutex1=threading.Lock() class MyThread(threading.Thread): def __init__(self,url,name): threading.Thread.__init__(self) self.url=url self.name=name def run(self): mutex.acquire() print print'down from%s'%self.url time.sleep(1) mutex.release() try: urllib.urlretrieve(self.url,self.name) except Exception,e: print e time.sleep(1) urllib.urlretrieve(self.url,self.name) threads=[] for u in urls[84:]: d+=1 local='d:\\mysite\\pic1\\%d\\'%d mkdir(local) print'download begin...' for i in xrange(40): lcal=local url=u url+='%03d.jpg'%i lcal+='%03d.jpg'%i th=MyThread(url,lcal) threads.append(th) th.start() #for t in threads: #t.join() print'over!download finished' 爬虫抓取信息 #!/usr/bin/env python #-*-coding:utf-8-*- """ Python爬虫,抓取一卡通相关企业信息 Anthor:yangyongzhen Version:0.0.2 Date:2014-12-14 Language:Python2.7.5 Editor:Sublime Text2 """ import urllib2,re,string import threading,Queue,time import sys import os from bs4 import BeautifulSoup #from pprint import pprint reload(sys) sys.setdefaultencoding('utf8') _DATA=[] FILE_LOCK=threading.Lock() SHARE_Q=Queue.Queue()#构造一个不限制大小的的队列 _WORKER_THREAD_NUM=3#设置线程的个数 _Num=0#总条数 class MyThread(threading.Thread): def __init__(self,func,num): super(MyThread,self).__init__()#调用父类的构造函数 self.func=func#传入线程函数逻辑 self.thread_num=num def run(self): self.func() #print u'线程ID:',self.thread_num def worker(): global SHARE_Q while not SHARE_Q.empty(): url=SHARE_Q.get()#获得任务 my_page=get_page(url) find_data(my_page)#获得当前页面的数据 #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url): """ 根据所给的url爬取网页HTML Args: url:表示当前要爬取页面的url Returns: 返回抓取到整个页面的HTML(unicode编码) Raises: URLError:url引发的异常 """ try: html=urllib2.urlopen(url).read() my_page=html.decode("gbk",'ignore') #my_page=unicode(html,'utf-8','ignore').encode('utf-8','ignore') #my_page=urllib2.urlopen(url).read().decode("utf8") except urllib2.URLError,e: if hasattr(e,"code"): print"The server couldn't fulfill the request." print"Error code:%s"%e.code elif hasattr(e,"reason"): print"We failed to reach a server.Please check your url and read the Reason" print"Reason:%s"%e.reason return my_page def find_data(my_page): """ 通过返回的整个网页HTML,正则匹配名称 Args: my_page:传入页面的HTML文本用于正则匹配 """ global _Num temp_data=[] items=BeautifulSoup(my_page).find_all("div",style="width:96%;margin:10px;border-bottom:1px#CCC dashed;padding-bottom:10px;") for index,item in enumerate(items): #print item #print item.h1 #print h.group() #temp_data.append(item) #print item.find(re.compile("^a")) href=item.find(re.compile("^a")) #soup=BeautifulSoup(item) #公司名称 if item.a: data=item.a.string.encode("gbk","ignore") print data temp_data.append(data) goods=item.find_all("div",style="font-size:12px;") #经营产品与联系方式 for i in goods: data=i.get_text().encode("gbk","ignore") temp_data.append(data) print data #b=item.find_all("b") #print b #链接地址 pat=re.compile(r'href="([^"]*)"') h=pat.search(str(item)) if h: #print h.group(0) href=h.group(1) print href temp_data.append(h.group(1)) _Num+=1 #b=item.find_all(text=re.compile("Dormouse")) #pprint(goods) #print href #pat=re.compile(r'title="([^"]*)"') #h=pat.search(str(href)) #if h: #print h.group(1) #temp_data.append(h.group(1)) _DATA.append(temp_data) #headers={'User-Agent':"Mozilla/5.0(Windows NT 6.1;WOW64)AppleWebKit/537.1(KHTML,like Gecko)Chrome/22.0.1207.1 Safari/537.1"}##浏览器请求头(大部分网站没有这个请求头会报错、请务必加上哦) #all_url='http://www.mzitu.com/all'##开始的URL地址 #start_html=requests.get(all_url,headers=headers)##使用requests中的get方法来获取all_url(就是:http://www.mzitu.com/all这个地址)的内容headers为上面设置的请求头、请务必参考requests官方文档解释 #print(start_html.text)##打印出start_html(请注意,concent是二进制的数据,一般用于下载图片、视频、音频、等多媒体内容是才使用concent,对于打印网页内容请使用text) def main(): global SHARE_Q threads=[] start=time.clock() douban_url="http://company.yktworld.com/comapny_search.asp?page={page}" #向队列中放入任务,真正使用时,应该设置为可持续的放入任务 for index in xrange(20): SHARE_Q.put(douban_url.format(page=index*1)) for i in xrange(_WORKER_THREAD_NUM): thread=MyThread(worker,i) thread.start()#线程开始处理任务 threads.append(thread) for thread in threads: thread.join() SHARE_Q.join() i=0 with open("down.txt","w+")as my_file: for page in _DATA: i+=1 for name in page: my_file.write(name+"\n") print"Spider Successful!!!" end=time.clock() print u'抓取完成!' print u'总页数:',i print u'总条数:',_Num print u'一共用时:',end-start,u'秒' if __name__=='__main__': main() 爬虫多线程下载电影名称 #!/usr/bin/env python #-*-coding:utf-8-*- """ Python爬虫 Anthor:yangyongzhen Version:0.0.2 Date:2014-12-14 Language:Python2.7.8 Editor:Sublime Text2 """ import urllib2,re,string import threading,Queue,time import sys import os from bs4 import BeautifulSoup reload(sys) sys.setdefaultencoding('utf8') _DATA=[] FILE_LOCK=threading.Lock() SHARE_Q=Queue.Queue()#构造一个不限制大小的的队列 _WORKER_THREAD_NUM=3#设置线程的个数 rootpath=os.getcwd()+u'/抓取的内容/' def makedir(path): if not os.path.isdir(path): os.makedirs(path) #创建抓取的根目录 #makedir(rootpath) #显示下载进度 def Schedule(a,b,c): ''''' a:已经下载的数据块 b:数据块的大小 c:远程文件的大小 ''' per=100.0*a*b/c if per>100: per=100 print'%.2f%%'%per class MyThread(threading.Thread): def __init__(self,func): super(MyThread,self).__init__()#调用父类的构造函数 self.func=func#传入线程函数逻辑 def run(self): self.func() def worker(): print'work thread start...\n' global SHARE_Q while not SHARE_Q.empty(): url=SHARE_Q.get()#获得任务 my_page=get_page(url) find_title(my_page)#获得当前页面的电影名 #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url): """ 根据所给的url爬取网页HTML Args: url:表示当前要爬取页面的url Returns: 返回抓取到整个页面的HTML(unicode编码) Raises: URLError:url引发的异常 """ try: html=urllib2.urlopen(url).read() my_page=html.decode("utf8") #my_page=unicode(html,'utf-8','ignore').encode('utf-8','ignore') #my_page=urllib2.urlopen(url).read().decode("utf8") except urllib2.URLError,e: if hasattr(e,"code"): print"The server couldn't fulfill the request." print"Error code:%s"%e.code elif hasattr(e,"reason"): print"We failed to reach a server.Please check your url and read the Reason" print"Reason:%s"%e.reason return my_page def find_title(my_page): """ 通过返回的整个网页HTML,正则匹配前100的电影名称 Args: my_page:传入页面的HTML文本用于正则匹配 """ temp_data=[] movie_items=BeautifulSoup(my_page).findAll('h1') for index,item in enumerate(movie_items): #print item #print item.h1 pat=re.compile(r'href="([^"]*)"') h=pat.search(str(item)) if h: #print h.group(0) href=h.group(1) print href temp_data.append(h.group(1)) #print h.group() #temp_data.append(item) #print item.find(re.compile("^a")) href=item.find(re.compile("^a")) #soup=BeautifulSoup(item) if item.a: #print item.a.string temp_data.append(item.a.string) #print href #pat=re.compile(r'title="([^"]*)"') #h=pat.search(str(href)) #if h: #print h.group(1) #temp_data.append(h.group(1)) _DATA.append(temp_data) def main(): global SHARE_Q threads=[] start=time.clock() douban_url="http://movie.misszm.com/page/{page}" #向队列中放入任务,真正使用时,应该设置为可持续的放入任务 for index in xrange(5): SHARE_Q.put(douban_url.format(page=index*1)) for i in xrange(_WORKER_THREAD_NUM): thread=MyThread(worker) thread.start()#线程开始处理任务 threads.append(thread) for thread in threads: thread.join() SHARE_Q.join() with open("movie.txt","w+")as my_file: for page in _DATA: for movie_name in page: my_file.write(movie_name+"\n") print"Spider Successful!!!" end=time.clock() print u'抓取完成!' print u'一共用时:',end-start,u'秒' if __name__=='__main__': main() 串口转tcp工具 #coding=utf-8 #author:yangyongzhen #QQ:534117529 #'CardTest TcpServer-Simple Test Card Tool 1.00' import sys,threading,time; import serial; import binascii,encodings; import re; import os; from socket import* from struct import*; #from myutil import*; #name:myutil.py mylock=threading.RLock() Server_IP='' Srever_Port='' def print_hex1(s,prev='0x'): for c in s: print'%s%02x'%(prev,ord(c)), print def print_hex(s): for c in s: print'%02x'%(ord(c)), print def hexto_str(s): r='' for c in s: r+='%02x'%(ord(c)) return r def strto_hex(s): r=s.decode('hex') return r #''代表服务器为localhost #在一个非保留端口号上进行监听 class ComThread: def __init__(self,Port=0): self.l_serial=None; self.alive=False; self.waitEnd=None; self.port=Port; #TCP部分 #self.sockobj=socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.connection=None #数据 self.snddata='' self.rcvdata='' def waiting(self): if not self.waitEnd is None: self.waitEnd.wait(); def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive=False; self.stop(); def start(self): self.l_serial=serial.Serial(); self.l_serial.port=self.port; self.l_serial.baudrate=115200; self.l_serial.timeout=2;#秒 self.l_serial.open(); if self.l_serial.isOpen(): self.waitEnd=threading.Event(); self.alive=True; print'open serial port%d ok!\n'%(self.port+1) print'baudrate:115200\n' self.thread_read=None; self.thread_read=threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); self.thread_write=None; self.thread_write=threading.Thread(target=self.FirstWriter); self.thread_write.setDaemon(1); self.thread_write.start(); #TCP部分 self.thread_TcpClient=None; self.thread_TcpClient=threading.Thread(target=self.TcpClient); self.thread_TcpClient.setDaemon(1); self.thread_TcpClient.start(); self.thread_TcpSend=None; self.thread_TcpSend=threading.Thread(target=self.TcpSend); self.thread_TcpSend.setDaemon(1); self.thread_TcpSend.start(); return True; else: return False; def FirstReader(self): while self.alive: #接收间隔 time.sleep(0.1); try: data=''; n=self.l_serial.inWaiting(); if n: data=data+self.l_serial.read(n); #for l in xrange(len(data)): #print'%02X'%ord(data[l]), #发送数据 print u'->请求:' print data; mylock.acquire() self.snddata=data mylock.release() #print_hex(data); #判断结束 except Exception,ex: print str(ex); self.waitEnd.set(); self.alive=False; def FirstWriter(self): while self.alive: #接收间隔 time.sleep(0.1); try: #snddata=raw_input('\nenter data send:\n') if self.rcvdata!='': self.l_serial.write(self.rcvdata); print u'-<应答:' print self.rcvdata; mylock.acquire() self.rcvdata=''; mylock.release() #print_hex(snddata); except Exception,ex: print str(ex); self.waitEnd.set(); self.alive=False; def TcpClient(self): while True: #接收间隔 time.sleep(0.1); self.connection=socket(AF_INET,SOCK_STREAM); self.connection.connect((Server_IP,int(Server_Port))); print'Connect to Server OK!'; self.snddata='' self.rcvdata='' while True: #读取客户端套接字的下一行 data=self.connection.recv(1024) #如果没有数量的话,那么跳出循环 if not data:break #发送一个回复至客户端 mylock.acquire() self.snddata='' self.rcvdata=data mylock.release() #connection.send('Echo=>'+data) self.connection.close() self.waitEnd.set(); self.alive=False; def TcpSend(self): while True: #接收间隔 time.sleep(0.1); while True: time.sleep(0.1); try: if not self.connection is None: if self.snddata!='': self.connection.send(self.snddata) mylock.acquire() self.rcvdata='' self.snddata='' mylock.release() except Exception,ex: pass def stop(self): self.alive=False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close(); #测试用部分 if __name__=='__main__': print'Serial to Tcp Tool 1.00\n' print'Author:yangyongzhen\n' print'QQ:534117529\n' print'Copyright(c)**cap 2015-2016.\n' Server_IP=raw_input('please enter ServerIP:') print'Server_IP:%s'%(Server_IP) Server_Port=raw_input('please enter ServerPort:') print'Server_Port:%s'%(Server_Port) com=raw_input('please enter com port(1-9):') rt=ComThread(int(com)-1); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se); if rt.alive: rt.stop(); os.system("pause") print''; print'End OK.'; del rt; 远程读卡器server端 很早之前做过一个远程读卡器工具,原理就是在现场客服电脑上装个python做的tcpserver服务端,操控现场的读卡器。在公司内部做个客户端连接过去,这样实现在公司调试现场的卡片业务。 这个就是服务端工具的实现: #coding=utf-8 #author:yangyongzhen #QQ:534117529 #'CardTest TcpServer-Simple Test Card Tool 1.00' import sys,threading,time; import serial; import binascii,encodings; import re; import os; from socket import* from struct import*; #from myutil import*; #name:myutil.py mylock=threading.RLock() def print_hex1(s,prev='0x'): for c in s: print'%s%02x'%(prev,ord(c)), print def print_hex(s): for c in s: print'%02x'%(ord(c)), print def hexto_str(s): r='' for c in s: r+='%02x'%(ord(c)) return r def strto_hex(s): r=s.decode('hex') return r #''代表服务器为localhost #在一个非保留端口号上进行监听 class ComThread: def __init__(self,Port=0): self.l_serial=None; self.alive=False; self.waitEnd=None; self.port=Port; #TCP部分 self.myHost='' self.myPort=5050 self.sockobj=socket(AF_INET,SOCK_STREAM) self.connection=None #数据 self.snddata='' self.rcvdata='' def waiting(self): if not self.waitEnd is None: self.waitEnd.wait(); def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive=False; self.stop(); def start(self): self.l_serial=serial.Serial(); self.l_serial.port=self.port; self.l_serial.baudrate=115200; self.l_serial.timeout=2;#秒 self.l_serial.open(); if self.l_serial.isOpen(): self.waitEnd=threading.Event(); self.alive=True; print'open serial port%d ok!\n'%(self.port+1) print'baudrate:115200\n' self.thread_read=None; self.thread_read=threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); self.thread_write=None; self.thread_write=threading.Thread(target=self.FirstWriter); self.thread_write.setDaemon(1); self.thread_write.start(); #TCP部分 self.thread_TcpServer=None; self.thread_TcpServer=threading.Thread(target=self.TcpServer); self.thread_TcpServer.setDaemon(1); self.thread_TcpServer.start(); self.thread_TcpSend=None; self.thread_TcpSend=threading.Thread(target=self.TcpSend); self.thread_TcpSend.setDaemon(1); self.thread_TcpSend.start(); return True; else: return False; def FirstReader(self): while self.alive: #接收间隔 time.sleep(0.1); try: data=''; n=self.l_serial.inWaiting(); if n: data=data+self.l_serial.read(n); #for l in xrange(len(data)): #print'%02X'%ord(data[l]), #发送数据 print'serial recv:' print data; mylock.acquire() self.snddata=data mylock.release() #print_hex(data); #判断结束 except Exception,ex: print str(ex); self.waitEnd.set(); self.alive=False; def FirstWriter(self): while self.alive: #接收间隔 time.sleep(0.1); try: #snddata=raw_input('\nenter data send:\n') if self.rcvdata!='': self.l_serial.write(self.rcvdata); print'serial send:' print self.rcvdata; mylock.acquire() self.rcvdata=''; mylock.release() #print_hex(snddata); except Exception,ex: print str(ex); self.waitEnd.set(); self.alive=False; def TcpServer(self): self.sockobj.bind((self.myHost,self.myPort)) self.sockobj.listen(10) print'TcpServer listen at 5050 oK!\n' print'Waiting for connect...\n' while True: #接收间隔 time.sleep(0.1); self.connection,address=self.sockobj.accept() print'Server connected by',address self.snddata='' self.rcvdata='' try: while True: #读取客户端套接字的下一行 data=self.connection.recv(1024) #如果没有数量的话,那么跳出循环 if not data:break #发送一个回复至客户端 mylock.acquire() self.snddata='' self.rcvdata=data mylock.release() #connection.send('Echo=>'+data) self.connection.close() except Exception,ex: self.connection.close() self.waitEnd.set(); self.alive=False; def TcpSend(self): while True: #接收间隔 time.sleep(0.1); while True: time.sleep(0.1); try: if not self.connection is None: if self.snddata!='': self.connection.send(self.snddata) mylock.acquire() self.rcvdata='' self.snddata='' mylock.release() except Exception,ex: pass def stop(self): self.alive=False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close(); #测试用部分 if __name__=='__main__': print'CardTest TcpServer-Simple Test Card Tool 1.00\n' print'Author:yangyongzhen\n' print'QQ:534117529\n' print'Copyright(c)****2015-2016.\n' com=raw_input('please enter com port(1-9):') rt=ComThread(int(com)-1); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se); if rt.alive: rt.stop(); os.system("pause") print''; print'End OK.'; del rt; 黑客rtcp反向链接 #-*-coding:utf-8-*- ''' filename:rtcp.py desc: 利用python的socket端口转发,用于远程维护 如果连接不到远程,会sleep 36s,最多尝试200(即两小时) usage: ./rtcp.py stream1 stream2 stream为:l:port或c:host:port l:port表示监听指定的本地端口 c:host:port表示监听远程指定的端口 author:watercloud,zd,knownsec team web:www.knownsec.com,blog.knownsec.com date:2009-7 ''' import socket import sys import threading import time streams=[None,None]#存放需要进行数据转发的两个数据流(都是SocketObj对象) debug=1#调试状态0 or 1 def print_hex(s): for c in s: print'%02x'%(ord(c)), print def _usage(): print'Usage:./rtcp.py stream1 stream2\nstream:L:port or C:host:port' def _get_another_stream(num): ''' 从streams获取另外一个流对象,如果当前为空,则等待 ''' if num==0: num=1 elif num==1: num=0 else: raise"ERROR" while True: if streams[num]=='quit': print("can't connect to the target,quit now!") sys.exit(1) if streams[num]!=None: return streams[num] else: time.sleep(1) def _xstream(num,s1,s2): ''' 交换两个流的数据 num为当前流编号,主要用于调试目的,区分两个回路状态用。 ''' try: while True: #注意,recv函数会阻塞,直到对端完全关闭(close后还需要一定时间才能关闭,最快关闭方法是shutdow) buff=s1.recv(1024) if debug>0: print num,"recv" if len(buff)==0:#对端关闭连接,读不到数据 print num,"one closed" break s2.sendall(buff) if debug>0: print num,"sendall" print_hex(buff) except: print num,"one connect closed." try: s1.shutdown(socket.SHUT_RDWR) s1.close() except: pass try: s2.shutdown(socket.SHUT_RDWR) s2.close() except: pass streams[0]=None streams[1]=None print num,"CLOSED" def _server(port,num): ''' 处理服务情况,num为流编号(第0号还是第1号) ''' srv=socket.socket(socket.AF_INET,socket.SOCK_STREAM) srv.bind(('0.0.0.0',port)) srv.listen(1) #print'local listening at port%d'(%(port)) while True: conn,addr=srv.accept() print"connected from:",addr streams[num]=conn#放入本端流对象 s2=_get_another_stream(num)#获取另一端流对象 _xstream(num,conn,s2) def _connect(host,port,num): '''处理连接,num为流编号(第0号还是第1号) note:如果连接不到远程,会sleep 36s,最多尝试200(即两小时) ''' not_connet_time=0 wait_time=36 try_cnt=199 while True: if not_connet_time>try_cnt: streams[num]='quit' print('not connected') return None conn=socket.socket(socket.AF_INET,socket.SOCK_STREAM) try: conn.connect((host,port)) except Exception,e: print('can not connect%s:%s!'%(host,port)) not_connet_time+=1 time.sleep(wait_time) continue print"connected to%s:%i"%(host,port) streams[num]=conn#放入本端流对象 s2=_get_another_stream(num)#获取另一端流对象 _xstream(num,conn,s2) if __name__=='__main__': print'Tcp to Tcp Tool 1.00\n' print'Author:yangyongzhen\n' print'QQ:534117529\n' print'Copyright(c)Newcapec 2015-2016.\n' Server_IP=raw_input('please enter Server IP:') print'Server_IP:%s'%(Server_IP) Server_Port=raw_input('please enter Server Port:') print'Server_Port:%s'%(Server_Port) com=raw_input('please enter Local Port:') tlist=[]#线程列表,最终存放两个线程对象 #targv=[sys.argv[1],sys.argv[2]] t=threading.Thread(target=_server,args=(int(com),0)) tlist.append(t) t=threading.Thread(target=_connect,args=(Server_IP,int(Server_Port),1)) tlist.append(t) for t in tlist: t.start() for t in tlist: t.join() sys.exit(0) 调用c的动态库示例 #-*-coding:utf8-*- from ctypes import* from binascii import unhexlify as unhex import os dll=cdll.LoadLibrary('mydll.dll'); print'begin load mydll..' #key #str1='\x9B\xED\x98\x89\x15\x80\xC3\xB2' str1=unhex('0000556677222238') #data str2=unhex('002d2000000100015566772222383CD881604D0D286A556677222238000020141214181427') #output str3='\x12\x34\x56\x78\x12\x34\x56\x78' pstr1=c_char_p() pstr2=c_char_p() pstr3=c_char_p() pstr1.value=str1 pstr2.value=str2 pstr3.value=str3 dll.CurCalc_DES_MAC64(805306481,pstr1,0,pstr2,13,pstr3) print pstr1 print pstr2 print pstr3 stro=pstr3.value print stro strtemp='' for c in stro: print"%02x"%(ord(c)) strtemp+="{0:02x}".format(ord(c)) print strtemp os.execlp("E:\\RSA.exe",'') s=raw_input('press any key to continue...') tcp的socket连接报文测试工具 #-*-coding:utf-8-*- import socket from myutil import* from binascii import unhexlify as unhex from ctypes import* dll=cdll.LoadLibrary('mydll.dll') print'begin load mydll..' HOST,PORT="192.168.51.28",5800 sd="1234567812345678" #Create a socket(SOCK_STREAM means a TCP socket) sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) try: #Connect to server and send data sock.connect((HOST,int(PORT)) print"Sent1 OK:" print sd #Receive data from the server and shut down received=sock.recv(1024) print"Received:" print_hex(received) print'received len is 0x%02x'%(len(received)) print'received data analysis...' re1=received[0:4] print_hex(re1) re1=received[4:6] print_hex(re1) re1=received[6:10] print_hex(re1) re1=received[10:16] print_hex(re1) #pack2 send sock.send(sd2.decode('hex')) print"Sent2 OK:" print sd2 #Receive data from the server and shut down received1=sock.recv(1024) print"Received1:" print_hex(received1) print'received1 len is 0x%02x'%(len(received1)) finally: sock.close() s=raw_input('press any key to continue...') 报文拼接与加解密测试 #-*-coding:gb2312-*- import socket from myutil import* from binascii import unhexlify as unhex from ctypes import* dll=cdll.LoadLibrary('mydll.dll') print'begin load mydll..' #key key='\xF1\xE2\xD3\xC4\xF1\xE2\xD3\xC4' #output MAC mac='\x00'*8 data='\x00'*8 pkey=c_char_p() pdata=c_char_p() pmac=c_char_p() pkey.value=key pdata.value=data pmac.value=mac #pack1 class pack: pass pk=pack() pk.len='00000032' pk.ID='0001' pk.slnum='00000004' pk.poscode='123456781234' pk.rand='1122334455667788' pk.psam='313233343536' pk.kind='0000' pk.ver='000001' pk.time='20140805135601' pk.mac='06cc571e6d96e12d' data=unhex(pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time) #print_hex(data) pdata.value=data #cacl MAC dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,42,pmac) stro=pmac.value strtemp='' for c in stro: strtemp+="{0:02x}".format(ord(c)) #print strtemp pk.mac=strtemp #data to send sd=pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time+pk.mac print'send1 len is 0x%02x'%(len(sd)/2) print sd #pack2 class pack2: pass pk2=pack2() pk2.len='0000006E' pk2.ID='0012' pk2.slnum='00000005' pk2.fatCode='00' pk2.cardASN='0000000000000000' pk2.cardType='00' pk2.userNO='0000000000000000' pk2.fileName1='00000000000000000000000000000015' pk2.dataLen1='00' pk2.dataArea1='00000000000000319999990800FB2014080620240806FFFFFFFFFFFFFFFFFFFF' pk2.fileName2='00000000000000000000000000000016' pk2.dataLen2='00' pk2.dataArea2='000003E800FFFF16' pk2.mac='06cc571e6d96e12d' data2=unhex(pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2) pdata.value=data2 #cacl MAC dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,102,pmac) stro=pmac.value strtemp='' for c in stro: strtemp+="{0:02x}".format(ord(c)) #print strtemp pk2.mac=strtemp #data to send sd2=pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2+pk2.mac print'send2 len is 0x%02x'%(len(sd2)/2) print sd2 #PORT="192.168.60.37" #PORT="localhost" HOST,PORT="192.168.51.28",5800 #Create a socket(SOCK_STREAM means a TCP socket) sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) try: #Connect to server and send data sock.connect((HOST,int(PORT)) #data="123456789" #s=struct.pack('bbb',1,2,3) sock.send(sd.decode('hex')) print"Sent1 OK:" print sd #Receive data from the server and shut down received=sock.recv(1024) print"Received:" print_hex(received) print'received len is 0x%02x'%(len(received)) print'received data analysis...' re1=received[0:4] print_hex(re1) re1=received[4:6] print_hex(re1) re1=received[6:10] print_hex(re1) re1=received[10:16] print_hex(re1) #pack2 send sock.send(sd2.decode('hex')) print"Sent2 OK:" print sd2 #Receive data from the server and shut down received1=sock.recv(1024) print"Received1:" print_hex(received1) print'received1 len is 0x%02x'%(len(received1)) finally: sock.close() s=raw_input('press any key to continue...') 二进制文件解析工具 #-*-coding:utf-8-*- from myutil import* from binascii import unhexlify as unhex import os path=os.getcwd() path+='\\rec04.bin' #print path print"begin ans......" f1=open(path,'rb') for i in range(1,35): s=f1.read(280) print"data:",i print_hex(s) print'read data is:' print_hex(s) recstatadd=187 print"终端编号:" print_hex(s[recstatadd:recstatadd+10]) print"卡号长度:" print_hex(s[10]) print"卡号:" print_hex(s[11:11+10]) print"持卡序号1+所属地城市代码2+交易地城市代码2" print_hex(s[recstatadd+22:recstatadd+22+5]) print"应用交易计数器" print_hex(s[92:92+2]) print"交易前余额4,交易金额3" print_hex(s[recstatadd+29:recstatadd+29+7]) print"交易日期:" print_hex(s[99:99+3]) print"交易时间:" print_hex(s[44:44+3]) print"终端编号" print_hex(s[21:21+8]) print"商户编号" print_hex(s[21+8:21+8+15]) print"批次号" print_hex(s[5:5+3]) print"应用密文" print_hex(s[47:47+8]) print"授权金额" print_hex(s[103:103+6]) print"其他金额" print_hex(s[115:115+6]) print"终端验证结果" print_hex(s[94:5+94]) print"应用交易计数器" print_hex(s[92:92+4]) print"卡片验证结果" print_hex(s[56:56+32]) print"卡片序列号:" print_hex(s[131]) f1.close() 抓取动漫图片 #-*-coding:utf8-*- #2013.12.36 19:41 #抓取dbmei.com的图片。 from bs4 import BeautifulSoup import os,sys,urllib2,time,random #创建文件夹 path=os.getcwd()#获取此脚本所在目录 new_path=os.path.join(path,u'暴走漫画') if not os.path.isdir(new_path): os.mkdir(new_path) def page_loop(page=1): url='http://baozoumanhua.com/all/hot/page/%s?sv=1389537379'%page content=urllib2.urlopen(url) soup=BeautifulSoup(content) my_girl=soup.find_all('div',class_='img-wrap') for girl in my_girl: jokes=girl.find('img') link=jokes.get('src') flink=link print flink content2=urllib2.urlopen(flink).read() #with open(u'暴走漫画'+'/'+time.strftime('%H-%M-%S')+random.choice('qwertyuiopasdfghjklzxcvbnm')+flink[-5:],'wb')as code:#在OSC上现学的 with open(u'暴走漫画'+'/'+flink[-11:],'wb')as code: code.write(content2) page=int(page)+1 print u'开始抓取下一页' print'the%s page'%page page_loop(page) page_loop() 抓取网站模板 #!/usr/bin/env python #-*-coding:utf-8-*- #by yangyongzhen #2016-12-06 from bs4 import BeautifulSoup import urllib,urllib2,os,time import re rootpath=os.getcwd()+u'/抓取的模板/' def makedir(path): if not os.path.isdir(path): os.makedirs(path) #创建抓取的根目录 makedir(rootpath) #显示下载进度 def Schedule(a,b,c): ''''' a:已经下载的数据块 b:数据块的大小 c:远程文件的大小 ''' per=100.0*a*b/c if per>100: per=100 print'%.2f%%'%per def grabHref(url,listhref,localfile): html=urllib2.urlopen(url).read() html=unicode(html,'gb2312','ignore').encode('utf-8','ignore') content=BeautifulSoup(html).findAll('link') myfile=open(localfile,'w') pat=re.compile(r'href="([^"]*)"') pat2=re.compile(r'http') for item in content: h=pat.search(str(item)) href=h.group(1) if pat2.search(href): ans=href else: ans=url+href listhref.append(ans) myfile.write(ans) myfile.write('\r\n') print ans content=BeautifulSoup(html).findAll('script') pat=re.compile(r'src="([^"]*)"') pat2=re.compile(r'http') for item in content: h=pat.search(str(item)) if h: href=h.group(1) if pat2.search(href): ans=href else: ans=url+href listhref.append(ans) myfile.write(ans) myfile.write('\r\n') print ans content=BeautifulSoup(html).findAll('a') pat=re.compile(r'href="([^"]*)"') pat2=re.compile(r'http') for item in content: h=pat.search(str(item)) if h: href=h.group(1) if pat2.search(href): ans=href else: ans=url+href listhref.append(ans) myfile.write(ans) myfile.write('\r\n') print ans myfile.close() def main(): url="http://192.168.72.140/qdkj/"#采集网页的地址 listhref=[]#链接地址 localfile='ahref.txt'#保存链接地址为本地文件,文件名 grabHref(url,listhref,localfile) listhref=list(set(listhref))#去除链接中的重复地址 curpath=rootpath start=time.clock() for item in listhref: curpath=rootpath name=item.split('/')[-1] fdir=item.split('/')[3:-1] for i in fdir: curpath+=i curpath+='/' print curpath makedir(curpath) local=curpath+name urllib.urlretrieve(item,local,Schedule)#远程保存函数 end=time.clock() print u'模板抓取完成!' print u'一共用时:',end-start,u'秒' if __name__=="__main__": main()
到此为止,关于这篇文章的内容,小编就给大家介绍到这里了,希望可以给大家带来更多帮助。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/128240.html
摘要:做这一领域的工作,有很多网站能够起到辅助性的作用。再加上爬虫相对于其他热门方向来说,更容易学。也促使更多人会优先选择学习爬虫。能够代替手工完成手工无法完成的测试任务,并且可以记录相关数据及报告。 ...
摘要:简单介绍自带库,使用调试程序还是很方便的。比如下图就是展示断点进入到内部之后,打印的参数,打印某个变量退出调试,直接退出调试或者使用的方式退出最后说一句上面展示的使用调试的过程其实是很简单的,文章中主要通过截图展示运行的效果。 简单介绍 Python自带 Pdb库,使用 Pdb调试 Python程序还是很方便的。但是远程调试、多线程,Pdb是搞不定的 本文参考的相关文章如下: 《指针...
摘要:内存池机制提供了对内存的垃圾收集机制,但是它将不用的内存放到内存池而不是返回给操作系统。为了加速的执行效率,引入了一个内存池机制,用于管理对小块内存的申请和释放。 注:答案一般在网上都能够找到。1.对if __name__ == main的理解陈述2.python是如何进行内存管理的?3.请写出一段Python代码实现删除一个list里面的重复元素4.Python里面如何拷贝一个对象?...
摘要:入门,第一个这是一门很新的语言,年前后正式公布,算起来是比较年轻的编程语言了,更重要的是它是面向程序员的函数式编程语言,它的代码运行在之上。它通过编辑类工具,带来了先进的编辑体验,增强了语言服务。 showImg(https://segmentfault.com/img/bV1xdq?w=900&h=385); 新的一年不知不觉已经到来了,总结过去的 2017,相信小伙们一定有很多收获...
摘要:入门,第一个这是一门很新的语言,年前后正式公布,算起来是比较年轻的编程语言了,更重要的是它是面向程序员的函数式编程语言,它的代码运行在之上。它通过编辑类工具,带来了先进的编辑体验,增强了语言服务。 showImg(https://segmentfault.com/img/bV1xdq?w=900&h=385); 新的一年不知不觉已经到来了,总结过去的 2017,相信小伙们一定有很多收获...
摘要:入门,第一个这是一门很新的语言,年前后正式公布,算起来是比较年轻的编程语言了,更重要的是它是面向程序员的函数式编程语言,它的代码运行在之上。它通过编辑类工具,带来了先进的编辑体验,增强了语言服务。 showImg(https://segmentfault.com/img/bV1xdq?w=900&h=385); 新的一年不知不觉已经到来了,总结过去的 2017,相信小伙们一定有很多收获...
阅读 889·2023-01-14 11:38
阅读 833·2023-01-14 11:04
阅读 685·2023-01-14 10:48
阅读 1888·2023-01-14 10:34
阅读 892·2023-01-14 10:24
阅读 750·2023-01-14 10:18
阅读 479·2023-01-14 10:09
阅读 519·2023-01-14 10:02