生成一个查询 .im 三字母域名的python程序
记录已经注册的域名
记录未注册的域名
记录查询失败的域名
```
import socket
import time
import string
import os
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# 全局变量和锁
queried_domains = set()
lock = threading.Lock()
QUERIED_FILE = "queried_domains.txt"
UNREGISTERED_FILE = "unregistered_3letter_im.txt"
FAILED_FILE = "failed_3letter_im.txt"
def test_network():
"""测试网络连通性"""
print("正在测试网络连通性...")
try:
# 测试ping
result = subprocess.run(["ping", "-c", "1", "whois.nic.im"],
capture_output=True, text=True, timeout=5)
if "0% packet loss" in result.stdout:
print("✅ Ping测试通过")
else:
print("⚠️ Ping测试异常")
# 测试端口连通性
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(10)
s.connect(("whois.nic.im", 43))
s.close()
print("✅ 端口43可访问")
return True
except Exception as e:
print(f"❌ 网络测试失败: {e}")
return False
def load_queried_domains():
"""加载已查询的域名列表"""
if os.path.exists(QUERIED_FILE):
with open(QUERIED_FILE, "r") as f:
for line in f:
queried_domains.add(line.strip())
print(f"已加载 {len(queried_domains)} 个已查询域名")
def save_queried_domain(domain):
"""保存已查询的域名到文件"""
with lock:
with open(QUERIED_FILE, "a") as f:
f.write(domain + "\n")
queried_domains.add(domain)
def generate_domains():
"""生成所有三字母域名(仅a-z)"""
chars = string.ascii_lowercase
all_domains = [f"{a}{b}{c}.im" for a in chars for b in chars for c in chars]
# 过滤掉已查询的域名
return [domain for domain in all_domains if domain not in queried_domains]
def query_whois(domain, timeout=30, max_retries=3):
"""查询WHOIS状态,返回是否未注册,支持重试"""
retry_count = 0
last_error = None
while retry_count < max_retries:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect(("whois.nic.im", 43))
s.send((domain + "\r\n").encode())
response = b""
while True:
data = s.recv(1024)
if not data:
break
response += data
s.close()
# 检查未注册关键词(根据实际WHOIS返回调整)
if b"not found" in response.lower():
return True
elif b"no match" in response.lower():
return True
elif b"available" in response.lower():
return True
else:
return False
except socket.timeout:
last_error = "超时"
print(f"⏱️ 查询超时: {domain} (重试 {retry_count+1}/{max_retries})")
except Exception as e:
last_error = str(e)
print(f"❌ 查询错误: {domain} - {last_error} (重试 {retry_count+1}/{max_retries})")
retry_count += 1
if retry_count < max_retries:
# 指数退避策略
wait_time = min(2 ** retry_count, 10) # 最多等待10秒
print(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
print(f"❌ 查询失败: {domain} - 最后错误: {last_error}")
return None
def main():
# 测试网络连通性
if not test_network():
print("网络测试失败,请检查网络连接后重试")
return
# 加载已查询域名
load_queried_domains()
# 生成待查询域名
domains = generate_domains()
print(f"待查询域名数量: {len(domains)}")
if not domains:
print("所有域名已查询完成")
return
unregistered = []
failed = []
# 多线程查询(保守设置)
max_workers = 1 # 改为单线程避免触发限制
delay = 2.0 # 增加到2秒间隔
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(query_whois, domain): domain for domain in domains}
for future in as_completed(futures):
domain = futures[future]
result = future.result()
# 保存已查询域名(无论成功或失败)
save_queried_domain(domain)
if result is True:
unregistered.append(domain)
print(f"✅ 未注册: {domain}")
# 立即保存未注册域名
with open(UNREGISTERED_FILE, "a") as f:
f.write(domain + "\n")
elif result is False:
print(f"❌ 已注册: {domain}")
else:
failed.append(domain)
print(f"⚠️ 查询失败: {domain}")
# 立即保存失败域名
with open(FAILED_FILE, "a") as f:
f.write(domain + "\n")
# 控制频率
time.sleep(delay)
print(f"\n本次查询完成!未注册域名: {len(unregistered)} | 查询失败: {len(failed)}")
if __name__ == "__main__":
main()
```