AWS Security Agent Verification — Code Remediation Quality for Auto-Generated PRs
Table of Contents
Introduction
In Part 1 I verified REST API penetration testing (4/5 vulnerabilities detected), Part 2 examined the impact of providing source code, Part 3 tested GraphQL API detection, and Part 4 explored authentication flow support.
Parts 1–4 focused entirely on detection. AWS Security Agent also has a Code Remediation feature that automatically generates code fixes for pentest findings and creates pull requests in GitHub repositories. This article runs a pentest against the same Flask app from Part 4 and evaluates the quality of auto-generated fixes.
Official docs: Remediate a penetration test finding
Prerequisites and Constraints
Code Remediation has several constraints:
- us-east-1 only — Code Remediation is currently available only in us-east-1
- GitHub integration required — Install the AWS Security Agent GitHub App and connect repositories to the Agent Space. For private repos, PRs are created directly in GitHub. For public repos, fixes are provided as downloadable diff files instead of PRs, to avoid exposing security details publicly
- VPC-to-GitHub access required — The agent's test environment needs internet access (via NAT Gateway) to pull source code from GitHub
Parts 1–4 used ap-northeast-1. This article uses us-east-1.
Test Environment
Same Flask + pyotp app from Part 4 (5 planted vulnerabilities, TOTP 2FA enabled).
- App: Flask + pyotp + SQLite, TOTP 2FA enabled
- GitHub repo:
shinyaz/auth-vuln-app(Private) - Region: us-east-1
- Auth: testuser credentials via Secrets Manager
Full source code (app.py)
Built with Flask + pyotp. Set TOTP_ENABLED=true to enable 2FA. The TOTP secret is JBSWY3DPEHPK3PXP (the RFC 6238 test value, base32 of "Hello!"), shared across all accounts.
from flask import Flask, request, jsonify, session, redirect, render_template_string
import sqlite3, os, pyotp
app = Flask(__name__)
app.secret_key = "insecure-secret-key-for-testing"
DB_PATH = "/tmp/auth_vuln.db"
TOTP_SECRET = "JBSWY3DPEHPK3PXP"
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY, username TEXT UNIQUE,
password TEXT, role TEXT DEFAULT 'user', email TEXT, phone TEXT);
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY, name TEXT, category TEXT, price REAL);
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY, user_id INTEGER,
content TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
DELETE FROM users; DELETE FROM products; DELETE FROM comments;
INSERT INTO users (id, username, password, role, email, phone) VALUES
(1, 'admin', 'admin123', 'admin', 'admin@example.com', '555-0100'),
(2, 'testuser', 'testpass123', 'user', 'test@example.com', '555-0200'),
(3, 'alice', 'alice123', 'user', 'alice@example.com', '555-0300');
INSERT INTO products (id, name, category, price) VALUES
(1, 'Laptop', 'electronics', 999.99), (2, 'Phone', 'electronics', 699.99),
(3, 'Desk', 'furniture', 299.99), (4, 'Chair', 'furniture', 199.99);
INSERT INTO comments (user_id, content) VALUES
(1, 'Welcome to the dashboard!'), (2, 'This is a test comment.');
""")
conn.commit()
conn.close()
LAYOUT_HEAD = """<!DOCTYPE html><html><head><title>Auth Vuln App</title></head><body>
<nav>
<a href="/">Home</a> | <a href="/search">Search</a> |
<a href="/api/products">Products API</a> |
{% if session.get('user_id') %}
<a href="/dashboard">Dashboard</a> |
<a href="/profile/{{ session['user_id'] }}">Profile</a> |
{% if session.get('role') == 'admin' %}<a href="/admin/users">Admin</a> |{% endif %}
<a href="/logout">Logout ({{ session['username'] }})</a>
{% else %}<a href="/login">Login</a>{% endif %}
</nav><hr>"""
LAYOUT_FOOT = "</body></html>"
HOME_PAGE = LAYOUT_HEAD + """
<h1>Auth Vuln App</h1>
<h2>Public Pages</h2>
<ul>
<li><a href="/search">Search Products</a></li>
<li><a href="/api/products">Products API</a></li>
<li><a href="/api/products?category=electronics">Electronics</a></li>
</ul>
<h2>Authenticated Pages</h2>
<ul>
<li><a href="/dashboard">Dashboard</a> (requires login)</li>
<li><a href="/profile/1">User Profile</a> (requires login)</li>
<li><a href="/admin/users">Admin Panel</a> (requires admin)</li>
</ul>""" + LAYOUT_FOOT
LOGIN_PAGE = LAYOUT_HEAD + """
<h1>Login</h1>
{% if error %}<p style="color:red">{{ error }}</p>{% endif %}
<form method="POST" action="/login">
<label>Username: <input type="text" name="username"></label><br>
<label>Password: <input type="password" name="password"></label><br>
{% if totp_required %}
<label>2FA Code: <input type="text" name="totp_code"></label><br>
{% endif %}
<button type="submit">Login</button>
</form>""" + LAYOUT_FOOT
SEARCH_PAGE = LAYOUT_HEAD + """
<h1>Search Products</h1>
<form method="GET" action="/search">
<input type="text" name="q" value="{{ query }}">
<button type="submit">Search</button>
</form>
{% if query %}
<h2>Results for: {{ query | safe }}</h2>
<ul>{% for p in results %}<li>{{ p['name'] }} - ${{ p['price'] }}</li>{% endfor %}</ul>
{% endif %}""" + LAYOUT_FOOT
DASHBOARD_PAGE = LAYOUT_HEAD + """
<h1>Dashboard</h1><p>Welcome, {{ session['username'] }}!</p>
<h2>Comments</h2>
<form method="POST" action="/dashboard/comment">
<textarea name="content" rows="3" cols="40"></textarea><br>
<button type="submit">Post Comment</button>
</form>
<ul>{% for c in comments %}<li>{{ c['content'] | safe }}</li>{% endfor %}</ul>
""" + LAYOUT_FOOT
PROFILE_PAGE = LAYOUT_HEAD + """
<h1>User Profile</h1>
<table>
<tr><td>ID</td><td>{{ user['id'] }}</td></tr>
<tr><td>Username</td><td>{{ user['username'] }}</td></tr>
<tr><td>Email</td><td>{{ user['email'] }}</td></tr>
<tr><td>Phone</td><td>{{ user['phone'] }}</td></tr>
<tr><td>Role</td><td>{{ user['role'] }}</td></tr>
</table>""" + LAYOUT_FOOT
ADMIN_PAGE = LAYOUT_HEAD + """
<h1>Admin - User Management</h1>
<table border="1">
<tr><th>ID</th><th>Username</th><th>Email</th><th>Role</th></tr>
{% for u in users %}
<tr><td>{{ u['id'] }}</td><td><a href="/profile/{{ u['id'] }}">{{ u['username'] }}</a></td>
<td>{{ u['email'] }}</td><td>{{ u['role'] }}</td></tr>
{% endfor %}
</table>""" + LAYOUT_FOOT
@app.route("/")
def index():
return render_template_string(HOME_PAGE)
@app.route("/search")
def search():
query = request.args.get("q", "")
results = []
if query:
conn = get_db()
results = conn.execute("SELECT * FROM products WHERE name LIKE ?", (f"%{query}%",)).fetchall()
conn.close()
return render_template_string(SEARCH_PAGE, query=query, results=results)
@app.route("/api/products")
def api_products():
category = request.args.get("category", "")
conn = get_db()
if category:
query = f"SELECT * FROM products WHERE category = '{category}'"
try:
results = conn.execute(query).fetchall()
except Exception as e:
conn.close()
return jsonify({"error": str(e)}), 500
else:
results = conn.execute("SELECT * FROM products").fetchall()
conn.close()
return jsonify([dict(r) for r in results])
TOTP_ENABLED = os.environ.get("TOTP_ENABLED", "false").lower() == "true"
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "GET":
return render_template_string(LOGIN_PAGE, error=None, totp_required=TOTP_ENABLED)
username = request.form.get("username", "")
password = request.form.get("password", "")
conn = get_db()
user = conn.execute("SELECT * FROM users WHERE username = ? AND password = ?",
(username, password)).fetchone()
conn.close()
if not user:
return render_template_string(LOGIN_PAGE, error="Invalid credentials",
totp_required=TOTP_ENABLED), 401
if TOTP_ENABLED:
totp_code = request.form.get("totp_code", "")
totp = pyotp.TOTP(TOTP_SECRET)
if not totp.verify(totp_code, valid_window=1):
return render_template_string(LOGIN_PAGE, error="Invalid 2FA code",
totp_required=True), 401
session["user_id"] = user["id"]
session["username"] = user["username"]
session["role"] = user["role"]
return redirect("/dashboard")
@app.route("/logout")
def logout():
session.clear()
return redirect("/")
@app.route("/profile/<int:user_id>")
def profile(user_id):
if "user_id" not in session:
return redirect("/login")
conn = get_db()
user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
conn.close()
if not user:
return "User not found", 404
return render_template_string(PROFILE_PAGE, user=dict(user))
@app.route("/dashboard")
def dashboard():
if "user_id" not in session:
return redirect("/login")
conn = get_db()
comments = conn.execute("SELECT * FROM comments ORDER BY created_at DESC").fetchall()
conn.close()
return render_template_string(DASHBOARD_PAGE, comments=comments)
@app.route("/dashboard/comment", methods=["POST"])
def post_comment():
if "user_id" not in session:
return redirect("/login")
content = request.form.get("content", "")
conn = get_db()
conn.execute("INSERT INTO comments (user_id, content) VALUES (?, ?)",
(session["user_id"], content))
conn.commit()
conn.close()
return redirect("/dashboard")
@app.route("/admin/users")
def admin_users():
if "user_id" not in session:
return redirect("/login")
conn = get_db()
users = conn.execute("SELECT * FROM users").fetchall()
conn.close()
return render_template_string(ADMIN_PAGE, users=users)
@app.route("/.well-known/aws/securityagent-domain-verification.json")
def verify():
return jsonify({"token": "<verification-token>"})
if __name__ == "__main__":
init_db()
app.run(host="0.0.0.0", port=5000, debug=False)Setup steps (EC2 + GitHub integration + NAT Gateway)
REGION=us-east-1
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
AGENT_SPACE_ID=<your-agent-space-id>
# 1. Create and push GitHub repo
gh repo create auth-vuln-app --private
cd /tmp && mkdir auth-vuln-app-repo && cd auth-vuln-app-repo
git init
# Place app.py and requirements.txt
git add . && git commit -m "Initial commit"
git remote add origin git@github.com:<your-org>/auth-vuln-app.git
git push -u origin main
# 2. Launch EC2 (public subnet)
AMI_ID=$(aws ec2 describe-images --region $REGION --owners amazon \
--filters "Name=name,Values=al2023-ami-2023*-x86_64" "Name=state,Values=available" \
--query "sort_by(Images, &CreationDate)[-1].ImageId" --output text)
SG_ID=$(aws ec2 create-security-group --region $REGION \
--group-name "remediation-test-sg" --description "Remediation test" \
--vpc-id <your-vpc-id> --query "GroupId" --output text)
aws ec2 authorize-security-group-ingress --region $REGION \
--group-id "$SG_ID" --protocol tcp --port 80 --cidr <vpc-cidr>
# Self-referencing SG rule (agent container → EC2)
aws ec2 authorize-security-group-ingress --region $REGION \
--group-id "$SG_ID" --protocol tcp --port 80 --source-group "$SG_ID"
INSTANCE_ID=$(aws ec2 run-instances --region $REGION \
--image-id "$AMI_ID" --instance-type t3.small \
--subnet-id <your-public-subnet-id> --security-group-ids "$SG_ID" \
--iam-instance-profile Name=<your-ssm-profile> \
--associate-public-ip-address \
--tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=remediation-test-target}]' \
--query "Instances[0].InstanceId" --output text)
# 3. Deploy app via SSM
aws ssm send-command --region $REGION --instance-ids "$INSTANCE_ID" \
--document-name "AWS-RunShellScript" \
--parameters commands='["dnf install -y python3-pip",
"pip3 install flask pyotp gunicorn",
"mkdir -p /opt/app",
"# Deploy app.py via base64",
"cd /opt/app && TOTP_ENABLED=true python3 -c \"from app import init_db; init_db()\"",
"cd /opt/app && TOTP_ENABLED=true gunicorn -w 4 -b 0.0.0.0:80 --timeout 120 -e TOTP_ENABLED=true --daemon app:app"]'
# 4. Create NAT Gateway (for agent container → GitHub access)
EIP_ALLOC=$(aws ec2 allocate-address --region $REGION --domain vpc --query "AllocationId" --output text)
NAT_GW=$(aws ec2 create-nat-gateway --region $REGION \
--subnet-id <your-public-subnet-id> --allocation-id "$EIP_ALLOC" \
--query "NatGateway.NatGatewayId" --output text)
aws ec2 wait nat-gateway-available --region $REGION --nat-gateway-ids "$NAT_GW"
# 5. Create private subnet (for agent container)
PRIVATE_SUBNET=$(aws ec2 create-subnet --region $REGION \
--vpc-id <your-vpc-id> --cidr-block "172.31.128.0/24" \
--availability-zone <your-az> \
--query "Subnet.SubnetId" --output text)
PRIVATE_RT=$(aws ec2 create-route-table --region $REGION \
--vpc-id <your-vpc-id> --query "RouteTable.RouteTableId" --output text)
aws ec2 create-route --region $REGION \
--route-table-id "$PRIVATE_RT" --destination-cidr-block "0.0.0.0/0" \
--nat-gateway-id "$NAT_GW"
aws ec2 associate-route-table --region $REGION \
--route-table-id "$PRIVATE_RT" --subnet-id "$PRIVATE_SUBNET"
# 6. Create and verify Target Domain
PRIVATE_DNS=$(aws ec2 describe-instances --region $REGION \
--instance-ids "$INSTANCE_ID" \
--query "Reservations[0].Instances[0].PrivateDnsName" --output text)
aws securityagent create-target-domain --region $REGION \
--target-domain-name "$PRIVATE_DNS" --verification-method HTTP_ROUTE
# For private VPC domains, UNREACHABLE is the expected verification status
# 7. Secrets Manager
aws secretsmanager create-secret --region $REGION \
--name "security-agent/remediation-test/testuser" \
--secret-string '{"username":"testuser","password":"testpass123","totpSecret":"JBSWY3DPEHPK3PXP"}'
# 8. Update Agent Space (private subnet + secret + target domain)
aws securityagent update-agent-space --region $REGION \
--agent-space-id "$AGENT_SPACE_ID" \
--target-domain-ids <target-domain-id> \
--aws-resources '{
"vpcs": [{"vpcArn":"<vpc-id>","securityGroupArns":["'$SG_ID'"],"subnetArns":["'$PRIVATE_SUBNET'"]}],
"secretArns": ["<secret-arn>"],
"iamRoles": ["<role-arn>"]
}'
# 9. GitHub integration (via AWS Console)
# - Security Agent → Agent Space → Integrations → Connect GitHub
# - Install GitHub App → select repository
# - Penetration test tab → enable Pentest remediation
# 10. Create pentest (with GitHub repo + AUTOMATIC remediation)
aws securityagent create-pentest --region $REGION \
--agent-space-id "$AGENT_SPACE_ID" \
--title "remediation-test-auth" \
--code-remediation-strategy AUTOMATIC \
--assets '{
"endpoints": [{"uri": "http://'"$PRIVATE_DNS"'"}],
"actors": [{"identifier":"testuser","uris":["http://'"$PRIVATE_DNS"'"],
"authentication":{"providerType":"SECRETS_MANAGER","value":"<secret-arn>"},
"description":"Navigate to /login. Enter username and password. Enter TOTP code. Click Login."}],
"integratedRepositories": [{"integrationId":"<integration-id>","providerResourceId":"<github-repo-id>"}]
}' \
--service-role "<role-arn>" \
--vpc-config '{"vpcArn":"<vpc-id>","securityGroupArns":["'$SG_ID'"],"subnetArns":["'$PRIVATE_SUBNET'"]}'
# 11. Start pentest job
aws securityagent start-pentest-job --region $REGION \
--agent-space-id "$AGENT_SPACE_ID" --pentest-id <pentest-id>Setup Issues Encountered
Three issues came up during environment setup:
- DB not initialized — gunicorn doesn't run
init_db(). Must manually runpython3 -c "from app import init_db; init_db()". Without this, all DB-dependent endpoints return 500 and the pentest FAILs - VPC-to-GitHub access — The agent's test environment launches in the subnet configured in the Agent Space. If that subnet can't reach GitHub, source code pull fails with
Unable to reach GitHub repository. I created a private subnet with a NAT Gateway for internet access - Repository must be linked at pentest creation — Specify repos in
create-pentestviaassets.integratedRepositories. Adding them later viaupdate-pentestdoesn't apply to already-running jobs
Results
7 findings detected, all received auto-generated GitHub PRs. With code-remediation-strategy set to AUTOMATIC, PRs were generated incrementally as each finding was confirmed. No need to wait for the pentest job to complete — each PR appeared in the GitHub repository within minutes of its corresponding finding.
| # | Finding | Risk Type | Severity | Confidence | PR |
|---|---|---|---|---|---|
| 1 | SQL Injection in /api/products | SQL_INJECTION | CRITICAL | HIGH | #1 |
| 2 | Reflected XSS in /search | CROSS_SITE_SCRIPTING | MEDIUM | LOW | #2 |
| 3 | Hardcoded Flask Secret Key | SESSION_TOKEN_VULNERABILITIES | CRITICAL | HIGH | #3 |
| 4 | Default Credentials + Plaintext Passwords | DEFAULT_CREDENTIALS | CRITICAL | HIGH | #4 |
| 5 | IDOR on /profile | INSECURE_DIRECT_OBJECT_REFERENCE | MEDIUM | HIGH | #5 |
| 6 | Privilege Escalation /admin/users | PRIVILEGE_ESCALATION | MEDIUM | HIGH | #6 |
| 7 | Stored XSS in Comments | CROSS_SITE_SCRIPTING | MEDIUM | LOW | #7 |
Part 4 (Condition B) detected 6 findings. This run detected 7 — Hardcoded Secret Key (#3) and Default Credentials (#4) were newly detected. The GitHub repository provided source code context, which may have enabled code-level analysis for these findings.
PR Fix Analysis
PR #1: SQL Injection (CRITICAL) — f-string → parameterized query (+4/-4)
query = f"SELECT * FROM products WHERE category = '{category}'"
results = conn.execute(query).fetchall()results = conn.execute(
"SELECT * FROM products WHERE category = ?", (category,)
).fetchall()Replaces f-string concatenation with SQLite parameterized queries (? placeholder). Also changes the exception handler from str(e) to a generic message, preventing error detail leakage. Textbook fix, directly applicable.
Each PR description includes structured information: Pentest ID, Finding ID, CWE number, root cause explanation, fix details, and impact scope. This means code reviewers can assess the fix without separately looking up the finding — the PR is self-contained.
PR #2: Reflected XSS (MEDIUM) — remove |safe filter (+1/-1)
<h2>Results for: {{ query | safe }}</h2><h2>Results for: {{ query }}</h2>Removes the Jinja2 |safe filter, re-enabling default auto-escaping. One-line change that precisely eliminates the vulnerability.
PR #3: Hardcoded Secret Key (CRITICAL) — env var / random generation (+5/-1)
Replaces hardcoded app.secret_key = "insecure-secret-key-for-testing" with os.environ.get("FLASK_SECRET_KEY"), falling back to os.urandom(32) with a warning. Prevents session forgery.
PR #4: Default Credentials + Plaintext (CRITICAL) — comprehensive refactor (+70/-30)
The largest fix. Changes include:
- Plaintext passwords →
werkzeug.security.generate_password_hash - Shared TOTP secret →
pyotp.random_base32()per-user TOTP secrets - Re-seed on every startup → seed only when tables are empty
- Login logic updated to
check_password_hash - Added
totp_secretcolumn to users table
Goes beyond a single vulnerability fix to refactor the entire authentication foundation.
PR #5: IDOR (MEDIUM) — ownership check (+2/-1)
if session["user_id"] != user_id and session.get("role") != "admin":
return "Forbidden", 403Adds authorization check allowing access only to own profile or admin role. The admin exception is a practical consideration.
PR #6: Privilege Escalation (MEDIUM) — role-based authorization (+3/-1)
if session.get("role") != "admin":
return "Forbidden", 403Adds admin role check to /admin/users. Simple but accurate.
PR #7: Stored XSS (MEDIUM) — remove |safe filter (+1/-1)
Same pattern as PR #2. Removes |safe from comment rendering.
Fix Quality Assessment
| # | Vulnerability | Severity | Fix Accuracy | Directly Applicable? | Changes |
|---|---|---|---|---|---|
| 1 | SQL Injection | CRITICAL | ✅ Accurate | ✅ | +4/-4 |
| 2 | Reflected XSS | MEDIUM | ✅ Accurate | ✅ | +1/-1 |
| 3 | Hardcoded Secret Key | CRITICAL | ✅ Accurate | ✅ | +5/-1 |
| 4 | Default Credentials | CRITICAL | ✅ Accurate | ⚠️ Needs testing | +70/-30 |
| 5 | IDOR | MEDIUM | ✅ Accurate | ✅ | +2/-1 |
| 6 | Privilege Escalation | MEDIUM | ✅ Accurate | ✅ | +3/-1 |
| 7 | Stored XSS | MEDIUM | ✅ Accurate | ✅ | +1/-1 |
All 7 fixes were technically accurate. Only PR #4 (Default Credentials, +70/-30) includes schema changes requiring testing before application, but the fix direction is sound.
Summary
- All findings received auto-generated PRs — 7 vulnerabilities, 7 PRs, all technically accurate. Fixes ranged from one-line XSS
|saferemoval to a 70-line authentication refactor, scaling with vulnerability complexity - Fixes extend detection naturally — PR descriptions include finding title, CWE number, root cause, fix details, and impact scope — everything needed for code review. More efficient than reading findings and fixing manually
- NAT Gateway required — The agent's test environment runs in a private subnet, requiring a NAT Gateway for GitHub access. This adds cost (NAT Gateway hourly + data transfer)
- us-east-1 limitation is a practical constraint — If your Agent Space is in another region, you need to rebuild the environment in us-east-1 just for Remediation
Cleanup
Resource deletion
REGION=us-east-1
# EC2
aws ec2 terminate-instances --region $REGION --instance-ids <instance-id>
# NAT Gateway + EIP
aws ec2 delete-nat-gateway --region $REGION --nat-gateway-id <nat-gw-id>
# Wait for NAT Gateway deletion before releasing EIP
aws ec2 release-address --region $REGION --allocation-id <eip-alloc-id>
# Private subnet + route table
aws ec2 delete-subnet --region $REGION --subnet-id <private-subnet-id>
aws ec2 delete-route-table --region $REGION --route-table-id <private-rt-id>
# Security Group
aws ec2 delete-security-group --region $REGION --group-id <sg-id>
# Secrets Manager
aws secretsmanager delete-secret --region $REGION \
--secret-id <secret-name> --force-delete-without-recovery
# Target Domain
aws securityagent delete-target-domain --region $REGION \
--target-domain-id <target-domain-id>
# GitHub repo
gh repo delete <your-org>/auth-vuln-app --yes