531 lines
18 KiB
Python
531 lines
18 KiB
Python
"""Docker Socket 客户端封装 - 使用 httpx 直接通过 Unix socket 通信"""
|
||
import httpx
|
||
import json
|
||
import urllib.parse
|
||
from typing import Optional
|
||
|
||
SOCKET_PATH = "/var/run/docker.sock"
|
||
|
||
|
||
def _do_request(method: str, path: str, params: dict = None, data: str = None, raw: bool = False) -> dict | bytes:
|
||
"""通过 Unix socket 发送请求到 Docker API"""
|
||
url = f"http://localhost{path}"
|
||
headers = {}
|
||
if data:
|
||
headers["Content-Type"] = "application/x-www-form-urlencoded"
|
||
|
||
# 使用 httpx 的 Unix socket 支持
|
||
transport = httpx.HTTPTransport(uds=SOCKET_PATH)
|
||
with httpx.Client(transport=transport, timeout=30) as client:
|
||
if method == "GET":
|
||
r = client.get(url, params=params)
|
||
elif method == "POST":
|
||
r = client.post(url, params=params, content=data, headers=headers)
|
||
elif method == "DELETE":
|
||
r = client.delete(url, params=params)
|
||
else:
|
||
raise ValueError(f"Unsupported method: {method}")
|
||
|
||
if r.status_code >= 400:
|
||
try:
|
||
err = r.json()
|
||
raise Exception(err.get("message", r.text))
|
||
except Exception:
|
||
raise Exception(r.text)
|
||
if r.content:
|
||
if raw:
|
||
return r.content
|
||
return r.json()
|
||
return {}
|
||
|
||
|
||
def list_containers(all: bool = True) -> list[dict]:
|
||
"""列出所有容器"""
|
||
params = {"all": "true"} if all else {}
|
||
containers = _do_request("GET", "/containers/json", params)
|
||
return [_normalize_container(c) for c in containers]
|
||
|
||
|
||
def get_container(container_id: str) -> dict:
|
||
"""获取单个容器详情"""
|
||
data = _do_request("GET", f"/containers/{container_id}/json")
|
||
return _normalize_container(data)
|
||
|
||
|
||
def _normalize_container(c: dict) -> dict:
|
||
"""标准化容器数据结构"""
|
||
ports = {}
|
||
for p in c.get("Ports", []) or []:
|
||
if p.get("PublicPort"):
|
||
ports[f"{p['PrivatePort']}/{p.get('Type', 'tcp')}"] = [{"HostIp": p.get("IP", "0.0.0.0"), "HostPort": str(p["PublicPort"])}]
|
||
else:
|
||
ports[f"{p['PrivatePort']}/{p.get('Type', 'tcp')}"] = []
|
||
return {
|
||
"id": c["Id"][:12],
|
||
"name": (c.get("Names") or ["/"])[0].lstrip("/"),
|
||
"image": c.get("Image", ""),
|
||
"state": c.get("State", "unknown"),
|
||
"status": c.get("Status", ""),
|
||
"ports": ports,
|
||
"created": c.get("Created", 0),
|
||
}
|
||
|
||
|
||
def start_container(container_id: str) -> dict:
|
||
result = _do_request("POST", f"/containers/{container_id}/start")
|
||
return {"success": True, "data": result}
|
||
|
||
|
||
def stop_container(container_id: str) -> dict:
|
||
result = _do_request("POST", f"/containers/{container_id}/stop")
|
||
return {"success": True, "data": result}
|
||
|
||
|
||
def restart_container(container_id: str) -> dict:
|
||
result = _do_request("POST", f"/containers/{container_id}/restart")
|
||
return {"success": True, "data": result}
|
||
|
||
|
||
def remove_container(container_id: str, force: bool = False) -> dict:
|
||
result = _do_request("DELETE", f"/containers/{container_id}", {"force": "true"} if force else None)
|
||
return {"success": True, "data": result}
|
||
|
||
|
||
def get_container_logs(container_id: str, tail: int = 100, timestamps: bool = False) -> dict:
|
||
"""获取容器日志,格式化输出更易读"""
|
||
params = {"stderr": "true", "stdout": "true", "tail": str(tail)}
|
||
if timestamps:
|
||
params["timestamps"] = "true"
|
||
raw_content = _do_request("GET", f"/containers/{container_id}/logs", params, raw=True)
|
||
|
||
# Docker 日志是二进制流,每条消息前面有 8 字节头
|
||
# 格式: [stream_type(1), 0, 0, 0, size(4 bytes big-endian), data...]
|
||
raw_lines = []
|
||
i = 0
|
||
while i < len(raw_content):
|
||
if i + 8 > len(raw_content):
|
||
break
|
||
stream_type = raw_content[i] # 1=stdout, 2=stderr
|
||
size = int.from_bytes(raw_content[i+4:i+8], byteorder='big')
|
||
i += 8
|
||
if i + size > len(raw_content):
|
||
raw_lines.append((1, raw_content[i:].decode('utf-8', errors='replace')))
|
||
break
|
||
raw_lines.append((stream_type, raw_content[i:i+size].decode('utf-8', errors='replace')))
|
||
i += size
|
||
|
||
# 解析并格式化每行日志
|
||
import re
|
||
formatted_lines = []
|
||
log_counter = 0
|
||
|
||
# 日志级别颜色标记 (终端显示)
|
||
LOG_LEVELS = {
|
||
'error': '🔴 ERROR',
|
||
'warn': '🟡 WARN',
|
||
'warning': '🟡 WARN',
|
||
'info': '🔵 INFO',
|
||
'debug': '⚪ DEBUG',
|
||
'trace': '⚪ TRACE',
|
||
}
|
||
|
||
# 常见时间戳格式模式
|
||
TIMESTAMP_PATTERNS = [
|
||
r'^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}', # ISO-like: 2024-01-01T12:00:00
|
||
r'^\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}', # Apache: 01/Jan/2024:12:00:00
|
||
r'^\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}', # Syslog-like: Jan 1 12:00:00
|
||
]
|
||
|
||
for stream_type, raw_text in raw_lines:
|
||
# 分割多行日志
|
||
lines = raw_text.split('\n')
|
||
for line in lines:
|
||
line = line.rstrip()
|
||
if not line:
|
||
continue
|
||
log_counter += 1
|
||
|
||
# 判断日志来源
|
||
source = "out" if stream_type == 1 else "err"
|
||
|
||
# 尝试解析日志级别
|
||
level_tag = ""
|
||
for level, tag in LOG_LEVELS.items():
|
||
if level in line.lower():
|
||
level_tag = f"[{tag}] "
|
||
break
|
||
|
||
# 检测并标注时间戳
|
||
has_timestamp = False
|
||
for pattern in TIMESTAMP_PATTERNS:
|
||
if re.match(pattern, line):
|
||
has_timestamp = True
|
||
break
|
||
|
||
# 格式化输出
|
||
if has_timestamp:
|
||
# 有时间戳的行,保持原格式加行号
|
||
formatted_lines.append(f"{log_counter:>4} │ {level_tag}{line}")
|
||
else:
|
||
# 无时间戳的行,添加来源和级别标记
|
||
formatted_lines.append(f"{log_counter:>4} │ [{source}] {level_tag}{line}")
|
||
|
||
if not formatted_lines:
|
||
return {"logs": "(无日志)"}
|
||
|
||
return {"logs": "\n".join(formatted_lines)}
|
||
|
||
|
||
def get_container_stats(container_id: str) -> dict:
|
||
"""获取容器资源使用"""
|
||
stats = _do_request("GET", f"/containers/{container_id}/stats", {"stream": "false"})
|
||
# 计算 CPU 百分比
|
||
cpu_delta = stats.get("cpu_stats", {}).get("cpu_usage", {}).get("total_usage", 0) - \
|
||
stats.get("precpu_stats", {}).get("cpu_usage", {}).get("total_usage", 0)
|
||
system_delta = stats.get("cpu_stats", {}).get("system_cpu_usage", 0) - \
|
||
stats.get("precpu_stats", {}).get("system_cpu_usage", 0)
|
||
cpu_count = stats.get("cpu_stats", {}).get("online_cpus", 1)
|
||
cpu_percent = (cpu_delta / system_delta * cpu_count * 100.0) if system_delta > 0 else 0
|
||
|
||
# 内存
|
||
mem_usage = stats.get("memory_stats", {}).get("usage", 0)
|
||
mem_limit = stats.get("memory_stats", {}).get("limit", 1)
|
||
mem_percent = (mem_usage / mem_limit * 100.0) if mem_limit > 0 else 0
|
||
|
||
# 网络
|
||
networks = stats.get("networks", {}) or {}
|
||
rx = sum(n.get("rx_bytes", 0) for n in networks.values())
|
||
tx = sum(n.get("tx_bytes", 0) for n in networks.values())
|
||
|
||
return {
|
||
"cpu_percent": round(cpu_percent, 1),
|
||
"memory_usage": mem_usage,
|
||
"memory_limit": mem_limit,
|
||
"memory_percent": round(mem_percent, 1),
|
||
"network_rx": rx,
|
||
"network_tx": tx,
|
||
}
|
||
|
||
|
||
def get_system_info() -> dict:
|
||
"""获取系统信息"""
|
||
info = _do_request("GET", "/info")
|
||
version_info = _do_request("GET", "/version")
|
||
|
||
# 获取 CPU 和内存信息
|
||
try:
|
||
cpu_info = _do_request("GET", "/system/df") # 尝试用 /system/df
|
||
except Exception:
|
||
cpu_info = {}
|
||
|
||
return {
|
||
"containers": info.get("Containers", 0),
|
||
"running": info.get("ContainersRunning", 0),
|
||
"images": info.get("Images", 0),
|
||
"version": version_info.get("Version", "unknown"),
|
||
"cpu_count": info.get("NCPU", 0),
|
||
"memory_total": info.get("MemTotal", 0),
|
||
}
|
||
|
||
|
||
def discover_compose_files(search_path: str) -> list[dict]:
|
||
"""发现目录下的 docker-compose.yml 文件"""
|
||
import os
|
||
results = []
|
||
for root, dirs, files in os.walk(search_path):
|
||
if "docker-compose.yml" in files or "docker-compose.yaml" in files:
|
||
compose_file = os.path.join(root, "docker-compose.yml")
|
||
if not os.path.exists(compose_file):
|
||
compose_file = os.path.join(root, "docker-compose.yaml")
|
||
results.append({
|
||
"name": os.path.basename(root),
|
||
"path": compose_file,
|
||
})
|
||
return results
|
||
|
||
|
||
def compose_action(action: str, project_dir: str) -> dict:
|
||
"""对 Compose 项目执行动作(up/down/restart)"""
|
||
# 使用 docker compose 命令
|
||
import subprocess
|
||
cmd = ["docker", action.replace("restart", "restart" if action == "restart" else "restart")]
|
||
if action == "up":
|
||
cmd = ["docker", "compose", "-f", f"{project_dir}/docker-compose.yml", "up", "-d"]
|
||
elif action == "down":
|
||
cmd = ["docker", "compose", "-f", f"{project_dir}/docker-compose.yml", "down"]
|
||
elif action == "restart":
|
||
cmd = ["docker", "compose", "-f", f"{project_dir}/docker-compose.yml", "restart"]
|
||
|
||
try:
|
||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
||
return {"success": result.returncode == 0, "data": result.stdout, "error": result.stderr}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
def read_compose_file(path: str) -> dict:
|
||
"""读取 Compose 文件内容"""
|
||
import aiofiles
|
||
import asyncio
|
||
|
||
async def _read():
|
||
async with aiofiles.open(path, "r") as f:
|
||
return await f.read()
|
||
|
||
try:
|
||
content = asyncio.run(_read())
|
||
return {"success": True, "content": content}
|
||
except FileNotFoundError:
|
||
return {"success": False, "error": "文件不存在"}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
def write_compose_file(path: str, content: str) -> dict:
|
||
"""写入 Compose 文件内容"""
|
||
import aiofiles
|
||
import asyncio
|
||
|
||
async def _write():
|
||
async with aiofiles.open(path, "w") as f:
|
||
await f.write(content)
|
||
|
||
try:
|
||
asyncio.run(_write())
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
# 兼容别名的包装函数
|
||
def list_compose_projects(search_path: str) -> list[dict]:
|
||
return discover_compose_files(search_path)
|
||
|
||
|
||
def compose_up(project_dir: str) -> dict:
|
||
return compose_action("up", project_dir)
|
||
|
||
|
||
def compose_down(project_dir: str) -> dict:
|
||
return compose_action("down", project_dir)
|
||
|
||
|
||
def compose_restart(project_dir: str) -> dict:
|
||
return compose_action("restart", project_dir)
|
||
|
||
|
||
# ========== 镜像管理 ==========
|
||
|
||
# 默认代理/镜像服务器列表 (中国常用镜像)
|
||
DEFAULT_MIRRORS = [
|
||
{"name": "Docker Hub (官方)", "prefix": ""},
|
||
{"name": "Azure 中国", "prefix": "dockerhub.azk8s.cn/"},
|
||
{"name": "中科大 USTC", "prefix": "docker.mirrors.ustc.edu.cn/"},
|
||
{"name": "百度 BJ", "prefix": "mirror.baidubce.com/"},
|
||
{"name": "腾讯云", "prefix": "ccr.ccs.tencentyun.com/"},
|
||
]
|
||
|
||
|
||
def list_images() -> list[dict]:
|
||
"""列出本地镜像"""
|
||
images = _do_request("GET", "/images/json")
|
||
result = []
|
||
for img in images:
|
||
result.append({
|
||
"id": img.get("Id", "")[:12] if img.get("Id") else "",
|
||
"full_id": img.get("Id", ""),
|
||
"repo_tags": img.get("RepoTags") or [],
|
||
"size": img.get("Size", 0),
|
||
"created": img.get("Created", 0),
|
||
"virtual_size": img.get("VirtualSize", 0),
|
||
})
|
||
return result
|
||
|
||
|
||
def search_images(query: str, limit: int = 20) -> list[dict]:
|
||
"""从 Docker Hub 搜索镜像"""
|
||
import httpx
|
||
results = []
|
||
try:
|
||
# 使用 Docker Hub API v2 搜索
|
||
with httpx.Client(timeout=15) as client:
|
||
r = client.get(
|
||
"https://hub.docker.com/v2/search/repositories/",
|
||
params={"query": query, "page_size": limit}
|
||
)
|
||
if r.status_code == 200:
|
||
data = r.json()
|
||
for item in data.get("results", []):
|
||
results.append({
|
||
"name": item.get("repo_name", ""),
|
||
"description": item.get("short_description", ""),
|
||
"star_count": item.get("star_count", 0),
|
||
"pull_count": item.get("pull_count", 0),
|
||
"official": item.get("is_official", False),
|
||
})
|
||
except Exception as e:
|
||
# 如果 API 失败,尝试使用 docker search 命令
|
||
import subprocess
|
||
try:
|
||
result = subprocess.run(
|
||
["docker", "search", "--limit", str(limit), query],
|
||
capture_output=True, text=True, timeout=30
|
||
)
|
||
if result.returncode == 0:
|
||
lines = result.stdout.strip().split("\n")
|
||
if len(lines) > 1:
|
||
headers = lines[0].split()
|
||
for line in lines[1:]:
|
||
parts = line.split(maxsplit=4)
|
||
if len(parts) >= 4:
|
||
results.append({
|
||
"name": parts[0],
|
||
"description": parts[3] if len(parts) > 3 else "",
|
||
"star_count": int(parts[1]) if parts[1].isdigit() else 0,
|
||
"official": "[OK]" in parts[2] if len(parts) > 2 else False,
|
||
})
|
||
except Exception:
|
||
pass
|
||
return results
|
||
|
||
|
||
def pull_image(image_name: str, mirror_prefix: str = "") -> dict:
|
||
"""拉取镜像,支持指定镜像代理前缀"""
|
||
import subprocess
|
||
|
||
# 如果指定了镜像前缀,转换镜像名
|
||
full_image = image_name
|
||
if mirror_prefix:
|
||
# 例如: nginx -> dockerhub.azk8s.cn/library/nginx
|
||
if "/" not in image_name:
|
||
# 官方镜像,添加 library
|
||
full_image = f"{mirror_prefix}library/{image_name}"
|
||
else:
|
||
# 用户/组织镜像
|
||
full_image = f"{mirror_prefix}{image_name}"
|
||
|
||
try:
|
||
# 使用 docker pull 命令
|
||
result = subprocess.run(
|
||
["docker", "pull", full_image],
|
||
capture_output=True, text=True, timeout=600
|
||
)
|
||
if result.returncode == 0:
|
||
return {"success": True, "image": image_name, "pulled_as": full_image, "output": result.stdout}
|
||
else:
|
||
return {"success": False, "image": image_name, "error": result.stderr}
|
||
except Exception as e:
|
||
return {"success": False, "image": image_name, "error": str(e)}
|
||
|
||
|
||
def remove_image(image_id: str, force: bool = False) -> dict:
|
||
"""删除本地镜像"""
|
||
import subprocess
|
||
try:
|
||
cmd = ["docker", "rmi"]
|
||
if force:
|
||
cmd.append("-f")
|
||
cmd.append(image_id)
|
||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
||
if result.returncode == 0:
|
||
return {"success": True}
|
||
else:
|
||
return {"success": False, "error": result.stderr}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
# ========== 文件浏览 ==========
|
||
|
||
def list_directory(path: str) -> dict:
|
||
"""列出目录内容"""
|
||
import os
|
||
try:
|
||
entries = os.listdir(path)
|
||
items = []
|
||
for name in entries:
|
||
full_path = os.path.join(path, name)
|
||
try:
|
||
stat = os.stat(full_path)
|
||
is_dir = os.path.isdir(full_path)
|
||
items.append({
|
||
"name": name,
|
||
"path": full_path,
|
||
"is_dir": is_dir,
|
||
"size": stat.st_size if not is_dir else 0,
|
||
"modified": stat.st_mtime,
|
||
})
|
||
except Exception:
|
||
continue
|
||
# 目录在前,文件在后,按名称排序
|
||
items.sort(key=lambda x: (not x["is_dir"], x["name"].lower()))
|
||
return {"success": True, "path": path, "items": items}
|
||
except PermissionError:
|
||
return {"success": False, "error": "权限不足"}
|
||
except FileNotFoundError:
|
||
return {"success": False, "error": "目录不存在"}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
def read_file(path: str, encoding: str = "utf-8") -> dict:
|
||
"""读取文件内容"""
|
||
import aiofiles
|
||
import asyncio
|
||
async def _read():
|
||
async with aiofiles.open(path, "r", encoding=encoding) as f:
|
||
return await f.read()
|
||
try:
|
||
content = asyncio.run(_read())
|
||
return {"success": True, "content": content, "path": path}
|
||
except UnicodeDecodeError:
|
||
# 尝试二进制读取
|
||
try:
|
||
with open(path, "rb") as f:
|
||
content = f.read()
|
||
return {"success": False, "error": "文件为二进制格式,无法文本预览", "binary": True, "path": path}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
except FileNotFoundError:
|
||
return {"success": False, "error": "文件不存在"}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
def write_file(path: str, content: str, encoding: str = "utf-8") -> dict:
|
||
"""写入文件内容"""
|
||
import aiofiles
|
||
import asyncio
|
||
async def _write():
|
||
async with aiofiles.open(path, "w", encoding=encoding) as f:
|
||
await f.write(content)
|
||
try:
|
||
asyncio.run(_write())
|
||
return {"success": True, "path": path}
|
||
except PermissionError:
|
||
return {"success": False, "error": "权限不足"}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
def get_container_mounts() -> list[dict]:
|
||
"""获取容器的挂载点信息"""
|
||
containers = list_containers(all=True)
|
||
mounts = []
|
||
for c in containers:
|
||
try:
|
||
info = _do_request("GET", f"/containers/{c['id']}/json")
|
||
mount_info = info.get("Mounts") or []
|
||
for m in mount_info:
|
||
mounts.append({
|
||
"container_id": c["id"],
|
||
"container_name": c["name"],
|
||
"source": m.get("Source", ""),
|
||
"destination": m.get("Destination", ""),
|
||
"mode": m.get("Mode", ""),
|
||
"rw": m.get("RW", False),
|
||
})
|
||
except Exception:
|
||
continue
|
||
return mounts
|