@shinyaz

Aurora DSQL Rust コネクタで IAM 認証・OCC リトライを自動化する

目次

はじめに

2026年3月31日、AWS は Aurora DSQL 向けの Rust(SQLx)および .NET(Npgsql)コネクタをリリースした。

Aurora DSQL に Rust から接続する場合、従来は aws-sdk-dsql で IAM トークンを手動生成し、SSL を設定し、トークンリフレッシュ用のバックグラウンドタスクを自前で管理する必要があった。aws-samples のサンプルコードを見ると、接続確立だけで約50行になる。

Rust (コネクタなし — 接続部分の抜粋)
let sdk_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let signer = AuthTokenGenerator::new(
    Config::builder()
        .hostname(&cluster_endpoint)
        .region(Region::new(region))
        .expires_in(TOKEN_EXPIRATION_SECONDS)
        .build()
        .unwrap(),
);
let password_token = generate_password_token(&cluster_user, &signer, &sdk_config).await;
let connection_options = PgConnectOptions::new()
    .host(&cluster_endpoint)
    .port(5432)
    .database("postgres")
    .username(&cluster_user)
    .password(password_token.as_str())
    .ssl_mode(sqlx::postgres::PgSslMode::VerifyFull);
let pool = PgPoolOptions::new()
    .max_connections(10)
    .connect_with(connection_options.clone())
    .await?;
// + トークンリフレッシュ用の tokio::spawn が別途必要

コネクタを使えば、これが接続文字列1行になる。

Rust (コネクタあり)
let pool = aurora_dsql_sqlx_connector::pool::connect(
    "postgres://admin@<cluster>.dsql.us-east-1.on.aws/postgres"
).await?;

IAM トークン生成、SSL 設定、バックグラウンドでのトークンリフレッシュ(有効期限の80%で自動更新)がすべてコネクタ内部で処理される。さらに、Aurora DSQL 固有の OCC(楽観的同時実行制御)リトライヘルパーも提供される。

この記事では、コネクタを使って接続プーリング・CRUD・OCC リトライを実際に動かし、コネクタがどこまで面倒を見てくれるのかを検証する。公式ドキュメントは Connectors for Aurora DSQL、コネクタの README は GitHub を参照。

前提条件:

  • Rust 1.80 以上
  • AWS CLI 設定済み(dsql:* 権限)
  • 検証リージョン: us-east-1
環境構築(Aurora DSQL クラスタ作成 + Rust プロジェクト)
Terminal (クラスタ作成)
aws dsql create-cluster \
  --tags Name=rust-connector-test \
  --region us-east-1

クラスタが ACTIVE になるまで待つ。

Terminal (ステータス確認)
aws dsql get-cluster \
  --identifier <cluster-id> \
  --region us-east-1

Rust プロジェクトを作成する。

Terminal (プロジェクト初期化)
cargo init dsql-rust-test
cd dsql-rust-test
Cargo.toml
[package]
name = "dsql-rust-test"
version = "0.1.0"
edition = "2024"
 
