几个爬虫小栗子
年前水一篇,最近还是在看python
,基础看完之后就去看数据库了,数据库之后就是web
了,Django&Flask
之类的,先是html
入门,这个看完了,css
看到一半看不下去了,js
一点没看,所以我决定先跳过这一块,暂时去看看别的,再下来就是爬虫方面的东西了,个人感觉这个东西还是蛮有意思的,提起了我的兴趣,看了之后发现没点前端web
方面的基础玩起来会很吃力,之后也自己试着写了很多,像是什么登陆某些网站、爬取某些网站的数据,当然下面的几个小栗子也都是入门级的,也都是基于python3
,先来尝试登陆某个网站获取一些登录后才能拿到的数据。
登陆GitHub
具体要做什么,就是能登陆到GitHub
,登录后访问电子邮件设定页面,在页面源码里获取到当前登录的用户名、绑定的邮箱地址,还有就是访问安全日志页面,获取第一页的信息,也就是账户的登录记录,包括登陆状态、国家、IP
,日期,大概就是这样,这些东西都是登陆之后才能获取到的信息,所以先想办法登陆到github
。
先来分析一下页面,先点开浏览器的F12
,打开持续日志,打开目标网址,也就是https://github.com/login
,页面加载完成之后在开发者模式里面点开这个,也就是login
的response
,一会需要看这里面的源码
然后输入用户名密码点登陆吧,登陆成功之后就可以去看开发者工具了,找一个URI
为session
的POST
请求,也就是这个,这个就是登陆接口,看一下他都提交了哪些信息,
commit: Sign in
utf8: ✓
authenticity_token: 3/YrvJJIf3pYYs8pjn8wTeww/ElnHvLLt6MNLS50CEPeLOcgjDg2OdExK4q8nzFpqK2KRbq5YLBftXXeRWE2OQ==
ga_id: 123228193.1576807919
login: xxx
password: xxxx
webauthn-support: supported
webauthn-iuvpaa-support: unsupported
required_field_4988:
timestamp: 1578547909624
timestamp_secret: 58ecf2f5b220668cedd903552595e7efd24455840dad422a1ed05e6562b23d94
分析一下提交的数据,前两个值都是固定的,一会发送POST
请求的时候写死就可以了。
authenticity_token
,这个值是login
页面会返回给我们,一会把这个值取出来带上
ga_id
值是存在cookie
里面的,一会也把他取出来带上。
login
为用户名或邮箱地址,自行输入即可
password
为登陆密码,自己输入即可
required_field_4988
这个key
并没有值,这个key
也是动态的,也在login
页面源码里
timestamp
是时间戳,这个时间戳也是页面返回给我们的,一会把他取出来带上
timestamp_secret
也是login
页面返回给我们,一会把他取出来带上,
所以我们需要把这些数据在页面源码或cookies
里取出来,发送登陆请求的时候带上就行了,当然用户名密码除外,手动输入即可,
authenticity_token
ga_id
required_field_4988
timestamp
timestamp_secret
搞清楚这些之后就可以写代码了撒,用两个模块就够了,分别是re/requests
,requests
用来帮我们发送请求,re
帮我们拿到我们想要的东西就够了,所以代码如下,requests
自行安装撒。
import re
import requests
class Login_github(object):
def __init__(self):
# 初始化session对象
self.request_session = requests.session()
# 定义一个空列表,用于存在登陆记录
self.login_record = []
# 默认请求头
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/79.0.3945.88 Safari/537.36",
}
# github首页
self.index_url = 'https://github.com'
# 请求登陆页面
self.response = self.request_session.get(url='%s' % self.index_url + '/login', headers=self.header)
# 获取ga_id方法
def ga_id(self):
cookie = self.response.cookies['_octo']
ga_id_search = re.compile(r'(\d{2,10}\.\d{2,10})')
return re.search(ga_id_search, cookie).group(1)
# 获取authenticity_token值方法
def authenticity_token(self):
authenticity_token_search = re.compile(r'authenticity_token.\svalue="(.*?)"')
return re.search(authenticity_token_search, self.response.text).group(1)
# 获取时间戳方法
def timestamp(self):
timestamp_search = re.compile(r'name="timestamp"\svalue="(\d{13})"')
return re.search(timestamp_search, self.response.text).group(1)
# 获取required_field方法
def required_field(self):
required_field_search = re.compile(r'name="(\w+)?"\shidden')
return re.search(required_field_search, self.response.text).group(1)
# 获取timestamp_secret方法
def timestamp_secret(self):
timestamp_secret_search = re.compile(r'timestamp_secret.\svalue="(.*?)"')
return re.search(timestamp_secret_search, self.response.text).group(1)
# 登陆方法
def Login(self):
username = input('用户名或邮箱地址: ')
password = input('密码: ')
# 定义
post_data = {
"commit": "Sign in",
"utf8": "✓",
"ga_id": self.ga_id(),
"authenticity_token": self.authenticity_token(),
"login": username,
"password": password,
"webauthn-support": "supported",
"webauthn-iuvpaa-support": "unsupported",
"timestamp": self.timestamp(),
self.required_field(): '',
"timestamp_secret": self.timestamp_secret()
}
# 发送登陆POST请求
self.request_session.post(url='%s' % self.index_url + '/session', headers=self.header, data=post_data)
# 查询登陆记录,一页最多50条记录
def login_session(self):
# 正则匹配
login_search = re.compile(r'<span class="audit-type">\s+.+\s.+\s+('
r'\w+\.\w+)\s+.+\s.+\s.+\s+.+\s.+\s+.+\s+.+\s+.+?>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,'
r'3}).+\s+.+\s+(\w+.+)\s\s+.+\s.+">?(.*)<.+')
# 请求安全日志页面
login_info = re.findall(login_search, self.request_session.get(url='%s' % self.index_url +'/settings/security-log',headers=self.header).text)
for login in range(len(login_info)):
info_dict = {
'status': login_info[login][0],
'ip-add': login_info[login][1],
'country': login_info[login][2],
'date': login_info[login][3]
}
self.login_record.append(info_dict)
# 获取用户名、邮箱、最近登陆国家、日期、IP地址信息
def run(self):
# 尝试登陆
try:
self.Login()
except Exception as e:
print(e)
# 访问设置页面
response = self.request_session.get(url='%s' % self.index_url + '/settings/emails', headers=self.header).text
try:
# 正则匹配用户名
user_name_search = re.compile(r'strong\sclass="css-truncate-target">(.*?)<')
# 正则匹配邮箱
email_search = re.compile(r'option\sselected="selected"\svalue="\d+">(.+?)<')
# 打印用户名
print('你的用户名为:%s' % re.search(user_name_search, response).group(1))
# 打印邮箱
print('你的邮箱为: %s' % re.search(email_search, response).group(1))
try:
self.login_session()
except Exception as e:
print(e)
print('登陆记录如下')
for i in range(len(self.login_record)):
print(self.login_record[i])
except:
print('没有检索到用户名/邮箱地址,请确认你输入的用户名密码是否正确,也有可能出现了验证码,看看你的邮箱撒')
if __name__ == '__main__':
github = Login_github()
github.run()
登陆成功的输出如下,输出用户名、账户绑定的邮箱,和安全页面第一页的登陆记录,一页是50
条,图片我就是截了一部分,总之效果就是这样
当然也有登陆失败的时候,登陆失败输出如下,
上图登陆失败的原因就是出现了验证码,没办法继续了,之前可能就是太频繁了,还有种可能就是用户名密码错误,如果你的邮箱没有收到验证码就说明你的用户名密码有问题,还有可能就是网络超时,网络超时的话会抛出ConnectionError
的错误,还有时候会抛出证书错误,这个不知道是为什么,估计也是网络造成的,大概就是这样,这个小脚本就算完事了,下面来一个稍微复杂的。
爬取豆瓣top250
这个比上面的复杂一些,用到了re/requests/threading/lxml/pymongo
,pymongo
其实可以不用,我只是将爬取到的数据存到了mongodb
中,在不用mongodb
的情况不用安装,然后这个脚本就可以用了,如果不了解xpath/lxml
可以去了解一下,不是特别复杂,前提是你必须有点前端基础,否则看起来也很吃力,lxml
是要额外安装的,自行使用pip
命令安装一下吧。
要爬的目标页面是这个,这个是第一页,一会需要生成URL
,暂时还没用到selenium
,所以想翻页就只能自行生成URL
了,要获取到的内容分别是电影名、导演、评分、评价数、简介,首先是mongo_db.py
,根据自己的情况自行调整吧,目前就是进行了简单的插入操作,并没有做别的操作,当然不准备入库的话也不需要这个。
from pymongo import MongoClient
class Mongo_client(object):
def __init__(self):
try:
client = MongoClient(host='192.168.1.200', port=37017)
client.admin.authenticate('admin', 'Sowhat?')
my_db = client['douban']
self.collection = my_db['collection_top250']
except Exception as e:
print(e)
def insert_db(self, item):
self.collection.insert_one(item)
insert_data = Mongo_client()
然后是爬虫文件,
import re
import requests
import threading
from lxml import etree
from tools.mongo_db import insert_data
from concurrent.futures import ThreadPoolExecutor
class HandleDoubanMovieTop250(object):
def __init__(self):
# 定义请求头
self.header = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,'
'application/signed-exchange;v=b3;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/79.0.3945.88 Safari/537.36 ',
'Host': 'movie.douban.com'
}
# 一个空列表,用于存放构建后的URL
self.page_url = []
# 定义线程锁
self.lock = threading.Lock()
# 定义正则匹配
self.movie_re = re.compile(r'\s+')
# 构建页面地址
def handle_page_url(self):
page = 0
while page <= 25:
self.page_url.append('https://movie.douban.com/top250?start=%d' % page)
page += 25
# 发送get请求,返回页面信息
def handle_request(self, page_url):
# 将页面实例化并返回
return etree.HTML(requests.get(url=page_url, headers=self.header).text)
# 开始处理数据
def handle_page_detail(self, page_url):
# 调用实例化函数
response = self.handle_request(page_url)
# 开始处理数据
all_div = response.xpath('//div[@class="info"]')
for html in all_div:
movie_dict = {
# 电影名
'movie_name': self.movie_re.sub('', ''.join(html.xpath('./div/a/span/text()'))),
# 导演
'actors_information': self.movie_re.sub('', "".join(
html.xpath(".//div[@class='bd']/p[@class='']/text()"))),
# 评分
'score': html.xpath('.//span[@class="rating_num"]/text()')[0],
# 评价数
'evaluate': html.xpath('.//div[@class="star"]/span[last()]/text()')[0],
# 简介
'describe': html.xpath('.//span[@class="inq"]/text()')[0],
}
# 入库,如果不入库直接打印movie_dict也可以
with self.lock:
insert_data.insert_db(movie_dict)
def run(self):
# 生成要爬取的页面
self.handle_page_url()
# 定义线程池
executor = ThreadPoolExecutor(max_workers=10)
# 开跑
for url in self.page_url:
executor.submit(self.handle_page_detail, url)
# 关闭线程池
executor.shutdown()
if __name__ == '__main__':
# 实例化
top250 = HandleDoubanMovieTop250()
# 开跑
top250.run()
然后开跑,跑完之后去看一哈mongodb
,
截了一部分,总条数250
条就对了,
由于用了多线程,所以数据插入的顺序是乱的,像是入库的第一条萤火虫之墓,他是排在151
,本该是第一位的肖申克的救赎在13
条,想避免这个问题就不要用多线程了,大概就是这样,下面用selenium
来搞点事情。
爬取51job职位
这个主要就是用到了etree/selenium/pymongo
,当然不入库一样可以不用pymongo
,目标网址就是这里,也就是前程无忧的职位搜索页面,全国范围的,一会的操作就是用代码去调用浏览器,在这之前需要先去下载浏览器驱动,目前只支持Phantomjs/Chrome/Firefox
,我这里用的Chrome
的驱动,在这里可以找到,看一下自己浏览器版本去自行下载吧,下载完成之后加入到环境变量,在任意位置打开命令行运行chromedriver --versioin
能看到如下信息就算成功了。
然后自行安装一下selenium
这个包就可以了,大概要做的就是调用浏览器打开目标页面,判断搜索框有没有出现,如果出现了输入要搜索的职位信息,输入之后点击搜索按钮获取数据,接下来就是判断有没有下一页了,如果有则点击下一页进行翻页,如果没有就退出,这个我打算扔到centos7
上面去跑了,所以在centos
也要安装Chrome
和他的驱动才行,先把这件事情搞了。
# 安装浏览器
yum -y install https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm
# 下载驱动
wget http://chromedriver.storage.googleapis.com/79.0.3945.36/chromedriver_linux64.zip
unzip chromedriver_linux64.zip -d /usr/local/bin/
chmod +x /usr/local/bin/chromedriver
# 安装需要的包
pip3 install lxml pymongo selenium -i https://pypi.tuna.tsinghua.edu.cn/simple
代码如下
from lxml import etree
from selenium import webdriver
from tools.mongo_db import insert_data
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class Search_51job(object):
def __init__(self):
chrome_options = Options()
# root用户需要加上这个
chrome_options.add_argument('--no-sandbox')
# 关闭GPU加速
chrome_options.add_argument('--disable-gpu')
# 设置无头浏览器
chrome_options.add_argument('--headless')
# 设置浏览器
self.driver = webdriver.Chrome(chrome_options=chrome_options)
# 页码处理方法
def handle_job(self):
# 定义请求地址
self.driver.get('https://search.51job.com/list/000000,000000,0000,00,9,99,%2520,2,1.html')
# 判断kwdselectid元素是否出现,这个就是搜索框
if WebDriverWait(self.driver, 5, 0.5).until(EC.presence_of_element_located((By.ID, 'kwdselectid'))):
# 如果搜索框出现了就输入要搜索的职位
input_keyword = input('请输入要查找的岗位信息: ')
# 向搜索框输入要搜索的职位
self.driver.find_element_by_id('kwdselectid').send_keys(input_keyword)
# 点击搜索按钮
self.driver.find_element_by_class_name('p_but').click()
# 开始处理数据,先判断搜索结果是否出现
if WebDriverWait(self.driver, 5, 0.5).until(EC.presence_of_element_located((By.ID, 'resultList'))):
while True:
# 如果出现则将当前页面交给网页处理方法
self.handle_parse(self.driver.page_source)
# 再进行判断这个xpath的值是不是下一页
if self.driver.find_element_by_xpath('//li[@class="bk"][2]/a').text == '下一页':
# 如果是则点击这个位置进行翻页
self.driver.find_element_by_xpath('//li[@class="bk"][2]/a').click()
else:
# 如果没有出现下一页则跳出循环
break
# 关闭浏览器
self.driver.quit()
# 网页处理方法
def handle_parse(self, page_source):
html_51job = etree.HTML(page_source)
all_div = html_51job.xpath("//*[@id='resultList']//div[@class='el']")
for item in all_div:
info_dict = {'position': item.xpath('./p/span/a/@title')[0],
'company': item.xpath(".//span[@class='t2']/a/@title")[0],
'place': item.xpath("./span[@class='t3']/text()")[0]}
# 月薪这里有的没数据,这里进行了异常处理,如果没数据则等于无数据
try:
info_dict['salary'] = item.xpath("./span[@class='t4']/text()")[0]
except IndexError:
info_dict['salary'] = '无数据'
info_dict['issue_date'] = item.xpath("./span[@class='t5']/text()")[0]
# 打印获取到的信息
print(info_dict)
# 入库
insert_data.insert_db(info_dict)
if __name__ == '__main__':
search_51job = Search_51job()
search_51job.handle_job()
服务器目录结果如下,
[root@rj-bai ~/51job]# tree .
.
├── 51_selenium.py
└── tools
└── mongo_db.py
1 directory, 2 files
mongo_db.py
内容如下,
[root@rj-bai ~/51job]# cat tools/mongo_db.py
from pymongo import MongoClient
class Mongo_client(object):
def __init__(self):
try:
client = MongoClient(host='192.168.1.200', port=37017)
client.admin.authenticate('admin', 'Sowhat?')
my_db = client['51job']
self.collection = my_db['collection_51job']
except Exception as e:
print(e)
def insert_db(self, item):
self.collection.insert_one(item)
insert_data = Mongo_client()
这次我把要入库的信息也打印到屏幕上了,准备开跑,
[root@rj-bai ~/51job]# python3 51_selenium.py
请输入要查找的岗位信息: 运维
跑了几秒钟就停掉了,获取到了五页数据,其实这个不用selenium
就可以搞定,用requests/lxml/re
就够了,总页数在页面源码里都可以看到,取到总页数后写个方法构造URL
就可以了,总之能不用selenium
就不用selenium
,比较占用资源,毕竟要调用浏览器的,规则就是你想要的东西都在页面源码里你就不需要selenium
,如果遇到动态页面就需要用这个了,譬如说P
站。
下载P站图片
这个的话暂时就用selenium
去搞了,搞个最简单的,当前我并没有登陆,所以在未登录状态去访问某个页面,把那一页的缩略图拿下来就行了,暂时不搞翻页,我是个三蹦子玩家,所以搞点三蹦子图片下来,目标页面这里。
目前要做的事情就是把通过selenium
获取到全部缩略图的地址,然后把缩略图下载到本地,因为P
站比较特殊,他的很多东西都是在网页源代码里看不到的,都是动态填充的,先不考虑直接去调用他的接口,这个接口调用之后返回的就是我们想要的数据,之后会提到这个,就先用selenium+requests
搞定这个,selenium
帮我们拿到想要的数据,requests
帮我们去下载就可以了,大概就是这样,现在用requests
去访问一下目标地址,看一下源码,
import requests
print(requests.get(url='https://www.pixiv.net/tags/%E5%B4%A9%E5%9D%8F3/illustrations').text)
可以看到是找不到任何img
标签的,所以你用requests+xpath
去取数据,不好意思取不到,所以这时候用selenium
去获取数据了,代码如下,这个写的比较随便,
import os
import time
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# 请求地址
index_url = 'https://www.pixiv.net/tags/%E5%B4%A9%E5%9D%8F3/illustrations'
pixiv = webdriver.Chrome()
# 请求目标地址
pixiv.get(url=index_url)
# 定义请求头,否则下载图片403
header = {
"referer": index_url
}
# 开始判断这个元素有没有出现
try:
if WebDriverWait(pixiv, 10, 3).until(EC.presence_of_element_located((By.XPATH, "//section[@class='sc-LzMkr jkOmhS']"))):
# 开始向下滚动页面,滚动3次,每次1024像素
for i in range(3):
pixiv.execute_script('window.scrollBy(0,1024)')
# 等待五秒,等待页面加载
time.sleep(5)
# 通过xpath来匹配图片URL
image_list = pixiv.find_elements_by_xpath("//div[@class='sc-fzXfPL fRnGwV']/img")
print('当前获取图片的总数为: %s ' % len(image_list))
for item in image_list:
# 缩略图URL
thumbnail_url = item.get_attribute('src')
print('当前下载的图片为: %s' % thumbnail_url)
# 下载图片到本地
with open('C:/Users/Administrator/Desktop/pixiv/%s' % os.path.basename(thumbnail_url), 'wb') as f:
f.write(requests.get(headers=header, url=thumbnail_url).content)
except Exception as e:
print(e)
finally:
pixiv.quit()
代码就是这样,图片应该是64
张,热门作品缩略图四张,一页搜索结果六十张,开跑
这样就把图片下载下来了撒,不过都是缩略图,这就是个小栗子,针对动态页面的,其实不太建议用这种方式去获取数据,按我目前的理解只要是动态页面一般都是ajax
渲染数据的,包括P
站,所以找到ajax
请求地址去获取数据会比这个好很多,下面开始直接去请求ajax
地址来获取数据。
正式开搞
做个简单点的,登录到P
站,输入关键字之后去搜索,分析的全过程这里不多扯了,说重点,处于登陆状态,譬如我搜索三蹦子的时候页面发送了异步请求,请求地址如下,
https://www.pixiv.net/ajax/search/artworks/%E5%B4%A9%E5%9D%8F3?word=%E5%B4%A9%E5%9D%8F3&order=date_d&mode=all&p=1&s_mode=s_tag&type=all
解码看一下这是个什么地址,
from urllib.parse import unquote
print(unquote('https://www.pixiv.net/ajax/search/artworks/%E5%B4%A9%E5%9D%8F3?word=%E5%B4%A9%E5%9D%8F3&order=date_d&mode=all&p=1&s_mode=s_tag&type=all', 'utf-8'))
那两个崩坏3
应该就是关键字,p=1
就是页码,然后访问一下这个地址,看看返回的是啥子,
import requests
print(requests.get(url='https://www.pixiv.net/ajax/search/artworks/%E5%B4%A9%E5%9D%8F3?word=%E5%B4%A9%E5%9D%8F3&order=date_d&mode=all&p=1&s_mode=s_tag&type=all').text)
返回的是json
数据,先复制到网页上解析一下看看都有啥,我直接贴需要的内容了,
这个就是第一页的搜索结果,这是第一页第一条记录的详细信息,截了一部分,一页一共是60
条记录,data
可以看到60
,信息包括什么ID
、标题、标签、userID
、插画总数都在,重点是哪个illustId
,这个就是详情页的ID
,取到这个通过构造URL
就可以去访问详情页了。
在我访问详情页的时候发现点问题,如果是单图的情况下图片的地址在页面源码里是可以看到的,这个没问题,如果是图册,点进去之后只能看到一张图片,需要点击查看全部才能看到全部的图片,而且就算点开了查看全部在页面源码里也看不到图片地址,我看了一下异步请求,果然是点击查看全部请求之后又有异步请求发出去了,也就是这个
那串数字就是illustId
,他返回的也是json
,这个详情页是有两张图,看一下这个json
的视图,如下,
body
里面两条数据,经过分析original
就是原图地址了,所以,要做的事情就是通过代码先登录,当然也可以不登录,请求搜索接口,解析返回的json
,取出illustId
,也就是详情页ID
,取出之后构造出详情页异步请求地址,然后再发请求到这个地址,解析返回的json
取出original
地址,也就是原图的地址,取到地址之后下载保存到本地就可以了。
至于页码逻辑暂时是这样写的,通过上面的方式获取到插画总数,总数除60
强制进位,就得到页数了,但是有个问题,如果登录了就可以获取所有的数据,在不登陆的情况下只能查看前十页数据,就算是访问第11
页的数据返回的也是第十页的,所以代码了进行了判断,在没有登录的情况下,如果页码大于10
就重新赋值为10
,如果不大于10
则不进行任何操作,登录状态下获取全部数据。
登录这一块我并不是通过用户名密码去登陆的,而是通过cookies
,我也试过用requests/selenium
去登陆,但是问题很大,十有八九要求我进行验证,偶尔会登陆成功,搞得我头大,还是我某些地方写的有问题,最后换了一个比较靠谱的方式,那就是用cookies
,这个cookies
需要你自己获取一下,也很简单,就是在浏览器登陆之后F12
,找到这个,
看请求头里面的cookie
值,也就是这一串,
复制出来就可以了,然后代码如下,需要自行使用pip
安装两个包,也就是colorama/requests
,装完之后就可以运行了,注意线程不要开太多,超时时间自行调整吧,代码里设置的10
秒
import os
import re
import sys
import math
import json
import requests
import threading
from colorama import Fore
from multiprocessing import Queue
# 获取图片地址类
class Get_Picture_Url(threading.Thread):
def __init__(self, thread_name, page_queue, image_queue, search_name, cookies):
super(Get_Picture_Url, self).__init__()
# 线程名
self.thread_name = thread_name
# 页码队列
self.page_queue = page_queue
# 图片队列
self.image_queue = image_queue
# 搜索名字
self.search_name = search_name
# cookies
self.cookies = cookies
# 定义请求头
self.header = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,"
"application/signed-exchange;v=b3;q=0.9",
"accept-encoding": "gzip, deflate, br",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
"cache-control": "no-cache",
"pragma": "no-cache",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
'referer': 'https://www.pixiv.net',
"user-agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/79.0.3945.117 Safari/537.36",
}
# 构造URL
def run(self):
print(Fore.LIGHTBLUE_EX, '\n\t当前启动的处理页码的线程为:%s ' % self.thread_name)
while not url_flag:
try:
# block默认为True,设置成False,如果队列为空抛出异常,empty,full
page = self.page_queue.get(block=False)
# 构造URL
page_url = 'https://www.pixiv.net/ajax/search/artworks/%s?word=%s&order=date_d&mode=all&p=%d&s_mode' \
'=s_tag&type=all' % (self.search_name, self.search_name, page)
print(Fore.LIGHTBLUE_EX, "\n\t当前构造的url为:%s" % page_url)
# 调用获取图片地址方法
self.Get_Original_Picture_Url(page_url)
except:
pass
# 获取图片地址方法
def Get_Original_Picture_Url(self, url):
try:
print(Fore.LIGHTBLUE_EX, '\n\t当前获取图片地址的线程为:%s ' % self.thread_name)
# 请求搜索接口
search_response = requests.get(url=url, headers=self.header, timeout=10, cookies=self.cookies).text
# 解析返回的json取出illustId
illustId_path = json.loads(search_response)['body']['illustManga']['data']
for illustId in illustId_path:
# 构造详情页地址发送请求并解析返回json
details_json = requests.get('https://www.pixiv.net/ajax/illust/%s/pages' % illustId['illustId'],
headers=self.header, timeout=10, cookies=self.cookies).text
# 获取图片地址将地址添加到队列,因为会有图册出现,图片不止一张,只能循环了
image_path = json.loads(details_json)['body']
for image in range(len(image_path)):
# 将图片地址添加到图片队列
self.image_queue.put(image_path[image]['urls']['original'])
except Exception as e:
print(e)
# 图片下载类
class Download_Image(threading.Thread):
def __init__(self, thread_name, image_queue, dir_name):
super(Download_Image, self).__init__()
self.thread_name = thread_name
self.image_queue = image_queue
# 保存图片的文件夹名称,就是你的搜索的关键字
self.dir_name = dir_name
# 保存位置就是脚本当前目录下的搜索名字
self.path = os.path.dirname(os.path.abspath(__file__)) + '/' + self.dir_name
# 定义请求头
self.header = {
'referer': 'https://www.pixiv.net',
"user-agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/79.0.3945.117 Safari/537.36",
}
# 创建文件夹
if not os.path.exists(self.path):
os.mkdir(self.path)
# 开始下载图片
def run(self):
print(Fore.LIGHTBLUE_EX, '\n\t当前启动下载图片的线程为: %s' % self.thread_name)
while not image_flag:
try:
# 拉取图片队列中的数据
image_url = self.image_queue.get(block=False)
# 将图片保存到本地
print(Fore.LIGHTBLUE_EX, '\n\t当前执行下载的线程为: %s ' % self.thread_name)
with open(self.path + '/' + '%s' % os.path.basename(image_url), 'wb') as f:
print(Fore.LIGHTBLUE_EX, '\n\t当前下载的图片为: %s' % image_url)
f.write(requests.get(headers=self.header, url=image_url, timeout=10).content)
except:
pass
# 立flag
url_flag = False
image_flag = False
def run():
# 输入要搜索的图片
search_name = input('\n\t输入要搜索的图片: ')
cookies = input('\n\t请输入你的cookies,不输入则使用默认cookies: ')
# cookies字典
cookies_dict = {}
# 判断输入
if len(search_name) == 0:
print(Fore.LIGHTRED_EX, '\n\t请输入要搜索的内容')
sys.exit(0)
else:
# 判断cookies是否合法
if len(cookies) == 0:
print(Fore.LIGHTGREEN_EX,'\n\t未输入cookie,使用默认cookie访问P站')
cookies_dict = requests.get(url='https://www.pixiv.net', timeout=10).cookies
else:
try:
# 开始解析输入的cookies
for i in cookies.split(';'):
cookies_key, cookies_value = i.split("=", 1)
cookies_dict[cookies_key] = cookies_value
# 访问用户设置页面,获取用户ID/用户昵称
info_re = re.compile(r'userData":."id":"(?P<user_id>\d+)".+"name":"(?P<user_name>.+?)"')
user_info = re.search(info_re, requests.get(url='https://www.pixiv.net/setting_user.php',
cookies=cookies_dict, timeout=10).text)
print(Fore.LIGHTMAGENTA_EX, '\n\t当前用户ID为: %s' % user_info.groupdict()['user_id'])
print(Fore.LIGHTMAGENTA_EX, '\n\t当前用户昵称为: %s' % user_info.groupdict()['user_name'])
except Exception as e:
print(Fore.LIGHTRED_EX, '\n\t获取用户ID、用户昵称失败,请确保输入的cookies正确和网络正常')
print('\n\t%s' % e)
sys.exit(1)
# 获取图片总数
url = 'https://www.pixiv.net/ajax/search/artworks/%s?word=%s&order=date_d&mode=all&p=1&s_mode=s_tag&type=all' % (search_name, search_name)
total = json.loads(requests.get(url=url, timeout=10, cookies=cookies_dict).text)['body']['illustManga']['total']
# 如果图片数量为0则程序退出
if total == 0:
print(Fore.LIGHTRED_EX, '\n\t%s 搜索到的图片为0,换个关键字试试?,程序退出。' % search_name)
sys.exit(0)
else:
# 获取图片总数
print(Fore.LIGHTCYAN_EX, '\n\t当前获取到的作品数为 %d' % total)
# 计算出总页数
total_page = math.ceil(total / 60)
print(Fore.LIGHTCYAN_EX, '\n\t总页数为: %d' % total_page)
# 如果处于未登录状态则下载前十页数据
if type(cookies_dict) != dict and total_page > 10:
print(Fore.LIGHTCYAN_EX, '\n\t目前处于未登录状态,只能获取前10页数据')
total_page = 10
print(Fore.LIGHTCYAN_EX, '\n\t开始下载 1-%d 页的数据' % total_page)
# 页码队列
page_queue = Queue()
# 图片地址队列
image_queue = Queue()
# 将页码放入页码队列
for page in range(1, total_page + 1):
page_queue.put(page)
# 返回当前队列的长度
print(Fore.LIGHTMAGENTA_EX, '\n\t当前页码队列总数为: %s' % page_queue.qsize())
# 开启3个获取图片地址的线程
Picture_Threading_list = ['geturl_threading1', 'geturl_threading2', 'geturl_threading3']
Picture_Threading = []
for thread_name_page in Picture_Threading_list:
# 实例化对象
thread_page = Get_Picture_Url(thread_name_page, page_queue, image_queue, search_name, cookies_dict)
# 启动线程
thread_page.start()
Picture_Threading.append(thread_page)
# 开启3个下载线程
Download_Threading_list = ['download_threading1', 'download_threading2', 'download_threading3']
Download_Threading = []
for thread_name_parse in Download_Threading_list:
thread_parse = Download_Image(thread_name_parse, image_queue, search_name)
# 启动线程
thread_parse.start()
Download_Threading.append(thread_parse)
# 定义线程退出机制
global url_flag
while not page_queue.empty():
pass
url_flag = True
# 结束获取地址类线程
for thread_url in Picture_Threading:
thread_url.join()
print("\n\t%s 处理结束" % thread_url.thread_name)
global image_flag
while not image_queue.empty():
pass
image_flag = True
# 结束图片下载类线程
for thread_download in Download_Threading:
thread_download.join()
print('\n\t%s 处理结束' % thread_download.thread_name)
if __name__ == '__main__':
run()
跑一下,不登录的情况下,
开了几秒钟停掉了,下载了这么多,
登录的话就这样,提前准备好你的cookies
,
也是下了几秒钟就停掉了,
两次搜索的东西都是一样的,不难发现登陆之后比未登录的时候获取到的作品数要多,总之我想要的功能是实现了,其实还有一些需要优化的地方,其一是指定下载的范围,目前是下载一到最后一页,有点太暴力了,其二是某些作品里面可能会有多张图,这个目前还没做判断,只下载了第一张图,如果是多图就新建illustId
目录,将对应的图片下载到对应的illustId
目录内,目前所有图片都是堆到一起的,再就是GIF
图片也有问题,年前是没时间搞这个了,之后有时间会搞什么下载某用户的作品、排行榜之类的功能,根说Github
上有一堆有这种功能项目,我也没去了解过,总之还是自己写的用起来比较顺手,也就当练手了。
上面我并没有用爬虫代理,如果你要隐藏自己就需要设置代理了,设置方法其实很简单,就是买这种服务太贵了,我之前用过一个叫阿布云的,这个可以按小时去买,一小时一块钱,每秒请求数5
,而且那里也有怎么设置代理的文档,有兴趣可以去了解一下撒,所以,本文结束。
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。