AIO Sandbox 提供了全面的 REST API 用于文件系统操作,支持在整个沙盒环境中进行程序化文件管理。
所有文件操作都可通过 /v1/file/* 的 RESTful 端点访问。
读取文件内容,可选择指定行范围:
file_res = client.file.read_file(
file="/home/gem/.bashrc",
start_line=0,
end_line=10,
sudo=False,
)响应:
{
"success": true,
"message": "File read successfully",
"data": {
"content": "File contents here...",
"line_count": 42,
"file": "/path/to/file.txt"
}
}使用各种选项将内容写入文件:
curl -X POST http://localhost:8080/v1/file/write \
-H "Content-Type: application/json" \
-d '{
"file": "/path/to/output.txt",
"content": "Hello, World!",
"append": false,
"leading_newline": false,
"trailing_newline": true,
"sudo": false
}'响应:
{
"success": true,
"message": "File written successfully",
"data": {
"file": "/path/to/output.txt",
"bytes_written": 13
}
}在文件中查找和替换文本:
curl -X POST http://localhost:8080/v1/file/replace \
-H "Content-Type: application/json" \
-d '{
"file": "/path/to/file.txt",
"old_str": "old text",
"new_str": "new text",
"sudo": false
}'响应:
{
"success": true,
"message": "Replacement completed, replaced 3 occurrences",
"data": {
"file": "/path/to/file.txt",
"replaced_count": 3
}
}使用正则表达式搜索文件内容:
curl -X POST http://localhost:8080/v1/file/search \
-H "Content-Type: application/json" \
-d '{
"file": "/path/to/file.txt",
"regex": "function\\s+\\w+",
"sudo": false
}'响应:
{
"success": true,
"message": "Search completed, found 5 matches",
"data": {
"file": "/path/to/file.txt",
"matches": [
{
"line_number": 10,
"line": "function myFunction() {",
"match": "function myFunction"
}
]
}
}使用 glob 模式搜索文件:
curl -X POST http://localhost:8080/v1/file/find \
-H "Content-Type: application/json" \
-d '{
"path": "/home/gem",
"glob": "*.js"
}'响应:
{
"success": true,
"message": "Search completed, found 12 files",
"data": {
"files": [
"/home/gem/app.js",
"/home/gem/config.js",
"/home/gem/utils.js"
]
}
}import requests
import json
class SandboxFileAPI:
def __init__(self, base_url="http://localhost:8080"):
self.base_url = base_url
def read_file(self, file_path, start_line=None, end_line=None):
payload = {"file": file_path}
if start_line is not None:
payload["start_line"] = start_line
if end_line is not None:
payload["end_line"] = end_line
response = requests.post(
f"{self.base_url}/v1/file/read",
json=payload
)
return response.json()
def write_file(self, file_path, content, append=False):
payload = {
"file": file_path,
"content": content,
"append": append
}
response = requests.post(
f"{self.base_url}/v1/file/write",
json=payload
)
return response.json()
def search_files(self, pattern, directory="/"):
payload = {
"path": directory,
"glob": pattern
}
response = requests.post(
f"{self.base_url}/v1/file/find",
json=payload
)
return response.json()
# 使用示例
api = SandboxFileAPI()
# 读取配置
config = api.read_file("/app/config.json")
print(config["data"]["content"])
# 写入日志条目
api.write_file("/var/log/app.log", "Process started\n", append=True)
# 查找 Python 文件
files = api.search_files("*.py", "/app")
for file_path in files["data"]["files"]:
print(f"Found: {file_path}")class SandboxFileAPI {
constructor(baseUrl = 'http://localhost:8080') {
this.baseUrl = baseUrl;
}
async readFile(filePath, options = {}) {
const payload = { file: filePath, ...options };
const response = await fetch(`${this.baseUrl}/v1/file/read`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
return response.json();
}
async writeFile(filePath, content, options = {}) {
const payload = { file: filePath, content, ...options };
const response = await fetch(`${this.baseUrl}/v1/file/write`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
return response.json();
}
async replaceInFile(filePath, oldStr, newStr) {
const payload = { file: filePath, old_str: oldStr, new_str: newStr };
const response = await fetch(`${this.baseUrl}/v1/file/replace`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload)
});
return response.json();
}
}
// 使用示例
const api = new SandboxFileAPI();
// 读取和处理文件
const result = await api.readFile('/app/data.txt');
if (result.success) {
console.log('File content:', result.data.content);
// 更新配置
await api.replaceInFile('/app/config.json', '"debug": false', '"debug": true');
}文件在所有沙盒组件之间共享:
# 通过 API 创建文件
curl -X POST http://localhost:8080/v1/file/write \
-d '{"file": "/tmp/shared.txt", "content": "Shared content"}'
# 在终端中访问
# ws://localhost:8080/v1/shell/ws
# > cat /tmp/shared.txt
# Shared content
# 在 Code Server 中编辑
# http://localhost:8080/code-server/
# 打开 /tmp/shared.txt
# 在浏览器中处理
# 通过 VNC 下载或查看完整的文件处理工作流:
// 文件出现在 /home/gem/Downloads/# 读取下载的文件
content = client.file.read_file(file="/home/gem/Downloads/data.csv").data.content
# 处理并保存结果
processed = process_csv(content)
client.file.write_file(file="/tmp/results.json", content=json.dumps(processed))# 运行分析脚本
python /app/analyze.py /tmp/results.json
# 生成报告
pandoc /tmp/results.json -o /tmp/report.pdf# 在 VSCode 中打开结果进行优化
# /tmp/report.pdf 可供预览高效处理多个文件:
def batch_process_files(api, directory, pattern):
# 查找所有匹配的文件
files_result = api.search_files(pattern, directory)
for file_path in files_result["data"]["files"]:
# 读取每个文件
content_result = api.read_file(file_path)
if content_result["success"]:
content = content_result["data"]["content"]
# 处理内容
processed = content.upper()
# 写回处理后的内容
output_path = file_path.replace(".txt", "_processed.txt")
api.write_file(output_path, processed)
# 处理目录中的所有文本文件
batch_process_files(api, "/app/data", "*.txt")文件操作的稳健错误处理:
def safe_file_operation(api, operation, **kwargs):
try:
result = operation(**kwargs)
if result["success"]:
return result["data"]
else:
print(f"Operation failed: {result['message']}")
return None
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
return None
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
return None
# 安全文件读取
content = safe_file_operation(
api,
api.read_file,
file_path="/path/to/file.txt"
)处理文件权限和 sudo 操作:
# 常规文件操作
result = api.read_file("/home/user/file.txt")
# 系统文件的 sudo 操作
result = api.read_file("/etc/nginx/nginx.conf", sudo=True)
# 写入受保护的位置
api.write_file(
"/etc/cron.d/backup",
"0 2 * * * root /backup.sh",
sudo=True
)import os
import re
def secure_file_operation(file_path, base_directory="/home/user"):
# 规范化路径
normalized = os.path.normpath(file_path)
# 检查路径遍历
if ".." in normalized or normalized.startswith("/"):
if not normalized.startswith(base_directory):
raise ValueError("Path traversal detected")
# 验证文件名
if not re.match(r'^[a-zA-Z0-9._/-]+$', normalized):
raise ValueError("Invalid characters in filename")
return normalized
# 安全使用
try:
safe_path = secure_file_operation("../../../etc/passwd")
except ValueError as e:
print(f"Security violation: {e}")# 分块读取大文件
def read_large_file(api, file_path, chunk_size=1000):
total_lines = 0
content_parts = []
while True:
result = api.read_file(
file_path,
start_line=total_lines,
end_line=total_lines + chunk_size
)
if not result["success"] or not result["data"]["content"]:
break
content_parts.append(result["data"]["content"])
total_lines += chunk_size
# 防止无限循环
if len(result["data"]["content"].splitlines()) < chunk_size:
break
return "\n".join(content_parts)import asyncio
import aiohttp
async def parallel_file_operations(files):
async with aiohttp.ClientSession() as session:
tasks = []
for file_path in files:
task = read_file_async(session, file_path)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def read_file_async(session, file_path):
payload = {"file": file_path}
async with session.post(
"http://localhost:8080/v1/file/read",
json=payload
) as response:
return await response.json()准备集成文件操作?查看我们的 API 参考 获取完整的端点文档。