v2 签到脚本

4 条回复
131 次浏览

cookie 只需要 A2 和 A20 相关部分,其他的都可以删除

每天八点以后签到,八点之前无法签到。

代理部分根据自己来,如果本地需要用到代理,就更改相关代理设置,如果不需要代理,注释掉就行,

推送部分可以根据自己调整,我用的企微推送的,这是一个简单的签到脚本,要想获取相关签到返回信息可以更改脚本。

使用青龙或者 Linux 定时任务运行即可。
@JoeJoeJoe

复制
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys
import re
import json
import time
import random
import gzip
import zlib
from io import BytesIO
import http.cookiejar
import urllib.request
import urllib.parse
from datetime import datetime

# ==================== 配置区域 ====================
# V2EX 认证信息(登录后从浏览器获取)
V2EX_COOKIE = ''

# 企业微信机器人配置(可选)
WECHAT_WEBHOOK_URL = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的 KEY' # 替换为你的机器人 webhook 地址

# 代理配置(如果需要)
PROXY = 'http://10.188.6.97:7890' # HTTP 代理
# PROXY = None # 不使用代理设为 None
# =================================================

V2EX_DOMAIN = 'v2ex.com'
V2EX_URL_START = 'https://' + V2EX_DOMAIN
V2EX_MISSION = V2EX_URL_START + '/mission/daily'

def send_wechat_message(content, message_type='markdown'):
    """发送消息到企业微信机器人"""
    if WECHAT_WEBHOOK_URL == 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=1111':
        print("提示:未配置企业微信机器人,跳过推送")
        return False
    
    try:
        if message_type == 'markdown':
            data = {
                "msgtype": "markdown",
                "markdown": {"content": content}
            }
        else:
            data = {
                "msgtype": "text",
                "text": {"content": content}
            }
        
        json_data = json.dumps(data).encode('utf-8')
        req = urllib.request.Request(WECHAT_WEBHOOK_URL, 
                                    data=json_data,
                                    headers={'Content-Type': 'application/json'})
        
        response = urllib.request.urlopen(req, timeout=10)
        result = json.loads(response.read().decode('utf-8'))
        
        if result.get('errcode') == 0:
            print("✅ 企业微信消息发送成功")
            return True
        else:
            print(f"❌ 企业微信消息发送失败: {result}")
            return False
            
    except Exception as e:
        print(f"❌ 发送企业微信消息时发生错误: {e}")
        return False

def format_sign_result(success, message, days=None, coins=None):
    """格式化签到结果"""
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    status_icon = "✅" if success else "❌"
    status_text = "成功" if success else "失败"
    
    md = f"""## V2EX 签到报告 {status_icon}

**签到状态**:{status_text}
**签到时间**:{current_time}
**签到结果**:{message}
"""
    
    if days:
        md += f"**连续签到**:{days} 天\n"
    if coins:
        md += f"**当前铜币**:{coins}\n"
    
    md += f"\n---\n[👉 前往 V2EX]({V2EX_URL_START})"
    return md

def random_delay(min_seconds=1, max_seconds=3):
    """随机延迟,模拟人类行为"""
    delay = random.uniform(min_seconds, max_seconds)
    time.sleep(delay)

def decompress_data(data, headers):
    """解压缩数据"""
    content_encoding = headers.get('Content-Encoding', '').lower()
    
    if not content_encoding:
        return data
    
    try:
        if content_encoding == 'gzip':
            buf = BytesIO(data)
            with gzip.GzipFile(fileobj=buf) as f:
                return f.read()
        elif content_encoding == 'deflate':
            return zlib.decompress(data)
        elif content_encoding == 'br':
            # 如果不支持 br,返回原始数据
            print("⚠️ Brotli 压缩不支持,返回原始数据")
            return data
        else:
            print(f"⚠️ 未知压缩格式: {content_encoding}")
            return data
    except Exception as e:
        print(f"⚠️ 解压失败 ({content_encoding}): {e}")
        return data

def read_response(response):
    """读取响应并返回字符串,自动处理压缩"""
    # 获取响应头
    headers = dict(response.getheaders())
    data = response.read()
    
    # 解压缩数据
    data = decompress_data(data, headers)
    
    # 尝试多种编码解码
    if isinstance(data, bytes):
        # 尝试常用编码
        for encoding in ['utf-8', 'gbk', 'gb2312', 'latin-1']:
            try:
                return data.decode(encoding)
            except UnicodeDecodeError:
                continue
        # 如果都不行,使用 ignore 模式
        return data.decode('utf-8', errors='ignore')
    
    return str(data)