[dependencies]
aurora-dsql-sqlx-connector = { version = "0.1.2", features = ["pool", "occ"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1"

pool フィーチャーでコネクションプーリング(バックグラウンドトークンリフレッシュ付き)、occ フィーチャーで OCC リトライヘルパーが有効になる。

結果だけ見たい場合はまとめへ。

検証 1: 接続プーリングと基本 CRUD

pool::connect_with() で Aurora DSQL に接続し、テーブル作成・INSERT・SELECT・UPDATE・DELETE を実行する。connect_with()PgPoolOptions を渡して最大接続数などをカスタマイズできる。カスタマイズが不要なら、はじめにで紹介した pool::connect() でも同じ結果になる。

接続部分のコードはこれだけだ。DsqlConnectOptions が接続文字列をパースし、pool::connect_with() が IAM トークン生成・SSL・プール初期化をまとめて処理する。

Rust (接続部分)
let conn_str = format!("postgres://admin@{}/postgres", endpoint);
let config = DsqlConnectOptions::from_connection_string(&conn_str)?;
let pool = aurora_dsql_sqlx_connector::pool::connect_with(
    &config,
    PgPoolOptions::new().max_connections(10),
)
.await?;

接続文字列を渡すだけで、IAM トークン生成・SSL ハンドシェイク・プール初期化が完了する。以降の CRUD 操作は通常の SQLx と同じ API がそのまま使える。フルソースは以下の折り畳みを参照。

検証 1 のフルソース(verify1_pool_crud.rs)
Rust (src/main.rs)
use aurora_dsql_sqlx_connector::DsqlConnectOptions;
use sqlx::{postgres::PgPoolOptions, Executor, Row};
 
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let endpoint = std::env::var("CLUSTER_ENDPOINT")?;
    let conn_str = format!("postgres://admin@{}/postgres", endpoint);
 
    // プール接続(IAM トークン生成・SSL・リフレッシュはすべて自動)
    let config = DsqlConnectOptions::from_connection_string(&conn_str)?;
    let pool = aurora_dsql_sqlx_connector::pool::connect_with(
        &config,
        PgPoolOptions::new().max_connections(10),
    )
    .await?;
    println!("Pool connected successfully");
 
    // CREATE
    pool.execute(
        "CREATE TABLE IF NOT EXISTS test_items(
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            name VARCHAR(100) NOT NULL,
            value INT NOT NULL
        )",
    )
    .await?;
 
    // INSERT
    sqlx::query("INSERT INTO test_items(name, value) VALUES($1, $2)")
        .bind("item-a")
        .bind(42)
        .execute(&pool)
        .await?;
 
    // SELECT
    let row = sqlx::query("SELECT name, value FROM test_items WHERE name = $1")
        .bind("item-a")
        .fetch_one(&pool)
        .await?;
    println!("Selected: name={}, value={}",
        row.get::<&str, _>("name"), row.get::<i32, _>("value"));
 
    // UPDATE
    sqlx::query("UPDATE test_items SET value = $1 WHERE name = $2")
        .bind(100).bind("item-a").execute(&pool).await?;
 
    // DELETE
    sqlx::query("DELETE FROM test_items WHERE name = $1")
        .bind("item-a").execute(&pool).await?;
 
    pool.execute("DROP TABLE test_items").await?;
    pool.close().await; // バックグラウンドのトークンリフレッシュタスクを停止
    Ok(())
}
Terminal (実行)
export CLUSTER_ENDPOINT=<cluster-id>.dsql.us-east-1.on.aws
cargo run
Output
Pool connected successfully
Selected: name=item-a, value=42

プールが不要な場合(スクリプトや CLI ツールなど)は、connection::connect() で単一接続も可能だ。

Rust (単一接続)
let mut conn = aurora_dsql_sqlx_connector::connection::connect(
    "postgres://admin@<cluster>.dsql.us-east-1.on.aws/postgres"
).await?;
let row = sqlx::query("SELECT 'hello' as msg")
    .fetch_one(&mut conn).await?;

connection::connect() は呼び出しごとに新しい IAM トークンを生成する。長時間の処理ではトークンの有効期限(デフォルト15分)に注意が必要だ。

基本操作を確認できた。次に、Aurora DSQL を本番で使う際に避けられない書き込み競合への対処を検証する。

検証 2: OCC リトライの動作確認

Aurora DSQL は楽観的同時実行制御(OCC)を採用しており、悲観的ロック(SELECT ... FOR UPDATE)は使えない。複数のトランザクションが同一行を同時に更新すると、一方が SQLSTATE 40001(OC000: data conflict)でエラーになる。具体的には、トランザクション内で読み取った行がコミット時点で他のトランザクションによって変更されていた場合に競合が検出される。本番運用ではリトライロジックが必須だ。

コネクタは retry_on_occ ヘルパーを提供する。エクスポネンシャルバックオフ(base 100ms、最大 5000ms)とジッター(0-25%)で自動リトライする。使い方はクロージャを渡すだけだ。

Rust (retry_on_occ の使い方)
let occ_config = aurora_dsql_sqlx_connector::OCCRetryConfig::default();
 
let result: Result<(), DsqlError> = retry_on_occ(&occ_config, || async {
    let mut tx = pool.begin().await?;
    // ... トランザクション内の操作 ...
    tx.commit().await?;
    Ok(())
}).await;

同一行に対して5つの並行タスクからインクリメントを実行し、OCC 競合の発生とリトライ動作を確認する。

