python3分布式爬取知乎
下面代码利用了redis作为中间库来存储要处理的数据,处理数据然后再把需要处理的上传到redis如此循环,并且能实现断点续传,随时停止,随时开始。代码封装可以继续优化。
代码分为三部分。common公共函数,mater为了让redis有起始数据(可以继续优化),slave可以复制多个一起运行来实现爬取数据和存redis
#common: import redis import requests import pymysql import sys class commonP(object): def __init__(self): #他关注的人 self.followees = 'https://www.zhihu.com/api/v4/members/{}/followees?include=data%5B*%5D.answer_count%2Carticles_count%2Cgender%2Cfollower_count%2Cis_followed%2Cis_following%2Cbadge%5B%3F(type%3Dbest_answerer)%5D.topics&offset=0&limit=20' #关注他的人 self.followers = 'https://www.zhihu.com/api/v4/members/{}/followers?include=data%5B*%5D.answer_count%2Carticles_count%2Cgender%2Cfollower_count%2Cis_followed%2Cis_following%2Cbadge%5B%3F(type%3Dbest_answerer)%5D.topics&offset=0&limit=20' #header self.header = { 'Host':'www.zhihu.com', 'authorization':'Bearer 2|1:0|10:1517567905|4:z_c0|92:Mi4xaEEwekJBQUFBQUFBUUFKT3RYLTREQ1lBQUFCZ0FsVk5vWTFoV3dCMHBqdmVvS3NtNG5ZRWpicFJzU0xpNGo1NDZR|b55eabdc78b21257101449ab18b1b440f6f54280f9480103b36010ebe85cb5de', 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36', 'Referer':'https://www.zhihu.com/', 'Cookie':'_zap=b0192650-2f45-4bf6-981f-d8803d2759c0; d_c0="AEACTrV_uAyPTkEI_9xF956jRFKzNEp9Iqg=|1511319939"; __utma=51854390.497160466.1511319903.1513058823.1515050597.5; __utmz=51854390.1515050597.5.5.utmcsr=zhihu.com|utmccn=(referral)|utmcmd=referral|utmcct=/; __utmv=51854390.100--|2=registration_date=20170222=1^3=entry_date=20170222=1; q_c1=eb2a32a585c746889bea4245cf397a22|1515743186000|1510216112000; aliyungf_tc=AQAAAD8Hi2GM4AEA04QOt67iPGm9Lpec; _xsrf=aac1c9a9-d2c0-41b4-b859-a5c67c30f0ec; l_cap_id="MjhhMTAwZDlmN2RkNDYwMDg4ZWUzM2FmN2EzMWVjZTQ=|1517558959|82401362ea74f73afe1204fb7d57d795dca7c001"; r_cap_id="YTY0NjFhNjg2NmU2NDU5OTlhMGJhMzQ3NTVlYjY0MzQ=|1517558959|a7da4dff9c478f7a61407f3bb807790cf060d497"; cap_id="ODEzN2I0NDViMjg3NDRmZWEwMGQ0OTEyNjg4Yzg5NzY=|1517558959|154679a53f496fbf09cabfbc8c7136af7c3493ab"; capsion_ticket="2|1:0|10:1517567898|14:capsion_ticket|44:MjVkZGNiNzQ4OTNjNDQxOGJkYzNhNDJlMDBiZmFhOWM=|9296233d810eb54632ab14b7e2b47aa5ce053252d11fae5f695e3ebda76a6d14"; z_c0="2|1:0|10:1517567905|4:z_c0|92:Mi4xaEEwekJBQUFBQUFBUUFKT3RYLTREQ1lBQUFCZ0FsVk5vWTFoV3dCMHBqdmVvS3NtNG5ZRWpicFJzU0xpNGo1NDZR|b55eabdc78b21257101449ab18b1b440f6f54280f9480103b36010ebe85cb5de"' } self.pool = redis.ConnectionPool(host = 'localhost',port=6379,db=0) self.r = redis.StrictRedis(connection_pool = self.pool) #连接数据库 def connctMysql(self): conn = pymysql.connect(host='localhost', user='root', passwd='root', db='onescrn', charset='utf8') cur = conn.cursor() return conn,cur #get请求 def requestGet(self,url): return requests.get(url,headers = self.header).json() #byte类型转为array def bytesToStr(self,bytes): return bytes.decode(encoding="utf-8").split(',') #插入数据库 def insertData(self,data): data = self.delData(data) conn,cur = self.connctMysql() sql = "insert into zhihu (headline,name,gender,url_token) values(%s,%s,%s,%s)" try: cur.executemany(sql, data) conn.commit() cur.close() conn.close() except: raise # def delData(self,data): conn,cur = self.connctMysql() str = '' for dd in data: str += "'"+dd[-1]+"'," sql = "select url_token from zhihu where url_token in({})".format(str[0:-1]) cur.execute(sql) rows = cur.fetchall() inUrl = [] for dd in rows: inUrl.append(dd[0]) result = [] for dd in data: if dd[-1] not in inUrl: result.append(dd) else: continue return result #加入队列 def lpush(self,url_token): print(url_token) url_token = self.mysqlIsExit(url_token) print(url_token) self.r.lpush('num',','.join(url_token)) #判断是否重复 def mysqlIsExit(self,data): conn,cur = self.connctMysql() dd = [] for i in range(len(data)): sql = "select count(1) as a from zhurl_token where url_token='{}'".format(data[i]) cur.execute(sql) rows = cur.fetchone() if rows[0] == 0: dd.append(data[i]) return dd # 把用户认证插入数据表 防止重复操作 def insertUrlToken(self,url_tokens): conn,cur = self.connctMysql() url_tokens = self.oneToTwo(url_tokens) sql = "insert into zhurl_token (url_token) values(%s)" try: cur.executemany(sql, url_tokens) conn.commit() cur.close() conn.close() except: raise def oneToTwo(self,url_tokens): data = [] for d in url_tokens: temp = [] temp.append(d) data.append(temp) return data #判断是否存在于数据库中 # def isIn(self,url_token): # # conn,cur = self.connctMysql() # sql = "select count(1) as a from zhurl_token where url_tokens='{}'".format(url_token) # cur.execute(sql) # rows = cur.fetchone() # return rows[0] #处理关注他的人 def formFollowers(self,url_tokens): for name in url_tokens: # isIn = self.isIn(name) # if isIn > 0: # print(name) # continue # print(name,'关注的人') isFirstPage = 0 while True: if isFirstPage == 0: url = self.followers.format(name) isFirstPage = 1 data = self.requestGet(url) try: url = data['paging']['next'] except: break if len(data['data']) == 0: break else: self.formData(data) #处理他关注的人 def formFollowees(self,url_tokens): for name in url_tokens: # print(name,'他关注的人') # isIn = self.isIn(name) # if isIn > 0: # print(name) # continue isFirstPage = 0 while True: if isFirstPage == 0: url = self.followees.format(name) isFirstPage = 1 data = self.requestGet(url) try: url = data['paging']['next'] except: break if len(data['data']) == 0: break else: self.formData(data) #处理数据 def formData(self,data): result = [] url_token = [] for dd in data['data']: temp = [] temp.append(dd['headline']) temp.append(dd['name']) temp.append(dd['gender']) temp.append(dd['url_token']) url_token.append(dd['url_token']) result.append(temp) self.insertData(result) self.lpush(url_token)
#mater: from common import commonP class spider(commonP): def getData(self,name): self.oneStep(name) def oneStep(self,name): isFirstPage = 0 while True: if isFirstPage == 0: url = self.followers.format(name) isFirstPage = 1 data = self.requestGet(url) url = data['paging']['next'] if len(data['data']) == 0: break else: self.formData(data) if __name__ == '__main__': spider = spider() spider.getData('da-shi-ta-peng-you')
#slave: import threading from common import commonP class spider(commonP): def getData(self): while True: task = self.r.blpop('num', 0) url_tokens = self.bytesToStr(task[1]) self.insertUrlToken(url_tokens) thread1 = threading.Thread(target=self.formFollowers,args=(url_tokens,)) thread1.start() thread1.join() thread1 = threading.Thread(target=self.formFollowees,args=(url_tokens,)) thread1.start() thread1.join() print('over:',url_tokens) if __name__ == '__main__': spider = spider() spider.getData()