-
Notifications
You must be signed in to change notification settings - Fork 0
/
Crt_Subdomain_Fetcher.py
127 lines (89 loc) · 3.95 KB
/
Crt_Subdomain_Fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import requests
import argparse
import os
import concurrent.futures
def fetch_subdomains(domain, result_file):
url = f"https://crt.sh/?q={domain}&exclude=expired&group=none&output=json"
try:
print(f"正在处理:{domain}")
response = requests.get(url, timeout=20)
response.raise_for_status()
json_data = response.json()
subdomains = set()
for entry in json_data:
for name in entry["name_value"].splitlines():
if name != domain:
subdomains.add(name)
# 将子域名添加到文件末尾
with open(result_file, mode='a', encoding='utf-8') as file:
for subdomain in subdomains:
file.write(f"{subdomain}\n")
except requests.exceptions.RequestException as e:
print(f"Error fetching {domain}: {e}")
def separate_subdomains(filename, wildcard_file='wildcard_subdomains.txt',
non_wildcard_file='non_wildcard_subdomains.txt'):
current_dir = os.getcwd()
file_path = os.path.join(current_dir, filename)
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
# 保存所有子域名
with open(wildcard_file, mode='w', encoding='utf-8') as wildcard_f, \
open(non_wildcard_file, mode='w', encoding='utf-8') as non_wildcard_f:
for line in sorted(set(line.strip() for line in lines)):
if '*' in line:
wildcard_f.write(f"{line}\n")
else:
non_wildcard_f.write(f"{line}\n")
print(f"包含通配符的子域名已写入 {os.path.join(current_dir, wildcard_file)}")
print(f"不包含通配符的子域名已写入 {os.path.join(current_dir, non_wildcard_file)}")
def remove_duplicates(filename):
current_dir = os.getcwd()
file_path = os.path.join(current_dir, filename)
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
unique_lines = sorted(set(line.strip() for line in lines))
with open(file_path, mode='w', encoding='utf-8') as file:
for line in unique_lines:
file.write(f"{line}\n")
print(f"去重后的数据已写入 {file_path}")
# 调用函数分离子域名
separate_subdomains(filename)
def main():
parser = argparse.ArgumentParser(description='Fetch and save subdomains data from crt.sh for given domains.')
parser.add_argument('-t', type=str, help='Single domain target')
parser.add_argument('-f', type=str, help='File containing list of domains, one per line')
parser.add_argument('-threads', type=int, default=5, help='Number of threads to use (default: 5)')
args = parser.parse_args()
if args.t is None and args.f is None:
parser.print_help()
return
domains = set()
if args.t:
domains.add(args.t)
if args.f:
try:
with open(args.f, 'r') as file:
file_domains = {line.strip() for line in file if line.strip()}
domains.update(file_domains)
except FileNotFoundError:
print(f"文件未找到: {args.f}")
return
result_file = "out.txt"
if os.path.exists(result_file):
os.remove(result_file)
total_domains = len(domains)
completed_domains = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
future_to_domain = {executor.submit(fetch_subdomains, domain, result_file): domain for domain in domains}
for future in concurrent.futures.as_completed(future_to_domain):
domain = future_to_domain[future]
try:
future.result()
except Exception as exc:
print(f"{domain} 生成异常: {exc}")
completed_domains += 1
percent_complete = (completed_domains / total_domains) * 100
print(f"进度: {percent_complete:.2f}%", end='\r')
remove_duplicates(result_file)
if __name__ == "__main__":
main()