検証 2 のフルソース(verify2_occ_retry.rs)
Rust (src/main.rs)
use aurora_dsql_sqlx_connector::{retry_on_occ, DsqlConnectOptions, DsqlError};
use sqlx::{postgres::PgPoolOptions, Executor, Row};
 
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let endpoint = std::env::var("CLUSTER_ENDPOINT")?;
    let conn_str = format!("postgres://admin@{}/postgres", endpoint);
 
    let config = DsqlConnectOptions::from_connection_string(&conn_str)?;
    let pool = aurora_dsql_sqlx_connector::pool::connect_with(
        &config,
        PgPoolOptions::new().max_connections(10),
    ).await?;
 
    // カウンタ行を初期化
    pool.execute(
        "CREATE TABLE IF NOT EXISTS counter(id INT PRIMARY KEY, value INT NOT NULL)",
    ).await?;
    sqlx::query("INSERT INTO counter(id, value) VALUES(1, 0) ON CONFLICT (id) DO UPDATE SET value = 0")
        .execute(&pool).await?;
 
    // デフォルト設定(max_attempts: 3)で並行更新
    let occ_config = aurora_dsql_sqlx_connector::OCCRetryConfig::default();
    let mut handles = Vec::new();
 
    for task_id in 0..5 {
        let pool = pool.clone();
        let occ_config = occ_config.clone();
        handles.push(tokio::spawn(async move {
            let result: Result<(), DsqlError> = retry_on_occ(&occ_config, || async {
                let mut tx = pool.begin().await?;
                let row = sqlx::query("SELECT value FROM counter WHERE id = 1")
                    .fetch_one(&mut *tx).await?;
                let current: i32 = row.get("value");
                sqlx::query("UPDATE counter SET value = $1 WHERE id = 1")
                    .bind(current + 1).execute(&mut *tx).await?;
                tx.commit().await?;
                Ok(())
            }).await;
            match &result {
                Ok(()) => println!("Task {} succeeded", task_id),
                Err(e) => println!("Task {} failed: {}", task_id, e),
            }
            result
        }));
    }
 
    let (mut ok, mut ng) = (0, 0);
    for h in handles {
        match h.await? { Ok(()) => ok += 1, Err(_) => ng += 1 }
    }
    let final_val = sqlx::query_scalar::<_, i32>("SELECT value FROM counter WHERE id = 1")
        .fetch_one(&pool).await?;
    println!("Succeeded: {ok}, Failed: {ng}, Final value: {final_val}");
 
    pool.execute("DROP TABLE counter").await?;
    pool.close().await; // バックグラウンドのトークンリフレッシュタスクを停止
    Ok(())
}
Terminal (実行)
export CLUSTER_ENDPOINT=<cluster-id>.dsql.us-east-1.on.aws
cargo run

デフォルト設定(max_attempts: 3)の結果

Output
Task 1 succeeded
Task 2 succeeded
Task 0 succeeded
Task 3 succeeded
Task 4 failed: OCC retry exhausted after 3 attempts: database error:
  error returned from database: change conflicts with another transaction,
  please retry: (OC000)
 
Succeeded: 4, Failed: 1, Final value: 4

5タスク中1タスクが3回のリトライで回復できず失敗した。カウンタ値は4で、成功したタスク数と一致している。ロストアップデートは発生していない。

max_attempts を増やした場合

OCCRetryConfigBuildermax_attempts を10に変更して再実行する。

Rust (max_attempts=10)
let occ_config = aurora_dsql_sqlx_connector::OCCRetryConfigBuilder::default()
    .max_attempts(10u32)
    .build()?;
Output
Task 0 succeeded
Task 3 succeeded
Task 1 succeeded
Task 4 succeeded
Task 2 succeeded
 
Succeeded: 5, Failed: 0, Final value: 5

全タスクが成功し、カウンタ値も正確に5になった。

OCC リトライの設計指針

OCCRetryConfig のデフォルト値と調整ポイントをまとめる。

パラメータデフォルト説明
max_attempts3最大リトライ回数
base_delay_ms100初回バックオフ(ms)
max_delay_ms5000バックオフ上限(ms)
jitter_factor0.25ジッター係数(0-25%)

デフォルトの max_attempts: 3 は低並行の書き込みには十分だが、今回のように同一行への高並行更新では不足する場合がある。本番環境では想定される並行度に応じて max_attempts を調整する必要がある。

まとめ

  • 接続コードが約50行から1行に — IAM トークン生成、SSL、バックグラウンドリフレッシュがすべてコネクタ内部で処理される。検証 1 で確認した通り、SQLx の API はそのまま使えるため、既存の SQLx ユーザーであれば追加の学習はほぼ不要だ
  • OCC リトライは retry_on_occ で簡潔に書ける — Aurora DSQL では悲観的ロックが使えないため、書き込み競合のリトライは必須パターン。コネクタのヘルパーを使えばエクスポネンシャルバックオフ+ジッターが自動適用される
  • max_attempts のデフォルト3回は高並行で不足する — 同一行への並行更新が多い場合は OCCRetryConfigBuilder で調整が必要。想定並行度でテストしてから本番投入すべきだ

クリーンアップ

Terminal
# 削除保護を無効化してからクラスタを削除
aws dsql update-cluster \
  --identifier <cluster-id> \
  --no-deletion-protection-enabled \
  --region us-east-1
 
aws dsql delete-cluster \
  --identifier <cluster-id> \
  --region us-east-1

共有する

田原 慎也

田原 慎也

ソリューションアーキテクト @ AWS

AWS ソリューションアーキテクトとして金融業界のお客様を中心に技術支援をしており、クラウドアーキテクチャや AI/ML に関する学びをこのサイトで発信しています。このサイトの内容は個人の見解であり、所属企業の公式な意見や見解を代表するものではありません。

関連記事