python3分布式爬取知乎
下面代码利用了redis作为中间库来存储要处理的数据,处理数据然后再把需要处理的上传到redis如此循环,并且能实现断点续传,随时停止,随时开始。代码封装可以继续优化。
代码分为三部分。common公共函数,mater为了让redis有起始数据(可以继续优化),slave可以复制多个一起运行来实现爬取数据和存redis
#common:
import redis
import requests
import pymysql
import sys
class commonP(object):
def __init__(self):
#他关注的人
self.followees = 'https://www.zhihu.com/api/v4/members/{}/followees?include=data%5B*%5D.answer_count%2Carticles_count%2Cgender%2Cfollower_count%2Cis_followed%2Cis_following%2Cbadge%5B%3F(type%3Dbest_answerer)%5D.topics&offset=0&limit=20'
#关注他的人
self.followers = 'https://www.zhihu.com/api/v4/members/{}/followers?include=data%5B*%5D.answer_count%2Carticles_count%2Cgender%2Cfollower_count%2Cis_followed%2Cis_following%2Cbadge%5B%3F(type%3Dbest_answerer)%5D.topics&offset=0&limit=20'
#header
self.header = {
'Host':'www.zhihu.com',
'authorization':'Bearer 2|1:0|10:1517567905|4:z_c0|92:Mi4xaEEwekJBQUFBQUFBUUFKT3RYLTREQ1lBQUFCZ0FsVk5vWTFoV3dCMHBqdmVvS3NtNG5ZRWpicFJzU0xpNGo1NDZR|b55eabdc78b21257101449ab18b1b440f6f54280f9480103b36010ebe85cb5de',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36',
'Referer':'https://www.zhihu.com/',
'Cookie':'_zap=b0192650-2f45-4bf6-981f-d8803d2759c0; d_c0="AEACTrV_uAyPTkEI_9xF956jRFKzNEp9Iqg=|1511319939"; __utma=51854390.497160466.1511319903.1513058823.1515050597.5; __utmz=51854390.1515050597.5.5.utmcsr=zhihu.com|utmccn=(referral)|utmcmd=referral|utmcct=/; __utmv=51854390.100--|2=registration_date=20170222=1^3=entry_date=20170222=1; q_c1=eb2a32a585c746889bea4245cf397a22|1515743186000|1510216112000; aliyungf_tc=AQAAAD8Hi2GM4AEA04QOt67iPGm9Lpec; _xsrf=aac1c9a9-d2c0-41b4-b859-a5c67c30f0ec; l_cap_id="MjhhMTAwZDlmN2RkNDYwMDg4ZWUzM2FmN2EzMWVjZTQ=|1517558959|82401362ea74f73afe1204fb7d57d795dca7c001"; r_cap_id="YTY0NjFhNjg2NmU2NDU5OTlhMGJhMzQ3NTVlYjY0MzQ=|1517558959|a7da4dff9c478f7a61407f3bb807790cf060d497"; cap_id="ODEzN2I0NDViMjg3NDRmZWEwMGQ0OTEyNjg4Yzg5NzY=|1517558959|154679a53f496fbf09cabfbc8c7136af7c3493ab"; capsion_ticket="2|1:0|10:1517567898|14:capsion_ticket|44:MjVkZGNiNzQ4OTNjNDQxOGJkYzNhNDJlMDBiZmFhOWM=|9296233d810eb54632ab14b7e2b47aa5ce053252d11fae5f695e3ebda76a6d14"; z_c0="2|1:0|10:1517567905|4:z_c0|92:Mi4xaEEwekJBQUFBQUFBUUFKT3RYLTREQ1lBQUFCZ0FsVk5vWTFoV3dCMHBqdmVvS3NtNG5ZRWpicFJzU0xpNGo1NDZR|b55eabdc78b21257101449ab18b1b440f6f54280f9480103b36010ebe85cb5de"'
}
self.pool = redis.ConnectionPool(host = 'localhost',port=6379,db=0)
self.r = redis.StrictRedis(connection_pool = self.pool)
#连接数据库
def connctMysql(self):
conn = pymysql.connect(host='localhost', user='root', passwd='root',
db='onescrn', charset='utf8')
cur = conn.cursor()
return conn,cur
#get请求
def requestGet(self,url):
return requests.get(url,headers = self.header).json()
#byte类型转为array
def bytesToStr(self,bytes):
return bytes.decode(encoding="utf-8").split(',')
#插入数据库
def insertData(self,data):
data = self.delData(data)
conn,cur = self.connctMysql()
sql = "insert into zhihu (headline,name,gender,url_token) values(%s,%s,%s,%s)"
try:
cur.executemany(sql, data)
conn.commit()
cur.close()
conn.close()
except:
raise
#
def delData(self,data):
conn,cur = self.connctMysql()
str = ''
for dd in data:
str += "'"+dd[-1]+"',"
sql = "select url_token from zhihu where url_token in({})".format(str[0:-1])
cur.execute(sql)
rows = cur.fetchall()
inUrl = []
for dd in rows:
inUrl.append(dd[0])
result = []
for dd in data:
if dd[-1] not in inUrl:
result.append(dd)
else:
continue
return result
#加入队列
def lpush(self,url_token):
print(url_token)
url_token = self.mysqlIsExit(url_token)
print(url_token)
self.r.lpush('num',','.join(url_token))
#判断是否重复
def mysqlIsExit(self,data):
conn,cur = self.connctMysql()
dd = []
for i in range(len(data)):
sql = "select count(1) as a from zhurl_token where url_token='{}'".format(data[i])
cur.execute(sql)
rows = cur.fetchone()
if rows[0] == 0:
dd.append(data[i])
return dd
# 把用户认证插入数据表 防止重复操作
def insertUrlToken(self,url_tokens):
conn,cur = self.connctMysql()
url_tokens = self.oneToTwo(url_tokens)
sql = "insert into zhurl_token (url_token) values(%s)"
try:
cur.executemany(sql, url_tokens)
conn.commit()
cur.close()
conn.close()
except:
raise
def oneToTwo(self,url_tokens):
data = []
for d in url_tokens:
temp = []
temp.append(d)
data.append(temp)
return data
#判断是否存在于数据库中
# def isIn(self,url_token):
#
# conn,cur = self.connctMysql()
# sql = "select count(1) as a from zhurl_token where url_tokens='{}'".format(url_token)
# cur.execute(sql)
# rows = cur.fetchone()
# return rows[0]
#处理关注他的人
def formFollowers(self,url_tokens):
for name in url_tokens:
# isIn = self.isIn(name)
# if isIn > 0:
# print(name)
# continue
# print(name,'关注的人')
isFirstPage = 0
while True:
if isFirstPage == 0:
url = self.followers.format(name)
isFirstPage = 1
data = self.requestGet(url)
try:
url = data['paging']['next']
except:
break
if len(data['data']) == 0:
break
else:
self.formData(data)
#处理他关注的人
def formFollowees(self,url_tokens):
for name in url_tokens:
# print(name,'他关注的人')
# isIn = self.isIn(name)
# if isIn > 0:
# print(name)
# continue
isFirstPage = 0
while True:
if isFirstPage == 0:
url = self.followees.format(name)
isFirstPage = 1
data = self.requestGet(url)
try:
url = data['paging']['next']
except:
break
if len(data['data']) == 0:
break
else:
self.formData(data)
#处理数据
def formData(self,data):
result = []
url_token = []
for dd in data['data']:
temp = []
temp.append(dd['headline'])
temp.append(dd['name'])
temp.append(dd['gender'])
temp.append(dd['url_token'])
url_token.append(dd['url_token'])
result.append(temp)
self.insertData(result)
self.lpush(url_token)#mater:
from common import commonP
class spider(commonP):
def getData(self,name):
self.oneStep(name)
def oneStep(self,name):
isFirstPage = 0
while True:
if isFirstPage == 0:
url = self.followers.format(name)
isFirstPage = 1
data = self.requestGet(url)
url = data['paging']['next']
if len(data['data']) == 0:
break
else:
self.formData(data)
if __name__ == '__main__':
spider = spider()
spider.getData('da-shi-ta-peng-you')#slave:
import threading
from common import commonP
class spider(commonP):
def getData(self):
while True:
task = self.r.blpop('num', 0)
url_tokens = self.bytesToStr(task[1])
self.insertUrlToken(url_tokens)
thread1 = threading.Thread(target=self.formFollowers,args=(url_tokens,))
thread1.start()
thread1.join()
thread1 = threading.Thread(target=self.formFollowees,args=(url_tokens,))
thread1.start()
thread1.join()
print('over:',url_tokens)
if __name__ == '__main__':
spider = spider()
spider.getData()