#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 图片迁移脚本 - 通过SSH或本地目录迁移图片到远程服务器 保持相对路径结构一致 """ import argparse import json import os import shutil import sys import tempfile import time from typing import Any, Dict, List, Optional, Tuple # paramiko 导入被延迟到实际需要时 paramiko: Any = None def ensure_paramiko() -> None: """确保paramiko已导入,如果未安装则报错""" global paramiko if paramiko is None: try: import paramiko as paramiko_module paramiko = paramiko_module except ImportError: print("错误: 需要安装paramiko库") print("请运行: pip install paramiko") sys.exit(1) class ServerConfig: """服务器配置(支持SSH或本地目录)""" def __init__( self, config_type: str = "ssh", # "ssh" 或 "local" host: str = "", port: int = 22, username: Optional[str] = None, password: Optional[str] = None, key_file: Optional[str] = None, base_dir: Optional[str] = None, ): """ 初始化服务器配置 Args: config_type: 配置类型,'ssh' 表示SSH服务器,'local' 表示本地目录 host: 服务器地址(对于local类型可为None) port: 服务器端口(仅SSH类型) username: 用户名(仅SSH类型) password: 密码(仅SSH类型) key_file: 私钥文件路径(仅SSH类型) base_dir: 基础目录路径 """ self.config_type = config_type self.host = host self.port = port self.username = username self.password = password self.key_file = key_file self.base_dir = base_dir.rstrip("/") if base_dir else "" def to_dict(self) -> Dict: """转换为字典""" return { "type": self.config_type, "host": self.host, "port": self.port, "username": self.username, "password": self.password, "key_file": self.key_file, "base_dir": self.base_dir, } @classmethod def from_dict(cls, data: Dict) -> "ServerConfig": """从字典创建""" ssh_data = data.get("ssh", {}) return cls( config_type=data.get("type", "ssh"), host=ssh_data.get("host", "localhost"), port=ssh_data.get("port", 22), username=ssh_data.get("username", ""), password=ssh_data.get("password", ""), key_file=ssh_data.get("key_file", ""), base_dir=data.get("base_dir", ""), ) def is_local(self) -> bool: """是否是本地目录配置""" return self.config_type == "local" def __str__(self) -> str: if self.is_local(): return f"LocalDirectory(base_dir={self.base_dir})" else: return f"SSHServer(host={self.host}:{self.port}, user={self.username}, base_dir={self.base_dir})" class ImageMigrator: """图片迁移器""" def __init__( self, source_config: ServerConfig, target_config: ServerConfig, verbose: bool = False, overwrite: bool = False, ) -> None: """ 初始化迁移器 Args: source_config: 源服务器配置(可以是本地目录或SSH服务器) target_config: 目标服务器配置(必须为SSH服务器) verbose: 是否显示详细输出 overwrite: 是否覆盖已存在的文件 """ self.source_config = source_config self.target_config = target_config self.verbose = verbose self.overwrite = overwrite # SSH客户端连接(仅用于SSH类型的服务器) self.source_client: Any = None self.target_client: Any = None self.source_sftp: Any = None self.target_sftp: Any = None # 统计信息 self.stats = { "total": 0, "success": 0, "failed": 0, "skipped": 0, "bytes_transferred": 0, } def connect(self) -> bool: """连接到服务器""" try: # 确保paramiko已导入 ensure_paramiko() # 连接目标服务器(必须为SSH) if self.target_config.is_local(): print("错误: 目标服务器必须为SSH服务器,不能是本地目录") return False self.target_client = paramiko.SSHClient() self.target_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.verbose: print( f"连接到目标服务器: {self.target_config.host}:{self.target_config.port}" ) if self.target_config.key_file: key = paramiko.RSAKey.from_private_key_file(self.target_config.key_file) self.target_client.connect( hostname=self.target_config.host, port=self.target_config.port, username=self.target_config.username, pkey=key, ) else: self.target_client.connect( hostname=self.target_config.host, port=self.target_config.port, username=self.target_config.username, password=self.target_config.password, ) self.target_sftp = self.target_client.open_sftp() # 检查SFTP服务器的工作目录 if self.verbose: try: cwd = self.target_sftp.getcwd() print(f"DEBUG: 目标SFTP服务器当前工作目录: {cwd}") except Exception as e: print(f"DEBUG: 无法获取目标SFTP服务器工作目录: {e}") # 连接源服务器(如果是SSH类型) if not self.source_config.is_local(): self.source_client = paramiko.SSHClient() self.source_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if self.verbose: print( f"连接到源服务器: {self.source_config.host}:{self.source_config.port}" ) if self.source_config.key_file: key = paramiko.RSAKey.from_private_key_file( self.source_config.key_file ) self.source_client.connect( hostname=self.source_config.host, port=self.source_config.port, username=self.source_config.username, pkey=key, ) else: self.source_client.connect( hostname=self.source_config.host, port=self.source_config.port, username=self.source_config.username, password=self.source_config.password, ) self.source_sftp = self.source_client.open_sftp() if self.verbose: source_type = ( "本地目录" if self.source_config.is_local() else "SSH服务器" ) print(f"连接成功! 源: {source_type}, 目标: SSH服务器") return True except Exception as e: print(f"连接失败: {e}") self.close() return False def close(self) -> None: """关闭所有连接""" if self.source_sftp: self.source_sftp.close() if self.source_client: self.source_client.close() if self.target_sftp: self.target_sftp.close() if self.target_client: self.target_client.close() if self.verbose: print("连接已关闭") def ensure_target_directory(self, remote_dir: str) -> bool: """确保目标目录存在(递归创建)""" try: if self.verbose: print(f"DEBUG ensure_target_directory: 检查/创建目录: {remote_dir}") # 确保路径以斜杠开头(绝对路径) if not remote_dir.startswith("/"): remote_dir = "/" + remote_dir if self.verbose: print( f"DEBUG ensure_target_directory: 转换为绝对路径: {remote_dir}" ) # 首先检查目录是否已经存在 try: # 尝试列出目录内容 self.target_sftp.listdir(remote_dir) if self.verbose: print(f"DEBUG ensure_target_directory: 目录已存在: {remote_dir}") return True except (FileNotFoundError, IOError): # 目录不存在,需要递归创建 if self.verbose: print( f"DEBUG ensure_target_directory: 目录不存在,开始递归创建: {remote_dir}" ) # 分割路径为各个组件,保留空字符串以处理根目录 parts = [p for p in remote_dir.split("/") if p] # 从根目录开始逐级创建 current_path = "" for i, part in enumerate(parts): # 构建当前路径(确保以斜杠开头) if current_path: current_path = f"{current_path}/{part}" else: current_path = f"/{part}" # 检查当前路径是否存在 try: self.target_sftp.listdir(current_path) if self.verbose: print( f"DEBUG ensure_target_directory: 路径已存在: {current_path}" ) except (FileNotFoundError, IOError): # 当前路径不存在,尝试创建 if self.verbose: print( f"DEBUG ensure_target_directory: 创建目录: {current_path}" ) try: self.target_sftp.mkdir(current_path) if self.verbose: print( f"DEBUG ensure_target_directory: 目录创建成功: {current_path}" ) except Exception as mkdir_e: # 如果创建失败,可能是权限问题或目录已存在 if self.verbose: print( f"DEBUG ensure_target_directory: 创建目录时出错 {current_path}: {mkdir_e}" ) # 尝试检查目录是否真的不存在 try: self.target_sftp.listdir(current_path) if self.verbose: print( f"DEBUG ensure_target_directory: 目录实际上已存在: {current_path}" ) except Exception as listdir_e: # 目录确实不存在且创建失败 print(f"错误: 无法创建目录 {current_path}: {listdir_e}") return False # 最终确认目录是否创建成功 try: self.target_sftp.listdir(remote_dir) if self.verbose: print( f"DEBUG ensure_target_directory: 目录创建完成: {remote_dir}" ) return True except Exception as final_check_e: if self.verbose: print( f"DEBUG ensure_target_directory: 最终检查失败 {remote_dir}: {final_check_e}" ) return False except Exception as e: print(f"创建目录失败 {remote_dir}: {e}") if self.verbose: import traceback traceback.print_exc() return False def get_source_file_info(self, source_path: str) -> Tuple[bool, Optional[int]]: """ 获取源文件信息 Returns: Tuple[exists: bool, size: Optional[int]] """ try: if self.source_config.is_local(): # 本地文件 if self.verbose: print(f"DEBUG: 检查本地文件路径: {source_path}") print(f"DEBUG: 当前工作目录: {os.getcwd()}") print(f"DEBUG: 绝对路径: {os.path.abspath(source_path)}") print(f"DEBUG: 文件是否存在: {os.path.exists(source_path)}") if os.path.exists(source_path): if self.verbose: print(f"DEBUG: 文件大小: {os.path.getsize(source_path)} bytes") return True, os.path.getsize(source_path) else: if self.verbose: print("DEBUG: 文件不存在!") return False, 0 else: # SSH远程文件 source_attr = self.source_sftp.stat(source_path) return True, source_attr.st_size except Exception as e: if self.verbose: print(f"DEBUG: 获取文件信息时出错: {e}") return False, 0 def read_source_file(self, source_path: str, temp_path: str) -> bool: """ 读取源文件到临时文件 Returns: bool: 是否成功 """ try: if self.source_config.is_local(): # 从本地文件复制 if self.verbose: print("DEBUG read_source_file: 开始复制文件") print(f"DEBUG read_source_file: 源路径: {source_path}") print(f"DEBUG read_source_file: 临时路径: {temp_path}") print( f"DEBUG read_source_file: 源文件是否存在: {os.path.exists(source_path)}" ) print( f"DEBUG read_source_file: 源文件绝对路径: {os.path.abspath(source_path)}" ) shutil.copy2(source_path, temp_path) if self.verbose: print("DEBUG read_source_file: 复制完成") print( f"DEBUG read_source_file: 临时文件是否存在: {os.path.exists(temp_path)}" ) if os.path.exists(temp_path): print( f"DEBUG read_source_file: 临时文件大小: {os.path.getsize(temp_path)} bytes" ) return True else: # 从SSH服务器下载 self.source_sftp.get(source_path, temp_path) return True except Exception as e: with open("fails.txt", "a", encoding="utf-8") as file: file.write(f"{source_path}\n") print(f"读取源文件失败 {source_path}: {e}") import traceback traceback.print_exc() return False def transfer_file(self, relative_path: str) -> bool: """ 传输单个文件 Args: relative_path: 相对路径(相对于基础目录) Returns: bool: 是否成功 """ try: # 构建完整路径 source_path = f"{self.source_config.base_dir}/{relative_path}" target_path = f"{self.target_config.base_dir}/{relative_path}" if self.verbose: source_type = "本地" if self.source_config.is_local() else "远程" print(f"传输: {relative_path}") print(f" 源({source_type}): {source_path}") print(f" 目标(远程): {target_path}") # 检查源文件是否存在并获取大小 source_exists, file_size = self.get_source_file_info(source_path) if not source_exists: print(f"错误: 源文件不存在: {source_path}") self.stats["failed"] += 1 return False # 检查目标文件是否已存在(根据overwrite选项处理) try: self.target_sftp.stat(target_path) if not self.overwrite: if self.verbose: print(f"跳过: 目标文件已存在: {target_path}") self.stats["skipped"] += 1 return True else: if self.verbose: print(f"覆盖: 目标文件已存在: {target_path}") except FileNotFoundError: if self.verbose: print(f"DEBUG: 目标文件不存在,将创建新文件: {target_path}") except Exception as e: if self.verbose: print(f"DEBUG: 检查目标文件时出错: {e}") # 确保目标目录存在 target_dir = os.path.dirname(target_path) if self.verbose: print(f"DEBUG: 目标目录: {target_dir}") print(f"DEBUG: 目标路径: {target_path}") if target_dir and not self.ensure_target_directory(target_dir): print(f"错误: 无法创建目标目录: {target_dir}") self.stats["failed"] += 1 return False # 传输文件 start_time = time.time() temp_path = None try: # 创建临时文件 with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_path = temp_file.name if self.verbose: print(f"DEBUG: 创建临时文件: {temp_path}") # 读取源文件到临时文件 if not self.read_source_file(source_path, temp_path): self.stats["failed"] += 1 return False # 上传到目标服务器 if self.verbose: print(f"DEBUG: 开始上传到目标服务器: {temp_path} -> {target_path}") print(f"DEBUG: 临时文件大小: {os.path.getsize(temp_path)} bytes") print(f"DEBUG: 临时文件是否存在: {os.path.exists(temp_path)}") # 尝试列出目标目录内容 try: target_dir = os.path.dirname(target_path) dir_list = self.target_sftp.listdir(target_dir) print(f"DEBUG: 目标目录内容: {dir_list}") except Exception as e: print(f"DEBUG: 无法列出目标目录内容 {target_dir}: {e}") # 尝试检查目录是否存在 try: self.target_sftp.stat(target_dir) print(f"DEBUG: 但目录stat成功: {target_dir}") except Exception as stat_e: print(f"DEBUG: 目录stat也失败: {stat_e}") self.target_sftp.put(temp_path, target_path) if self.verbose: print("DEBUG: 上传完成") except Exception as e: if self.verbose: print(f"DEBUG: 上传过程中出错: {e}") import traceback traceback.print_exc() raise finally: # 清理临时文件 if temp_path and os.path.exists(temp_path): try: if self.verbose: print(f"DEBUG: 清理临时文件: {temp_path}") os.remove(temp_path) except Exception as e: if self.verbose: print(f"DEBUG: 清理临时文件时出错: {e}") # 记录统计 transfer_time = time.time() - start_time speed = (file_size or 0) / transfer_time / 1024 if transfer_time > 0 else 0 if self.verbose: print( f" 完成: {file_size} bytes, 耗时: {transfer_time:.2f}s, 速度: {speed:.2f} KB/s" ) self.stats["success"] += 1 self.stats["bytes_transferred"] += file_size or 0 return True except Exception as e: print(f"传输失败 {relative_path}: {e}") if self.verbose: import traceback traceback.print_exc() self.stats["failed"] += 1 return False def migrate_images(self, image_paths: List[str]) -> Dict[str, int]: """ 迁移图片列表 Args: image_paths: 相对路径列表 Returns: Dict: 统计信息 """ if not self.connect(): return self.stats try: self.stats["total"] = len(image_paths) source_type = "本地目录" if self.source_config.is_local() else "SSH服务器" print(f"开始从{source_type}迁移 {len(image_paths)} 个图片到SSH服务器...") for i, relative_path in enumerate(image_paths, 1): # 清理路径 relative_path = relative_path.strip() if not relative_path: continue # 显示进度 print(f"[{i}/{len(image_paths)}] {relative_path}", end="") # 传输文件 success = self.transfer_file(relative_path) if success: print(" ✓") else: print(" ✗") # 打印统计信息 print("\n" + "=" * 50) print("迁移完成!") print(f"总计: {self.stats['total']}") print(f"成功: {self.stats['success']}") print(f"失败: {self.stats['failed']}") print(f"跳过: {self.stats['skipped']}") print(f"传输字节: {self.stats['bytes_transferred']:,}") return self.stats finally: self.close() def load_config(config_file: str) -> Dict: """加载配置文件""" try: with open(config_file, "r", encoding="utf-8") as f: config = json.load(f) return config except Exception as e: print(f"加载配置文件失败 {config_file}: {e}") return {} def save_config(config_file: str, config: Dict): """保存配置文件""" try: with open(config_file, "w", encoding="utf-8") as f: json.dump(config, f, indent=2, ensure_ascii=False) print(f"配置文件已保存: {config_file}") except Exception as e: print(f"保存配置文件失败 {config_file}: {e}") def create_example_config(config_file: str): """创建示例配置文件""" example_config = { "source": { "type": "local", # 可以是 "local" 或 "ssh" "base_dir": "/path/to/local/images", # 如果是SSH类型,需要以下配置 "ssh": { "host": "source-server.example.com", "port": 22, "username": "username", "password": "your_password_here", "key_file": "/path/to/private/key", }, }, "target": { "type": "ssh", # 目标必须是SSH类型 "base_dir": "/var/www/html/images", "ssh": { "host": "target-server.example.com", "port": 22, "username": "username", "password": "your_password_here", "key_file": "/path/to/private/key", }, }, } save_config(config_file, example_config) print("已创建示例配置文件") print("注意: 源服务器可以是本地目录(local)或SSH服务器(ssh)") print(" 目标服务器必须是SSH服务器(ssh)") def read_image_paths(image_list_file: str) -> List[str]: """从文件读取图片路径列表""" paths = [] not_images = [] try: with open(image_list_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line and not line.startswith("#"): if not line.endswith((".png", ".jpeg", ".jpg", ".gif", ".webp")): not_images.append(line) else: paths.append(line) print(f"从文件中读取到 {len(paths)} 个图片路径数据行") print(f"从文件中读取到 {len(not_images)} 个非图片路径数据行") except Exception as e: print(f"读取图片路径文件失败 {image_list_file}: {e}") return paths def main(): parser = argparse.ArgumentParser( description="从本地目录或SSH服务器迁移图片到远程SSH服务器,保持相对路径结构", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 使用配置文件迁移(源为本地目录) python image_migrate.py --config config.json --input images.txt # 使用配置文件迁移(源为SSH服务器) # 配置文件中 source.type="ssh" # 直接指定参数迁移(源为本地目录) python image_migrate.py \\ --source-type local --source-dir /path/to/local/images \\ --target-host dst.example.com --target-user user2 --target-dir /home/user/images \\ --input images.txt # 直接指定参数迁移(源为SSH服务器) python image_migrate.py \\ --source-type ssh --source-host src.example.com --source-user user1 --source-dir /var/www/images \\ --target-host dst.example.com --target-user user2 --target-dir /home/user/images \\ --input images.txt # 创建示例配置文件 python image_migrate.py --create-config config.json # 从命令行直接指定图片路径 python image_migrate.py --config config.json images/product1.jpg images/product2.jpg """, ) # 配置文件选项 parser.add_argument("--config", help="配置文件路径") parser.add_argument("--create-config", metavar="FILE", help="创建示例配置文件") # 源服务器选项 parser.add_argument( "--source-type", choices=["local", "ssh"], default="local", help="源服务器类型: local(本地目录) 或 ssh(SSH服务器),默认: local", ) parser.add_argument("--source-host", help="源服务器地址(仅SSH类型需要)") parser.add_argument( "--source-port", type=int, default=22, help="源服务器端口(仅SSH类型,默认: 22)", ) parser.add_argument("--source-user", help="源服务器用户名(仅SSH类型需要)") parser.add_argument("--source-password", help="源服务器密码(仅SSH类型需要)") parser.add_argument("--source-key", help="源服务器私钥文件路径(仅SSH类型需要)") parser.add_argument("--source-dir", help="源服务器图片基础目录") # 目标服务器选项 parser.add_argument("--target-host", help="目标服务器地址") parser.add_argument( "--target-port", type=int, default=22, help="目标服务器端口(默认: 22)" ) parser.add_argument("--target-user", help="目标服务器用户名") parser.add_argument("--target-password", help="目标服务器密码") parser.add_argument("--target-key", help="目标服务器私钥文件路径") parser.add_argument("--target-dir", help="目标服务器图片基础目录") # 输入选项 parser.add_argument( "--input", "-i", help="包含图片路径列表的文件(每行一个相对路径)" ) parser.add_argument("--verbose", "-v", action="store_true", help="显示详细输出") parser.add_argument("--overwrite", action="store_true", help="覆盖已存在的文件") # 直接传递图片路径 parser.add_argument("image_paths", nargs="*", help="直接指定图片相对路径") args = parser.parse_args() # 创建示例配置文件 if args.create_config: create_example_config(args.create_config) return # 检查必要参数(如果不是创建配置文件) if not args.config and not ( args.source_dir and args.target_host and args.target_user and args.target_dir ): print("错误: 必须提供配置文件或指定必要的服务器参数") print("必要参数: --source-dir, --target-host, --target-user, --target-dir") parser.print_help() sys.exit(1) # 加载配置 config = {} if args.config: config = load_config(args.config) if not config: print("错误: 无法加载配置文件") sys.exit(1) # 构建源服务器配置 source_config_data = config.get("source", {}) # 命令行参数覆盖配置文件 if args.source_type: source_config_data["type"] = args.source_type # 如果是SSH类型,需要SSH配置 if args.source_type == "ssh" or source_config_data.get("type") == "ssh": ssh_config = source_config_data.get("ssh", {}) if args.source_host: ssh_config["host"] = args.source_host if args.source_port: ssh_config["port"] = args.source_port if args.source_user: ssh_config["username"] = args.source_user if args.source_password: ssh_config["password"] = args.source_password if args.source_key: ssh_config["key_file"] = args.source_key source_config_data["ssh"] = ssh_config # 检查必要的SSH参数 if not ssh_config.get("host"): print("错误: 源服务器为SSH类型,必须指定 --source-host") sys.exit(1) if not ssh_config.get("username"): print("错误: 源服务器为SSH类型,必须指定 --source-user") sys.exit(1) else: # 本地类型,不需要SSH配置 source_config_data.pop("ssh", None) # 设置基础目录 if args.source_dir: source_config_data["base_dir"] = args.source_dir elif "base_dir" not in source_config_data: print("错误: 必须指定源服务器图片基础目录 (--source-dir)") sys.exit(1) # 构建目标服务器配置 target_config_data = config.get("target", {}) target_config_data["type"] = "ssh" # 目标必须是SSH ssh_config = target_config_data.get("ssh", {}) # 命令行参数覆盖配置文件 if args.target_host: ssh_config["host"] = args.target_host if args.target_port: ssh_config["port"] = args.target_port if args.target_user: ssh_config["username"] = args.target_user if args.target_password: ssh_config["password"] = args.target_password if args.target_key: ssh_config["key_file"] = args.target_key target_config_data["ssh"] = ssh_config # 设置基础目录 if args.target_dir: target_config_data["base_dir"] = args.target_dir elif "base_dir" not in target_config_data: print("错误: 必须指定目标服务器图片基础目录 (--target-dir)") sys.exit(1) # 检查必要的目标SSH参数 if not ssh_config.get("host"): print("错误: 必须指定目标服务器地址 (--target-host)") sys.exit(1) if not ssh_config.get("username"): print("错误: 必须指定目标服务器用户名 (--target-user)") sys.exit(1) # 创建服务器配置对象 try: source_config = ServerConfig.from_dict(source_config_data) target_config = ServerConfig.from_dict(target_config_data) if args.verbose: print(f"源服务器配置: {source_config}") print(f"目标服务器配置: {target_config}") except Exception as e: print(f"创建服务器配置失败: {e}") sys.exit(1) # 获取图片路径列表 image_paths = [] # 从文件读取 if args.input: file_paths = read_image_paths(args.input) image_paths.extend(file_paths) # 从命令行参数添加 if args.image_paths: image_paths.extend(args.image_paths) if not image_paths: print("错误: 没有指定要迁移的图片路径") print("请使用 --input 指定文件或直接在命令行提供路径") sys.exit(1) origin_paths = image_paths # 去重 image_paths = list(set(image_paths)) repeats = len(origin_paths) - len(image_paths) if repeats > 0: print(f"从文件中读取到 {repeats} 个重复图片路径数据行") # 创建并运行迁移器 migrator = ImageMigrator( source_config=source_config, target_config=target_config, verbose=args.verbose, overwrite=args.overwrite, ) # 执行迁移 stats = migrator.migrate_images(image_paths) # 如果有失败,返回错误代码 if stats["failed"] > 0: sys.exit(1) if __name__ == "__main__": main()