@shinyaz

Automate IAM auth and OCC retry with the Aurora DSQL Rust connector

Table of Contents

Introduction

On March 31, 2026, AWS announced new connectors for Aurora DSQL targeting Rust (SQLx) and .NET (Npgsql).

Previously, connecting to Aurora DSQL from Rust required manually generating IAM tokens with aws-sdk-dsql, configuring SSL, and managing a background token refresh task. The aws-samples reference code shows roughly 50 lines just for connection setup.

Rust (without connector — connection excerpt)
let sdk_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let signer = AuthTokenGenerator::new(
    Config::builder()
        .hostname(&cluster_endpoint)
        .region(Region::new(region))
        .expires_in(TOKEN_EXPIRATION_SECONDS)
        .build()
        .unwrap(),
);
let password_token = generate_password_token(&cluster_user, &signer, &sdk_config).await;
let connection_options = PgConnectOptions::new()
    .host(&cluster_endpoint)
    .port(5432)
    .database("postgres")
    .username(&cluster_user)
    .password(password_token.as_str())
    .ssl_mode(sqlx::postgres::PgSslMode::VerifyFull);
let pool = PgPoolOptions::new()
    .max_connections(10)
    .connect_with(connection_options.clone())
    .await?;
// + a separate tokio::spawn for token refresh

With the connector, this becomes a single line.

Rust (with connector)
let pool = aurora_dsql_sqlx_connector::pool::connect(
    "postgres://admin@<cluster>.dsql.us-east-1.on.aws/postgres"
).await?;

IAM token generation, SSL configuration, and background token refresh (auto-rotated at 80% of the token lifetime) are all handled internally. The connector also provides OCC (optimistic concurrency control) retry helpers specific to Aurora DSQL.

This article verifies the connector's connection pooling, CRUD operations, and OCC retry behavior hands-on. For official documentation, see Connectors for Aurora DSQL and the connector README on GitHub.

Prerequisites:

  • Rust 1.80+
  • AWS CLI configured (dsql:* permissions)
  • Test region: us-east-1
Environment setup (Aurora DSQL cluster + Rust project)
Terminal (create cluster)
aws dsql create-cluster \
  --tags Name=rust-connector-test \
  --region us-east-1

Wait until the cluster status becomes ACTIVE.

Terminal (check status)
aws dsql get-cluster \
  --identifier <cluster-id> \
  --region us-east-1

Create the Rust project.

Terminal (project init)
cargo init dsql-rust-test
cd dsql-rust-test
Cargo.toml
[package]
name = "dsql-rust-test"
version = "0.1.0"
edition = "2024"
 
[dependencies]
aurora-dsql-sqlx-connector = { version = "0.1.2", features = ["pool", "occ"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1"

The pool feature enables connection pooling with background token refresh. The occ feature enables OCC retry helpers.

Skip to Summary if you only want the findings.

Verification 1: Connection pooling and basic CRUD

Connect to Aurora DSQL using pool::connect_with() and run CREATE, INSERT, SELECT, UPDATE, and DELETE operations. connect_with() accepts PgPoolOptions for customizing settings like max connections. If you don't need customization, pool::connect() shown in the introduction works the same way.

The connection code is just this. DsqlConnectOptions parses the connection string, and pool::connect_with() handles IAM token generation, SSL, and pool initialization in one call.

Rust (connection)
let conn_str = format!("postgres://admin@{}/postgres", endpoint);
let config = DsqlConnectOptions::from_connection_string(&conn_str)?;
let pool = aurora_dsql_sqlx_connector::pool::connect_with(
    &config,
    PgPoolOptions::new().max_connections(10),
)
.await?;

A connection string is all it takes — IAM token generation, SSL handshake, and pool initialization happen behind the scenes. CRUD operations use the standard SQLx API with zero changes. See the collapsible section below for the full source.

Full source for Verification 1 (verify1_pool_crud.rs)
Rust (src/main.rs)
use aurora_dsql_sqlx_connector::DsqlConnectOptions;
use sqlx::{postgres::PgPoolOptions, Executor, Row};
 
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let endpoint = std::env::var("CLUSTER_ENDPOINT")?;
    let conn_str = format!("postgres://admin@{}/postgres", endpoint);
 
    // Pool connection (IAM token, SSL, refresh — all automatic)
    let config = DsqlConnectOptions::from_connection_string(&conn_str)?;
    let pool = aurora_dsql_sqlx_connector::pool::connect_with(
        &config,
        PgPoolOptions::new().max_connections(10),
    )
    .await?;
    println!("Pool connected successfully");
 
    // CREATE
    pool.execute(
        "CREATE TABLE IF NOT EXISTS test_items(
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            name VARCHAR(100) NOT NULL,
            value INT NOT NULL
        )",
    )
    .await?;
 
    // INSERT
    sqlx::query("INSERT INTO test_items(name, value) VALUES($1, $2)")
        .bind("item-a")
        .bind(42)
        .execute(&pool)
        .await?;
 
    // SELECT
    let row = sqlx::query("SELECT name, value FROM test_items WHERE name = $1")
        .bind("item-a")
        .fetch_one(&pool)
        .await?;
    println!("Selected: name={}, value={}",
        row.get::<&str, _>("name"), row.get::<i32, _>("value"));
 
    // UPDATE
    sqlx::query("UPDATE test_items SET value = $1 WHERE name = $2")
        .bind(100).bind("item-a").execute(&pool).await?;
 
    // DELETE
    sqlx::query("DELETE FROM test_items WHERE name = $1")
        .bind("item-a").execute(&pool).await?;
 
    pool.execute("DROP TABLE test_items").await?;
    pool.close().await; // stops the background token refresh task
    Ok(())
}
Terminal (run)
export CLUSTER_ENDPOINT=<cluster-id>.dsql.us-east-1.on.aws
cargo run
Output
Pool connected successfully
Selected: name=item-a, value=42

