2025年8月23日星期六

使用AI写一个查询域名的python代码

 生成一个查询 .im 三字母域名的python程序 

记录已经注册的域名

记录未注册的域名

记录查询失败的域名

```

import socket

import time

import string

import os

import subprocess

from concurrent.futures import ThreadPoolExecutor, as_completed

import threading


# 全局变量和锁

queried_domains = set()

lock = threading.Lock()

QUERIED_FILE = "queried_domains.txt"

UNREGISTERED_FILE = "unregistered_3letter_im.txt"

FAILED_FILE = "failed_3letter_im.txt"


def test_network():

    """测试网络连通性"""

    print("正在测试网络连通性...")

    try:

        # 测试ping

        result = subprocess.run(["ping", "-c", "1", "whois.nic.im"], 

                              capture_output=True, text=True, timeout=5)

        if "0% packet loss" in result.stdout:

            print("✅ Ping测试通过")

        else:

            print("⚠️ Ping测试异常")

        

        # 测试端口连通性

        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        s.settimeout(10)

        s.connect(("whois.nic.im", 43))

        s.close()

        print("✅ 端口43可访问")

        return True

    except Exception as e:

        print(f"❌ 网络测试失败: {e}")

        return False


def load_queried_domains():

    """加载已查询的域名列表"""

    if os.path.exists(QUERIED_FILE):

        with open(QUERIED_FILE, "r") as f:

            for line in f:

                queried_domains.add(line.strip())

    print(f"已加载 {len(queried_domains)} 个已查询域名")


def save_queried_domain(domain):

    """保存已查询的域名到文件"""

    with lock:

        with open(QUERIED_FILE, "a") as f:

            f.write(domain + "\n")

        queried_domains.add(domain)


def generate_domains():

    """生成所有三字母域名(仅a-z)"""

    chars = string.ascii_lowercase

    all_domains = [f"{a}{b}{c}.im" for a in chars for b in chars for c in chars]

    # 过滤掉已查询的域名

    return [domain for domain in all_domains if domain not in queried_domains]


def query_whois(domain, timeout=30, max_retries=3):

    """查询WHOIS状态,返回是否未注册,支持重试"""

    retry_count = 0

    last_error = None

    

    while retry_count < max_retries:

        try:

            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            s.settimeout(timeout)

            s.connect(("whois.nic.im", 43))

            s.send((domain + "\r\n").encode())

            

            response = b""

            while True:

                data = s.recv(1024)

                if not data:

                    break

                response += data

            s.close()

            

            # 检查未注册关键词(根据实际WHOIS返回调整)

            if b"not found" in response.lower():

                return True

            elif b"no match" in response.lower():

                return True

            elif b"available" in response.lower():

                return True

            else:

                return False

                

        except socket.timeout:

            last_error = "超时"

            print(f"⏱️ 查询超时: {domain} (重试 {retry_count+1}/{max_retries})")

        except Exception as e:

            last_error = str(e)

            print(f"❌ 查询错误: {domain} - {last_error} (重试 {retry_count+1}/{max_retries})")

        

        retry_count += 1

        if retry_count < max_retries:

            # 指数退避策略

            wait_time = min(2 ** retry_count, 10)  # 最多等待10秒

            print(f"等待 {wait_time} 秒后重试...")

            time.sleep(wait_time)

    

    print(f"❌ 查询失败: {domain} - 最后错误: {last_error}")

    return None


def main():

    # 测试网络连通性

    if not test_network():

        print("网络测试失败,请检查网络连接后重试")

        return

    

    # 加载已查询域名

    load_queried_domains()

    

    # 生成待查询域名

    domains = generate_domains()

    print(f"待查询域名数量: {len(domains)}")

    

    if not domains:

        print("所有域名已查询完成")

        return

    

    unregistered = []

    failed = []

    

    # 多线程查询(保守设置)

    max_workers = 1      # 改为单线程避免触发限制

    delay = 2.0          # 增加到2秒间隔

    

    with ThreadPoolExecutor(max_workers=max_workers) as executor:

        futures = {executor.submit(query_whois, domain): domain for domain in domains}

        for future in as_completed(futures):

            domain = futures[future]

            result = future.result()

            

            # 保存已查询域名(无论成功或失败)

            save_queried_domain(domain)

            

            if result is True:

                unregistered.append(domain)

                print(f"✅ 未注册: {domain}")

                # 立即保存未注册域名

                with open(UNREGISTERED_FILE, "a") as f:

                    f.write(domain + "\n")

            elif result is False:

                print(f"❌ 已注册: {domain}")

            else:

                failed.append(domain)

                print(f"⚠️ 查询失败: {domain}")

                # 立即保存失败域名

                with open(FAILED_FILE, "a") as f:

                    f.write(domain + "\n")

            

            # 控制频率

            time.sleep(delay)

    

    print(f"\n本次查询完成!未注册域名: {len(unregistered)} | 查询失败: {len(failed)}")


if __name__ == "__main__":

    main()

```


使用AI写一个查询域名的python代码

 生成一个查询 .im 三字母域名的python程序  记录已经注册的域名 记录未注册的域名 记录查询失败的域名 ``` import socket import time import string import os import subprocess from concur...