def setup_opener():
    """设置 opener,包含 cookie 和代理"""
    # 创建 cookie 处理器
    cj = http.cookiejar.CookieJar()
    
    # 解析 cookie 字符串
    for cookie_item in V2EX_COOKIE.split('; '):
        if '=' in cookie_item:
            name, value = cookie_item.split('=', 1)
            # 清理值中的引号
            value = value.strip('"').strip("'")
            
            cookie_obj = http.cookiejar.Cookie(
                version=0,
                name=name.strip(),
                value=value,
                port=None,
                port_specified=False,
                domain=V2EX_DOMAIN,
                domain_specified=True,
                domain_initial_dot=False,
                path='/',
                path_specified=True,
                secure=False,
                expires=None,
                discard=False,
                comment=None,
                comment_url=None,
                rest={'HttpOnly': None},
                rfc2109=False
            )
            cj.set_cookie(cookie_obj)
    
    # 构建处理器
    handlers = [urllib.request.HTTPCookieProcessor(cj)]
    
    # 添加代理
    if PROXY:
        proxy_handler = urllib.request.ProxyHandler({
            'http': PROXY,
            'https': PROXY
        })
        handlers.append(proxy_handler)
    
    # 创建 opener
    opener = urllib.request.build_opener(*handlers)
    
    # 完善请求头,模拟真实浏览器(注意:去掉 Accept-Encoding 避免自动压缩)
    opener.addheaders = [
        ('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'),
        ('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'),
        ('Accept-Language', 'zh-CN,zh;q=0.9,en;q=0.8'),
        # 移除 Accept-Encoding,让服务器返回未压缩的内容
        # ('Accept-Encoding', 'gzip, deflate'),
        ('Connection', 'keep-alive'),
        ('Upgrade-Insecure-Requests', '1'),
        ('Cache-Control', 'max-age=0'),
        ('Referer', V2EX_MISSION)
    ]
    
    return opener

def extract_once_param(html):
    """从 HTML 中提取 once 参数"""
    # 方法 1: 查找签到按钮的 onclick 属性
    match = re.search(r"onclick=\"location\.href\s*=\s*'([^']+)'\"", html)
    if match:
        return match.group(1)
    
    # 方法 2: 直接匹配 URL 模式
    match = re.search(r'/mission/daily/redeem\?once=(\d+)', html)
    if match:
        return match.group(0)
    
    # 方法 3: 查找 input 按钮的值
    match = re.search(r'<input[^>]*value="领取[^"]*"[^>]*onclick="[^"]*location\.href\s*=\s*''([^'']+)''"', html)
    if match:
        return match.group(1)
    
    return None

def extract_user_info(html):
    """提取用户信息(连续天数、铜币等)"""
    info = {}
    
    # 提取连续签到天数
    days_match = re.search(r'已连续登录\s*(\d+)\s*天', html)
    if days_match:
        info['days'] = days_match.group(1)
    
    # 提取铜币信息
    coins_match = re.search(r'(\d+)\s*<img src="/static/img/silver@2x\.png"', html)
    if coins_match:
        info['coins'] = coins_match.group(1)
    
    return info

def test_proxy():
    """测试代理是否可用"""
    if not PROXY:
        print("📡 未使用代理")
        return True
    
    print(f"📡 测试代理: {PROXY}")
    
    try:
        # 创建测试 opener
        proxy_handler = urllib.request.ProxyHandler({
            'http': PROXY,
            'https': PROXY
        })
        opener = urllib.request.build_opener(proxy_handler)
        opener.addheaders = [('User-Agent', 'Mozilla/5.0')]
        
        # 测试访问
        test_resp = opener.open('http://httpbin.org/ip', timeout=10)
        test_data = test_resp.read().decode('utf-8')
        print(f"✅ 代理工作正常,IP 信息: {test_data}")
        return True
    except Exception as e:
        print(f"❌ 代理测试失败: {e}")
        return False

def save_debug_file(content, filename):
    """保存调试文件,确保正确编码"""
    try:
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(content)
        print(f"💾 已保存调试信息到 {filename}")
    except Exception as e:
        print(f"⚠️ 保存调试文件失败: {e}")