For simpler use cases like scripts or CLI tools where pooling is unnecessary, connection::connect() provides a single connection.

Rust (single connection)
let mut conn = aurora_dsql_sqlx_connector::connection::connect(
    "postgres://admin@<cluster>.dsql.us-east-1.on.aws/postgres"
).await?;
let row = sqlx::query("SELECT 'hello' as msg")
    .fetch_one(&mut conn).await?;

Each call to connection::connect() generates a fresh IAM token. For long-running operations, be mindful of the token lifetime (default 15 minutes).

With basic operations confirmed, the next step is to verify how the connector handles write conflicts — an unavoidable concern when using Aurora DSQL in production.

Verification 2: OCC retry behavior

Aurora DSQL uses optimistic concurrency control (OCC) — pessimistic locking (SELECT ... FOR UPDATE) is not supported. When multiple transactions update the same row concurrently, one will fail with SQLSTATE 40001 (OC000: data conflict). Specifically, if a row read within a transaction has been modified by another transaction by the time it commits, a conflict is detected. Retry logic is essential for production use.

The connector provides a retry_on_occ helper that automatically retries with exponential backoff (base 100ms, max 5000ms) and jitter (0–25%). Usage is straightforward — just wrap the transaction in a closure:

Rust (retry_on_occ usage)
let occ_config = aurora_dsql_sqlx_connector::OCCRetryConfig::default();
 
let result: Result<(), DsqlError> = retry_on_occ(&occ_config, || async {
    let mut tx = pool.begin().await?;
    // ... operations inside the transaction ...
    tx.commit().await?;
    Ok(())
}).await;

The test launches 5 concurrent tasks that each read-then-increment the same counter row, forcing OCC conflicts.

