#!/usr/bin/env python3
"""
优化的CSV处理脚本：
1. 使用pandas高效加载CSV
2. 并发下载图片
3. 根据大小分组（>10K标记为不可见）
"""

import pandas as pd
import os
import asyncio
import aiohttp
import aiofiles
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import hashlib
import time
from urllib.parse import urlparse

# 配置
CSV_FILE = "/root/180M.csv"
OUTPUT_DIR = "/root/images"
VISIBLE_DIR = os.path.join(OUTPUT_DIR, "visible")      # <= 10K
INVISIBLE_DIR = os.path.join(OUTPUT_DIR, "invisible")  # > 10K
RESULT_CSV = "/root/180M_processed.csv"
SIZE_THRESHOLD = 10 * 1024  # 10KB

# 并发配置
MAX_CONCURRENT = 100  # 并发下载数
TIMEOUT = 30  # 超时秒数
BATCH_SIZE = 2000  # 每批处理行数
SAMPLE_SIZE = 10000  # 只处理前1万条记录

def setup_dirs():
    """创建输出目录"""
    for d in [OUTPUT_DIR, VISIBLE_DIR, INVISIBLE_DIR]:
        Path(d).mkdir(parents=True, exist_ok=True)

def get_filename_from_url(url, mobile):
    """根据URL和手机号生成唯一文件名"""
    ext = ".jpg"  # 默认扩展名
    parsed = urlparse(url)
    if parsed.path:
        path_ext = os.path.splitext(parsed.path)[1]
        if path_ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']:
            ext = path_ext
    # 使用手机号作为文件名
    return f"{mobile}{ext}"

async def download_image(session, url, mobile, semaphore):
    """异步下载单个图片"""
    async with semaphore:
        try:
            filename = get_filename_from_url(url, mobile)

            async with session.get(url, timeout=aiohttp.ClientTimeout(total=TIMEOUT)) as response:
                if response.status == 200:
                    content = await response.read()
                    size = len(content)

                    # 根据大小决定保存目录
                    if size > SIZE_THRESHOLD:
                        save_dir = INVISIBLE_DIR
                        is_visible = False
                    else:
                        save_dir = VISIBLE_DIR
                        is_visible = True

                    filepath = os.path.join(save_dir, filename)
                    async with aiofiles.open(filepath, 'wb') as f:
                        await f.write(content)

                    return {
                        'mobile': mobile,
                        'success': True,
                        'size': size,
                        'is_visible': is_visible,
                        'local_path': filepath
                    }
                else:
                    return {
                        'mobile': mobile,
                        'success': False,
                        'error': f"HTTP {response.status}",
                        'size': 0,
                        'is_visible': None,
                        'local_path': None
                    }
        except asyncio.TimeoutError:
            return {
                'mobile': mobile,
                'success': False,
                'error': "Timeout",
                'size': 0,
                'is_visible': None,
                'local_path': None
            }
        except Exception as e:
            return {
                'mobile': mobile,
                'success': False,
                'error': str(e)[:50],
                'size': 0,
                'is_visible': None,
                'local_path': None
            }

async def process_batch(batch_df):
    """处理一批数据"""
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT, limit_per_host=20)

    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = []
        for _, row in batch_df.iterrows():
            url = row.get('Profile Picture Link', '')
            mobile = str(row.get('Mobile Number', ''))
            if pd.notna(url) and url and url.startswith('http'):
                tasks.append(download_image(session, url, mobile, semaphore))

        if tasks:
            results = await asyncio.gather(*tasks)
            return results
    return []

def load_csv_optimized():
    """优化加载CSV文件"""
    print("正在加载CSV文件...")
    start = time.time()

    # 只读取需要的列，指定数据类型
    dtype = {
        'Mobile Number': str,
        'Name': str,
        'Gender': str,
        'Network': str,
        'Province': str,
        'City': str,
        'Town': str,
        'Bio': str,
        'Profile Picture Link': str
    }

    # 使用on_bad_lines='skip'跳过格式错误的行
    df = pd.read_csv(
        CSV_FILE,
        dtype=dtype,
        engine='c',  # 使用C引擎，速度更快
        low_memory=False,
        on_bad_lines='skip'  # 跳过有问题的行
    )

    elapsed = time.time() - start
    print(f"CSV加载完成: {len(df)} 行, 耗时 {elapsed:.2f} 秒")
    return df

async def main():
    """主函数"""
    setup_dirs()

    # 1. 加载CSV
    df = load_csv_optimized()

    # 只处理前 SAMPLE_SIZE 条记录
    if len(df) > SAMPLE_SIZE:
        print(f"\n限制处理前 {SAMPLE_SIZE} 条记录 (总共 {len(df)} 条)")
        df = df.head(SAMPLE_SIZE).copy()

    # 2. 下载图片并分类
    print("\n开始下载图片...")
    start = time.time()

    all_results = []
    total_rows = len(df)

    # 分批处理
    for i in range(0, total_rows, BATCH_SIZE):
        batch = df.iloc[i:i+BATCH_SIZE]
        batch_num = i // BATCH_SIZE + 1
        total_batches = (total_rows + BATCH_SIZE - 1) // BATCH_SIZE

        print(f"处理批次 {batch_num}/{total_batches} (行 {i+1}-{min(i+BATCH_SIZE, total_rows)})")

        results = await process_batch(batch)
        all_results.extend(results)

        # 显示进度统计
        success_count = sum(1 for r in all_results if r['success'])
        visible_count = sum(1 for r in all_results if r.get('is_visible') == True)
        invisible_count = sum(1 for r in all_results if r.get('is_visible') == False)
        print(f"  已下载: {success_count}, 可见: {visible_count}, 不可见(>10K): {invisible_count}")

    elapsed = time.time() - start
    print(f"\n下载完成，耗时 {elapsed:.2f} 秒")

    # 3. 创建结果映射
    results_map = {r['mobile']: r for r in all_results}

    # 4. 更新DataFrame并保存
    print("\n正在保存结果...")
    df['download_success'] = df['Mobile Number'].apply(
        lambda x: results_map.get(str(x), {}).get('success', False)
    )
    df['image_size'] = df['Mobile Number'].apply(
        lambda x: results_map.get(str(x), {}).get('size', 0)
    )
    df['is_visible'] = df['Mobile Number'].apply(
        lambda x: results_map.get(str(x), {}).get('is_visible', None)
    )
    df['local_path'] = df['Mobile Number'].apply(
        lambda x: results_map.get(str(x), {}).get('local_path', None)
    )

    df.to_csv(RESULT_CSV, index=False)
    print(f"结果已保存到: {RESULT_CSV}")

    # 5. 统计汇总
    success_total = df['download_success'].sum()
    visible_total = (df['is_visible'] == True).sum()
    invisible_total = (df['is_visible'] == False).sum()

    print("\n========== 处理结果统计 ==========")
    print(f"总记录数: {len(df)}")
    print(f"下载成功: {success_total}")
    print(f"下载失败: {len(df) - success_total}")
    print(f"可见图片 (<=10K): {visible_total}")
    print(f"不可见图片 (>10K): {invisible_total}")
    print(f"可见图片目录: {VISIBLE_DIR}")
    print(f"不可见图片目录: {INVISIBLE_DIR}")

if __name__ == "__main__":
    asyncio.run(main())