def sign_in():
    """执行签到"""
    if not V2EX_COOKIE or V2EX_COOKIE == 'auth=你的 auth 值':
        error_msg = "错误:请先设置正确的 V2EX_COOKIE"
        print(error_msg)
        send_wechat_message(format_sign_result(False, error_msg))
        return False
    
    try:
        print("=" * 50)
        print("🚀 开始 V2EX 签到...")
        print("=" * 50)
        
        # 测试代理
        if PROXY and not test_proxy():
            print("⚠️ 代理测试失败,尝试继续执行...")
        
        # 设置 opener
        opener = setup_opener()
        
        # 1. 先访问主页
        print("\n📄 第 1 步: 访问主页...")
        random_delay()
        home_resp = opener.open(V2EX_URL_START, timeout=15)
        home_html = read_response(home_resp)
        print(f"✅ 主页访问成功,页面大小: {len(home_html)} 字符")
        
        # 2. 访问签到页面
        print("\n📄 第 2 步: 访问签到页面...")
        random_delay()
        resp = opener.open(V2EX_MISSION, timeout=15)
        html = read_response(resp)
        print(f"✅ 签到页面访问成功,页面大小: {len(html)} 字符")
        
        # 保存原始 HTML 用于调试
        save_debug_file(html, 'v2ex_debug.html')
        
        # 检查反爬提示
        if '奇奇怪怪的设置' in html:
            print("⚠️ 检测到反爬提示,尝试继续...")
        
        # 检查是否已签到
        if '每日登录奖励已领取' in html:
            msg = "今日已签到"
            print(f"✅ {msg}")
            user_info = extract_user_info(html)
            send_wechat_message(format_sign_result(True, msg, 
                                                   user_info.get('days'), 
                                                   user_info.get('coins')))
            return True
        
        # 3. 提取 once 参数
        print("\n🔍 第 3 步: 提取签到参数...")
        once_path = extract_once_param(html)
        if not once_path:
            msg = "未找到签到参数,可能页面结构已变"
            print(f"❌ {msg}")
            send_wechat_message(format_sign_result(False, msg))
            return False
        
        print(f"✅ 找到签到参数: {once_path}")
        
        # 4. 构建签到 URL
        if once_path.startswith('http'):
            sign_url = once_path
        else:
            sign_url = V2EX_URL_START + once_path
        
        print(f"🔗 第 4 步: 签到 URL: {sign_url}")
        
        # 5. 执行签到
        print("\n🎯 第 5 步: 执行签到...")
        random_delay(2, 4) # 签到前稍长延迟
        sign_resp = opener.open(sign_url, timeout=15)
        sign_html = read_response(sign_resp)
        
        # 保存签到结果
        save_debug_file(sign_html, 'v2ex_result.html')
        
        # 6. 解析结果
        print("\n📊 第 6 步: 解析签到结果...")
        user_info = extract_user_info(sign_html)
        success = False
        message = ""
        
        if '每日登录奖励已领取' in sign_html:
            success = True
            message = "每日登录奖励已领取"
            print(f"✅ {message}")
        elif '你已连续' in sign_html:
            success = True
            days_match = re.search(r'你已连续(\d+)天', sign_html)
            days = days_match.group(1) if days_match else '?'
            message = f"已连续签到 {days} 天"
            print(f"✅ {message}")
        elif '领取' in sign_html and '铜币' in sign_html:
            success = True
            message = "签到成功"
            print(f"✅ {message}")
        else:
            # 检查页面中是否包含成功信息
            if '已领取' in sign_html or '成功' in sign_html:
                success = True
                message = "签到成功"
                print(f"✅ {message}")
            else:
                message = "签到完成,请手动检查结果"
                print(f"⚠️ {message}")
        
        # 7. 推送结果
        print("\n📱 第 7 步: 推送签到结果...")
        push_result = send_wechat_message(format_sign_result(success, message, 
                                                           user_info.get('days'), 
                                                           user_info.get('coins')))
        if push_result:
            print("✅ 结果推送成功")
        
        print("\n" + "=" * 50)
        print(f"🏁 签到{'成功' if success else '完成'}")
        print("=" * 50)
        
        return success
        
    except urllib.error.URLError as e:
        error_msg = f"网络错误: {e.reason}"
        print(f"\n❌ {error_msg}")
        if hasattr(e, 'code'):
            error_msg += f" (HTTP {e.code})"
        send_wechat_message(format_sign_result(False, error_msg))
        return False
    except Exception as e:
        error_msg = f"{type(e).__name__}: {e}"
        print(f"\n❌ 发生错误: {error_msg}")
        import traceback
        traceback.print_exc()
        send_wechat_message(format_sign_result(False, error_msg))
        return False

if __name__ == '__main__':
    print("=" * 50)
    print("V2EX 自动签到脚本 v2.1")
    print("=" * 50)
    
    # 执行签到
    success = sign_in()
    
    print("\n 脚本执行完毕")
    sys.exit(0 if success else 1)

发表一个评论

R保持