@shinyaz

AWS Security Agent Verification — Code Remediation Quality for Auto-Generated PRs

Table of Contents

Introduction

In Part 1 I verified REST API penetration testing (4/5 vulnerabilities detected), Part 2 examined the impact of providing source code, Part 3 tested GraphQL API detection, and Part 4 explored authentication flow support.

Parts 1–4 focused entirely on detection. AWS Security Agent also has a Code Remediation feature that automatically generates code fixes for pentest findings and creates pull requests in GitHub repositories. This article runs a pentest against the same Flask app from Part 4 and evaluates the quality of auto-generated fixes.

Official docs: Remediate a penetration test finding

Prerequisites and Constraints

Code Remediation has several constraints:

  • us-east-1 only — Code Remediation is currently available only in us-east-1
  • GitHub integration required — Install the AWS Security Agent GitHub App and connect repositories to the Agent Space. For private repos, PRs are created directly in GitHub. For public repos, fixes are provided as downloadable diff files instead of PRs, to avoid exposing security details publicly
  • VPC-to-GitHub access required — The agent's test environment needs internet access (via NAT Gateway) to pull source code from GitHub

Parts 1–4 used ap-northeast-1. This article uses us-east-1.

Test Environment

Same Flask + pyotp app from Part 4 (5 planted vulnerabilities, TOTP 2FA enabled).

  • App: Flask + pyotp + SQLite, TOTP 2FA enabled
  • GitHub repo: shinyaz/auth-vuln-app (Private)
  • Region: us-east-1
  • Auth: testuser credentials via Secrets Manager
Full source code (app.py)

Built with Flask + pyotp. Set TOTP_ENABLED=true to enable 2FA. The TOTP secret is JBSWY3DPEHPK3PXP (the RFC 6238 test value, base32 of "Hello!"), shared across all accounts.

app.py
from flask import Flask, request, jsonify, session, redirect, render_template_string
import sqlite3, os, pyotp
 
app = Flask(__name__)
app.secret_key = "insecure-secret-key-for-testing"
DB_PATH = "/tmp/auth_vuln.db"
TOTP_SECRET = "JBSWY3DPEHPK3PXP"
 
def get_db():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn
 