Full source for Verification 2 (verify2_occ_retry.rs)
Rust (src/main.rs)
use aurora_dsql_sqlx_connector::{retry_on_occ, DsqlConnectOptions, DsqlError};
use sqlx::{postgres::PgPoolOptions, Executor, Row};
 
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let endpoint = std::env::var("CLUSTER_ENDPOINT")?;
    let conn_str = format!("postgres://admin@{}/postgres", endpoint);
 
    let config = DsqlConnectOptions::from_connection_string(&conn_str)?;
    let pool = aurora_dsql_sqlx_connector::pool::connect_with(
        &config,
        PgPoolOptions::new().max_connections(10),
    ).await?;
 
    // Initialize counter row
    pool.execute(
        "CREATE TABLE IF NOT EXISTS counter(id INT PRIMARY KEY, value INT NOT NULL)",
    ).await?;
    sqlx::query("INSERT INTO counter(id, value) VALUES(1, 0) ON CONFLICT (id) DO UPDATE SET value = 0")
        .execute(&pool).await?;
 
    // Concurrent updates with default config (max_attempts: 3)
    let occ_config = aurora_dsql_sqlx_connector::OCCRetryConfig::default();
    let mut handles = Vec::new();
 
    for task_id in 0..5 {
        let pool = pool.clone();
        let occ_config = occ_config.clone();
        handles.push(tokio::spawn(async move {
            let result: Result<(), DsqlError> = retry_on_occ(&occ_config, || async {
                let mut tx = pool.begin().await?;
                let row = sqlx::query("SELECT value FROM counter WHERE id = 1")
                    .fetch_one(&mut *tx).await?;
                let current: i32 = row.get("value");
                sqlx::query("UPDATE counter SET value = $1 WHERE id = 1")
                    .bind(current + 1).execute(&mut *tx).await?;
                tx.commit().await?;
                Ok(())
            }).await;
            match &result {
                Ok(()) => println!("Task {} succeeded", task_id),
                Err(e) => println!("Task {} failed: {}", task_id, e),
            }
            result
        }));
    }
 
    let (mut ok, mut ng) = (0, 0);
    for h in handles {
        match h.await? { Ok(()) => ok += 1, Err(_) => ng += 1 }
    }
    let final_val = sqlx::query_scalar::<_, i32>("SELECT value FROM counter WHERE id = 1")
        .fetch_one(&pool).await?;
    println!("Succeeded: {ok}, Failed: {ng}, Final value: {final_val}");
 
    pool.execute("DROP TABLE counter").await?;
    pool.close().await; // stops the background token refresh task
    Ok(())
}
Terminal (run)
export CLUSTER_ENDPOINT=<cluster-id>.dsql.us-east-1.on.aws
cargo run

Default config (max_attempts: 3)

Output
Task 1 succeeded
Task 2 succeeded
Task 0 succeeded
Task 3 succeeded
Task 4 failed: OCC retry exhausted after 3 attempts: database error:
  error returned from database: change conflicts with another transaction,
  please retry: (OC000)
 
Succeeded: 4, Failed: 1, Final value: 4

One task exhausted its 3 retry attempts and failed. The counter value is 4, matching the number of successful tasks — no lost updates.

Increasing max_attempts

Using OCCRetryConfigBuilder to set max_attempts to 10:

Rust (max_attempts=10)
let occ_config = aurora_dsql_sqlx_connector::OCCRetryConfigBuilder::default()
    .max_attempts(10u32)
    .build()?;
Output
Task 0 succeeded
Task 3 succeeded
Task 1 succeeded
Task 4 succeeded
Task 2 succeeded
 
Succeeded: 5, Failed: 0, Final value: 5

All tasks succeeded and the counter is exactly 5.

OCC retry tuning guide

Default values and tuning points for OCCRetryConfig:

ParameterDefaultDescription
max_attempts3Maximum retry attempts
base_delay_ms100Initial backoff (ms)
max_delay_ms5000Backoff ceiling (ms)
jitter_factor0.25Jitter coefficient (0–25%)

The default max_attempts: 3 is sufficient for low-concurrency writes, but falls short when multiple tasks contend on the same row simultaneously. Tune max_attempts based on your expected concurrency level before going to production.

Summary

  • Connection code drops from ~50 lines to one — IAM token generation, SSL, and background refresh are all handled internally. As verified in Verification 1, the SQLx API works unchanged, so existing SQLx users need almost no additional learning
  • OCC retry is concise with retry_on_occ — Since Aurora DSQL does not support pessimistic locking, write conflict retry is a must-have pattern. The connector's helper applies exponential backoff with jitter automatically
  • Default max_attempts: 3 is insufficient under high concurrency — For workloads with frequent contention on the same rows, use OCCRetryConfigBuilder to increase the limit. Test with your expected concurrency level before deploying to production

Cleanup

Terminal
# Disable deletion protection, then delete the cluster
aws dsql update-cluster \
  --identifier <cluster-id> \
  --no-deletion-protection-enabled \
  --region us-east-1
 
aws dsql delete-cluster \
  --identifier <cluster-id> \
  --region us-east-1

Share this post

Shinya Tahara

Shinya Tahara

Solutions Architect @ AWS

I'm a Solutions Architect at AWS, providing technical guidance primarily to financial industry customers. I share learnings about cloud architecture and AI/ML on this site.The views and opinions expressed on this site are my own and do not represent the official positions of my employer.

Related Posts