Files
docker-manage/backend/docker_client.py

531 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Docker Socket 客户端封装 - 使用 httpx 直接通过 Unix socket 通信"""
import httpx
import json
import urllib.parse
from typing import Optional
SOCKET_PATH = "/var/run/docker.sock"
def _do_request(method: str, path: str, params: dict = None, data: str = None, raw: bool = False) -> dict | bytes:
"""通过 Unix socket 发送请求到 Docker API"""
url = f"http://localhost{path}"
headers = {}
if data:
headers["Content-Type"] = "application/x-www-form-urlencoded"
# 使用 httpx 的 Unix socket 支持
transport = httpx.HTTPTransport(uds=SOCKET_PATH)
with httpx.Client(transport=transport, timeout=30) as client:
if method == "GET":
r = client.get(url, params=params)
elif method == "POST":
r = client.post(url, params=params, content=data, headers=headers)
elif method == "DELETE":
r = client.delete(url, params=params)
else:
raise ValueError(f"Unsupported method: {method}")
if r.status_code >= 400:
try:
err = r.json()
raise Exception(err.get("message", r.text))
except Exception:
raise Exception(r.text)
if r.content:
if raw:
return r.content
return r.json()
return {}
def list_containers(all: bool = True) -> list[dict]:
"""列出所有容器"""
params = {"all": "true"} if all else {}
containers = _do_request("GET", "/containers/json", params)
return [_normalize_container(c) for c in containers]
def get_container(container_id: str) -> dict:
"""获取单个容器详情"""
data = _do_request("GET", f"/containers/{container_id}/json")
return _normalize_container(data)
def _normalize_container(c: dict) -> dict:
"""标准化容器数据结构"""
ports = {}
for p in c.get("Ports", []) or []:
if p.get("PublicPort"):
ports[f"{p['PrivatePort']}/{p.get('Type', 'tcp')}"] = [{"HostIp": p.get("IP", "0.0.0.0"), "HostPort": str(p["PublicPort"])}]
else:
ports[f"{p['PrivatePort']}/{p.get('Type', 'tcp')}"] = []
return {
"id": c["Id"][:12],
"name": (c.get("Names") or ["/"])[0].lstrip("/"),
"image": c.get("Image", ""),
"state": c.get("State", "unknown"),
"status": c.get("Status", ""),
"ports": ports,
"created": c.get("Created", 0),
}
def start_container(container_id: str) -> dict:
result = _do_request("POST", f"/containers/{container_id}/start")
return {"success": True, "data": result}
def stop_container(container_id: str) -> dict:
result = _do_request("POST", f"/containers/{container_id}/stop")
return {"success": True, "data": result}
def restart_container(container_id: str) -> dict:
result = _do_request("POST", f"/containers/{container_id}/restart")
return {"success": True, "data": result}
def remove_container(container_id: str, force: bool = False) -> dict:
result = _do_request("DELETE", f"/containers/{container_id}", {"force": "true"} if force else None)
return {"success": True, "data": result}
def get_container_logs(container_id: str, tail: int = 100, timestamps: bool = False) -> dict:
"""获取容器日志,格式化输出更易读"""
params = {"stderr": "true", "stdout": "true", "tail": str(tail)}
if timestamps:
params["timestamps"] = "true"
raw_content = _do_request("GET", f"/containers/{container_id}/logs", params, raw=True)
# Docker 日志是二进制流,每条消息前面有 8 字节头
# 格式: [stream_type(1), 0, 0, 0, size(4 bytes big-endian), data...]
raw_lines = []
i = 0
while i < len(raw_content):
if i + 8 > len(raw_content):
break
stream_type = raw_content[i] # 1=stdout, 2=stderr
size = int.from_bytes(raw_content[i+4:i+8], byteorder='big')
i += 8
if i + size > len(raw_content):
raw_lines.append((1, raw_content[i:].decode('utf-8', errors='replace')))
break
raw_lines.append((stream_type, raw_content[i:i+size].decode('utf-8', errors='replace')))
i += size
# 解析并格式化每行日志
import re
formatted_lines = []
log_counter = 0
# 日志级别颜色标记 (终端显示)
LOG_LEVELS = {
'error': '🔴 ERROR',
'warn': '🟡 WARN',
'warning': '🟡 WARN',
'info': '🔵 INFO',
'debug': '⚪ DEBUG',
'trace': '⚪ TRACE',
}
# 常见时间戳格式模式
TIMESTAMP_PATTERNS = [
r'^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}', # ISO-like: 2024-01-01T12:00:00
r'^\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}', # Apache: 01/Jan/2024:12:00:00
r'^\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}', # Syslog-like: Jan 1 12:00:00
]
for stream_type, raw_text in raw_lines:
# 分割多行日志
lines = raw_text.split('\n')
for line in lines:
line = line.rstrip()
if not line:
continue
log_counter += 1
# 判断日志来源
source = "out" if stream_type == 1 else "err"
# 尝试解析日志级别
level_tag = ""
for level, tag in LOG_LEVELS.items():
if level in line.lower():
level_tag = f"[{tag}] "
break
# 检测并标注时间戳
has_timestamp = False
for pattern in TIMESTAMP_PATTERNS:
if re.match(pattern, line):
has_timestamp = True
break
# 格式化输出
if has_timestamp:
# 有时间戳的行,保持原格式加行号
formatted_lines.append(f"{log_counter:>4}{level_tag}{line}")
else:
# 无时间戳的行,添加来源和级别标记
formatted_lines.append(f"{log_counter:>4} │ [{source}] {level_tag}{line}")
if not formatted_lines:
return {"logs": "(无日志)"}
return {"logs": "\n".join(formatted_lines)}
def get_container_stats(container_id: str) -> dict:
"""获取容器资源使用"""
stats = _do_request("GET", f"/containers/{container_id}/stats", {"stream": "false"})
# 计算 CPU 百分比
cpu_delta = stats.get("cpu_stats", {}).get("cpu_usage", {}).get("total_usage", 0) - \
stats.get("precpu_stats", {}).get("cpu_usage", {}).get("total_usage", 0)
system_delta = stats.get("cpu_stats", {}).get("system_cpu_usage", 0) - \
stats.get("precpu_stats", {}).get("system_cpu_usage", 0)
cpu_count = stats.get("cpu_stats", {}).get("online_cpus", 1)
cpu_percent = (cpu_delta / system_delta * cpu_count * 100.0) if system_delta > 0 else 0
# 内存
mem_usage = stats.get("memory_stats", {}).get("usage", 0)
mem_limit = stats.get("memory_stats", {}).get("limit", 1)
mem_percent = (mem_usage / mem_limit * 100.0) if mem_limit > 0 else 0
# 网络
networks = stats.get("networks", {}) or {}
rx = sum(n.get("rx_bytes", 0) for n in networks.values())
tx = sum(n.get("tx_bytes", 0) for n in networks.values())
return {
"cpu_percent": round(cpu_percent, 1),
"memory_usage": mem_usage,
"memory_limit": mem_limit,
"memory_percent": round(mem_percent, 1),
"network_rx": rx,
"network_tx": tx,
}
def get_system_info() -> dict:
"""获取系统信息"""
info = _do_request("GET", "/info")
version_info = _do_request("GET", "/version")
# 获取 CPU 和内存信息
try:
cpu_info = _do_request("GET", "/system/df") # 尝试用 /system/df
except Exception:
cpu_info = {}
return {
"containers": info.get("Containers", 0),
"running": info.get("ContainersRunning", 0),
"images": info.get("Images", 0),
"version": version_info.get("Version", "unknown"),
"cpu_count": info.get("NCPU", 0),
"memory_total": info.get("MemTotal", 0),
}
def discover_compose_files(search_path: str) -> list[dict]:
"""发现目录下的 docker-compose.yml 文件"""
import os
results = []
for root, dirs, files in os.walk(search_path):
if "docker-compose.yml" in files or "docker-compose.yaml" in files:
compose_file = os.path.join(root, "docker-compose.yml")
if not os.path.exists(compose_file):
compose_file = os.path.join(root, "docker-compose.yaml")
results.append({
"name": os.path.basename(root),
"path": compose_file,
})
return results
def compose_action(action: str, project_dir: str) -> dict:
"""对 Compose 项目执行动作up/down/restart"""
# 使用 docker compose 命令
import subprocess
cmd = ["docker", action.replace("restart", "restart" if action == "restart" else "restart")]
if action == "up":
cmd = ["docker", "compose", "-f", f"{project_dir}/docker-compose.yml", "up", "-d"]
elif action == "down":
cmd = ["docker", "compose", "-f", f"{project_dir}/docker-compose.yml", "down"]
elif action == "restart":
cmd = ["docker", "compose", "-f", f"{project_dir}/docker-compose.yml", "restart"]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
return {"success": result.returncode == 0, "data": result.stdout, "error": result.stderr}
except Exception as e:
return {"success": False, "error": str(e)}
def read_compose_file(path: str) -> dict:
"""读取 Compose 文件内容"""
import aiofiles
import asyncio
async def _read():
async with aiofiles.open(path, "r") as f:
return await f.read()
try:
content = asyncio.run(_read())
return {"success": True, "content": content}
except FileNotFoundError:
return {"success": False, "error": "文件不存在"}
except Exception as e:
return {"success": False, "error": str(e)}
def write_compose_file(path: str, content: str) -> dict:
"""写入 Compose 文件内容"""
import aiofiles
import asyncio
async def _write():
async with aiofiles.open(path, "w") as f:
await f.write(content)
try:
asyncio.run(_write())
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# 兼容别名的包装函数
def list_compose_projects(search_path: str) -> list[dict]:
return discover_compose_files(search_path)
def compose_up(project_dir: str) -> dict:
return compose_action("up", project_dir)
def compose_down(project_dir: str) -> dict:
return compose_action("down", project_dir)
def compose_restart(project_dir: str) -> dict:
return compose_action("restart", project_dir)
# ========== 镜像管理 ==========
# 默认代理/镜像服务器列表 (中国常用镜像)
DEFAULT_MIRRORS = [
{"name": "Docker Hub (官方)", "prefix": ""},
{"name": "Azure 中国", "prefix": "dockerhub.azk8s.cn/"},
{"name": "中科大 USTC", "prefix": "docker.mirrors.ustc.edu.cn/"},
{"name": "百度 BJ", "prefix": "mirror.baidubce.com/"},
{"name": "腾讯云", "prefix": "ccr.ccs.tencentyun.com/"},
]
def list_images() -> list[dict]:
"""列出本地镜像"""
images = _do_request("GET", "/images/json")
result = []
for img in images:
result.append({
"id": img.get("Id", "")[:12] if img.get("Id") else "",
"full_id": img.get("Id", ""),
"repo_tags": img.get("RepoTags") or [],
"size": img.get("Size", 0),
"created": img.get("Created", 0),
"virtual_size": img.get("VirtualSize", 0),
})
return result
def search_images(query: str, limit: int = 20) -> list[dict]:
"""从 Docker Hub 搜索镜像"""
import httpx
results = []
try:
# 使用 Docker Hub API v2 搜索
with httpx.Client(timeout=15) as client:
r = client.get(
"https://hub.docker.com/v2/search/repositories/",
params={"query": query, "page_size": limit}
)
if r.status_code == 200:
data = r.json()
for item in data.get("results", []):
results.append({
"name": item.get("repo_name", ""),
"description": item.get("short_description", ""),
"star_count": item.get("star_count", 0),
"pull_count": item.get("pull_count", 0),
"official": item.get("is_official", False),
})
except Exception as e:
# 如果 API 失败,尝试使用 docker search 命令
import subprocess
try:
result = subprocess.run(
["docker", "search", "--limit", str(limit), query],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
lines = result.stdout.strip().split("\n")
if len(lines) > 1:
headers = lines[0].split()
for line in lines[1:]:
parts = line.split(maxsplit=4)
if len(parts) >= 4:
results.append({
"name": parts[0],
"description": parts[3] if len(parts) > 3 else "",
"star_count": int(parts[1]) if parts[1].isdigit() else 0,
"official": "[OK]" in parts[2] if len(parts) > 2 else False,
})
except Exception:
pass
return results
def pull_image(image_name: str, mirror_prefix: str = "") -> dict:
"""拉取镜像,支持指定镜像代理前缀"""
import subprocess
# 如果指定了镜像前缀,转换镜像名
full_image = image_name
if mirror_prefix:
# 例如: nginx -> dockerhub.azk8s.cn/library/nginx
if "/" not in image_name:
# 官方镜像,添加 library
full_image = f"{mirror_prefix}library/{image_name}"
else:
# 用户/组织镜像
full_image = f"{mirror_prefix}{image_name}"
try:
# 使用 docker pull 命令
result = subprocess.run(
["docker", "pull", full_image],
capture_output=True, text=True, timeout=600
)
if result.returncode == 0:
return {"success": True, "image": image_name, "pulled_as": full_image, "output": result.stdout}
else:
return {"success": False, "image": image_name, "error": result.stderr}
except Exception as e:
return {"success": False, "image": image_name, "error": str(e)}
def remove_image(image_id: str, force: bool = False) -> dict:
"""删除本地镜像"""
import subprocess
try:
cmd = ["docker", "rmi"]
if force:
cmd.append("-f")
cmd.append(image_id)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
return {"success": True}
else:
return {"success": False, "error": result.stderr}
except Exception as e:
return {"success": False, "error": str(e)}
# ========== 文件浏览 ==========
def list_directory(path: str) -> dict:
"""列出目录内容"""
import os
try:
entries = os.listdir(path)
items = []
for name in entries:
full_path = os.path.join(path, name)
try:
stat = os.stat(full_path)
is_dir = os.path.isdir(full_path)
items.append({
"name": name,
"path": full_path,
"is_dir": is_dir,
"size": stat.st_size if not is_dir else 0,
"modified": stat.st_mtime,
})
except Exception:
continue
# 目录在前,文件在后,按名称排序
items.sort(key=lambda x: (not x["is_dir"], x["name"].lower()))
return {"success": True, "path": path, "items": items}
except PermissionError:
return {"success": False, "error": "权限不足"}
except FileNotFoundError:
return {"success": False, "error": "目录不存在"}
except Exception as e:
return {"success": False, "error": str(e)}
def read_file(path: str, encoding: str = "utf-8") -> dict:
"""读取文件内容"""
import aiofiles
import asyncio
async def _read():
async with aiofiles.open(path, "r", encoding=encoding) as f:
return await f.read()
try:
content = asyncio.run(_read())
return {"success": True, "content": content, "path": path}
except UnicodeDecodeError:
# 尝试二进制读取
try:
with open(path, "rb") as f:
content = f.read()
return {"success": False, "error": "文件为二进制格式,无法文本预览", "binary": True, "path": path}
except Exception as e:
return {"success": False, "error": str(e)}
except FileNotFoundError:
return {"success": False, "error": "文件不存在"}
except Exception as e:
return {"success": False, "error": str(e)}
def write_file(path: str, content: str, encoding: str = "utf-8") -> dict:
"""写入文件内容"""
import aiofiles
import asyncio
async def _write():
async with aiofiles.open(path, "w", encoding=encoding) as f:
await f.write(content)
try:
asyncio.run(_write())
return {"success": True, "path": path}
except PermissionError:
return {"success": False, "error": "权限不足"}
except Exception as e:
return {"success": False, "error": str(e)}
def get_container_mounts() -> list[dict]:
"""获取容器的挂载点信息"""
containers = list_containers(all=True)
mounts = []
for c in containers:
try:
info = _do_request("GET", f"/containers/{c['id']}/json")
mount_info = info.get("Mounts") or []
for m in mount_info:
mounts.append({
"container_id": c["id"],
"container_name": c["name"],
"source": m.get("Source", ""),
"destination": m.get("Destination", ""),
"mode": m.get("Mode", ""),
"rw": m.get("RW", False),
})
except Exception:
continue
return mounts