forked from xianhu/PSpider
-
Notifications
You must be signed in to change notification settings - Fork 1
/
test.py
103 lines (79 loc) · 3.35 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# _*_ coding: utf-8 _*_
"""
test.py by xianhu
"""
import re
import spider
import random
import datetime
import requests
from bs4 import BeautifulSoup
black_patterns = (spider.CONFIG_URL_ILLEGAL_PATTERN, r"binding", r"download", )
white_patterns = (r"^http[s]{0,1}://(www\.){0,1}(zhushou\.360)\.(com|cn)", )
class MyFetcher(spider.Fetcher):
"""
fetcher module, only rewrite url_fetch()
"""
def url_fetch(self, priority: int, url: str, keys: dict, deep: int, repeat: int, proxies=None):
response = requests.get(url, params=None, headers={}, data=None, proxies=proxies, timeout=(3.05, 10))
result = (response.status_code, response.url, response.text)
# test error-logging
assert random.randint(0, 100) != 8, "error-in-fetcher"
return 1, result, 1
class MyParser(spider.Parser):
"""
parser module, only rewrite htm_parse()
"""
def htm_parse(self, priority: int, url: str, keys: dict, deep: int, content: object):
status_code, url_now, html_text = content
# test multi-processing
[BeautifulSoup(html_text, "lxml") for _ in range(10)]
url_list = []
if (self._max_deep < 0) or (deep < self._max_deep):
re_group = re.findall(r"<a.+?href=\"(?P<url>.{5,}?)\".*?>", html_text, flags=re.IGNORECASE)
url_list = [(spider.get_url_legal(_url, base_url=url), keys, priority+1) for _url in re_group]
title = re.search(r"<title>(?P<title>.+?)</title>", html_text, flags=re.IGNORECASE)
save_list = [(url, title.group("title").strip(), datetime.datetime.now()), ] if title else []
# test error-logging
assert random.randint(0, 100) != 8, "error-in-parser"
return 1, url_list, save_list
class MySaver(spider.Saver):
"""
saver module, only rewrite item_save()
"""
def item_save(self, url: str, keys: dict, item: (list, tuple)):
self._save_pipe.write("\t".join([str(col) for col in item] + [url, ]) + "\n")
self._save_pipe.flush()
return 1, []
class MyProxies(spider.Proxieser):
"""
proxies module, only rewrite proxies_get()
"""
def proxies_get(self):
response = requests.get("http://xxxx.com/proxies")
proxies_result = [{"http": "http://%s" % ipport, "https": "https://%s" % ipport} for ipport in response.text.split("\n")]
return 1, proxies_result
def test_spider():
"""
test spider
"""
# initial fetcher / parser / saver / proxieser
fetcher = MyFetcher(sleep_time=1, max_repeat=0)
parser = MyParser(max_deep=2)
saver = MySaver(save_pipe=open("out_thread.txt", "w"))
# proxieser = MyProxies(sleep_time=5)
# define url_filter
url_filter = spider.UrlFilter(black_patterns=black_patterns, white_patterns=white_patterns, capacity=None)
# initial web_spider
# web_spider = spider.WebSpider(fetcher, parser, saver, proxieser=None, url_filter=url_filter, queue_parse_size=-1)
web_spider = spider.WebSpider(fetcher, parser, saver, proxieser=None, url_filter=url_filter, queue_parse_size=100, queue_proxies_size=100)
# add start url
web_spider.set_start_url("http://zhushou.360.cn/", priority=0, keys={"type": "360"}, deep=0)
# start web_spider
web_spider.start_working(fetcher_num=20)
# wait for finished
web_spider.wait_for_finished()
return
if __name__ == "__main__":
test_spider()
exit()