Files
orico-official-website/scripts/image_migrate.py
jsasg 33929b8284
Some checks failed
Gitea Actions Official-website / deploy-dev (push) Failing after 3s
refactor: 图片迁移python脚本
2025-12-22 10:09:39 +08:00

913 lines
33 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
图片迁移脚本 - 通过SSH或本地目录迁移图片到远程服务器
保持相对路径结构一致
"""
import argparse
import json
import os
import shutil
import sys
import tempfile
import time
from typing import Any, Dict, List, Optional, Tuple
# paramiko 导入被延迟到实际需要时
paramiko: Any = None
def ensure_paramiko() -> None:
"""确保paramiko已导入如果未安装则报错"""
global paramiko
if paramiko is None:
try:
import paramiko as paramiko_module
paramiko = paramiko_module
except ImportError:
print("错误: 需要安装paramiko库")
print("请运行: pip install paramiko")
sys.exit(1)
class ServerConfig:
"""服务器配置支持SSH或本地目录"""
def __init__(
self,
config_type: str = "ssh", # "ssh" 或 "local"
host: str = "",
port: int = 22,
username: Optional[str] = None,
password: Optional[str] = None,
key_file: Optional[str] = None,
base_dir: Optional[str] = None,
):
"""
初始化服务器配置
Args:
config_type: 配置类型,'ssh' 表示SSH服务器'local' 表示本地目录
host: 服务器地址对于local类型可为None
port: 服务器端口仅SSH类型
username: 用户名仅SSH类型
password: 密码仅SSH类型
key_file: 私钥文件路径仅SSH类型
base_dir: 基础目录路径
"""
self.config_type = config_type
self.host = host
self.port = port
self.username = username
self.password = password
self.key_file = key_file
self.base_dir = base_dir.rstrip("/") if base_dir else ""
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"type": self.config_type,
"host": self.host,
"port": self.port,
"username": self.username,
"password": self.password,
"key_file": self.key_file,
"base_dir": self.base_dir,
}
@classmethod
def from_dict(cls, data: Dict) -> "ServerConfig":
"""从字典创建"""
ssh_data = data.get("ssh", {})
return cls(
config_type=data.get("type", "ssh"),
host=ssh_data.get("host", "localhost"),
port=ssh_data.get("port", 22),
username=ssh_data.get("username", ""),
password=ssh_data.get("password", ""),
key_file=ssh_data.get("key_file", ""),
base_dir=data.get("base_dir", ""),
)
def is_local(self) -> bool:
"""是否是本地目录配置"""
return self.config_type == "local"
def __str__(self) -> str:
if self.is_local():
return f"LocalDirectory(base_dir={self.base_dir})"
else:
return f"SSHServer(host={self.host}:{self.port}, user={self.username}, base_dir={self.base_dir})"
class ImageMigrator:
"""图片迁移器"""
def __init__(
self,
source_config: ServerConfig,
target_config: ServerConfig,
verbose: bool = False,
overwrite: bool = False,
) -> None:
"""
初始化迁移器
Args:
source_config: 源服务器配置可以是本地目录或SSH服务器
target_config: 目标服务器配置必须为SSH服务器
verbose: 是否显示详细输出
overwrite: 是否覆盖已存在的文件
"""
self.source_config = source_config
self.target_config = target_config
self.verbose = verbose
self.overwrite = overwrite
# SSH客户端连接仅用于SSH类型的服务器
self.source_client: Any = None
self.target_client: Any = None
self.source_sftp: Any = None
self.target_sftp: Any = None
# 统计信息
self.stats = {
"total": 0,
"success": 0,
"failed": 0,
"skipped": 0,
"bytes_transferred": 0,
}
def connect(self) -> bool:
"""连接到服务器"""
try:
# 确保paramiko已导入
ensure_paramiko()
# 连接目标服务器必须为SSH
if self.target_config.is_local():
print("错误: 目标服务器必须为SSH服务器不能是本地目录")
return False
self.target_client = paramiko.SSHClient()
self.target_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.verbose:
print(
f"连接到目标服务器: {self.target_config.host}:{self.target_config.port}"
)
if self.target_config.key_file:
key = paramiko.RSAKey.from_private_key_file(self.target_config.key_file)
self.target_client.connect(
hostname=self.target_config.host,
port=self.target_config.port,
username=self.target_config.username,
pkey=key,
)
else:
self.target_client.connect(
hostname=self.target_config.host,
port=self.target_config.port,
username=self.target_config.username,
password=self.target_config.password,
)
self.target_sftp = self.target_client.open_sftp()
# 检查SFTP服务器的工作目录
if self.verbose:
try:
cwd = self.target_sftp.getcwd()
print(f"DEBUG: 目标SFTP服务器当前工作目录: {cwd}")
except Exception as e:
print(f"DEBUG: 无法获取目标SFTP服务器工作目录: {e}")
# 连接源服务器如果是SSH类型
if not self.source_config.is_local():
self.source_client = paramiko.SSHClient()
self.source_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.verbose:
print(
f"连接到源服务器: {self.source_config.host}:{self.source_config.port}"
)
if self.source_config.key_file:
key = paramiko.RSAKey.from_private_key_file(
self.source_config.key_file
)
self.source_client.connect(
hostname=self.source_config.host,
port=self.source_config.port,
username=self.source_config.username,
pkey=key,
)
else:
self.source_client.connect(
hostname=self.source_config.host,
port=self.source_config.port,
username=self.source_config.username,
password=self.source_config.password,
)
self.source_sftp = self.source_client.open_sftp()
if self.verbose:
source_type = (
"本地目录" if self.source_config.is_local() else "SSH服务器"
)
print(f"连接成功! 源: {source_type}, 目标: SSH服务器")
return True
except Exception as e:
print(f"连接失败: {e}")
self.close()
return False
def close(self) -> None:
"""关闭所有连接"""
if self.source_sftp:
self.source_sftp.close()
if self.source_client:
self.source_client.close()
if self.target_sftp:
self.target_sftp.close()
if self.target_client:
self.target_client.close()
if self.verbose:
print("连接已关闭")
def ensure_target_directory(self, remote_dir: str) -> bool:
"""确保目标目录存在(递归创建)"""
try:
if self.verbose:
print(f"DEBUG ensure_target_directory: 检查/创建目录: {remote_dir}")
# 确保路径以斜杠开头(绝对路径)
if not remote_dir.startswith("/"):
remote_dir = "/" + remote_dir
if self.verbose:
print(
f"DEBUG ensure_target_directory: 转换为绝对路径: {remote_dir}"
)
# 首先检查目录是否已经存在
try:
# 尝试列出目录内容
self.target_sftp.listdir(remote_dir)
if self.verbose:
print(f"DEBUG ensure_target_directory: 目录已存在: {remote_dir}")
return True
except (FileNotFoundError, IOError):
# 目录不存在,需要递归创建
if self.verbose:
print(
f"DEBUG ensure_target_directory: 目录不存在,开始递归创建: {remote_dir}"
)
# 分割路径为各个组件,保留空字符串以处理根目录
parts = [p for p in remote_dir.split("/") if p]
# 从根目录开始逐级创建
current_path = ""
for i, part in enumerate(parts):
# 构建当前路径(确保以斜杠开头)
if current_path:
current_path = f"{current_path}/{part}"
else:
current_path = f"/{part}"
# 检查当前路径是否存在
try:
self.target_sftp.listdir(current_path)
if self.verbose:
print(
f"DEBUG ensure_target_directory: 路径已存在: {current_path}"
)
except (FileNotFoundError, IOError):
# 当前路径不存在,尝试创建
if self.verbose:
print(
f"DEBUG ensure_target_directory: 创建目录: {current_path}"
)
try:
self.target_sftp.mkdir(current_path)
if self.verbose:
print(
f"DEBUG ensure_target_directory: 目录创建成功: {current_path}"
)
except Exception as mkdir_e:
# 如果创建失败,可能是权限问题或目录已存在
if self.verbose:
print(
f"DEBUG ensure_target_directory: 创建目录时出错 {current_path}: {mkdir_e}"
)
# 尝试检查目录是否真的不存在
try:
self.target_sftp.listdir(current_path)
if self.verbose:
print(
f"DEBUG ensure_target_directory: 目录实际上已存在: {current_path}"
)
except Exception as listdir_e:
# 目录确实不存在且创建失败
print(f"错误: 无法创建目录 {current_path}: {listdir_e}")
return False
# 最终确认目录是否创建成功
try:
self.target_sftp.listdir(remote_dir)
if self.verbose:
print(
f"DEBUG ensure_target_directory: 目录创建完成: {remote_dir}"
)
return True
except Exception as final_check_e:
if self.verbose:
print(
f"DEBUG ensure_target_directory: 最终检查失败 {remote_dir}: {final_check_e}"
)
return False
except Exception as e:
print(f"创建目录失败 {remote_dir}: {e}")
if self.verbose:
import traceback
traceback.print_exc()
return False
def get_source_file_info(self, source_path: str) -> Tuple[bool, Optional[int]]:
"""
获取源文件信息
Returns:
Tuple[exists: bool, size: Optional[int]]
"""
try:
if self.source_config.is_local():
# 本地文件
if self.verbose:
print(f"DEBUG: 检查本地文件路径: {source_path}")
print(f"DEBUG: 当前工作目录: {os.getcwd()}")
print(f"DEBUG: 绝对路径: {os.path.abspath(source_path)}")
print(f"DEBUG: 文件是否存在: {os.path.exists(source_path)}")
if os.path.exists(source_path):
if self.verbose:
print(f"DEBUG: 文件大小: {os.path.getsize(source_path)} bytes")
return True, os.path.getsize(source_path)
else:
if self.verbose:
print("DEBUG: 文件不存在!")
return False, 0
else:
# SSH远程文件
source_attr = self.source_sftp.stat(source_path)
return True, source_attr.st_size
except Exception as e:
if self.verbose:
print(f"DEBUG: 获取文件信息时出错: {e}")
return False, 0
def read_source_file(self, source_path: str, temp_path: str) -> bool:
"""
读取源文件到临时文件
Returns:
bool: 是否成功
"""
try:
if self.source_config.is_local():
# 从本地文件复制
if self.verbose:
print("DEBUG read_source_file: 开始复制文件")
print(f"DEBUG read_source_file: 源路径: {source_path}")
print(f"DEBUG read_source_file: 临时路径: {temp_path}")
print(
f"DEBUG read_source_file: 源文件是否存在: {os.path.exists(source_path)}"
)
print(
f"DEBUG read_source_file: 源文件绝对路径: {os.path.abspath(source_path)}"
)
shutil.copy2(source_path, temp_path)
if self.verbose:
print("DEBUG read_source_file: 复制完成")
print(
f"DEBUG read_source_file: 临时文件是否存在: {os.path.exists(temp_path)}"
)
if os.path.exists(temp_path):
print(
f"DEBUG read_source_file: 临时文件大小: {os.path.getsize(temp_path)} bytes"
)
return True
else:
# 从SSH服务器下载
self.source_sftp.get(source_path, temp_path)
return True
except Exception as e:
with open("fails.txt", "a", encoding="utf-8") as file:
file.write(f"{source_path}\n")
print(f"读取源文件失败 {source_path}: {e}")
import traceback
traceback.print_exc()
return False
def transfer_file(self, relative_path: str) -> bool:
"""
传输单个文件
Args:
relative_path: 相对路径(相对于基础目录)
Returns:
bool: 是否成功
"""
try:
# 构建完整路径
source_path = f"{self.source_config.base_dir}/{relative_path}"
target_path = f"{self.target_config.base_dir}/{relative_path}"
if self.verbose:
source_type = "本地" if self.source_config.is_local() else "远程"
print(f"传输: {relative_path}")
print(f" 源({source_type}): {source_path}")
print(f" 目标(远程): {target_path}")
# 检查源文件是否存在并获取大小
source_exists, file_size = self.get_source_file_info(source_path)
if not source_exists:
print(f"错误: 源文件不存在: {source_path}")
self.stats["failed"] += 1
return False
# 检查目标文件是否已存在根据overwrite选项处理
try:
self.target_sftp.stat(target_path)
if not self.overwrite:
if self.verbose:
print(f"跳过: 目标文件已存在: {target_path}")
self.stats["skipped"] += 1
return True
else:
if self.verbose:
print(f"覆盖: 目标文件已存在: {target_path}")
except FileNotFoundError:
if self.verbose:
print(f"DEBUG: 目标文件不存在,将创建新文件: {target_path}")
except Exception as e:
if self.verbose:
print(f"DEBUG: 检查目标文件时出错: {e}")
# 确保目标目录存在
target_dir = os.path.dirname(target_path)
if self.verbose:
print(f"DEBUG: 目标目录: {target_dir}")
print(f"DEBUG: 目标路径: {target_path}")
if target_dir and not self.ensure_target_directory(target_dir):
print(f"错误: 无法创建目标目录: {target_dir}")
self.stats["failed"] += 1
return False
# 传输文件
start_time = time.time()
temp_path = None
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_path = temp_file.name
if self.verbose:
print(f"DEBUG: 创建临时文件: {temp_path}")
# 读取源文件到临时文件
if not self.read_source_file(source_path, temp_path):
self.stats["failed"] += 1
return False
# 上传到目标服务器
if self.verbose:
print(f"DEBUG: 开始上传到目标服务器: {temp_path} -> {target_path}")
print(f"DEBUG: 临时文件大小: {os.path.getsize(temp_path)} bytes")
print(f"DEBUG: 临时文件是否存在: {os.path.exists(temp_path)}")
# 尝试列出目标目录内容
try:
target_dir = os.path.dirname(target_path)
dir_list = self.target_sftp.listdir(target_dir)
print(f"DEBUG: 目标目录内容: {dir_list}")
except Exception as e:
print(f"DEBUG: 无法列出目标目录内容 {target_dir}: {e}")
# 尝试检查目录是否存在
try:
self.target_sftp.stat(target_dir)
print(f"DEBUG: 但目录stat成功: {target_dir}")
except Exception as stat_e:
print(f"DEBUG: 目录stat也失败: {stat_e}")
self.target_sftp.put(temp_path, target_path)
if self.verbose:
print("DEBUG: 上传完成")
except Exception as e:
if self.verbose:
print(f"DEBUG: 上传过程中出错: {e}")
import traceback
traceback.print_exc()
raise
finally:
# 清理临时文件
if temp_path and os.path.exists(temp_path):
try:
if self.verbose:
print(f"DEBUG: 清理临时文件: {temp_path}")
os.remove(temp_path)
except Exception as e:
if self.verbose:
print(f"DEBUG: 清理临时文件时出错: {e}")
# 记录统计
transfer_time = time.time() - start_time
speed = (file_size or 0) / transfer_time / 1024 if transfer_time > 0 else 0
if self.verbose:
print(
f" 完成: {file_size} bytes, 耗时: {transfer_time:.2f}s, 速度: {speed:.2f} KB/s"
)
self.stats["success"] += 1
self.stats["bytes_transferred"] += file_size or 0
return True
except Exception as e:
print(f"传输失败 {relative_path}: {e}")
if self.verbose:
import traceback
traceback.print_exc()
self.stats["failed"] += 1
return False
def migrate_images(self, image_paths: List[str]) -> Dict[str, int]:
"""
迁移图片列表
Args:
image_paths: 相对路径列表
Returns:
Dict: 统计信息
"""
if not self.connect():
return self.stats
try:
self.stats["total"] = len(image_paths)
source_type = "本地目录" if self.source_config.is_local() else "SSH服务器"
print(f"开始从{source_type}迁移 {len(image_paths)} 个图片到SSH服务器...")
for i, relative_path in enumerate(image_paths, 1):
# 清理路径
relative_path = relative_path.strip()
if not relative_path:
continue
# 显示进度
print(f"[{i}/{len(image_paths)}] {relative_path}", end="")
# 传输文件
success = self.transfer_file(relative_path)
if success:
print("")
else:
print("")
# 打印统计信息
print("\n" + "=" * 50)
print("迁移完成!")
print(f"总计: {self.stats['total']}")
print(f"成功: {self.stats['success']}")
print(f"失败: {self.stats['failed']}")
print(f"跳过: {self.stats['skipped']}")
print(f"传输字节: {self.stats['bytes_transferred']:,}")
return self.stats
finally:
self.close()
def load_config(config_file: str) -> Dict:
"""加载配置文件"""
try:
with open(config_file, "r", encoding="utf-8") as f:
config = json.load(f)
return config
except Exception as e:
print(f"加载配置文件失败 {config_file}: {e}")
return {}
def save_config(config_file: str, config: Dict):
"""保存配置文件"""
try:
with open(config_file, "w", encoding="utf-8") as f:
json.dump(config, f, indent=2, ensure_ascii=False)
print(f"配置文件已保存: {config_file}")
except Exception as e:
print(f"保存配置文件失败 {config_file}: {e}")
def create_example_config(config_file: str):
"""创建示例配置文件"""
example_config = {
"source": {
"type": "local", # 可以是 "local" 或 "ssh"
"base_dir": "/path/to/local/images",
# 如果是SSH类型需要以下配置
"ssh": {
"host": "source-server.example.com",
"port": 22,
"username": "username",
"password": "your_password_here",
"key_file": "/path/to/private/key",
},
},
"target": {
"type": "ssh", # 目标必须是SSH类型
"base_dir": "/var/www/html/images",
"ssh": {
"host": "target-server.example.com",
"port": 22,
"username": "username",
"password": "your_password_here",
"key_file": "/path/to/private/key",
},
},
}
save_config(config_file, example_config)
print("已创建示例配置文件")
print("注意: 源服务器可以是本地目录(local)或SSH服务器(ssh)")
print(" 目标服务器必须是SSH服务器(ssh)")
def read_image_paths(image_list_file: str) -> List[str]:
"""从文件读取图片路径列表"""
paths = []
try:
with open(image_list_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
paths.append(line)
print(f"从文件读取 {len(paths)} 个图片路径")
except Exception as e:
print(f"读取图片路径文件失败 {image_list_file}: {e}")
return paths
def main():
parser = argparse.ArgumentParser(
description="从本地目录或SSH服务器迁移图片到远程SSH服务器保持相对路径结构",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 使用配置文件迁移(源为本地目录)
python image_migrate.py --config config.json --input images.txt
# 使用配置文件迁移源为SSH服务器
# 配置文件中 source.type="ssh"
# 直接指定参数迁移(源为本地目录)
python image_migrate.py \\
--source-type local --source-dir /path/to/local/images \\
--target-host dst.example.com --target-user user2 --target-dir /home/user/images \\
--input images.txt
# 直接指定参数迁移源为SSH服务器
python image_migrate.py \\
--source-type ssh --source-host src.example.com --source-user user1 --source-dir /var/www/images \\
--target-host dst.example.com --target-user user2 --target-dir /home/user/images \\
--input images.txt
# 创建示例配置文件
python image_migrate.py --create-config config.json
# 从命令行直接指定图片路径
python image_migrate.py --config config.json images/product1.jpg images/product2.jpg
""",
)
# 配置文件选项
parser.add_argument("--config", help="配置文件路径")
parser.add_argument("--create-config", metavar="FILE", help="创建示例配置文件")
# 源服务器选项
parser.add_argument(
"--source-type",
choices=["local", "ssh"],
default="local",
help="源服务器类型: local(本地目录) 或 ssh(SSH服务器),默认: local",
)
parser.add_argument("--source-host", help="源服务器地址仅SSH类型需要")
parser.add_argument(
"--source-port",
type=int,
default=22,
help="源服务器端口仅SSH类型默认: 22",
)
parser.add_argument("--source-user", help="源服务器用户名仅SSH类型需要")
parser.add_argument("--source-password", help="源服务器密码仅SSH类型需要")
parser.add_argument("--source-key", help="源服务器私钥文件路径仅SSH类型需要")
parser.add_argument("--source-dir", help="源服务器图片基础目录")
# 目标服务器选项
parser.add_argument("--target-host", help="目标服务器地址")
parser.add_argument(
"--target-port", type=int, default=22, help="目标服务器端口(默认: 22"
)
parser.add_argument("--target-user", help="目标服务器用户名")
parser.add_argument("--target-password", help="目标服务器密码")
parser.add_argument("--target-key", help="目标服务器私钥文件路径")
parser.add_argument("--target-dir", help="目标服务器图片基础目录")
# 输入选项
parser.add_argument(
"--input", "-i", help="包含图片路径列表的文件(每行一个相对路径)"
)
parser.add_argument("--verbose", "-v", action="store_true", help="显示详细输出")
parser.add_argument("--overwrite", action="store_true", help="覆盖已存在的文件")
# 直接传递图片路径
parser.add_argument("image_paths", nargs="*", help="直接指定图片相对路径")
args = parser.parse_args()
# 创建示例配置文件
if args.create_config:
create_example_config(args.create_config)
return
# 检查必要参数(如果不是创建配置文件)
if not args.config and not (
args.source_dir and args.target_host and args.target_user and args.target_dir
):
print("错误: 必须提供配置文件或指定必要的服务器参数")
print("必要参数: --source-dir, --target-host, --target-user, --target-dir")
parser.print_help()
sys.exit(1)
# 加载配置
config = {}
if args.config:
config = load_config(args.config)
if not config:
print("错误: 无法加载配置文件")
sys.exit(1)
# 构建源服务器配置
source_config_data = config.get("source", {})
# 命令行参数覆盖配置文件
if args.source_type:
source_config_data["type"] = args.source_type
# 如果是SSH类型需要SSH配置
if args.source_type == "ssh" or source_config_data.get("type") == "ssh":
ssh_config = source_config_data.get("ssh", {})
if args.source_host:
ssh_config["host"] = args.source_host
if args.source_port:
ssh_config["port"] = args.source_port
if args.source_user:
ssh_config["username"] = args.source_user
if args.source_password:
ssh_config["password"] = args.source_password
if args.source_key:
ssh_config["key_file"] = args.source_key
source_config_data["ssh"] = ssh_config
# 检查必要的SSH参数
if not ssh_config.get("host"):
print("错误: 源服务器为SSH类型必须指定 --source-host")
sys.exit(1)
if not ssh_config.get("username"):
print("错误: 源服务器为SSH类型必须指定 --source-user")
sys.exit(1)
else:
# 本地类型不需要SSH配置
source_config_data.pop("ssh", None)
# 设置基础目录
if args.source_dir:
source_config_data["base_dir"] = args.source_dir
elif "base_dir" not in source_config_data:
print("错误: 必须指定源服务器图片基础目录 (--source-dir)")
sys.exit(1)
# 构建目标服务器配置
target_config_data = config.get("target", {})
target_config_data["type"] = "ssh" # 目标必须是SSH
ssh_config = target_config_data.get("ssh", {})
# 命令行参数覆盖配置文件
if args.target_host:
ssh_config["host"] = args.target_host
if args.target_port:
ssh_config["port"] = args.target_port
if args.target_user:
ssh_config["username"] = args.target_user
if args.target_password:
ssh_config["password"] = args.target_password
if args.target_key:
ssh_config["key_file"] = args.target_key
target_config_data["ssh"] = ssh_config
# 设置基础目录
if args.target_dir:
target_config_data["base_dir"] = args.target_dir
elif "base_dir" not in target_config_data:
print("错误: 必须指定目标服务器图片基础目录 (--target-dir)")
sys.exit(1)
# 检查必要的目标SSH参数
if not ssh_config.get("host"):
print("错误: 必须指定目标服务器地址 (--target-host)")
sys.exit(1)
if not ssh_config.get("username"):
print("错误: 必须指定目标服务器用户名 (--target-user)")
sys.exit(1)
# 创建服务器配置对象
try:
source_config = ServerConfig.from_dict(source_config_data)
target_config = ServerConfig.from_dict(target_config_data)
if args.verbose:
print(f"源服务器配置: {source_config}")
print(f"目标服务器配置: {target_config}")
except Exception as e:
print(f"创建服务器配置失败: {e}")
sys.exit(1)
# 获取图片路径列表
image_paths = []
# 从文件读取
if args.input:
file_paths = read_image_paths(args.input)
image_paths.extend(file_paths)
# 从命令行参数添加
if args.image_paths:
image_paths.extend(args.image_paths)
if not image_paths:
print("错误: 没有指定要迁移的图片路径")
print("请使用 --input 指定文件或直接在命令行提供路径")
sys.exit(1)
# 去重
image_paths = list(set(image_paths))
# 创建并运行迁移器
migrator = ImageMigrator(
source_config=source_config,
target_config=target_config,
verbose=args.verbose,
overwrite=args.overwrite,
)
# 执行迁移
stats = migrator.migrate_images(image_paths)
# 如果有失败,返回错误代码
if stats["failed"] > 0:
sys.exit(1)
if __name__ == "__main__":
main()