def init_db():
    conn = get_db()
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY, username TEXT UNIQUE,
            password TEXT, role TEXT DEFAULT 'user', email TEXT, phone TEXT);
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY, name TEXT, category TEXT, price REAL);
        CREATE TABLE IF NOT EXISTS comments (
            id INTEGER PRIMARY KEY, user_id INTEGER,
            content TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
        DELETE FROM users; DELETE FROM products; DELETE FROM comments;
        INSERT INTO users (id, username, password, role, email, phone) VALUES
            (1, 'admin', 'admin123', 'admin', 'admin@example.com', '555-0100'),
            (2, 'testuser', 'testpass123', 'user', 'test@example.com', '555-0200'),
            (3, 'alice', 'alice123', 'user', 'alice@example.com', '555-0300');
        INSERT INTO products (id, name, category, price) VALUES
            (1, 'Laptop', 'electronics', 999.99), (2, 'Phone', 'electronics', 699.99),
            (3, 'Desk', 'furniture', 299.99), (4, 'Chair', 'furniture', 199.99);
        INSERT INTO comments (user_id, content) VALUES
            (1, 'Welcome to the dashboard!'), (2, 'This is a test comment.');
    """)
    conn.commit()
    conn.close()
 
LAYOUT_HEAD = """<!DOCTYPE html><html><head><title>Auth Vuln App</title></head><body>
<nav>
  <a href="/">Home</a> | <a href="/search">Search</a> |
  <a href="/api/products">Products API</a> |
  {% if session.get('user_id') %}
    <a href="/dashboard">Dashboard</a> |
    <a href="/profile/{{ session['user_id'] }}">Profile</a> |
    {% if session.get('role') == 'admin' %}<a href="/admin/users">Admin</a> |{% endif %}
    <a href="/logout">Logout ({{ session['username'] }})</a>
  {% else %}<a href="/login">Login</a>{% endif %}
</nav><hr>"""
LAYOUT_FOOT = "</body></html>"
 
HOME_PAGE = LAYOUT_HEAD + """
<h1>Auth Vuln App</h1>
<h2>Public Pages</h2>
<ul>
  <li><a href="/search">Search Products</a></li>
  <li><a href="/api/products">Products API</a></li>
  <li><a href="/api/products?category=electronics">Electronics</a></li>
</ul>
<h2>Authenticated Pages</h2>
<ul>
  <li><a href="/dashboard">Dashboard</a> (requires login)</li>
  <li><a href="/profile/1">User Profile</a> (requires login)</li>
  <li><a href="/admin/users">Admin Panel</a> (requires admin)</li>
</ul>""" + LAYOUT_FOOT
 
LOGIN_PAGE = LAYOUT_HEAD + """
<h1>Login</h1>
{% if error %}<p style="color:red">{{ error }}</p>{% endif %}
<form method="POST" action="/login">
  <label>Username: <input type="text" name="username"></label><br>
  <label>Password: <input type="password" name="password"></label><br>
  {% if totp_required %}
    <label>2FA Code: <input type="text" name="totp_code"></label><br>
  {% endif %}
  <button type="submit">Login</button>
</form>""" + LAYOUT_FOOT
 
SEARCH_PAGE = LAYOUT_HEAD + """
<h1>Search Products</h1>
<form method="GET" action="/search">
  <input type="text" name="q" value="{{ query }}">
  <button type="submit">Search</button>
</form>
{% if query %}
  <h2>Results for: {{ query | safe }}</h2>
  <ul>{% for p in results %}<li>{{ p['name'] }} - ${{ p['price'] }}</li>{% endfor %}</ul>
{% endif %}""" + LAYOUT_FOOT
 
DASHBOARD_PAGE = LAYOUT_HEAD + """
<h1>Dashboard</h1><p>Welcome, {{ session['username'] }}!</p>
<h2>Comments</h2>
<form method="POST" action="/dashboard/comment">
  <textarea name="content" rows="3" cols="40"></textarea><br>
  <button type="submit">Post Comment</button>
</form>
<ul>{% for c in comments %}<li>{{ c['content'] | safe }}</li>{% endfor %}</ul>
""" + LAYOUT_FOOT
 
PROFILE_PAGE = LAYOUT_HEAD + """
<h1>User Profile</h1>
<table>
  <tr><td>ID</td><td>{{ user['id'] }}</td></tr>
  <tr><td>Username</td><td>{{ user['username'] }}</td></tr>
  <tr><td>Email</td><td>{{ user['email'] }}</td></tr>
  <tr><td>Phone</td><td>{{ user['phone'] }}</td></tr>
  <tr><td>Role</td><td>{{ user['role'] }}</td></tr>
</table>""" + LAYOUT_FOOT
 
ADMIN_PAGE = LAYOUT_HEAD + """
<h1>Admin - User Management</h1>
<table border="1">
  <tr><th>ID</th><th>Username</th><th>Email</th><th>Role</th></tr>
  {% for u in users %}
  <tr><td>{{ u['id'] }}</td><td><a href="/profile/{{ u['id'] }}">{{ u['username'] }}</a></td>
      <td>{{ u['email'] }}</td><td>{{ u['role'] }}</td></tr>
  {% endfor %}
</table>""" + LAYOUT_FOOT
 
@app.route("/")
def index():
    return render_template_string(HOME_PAGE)
 
@app.route("/search")
def search():
    query = request.args.get("q", "")
    results = []
    if query:
        conn = get_db()
        results = conn.execute("SELECT * FROM products WHERE name LIKE ?", (f"%{query}%",)).fetchall()
        conn.close()
    return render_template_string(SEARCH_PAGE, query=query, results=results)
 
@app.route("/api/products")
def api_products():
    category = request.args.get("category", "")
    conn = get_db()
    if category:
        query = f"SELECT * FROM products WHERE category = '{category}'"
        try:
            results = conn.execute(query).fetchall()
        except Exception as e:
            conn.close()
            return jsonify({"error": str(e)}), 500
    else:
        results = conn.execute("SELECT * FROM products").fetchall()
    conn.close()
    return jsonify([dict(r) for r in results])
 
TOTP_ENABLED = os.environ.get("TOTP_ENABLED", "false").lower() == "true"
 
@app.route("/login", methods=["GET", "POST"])
def login():
    if request.method == "GET":
        return render_template_string(LOGIN_PAGE, error=None, totp_required=TOTP_ENABLED)
    username = request.form.get("username", "")
    password = request.form.get("password", "")
    conn = get_db()
    user = conn.execute("SELECT * FROM users WHERE username = ? AND password = ?",
                        (username, password)).fetchone()
    conn.close()
    if not user:
        return render_template_string(LOGIN_PAGE, error="Invalid credentials",
                                      totp_required=TOTP_ENABLED), 401
    if TOTP_ENABLED:
        totp_code = request.form.get("totp_code", "")
        totp = pyotp.TOTP(TOTP_SECRET)
        if not totp.verify(totp_code, valid_window=1):
            return render_template_string(LOGIN_PAGE, error="Invalid 2FA code",
                                          totp_required=True), 401
    session["user_id"] = user["id"]
    session["username"] = user["username"]
    session["role"] = user["role"]
    return redirect("/dashboard")
 
@app.route("/logout")
def logout():
    session.clear()
    return redirect("/")
 
@app.route("/profile/<int:user_id>")
def profile(user_id):
    if "user_id" not in session:
        return redirect("/login")
    conn = get_db()
    user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
    conn.close()
    if not user:
        return "User not found", 404
    return render_template_string(PROFILE_PAGE, user=dict(user))
 
@app.route("/dashboard")
def dashboard():
    if "user_id" not in session:
        return redirect("/login")
    conn = get_db()
    comments = conn.execute("SELECT * FROM comments ORDER BY created_at DESC").fetchall()
    conn.close()
    return render_template_string(DASHBOARD_PAGE, comments=comments)
 
@app.route("/dashboard/comment", methods=["POST"])
def post_comment():
    if "user_id" not in session:
        return redirect("/login")
    content = request.form.get("content", "")
    conn = get_db()
    conn.execute("INSERT INTO comments (user_id, content) VALUES (?, ?)",
                 (session["user_id"], content))
    conn.commit()
    conn.close()
    return redirect("/dashboard")
 
@app.route("/admin/users")
def admin_users():
    if "user_id" not in session:
        return redirect("/login")
    conn = get_db()
    users = conn.execute("SELECT * FROM users").fetchall()
    conn.close()
    return render_template_string(ADMIN_PAGE, users=users)
 
@app.route("/.well-known/aws/securityagent-domain-verification.json")
def verify():
    return jsonify({"token": "<verification-token>"})
 
if __name__ == "__main__":
    init_db()
    app.run(host="0.0.0.0", port=5000, debug=False)
Setup steps (EC2 + GitHub integration + NAT Gateway)
Terminal
REGION=us-east-1
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
AGENT_SPACE_ID=<your-agent-space-id>
 
# 1. Create and push GitHub repo
gh repo create auth-vuln-app --private
cd /tmp && mkdir auth-vuln-app-repo && cd auth-vuln-app-repo
git init
# Place app.py and requirements.txt
git add . && git commit -m "Initial commit"
git remote add origin git@github.com:<your-org>/auth-vuln-app.git
git push -u origin main
 
# 2. Launch EC2 (public subnet)
AMI_ID=$(aws ec2 describe-images --region $REGION --owners amazon \
  --filters "Name=name,Values=al2023-ami-2023*-x86_64" "Name=state,Values=available" \
  --query "sort_by(Images, &CreationDate)[-1].ImageId" --output text)
 
SG_ID=$(aws ec2 create-security-group --region $REGION \
  --group-name "remediation-test-sg" --description "Remediation test" \
  --vpc-id <your-vpc-id> --query "GroupId" --output text)
 
aws ec2 authorize-security-group-ingress --region $REGION \
  --group-id "$SG_ID" --protocol tcp --port 80 --cidr <vpc-cidr>
 
# Self-referencing SG rule (agent container → EC2)
aws ec2 authorize-security-group-ingress --region $REGION \
  --group-id "$SG_ID" --protocol tcp --port 80 --source-group "$SG_ID"
 
INSTANCE_ID=$(aws ec2 run-instances --region $REGION \
  --image-id "$AMI_ID" --instance-type t3.small \
  --subnet-id <your-public-subnet-id> --security-group-ids "$SG_ID" \
  --iam-instance-profile Name=<your-ssm-profile> \
  --associate-public-ip-address \
  --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=remediation-test-target}]' \
  --query "Instances[0].InstanceId" --output text)
 
# 3. Deploy app via SSM
aws ssm send-command --region $REGION --instance-ids "$INSTANCE_ID" \
  --document-name "AWS-RunShellScript" \
  --parameters commands='["dnf install -y python3-pip",
    "pip3 install flask pyotp gunicorn",
    "mkdir -p /opt/app",
    "# Deploy app.py via base64",
    "cd /opt/app && TOTP_ENABLED=true python3 -c \"from app import init_db; init_db()\"",
    "cd /opt/app && TOTP_ENABLED=true gunicorn -w 4 -b 0.0.0.0:80 --timeout 120 -e TOTP_ENABLED=true --daemon app:app"]'
 
# 4. Create NAT Gateway (for agent container → GitHub access)
EIP_ALLOC=$(aws ec2 allocate-address --region $REGION --domain vpc --query "AllocationId" --output text)
NAT_GW=$(aws ec2 create-nat-gateway --region $REGION \
  --subnet-id <your-public-subnet-id> --allocation-id "$EIP_ALLOC" \
  --query "NatGateway.NatGatewayId" --output text)
aws ec2 wait nat-gateway-available --region $REGION --nat-gateway-ids "$NAT_GW"
 
# 5. Create private subnet (for agent container)
PRIVATE_SUBNET=$(aws ec2 create-subnet --region $REGION \
  --vpc-id <your-vpc-id> --cidr-block "172.31.128.0/24" \
  --availability-zone <your-az> \
  --query "Subnet.SubnetId" --output text)
 
PRIVATE_RT=$(aws ec2 create-route-table --region $REGION \
  --vpc-id <your-vpc-id> --query "RouteTable.RouteTableId" --output text)
aws ec2 create-route --region $REGION \
  --route-table-id "$PRIVATE_RT" --destination-cidr-block "0.0.0.0/0" \
  --nat-gateway-id "$NAT_GW"
aws ec2 associate-route-table --region $REGION \
  --route-table-id "$PRIVATE_RT" --subnet-id "$PRIVATE_SUBNET"
 
# 6. Create and verify Target Domain
PRIVATE_DNS=$(aws ec2 describe-instances --region $REGION \
  --instance-ids "$INSTANCE_ID" \
  --query "Reservations[0].Instances[0].PrivateDnsName" --output text)
aws securityagent create-target-domain --region $REGION \
  --target-domain-name "$PRIVATE_DNS" --verification-method HTTP_ROUTE
# For private VPC domains, UNREACHABLE is the expected verification status
 
# 7. Secrets Manager
aws secretsmanager create-secret --region $REGION \
  --name "security-agent/remediation-test/testuser" \
  --secret-string '{"username":"testuser","password":"testpass123","totpSecret":"JBSWY3DPEHPK3PXP"}'
 
# 8. Update Agent Space (private subnet + secret + target domain)
aws securityagent update-agent-space --region $REGION \
  --agent-space-id "$AGENT_SPACE_ID" \
  --target-domain-ids <target-domain-id> \
  --aws-resources '{
    "vpcs": [{"vpcArn":"<vpc-id>","securityGroupArns":["'$SG_ID'"],"subnetArns":["'$PRIVATE_SUBNET'"]}],
    "secretArns": ["<secret-arn>"],
    "iamRoles": ["<role-arn>"]
  }'
 
# 9. GitHub integration (via AWS Console)
# - Security Agent → Agent Space → Integrations → Connect GitHub
# - Install GitHub App → select repository
# - Penetration test tab → enable Pentest remediation
 
# 10. Create pentest (with GitHub repo + AUTOMATIC remediation)
aws securityagent create-pentest --region $REGION \
  --agent-space-id "$AGENT_SPACE_ID" \
  --title "remediation-test-auth" \
  --code-remediation-strategy AUTOMATIC \
  --assets '{
    "endpoints": [{"uri": "http://'"$PRIVATE_DNS"'"}],
    "actors": [{"identifier":"testuser","uris":["http://'"$PRIVATE_DNS"'"],
      "authentication":{"providerType":"SECRETS_MANAGER","value":"<secret-arn>"},
      "description":"Navigate to /login. Enter username and password. Enter TOTP code. Click Login."}],
    "integratedRepositories": [{"integrationId":"<integration-id>","providerResourceId":"<github-repo-id>"}]
  }' \
  --service-role "<role-arn>" \
  --vpc-config '{"vpcArn":"<vpc-id>","securityGroupArns":["'$SG_ID'"],"subnetArns":["'$PRIVATE_SUBNET'"]}'
 
# 11. Start pentest job
aws securityagent start-pentest-job --region $REGION \
  --agent-space-id "$AGENT_SPACE_ID" --pentest-id <pentest-id>

Setup Issues Encountered

Three issues came up during environment setup:

  1. DB not initialized — gunicorn doesn't run init_db(). Must manually run python3 -c "from app import init_db; init_db()". Without this, all DB-dependent endpoints return 500 and the pentest FAILs
  2. VPC-to-GitHub access — The agent's test environment launches in the subnet configured in the Agent Space. If that subnet can't reach GitHub, source code pull fails with Unable to reach GitHub repository. I created a private subnet with a NAT Gateway for internet access
  3. Repository must be linked at pentest creation — Specify repos in create-pentest via assets.integratedRepositories. Adding them later via update-pentest doesn't apply to already-running jobs

Results

7 findings detected, all received auto-generated GitHub PRs. With code-remediation-strategy set to AUTOMATIC, PRs were generated incrementally as each finding was confirmed. No need to wait for the pentest job to complete — each PR appeared in the GitHub repository within minutes of its corresponding finding.

#FindingRisk TypeSeverityConfidencePR
1SQL Injection in /api/productsSQL_INJECTIONCRITICALHIGH#1
2Reflected XSS in /searchCROSS_SITE_SCRIPTINGMEDIUMLOW#2
3Hardcoded Flask Secret KeySESSION_TOKEN_VULNERABILITIESCRITICALHIGH#3
4Default Credentials + Plaintext PasswordsDEFAULT_CREDENTIALSCRITICALHIGH#4
5IDOR on /profileINSECURE_DIRECT_OBJECT_REFERENCEMEDIUMHIGH#5
6Privilege Escalation /admin/usersPRIVILEGE_ESCALATIONMEDIUMHIGH#6
7Stored XSS in CommentsCROSS_SITE_SCRIPTINGMEDIUMLOW#7

Part 4 (Condition B) detected 6 findings. This run detected 7 — Hardcoded Secret Key (#3) and Default Credentials (#4) were newly detected. The GitHub repository provided source code context, which may have enabled code-level analysis for these findings.

PR Fix Analysis

PR #1: SQL Injection (CRITICAL) — f-string → parameterized query (+4/-4)

Before
query = f"SELECT * FROM products WHERE category = '{category}'"
results = conn.execute(query).fetchall()
After
results = conn.execute(
    "SELECT * FROM products WHERE category = ?", (category,)
).fetchall()

Replaces f-string concatenation with SQLite parameterized queries (? placeholder). Also changes the exception handler from str(e) to a generic message, preventing error detail leakage. Textbook fix, directly applicable.

Each PR description includes structured information: Pentest ID, Finding ID, CWE number, root cause explanation, fix details, and impact scope. This means code reviewers can assess the fix without separately looking up the finding — the PR is self-contained.

PR #2: Reflected XSS (MEDIUM) — remove |safe filter (+1/-1)

Before
<h2>Results for: {{ query | safe }}</h2>
After
<h2>Results for: {{ query }}</h2>

Removes the Jinja2 |safe filter, re-enabling default auto-escaping. One-line change that precisely eliminates the vulnerability.

PR #3: Hardcoded Secret Key (CRITICAL) — env var / random generation (+5/-1)

Replaces hardcoded app.secret_key = "insecure-secret-key-for-testing" with os.environ.get("FLASK_SECRET_KEY"), falling back to os.urandom(32) with a warning. Prevents session forgery.

PR #4: Default Credentials + Plaintext (CRITICAL) — comprehensive refactor (+70/-30)

The largest fix. Changes include:

  • Plaintext passwords → werkzeug.security.generate_password_hash
  • Shared TOTP secret → pyotp.random_base32() per-user TOTP secrets
  • Re-seed on every startup → seed only when tables are empty
  • Login logic updated to check_password_hash
  • Added totp_secret column to users table

Goes beyond a single vulnerability fix to refactor the entire authentication foundation.

PR #5: IDOR (MEDIUM) — ownership check (+2/-1)

After (added)
if session["user_id"] != user_id and session.get("role") != "admin":
    return "Forbidden", 403

Adds authorization check allowing access only to own profile or admin role. The admin exception is a practical consideration.

PR #6: Privilege Escalation (MEDIUM) — role-based authorization (+3/-1)

After (added)
if session.get("role") != "admin":
    return "Forbidden", 403

Adds admin role check to /admin/users. Simple but accurate.

PR #7: Stored XSS (MEDIUM) — remove |safe filter (+1/-1)

Same pattern as PR #2. Removes |safe from comment rendering.

Fix Quality Assessment

#VulnerabilitySeverityFix AccuracyDirectly Applicable?Changes
1SQL InjectionCRITICAL✅ Accurate+4/-4
2Reflected XSSMEDIUM✅ Accurate+1/-1
3Hardcoded Secret KeyCRITICAL✅ Accurate+5/-1
4Default CredentialsCRITICAL✅ Accurate⚠️ Needs testing+70/-30
5IDORMEDIUM✅ Accurate+2/-1
6Privilege EscalationMEDIUM✅ Accurate+3/-1
7Stored XSSMEDIUM✅ Accurate+1/-1

All 7 fixes were technically accurate. Only PR #4 (Default Credentials, +70/-30) includes schema changes requiring testing before application, but the fix direction is sound.

Summary

  • All findings received auto-generated PRs — 7 vulnerabilities, 7 PRs, all technically accurate. Fixes ranged from one-line XSS |safe removal to a 70-line authentication refactor, scaling with vulnerability complexity
  • Fixes extend detection naturally — PR descriptions include finding title, CWE number, root cause, fix details, and impact scope — everything needed for code review. More efficient than reading findings and fixing manually
  • NAT Gateway required — The agent's test environment runs in a private subnet, requiring a NAT Gateway for GitHub access. This adds cost (NAT Gateway hourly + data transfer)
  • us-east-1 limitation is a practical constraint — If your Agent Space is in another region, you need to rebuild the environment in us-east-1 just for Remediation

Cleanup

Resource deletion
Terminal
REGION=us-east-1
 
# EC2
aws ec2 terminate-instances --region $REGION --instance-ids <instance-id>
 
# NAT Gateway + EIP
aws ec2 delete-nat-gateway --region $REGION --nat-gateway-id <nat-gw-id>
# Wait for NAT Gateway deletion before releasing EIP
aws ec2 release-address --region $REGION --allocation-id <eip-alloc-id>
 
# Private subnet + route table
aws ec2 delete-subnet --region $REGION --subnet-id <private-subnet-id>
aws ec2 delete-route-table --region $REGION --route-table-id <private-rt-id>
 
# Security Group
aws ec2 delete-security-group --region $REGION --group-id <sg-id>
 
# Secrets Manager
aws secretsmanager delete-secret --region $REGION \
  --secret-id <secret-name> --force-delete-without-recovery
 
# Target Domain
aws securityagent delete-target-domain --region $REGION \
  --target-domain-id <target-domain-id>
 
# GitHub repo
gh repo delete <your-org>/auth-vuln-app --yes

Share this post

Shinya Tahara

Shinya Tahara

Solutions Architect @ AWS

I'm a Solutions Architect at AWS, providing technical guidance primarily to financial industry customers. I share learnings about cloud architecture and AI/ML on this site.The views and opinions expressed on this site are my own and do not represent the official positions of my employer.

Related Posts