Aurora DSQL Rust コネクタで IAM 認証・OCC リトライを自動化する
目次
はじめに
2026年3月31日、AWS は Aurora DSQL 向けの Rust(SQLx)および .NET(Npgsql)コネクタをリリースした。
Aurora DSQL に Rust から接続する場合、従来は aws-sdk-dsql で IAM トークンを手動生成し、SSL を設定し、トークンリフレッシュ用のバックグラウンドタスクを自前で管理する必要があった。aws-samples のサンプルコードを見ると、接続確立だけで約50行になる。
let sdk_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let signer = AuthTokenGenerator::new(
Config::builder()
.hostname(&cluster_endpoint)
.region(Region::new(region))
.expires_in(TOKEN_EXPIRATION_SECONDS)
.build()
.unwrap(),
);
let password_token = generate_password_token(&cluster_user, &signer, &sdk_config).await;
let connection_options = PgConnectOptions::new()
.host(&cluster_endpoint)
.port(5432)
.database("postgres")
.username(&cluster_user)
.password(password_token.as_str())
.ssl_mode(sqlx::postgres::PgSslMode::VerifyFull);
let pool = PgPoolOptions::new()
.max_connections(10)
.connect_with(connection_options.clone())
.await?;
// + トークンリフレッシュ用の tokio::spawn が別途必要コネクタを使えば、これが接続文字列1行になる。
let pool = aurora_dsql_sqlx_connector::pool::connect(
"postgres://admin@<cluster>.dsql.us-east-1.on.aws/postgres"
).await?;IAM トークン生成、SSL 設定、バックグラウンドでのトークンリフレッシュ(有効期限の80%で自動更新)がすべてコネクタ内部で処理される。さらに、Aurora DSQL 固有の OCC(楽観的同時実行制御)リトライヘルパーも提供される。
この記事では、コネクタを使って接続プーリング・CRUD・OCC リトライを実際に動かし、コネクタがどこまで面倒を見てくれるのかを検証する。公式ドキュメントは Connectors for Aurora DSQL、コネクタの README は GitHub を参照。
前提条件:
- Rust 1.80 以上
- AWS CLI 設定済み(
dsql:*権限) - 検証リージョン: us-east-1
環境構築(Aurora DSQL クラスタ作成 + Rust プロジェクト)
aws dsql create-cluster \
--tags Name=rust-connector-test \
--region us-east-1クラスタが ACTIVE になるまで待つ。
aws dsql get-cluster \
--identifier <cluster-id> \
--region us-east-1Rust プロジェクトを作成する。
cargo init dsql-rust-test
cd dsql-rust-test[package]
name = "dsql-rust-test"
version = "0.1.0"
edition = "2024"
[dependencies]
aurora-dsql-sqlx-connector = { version = "0.1.2", features = ["pool", "occ"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1"pool フィーチャーでコネクションプーリング(バックグラウンドトークンリフレッシュ付き)、occ フィーチャーで OCC リトライヘルパーが有効になる。
結果だけ見たい場合はまとめへ。
検証 1: 接続プーリングと基本 CRUD
pool::connect_with() で Aurora DSQL に接続し、テーブル作成・INSERT・SELECT・UPDATE・DELETE を実行する。connect_with() は PgPoolOptions を渡して最大接続数などをカスタマイズできる。カスタマイズが不要なら、はじめにで紹介した pool::connect() でも同じ結果になる。
接続部分のコードはこれだけだ。DsqlConnectOptions が接続文字列をパースし、pool::connect_with() が IAM トークン生成・SSL・プール初期化をまとめて処理する。
let conn_str = format!("postgres://admin@{}/postgres", endpoint);
let config = DsqlConnectOptions::from_connection_string(&conn_str)?;
let pool = aurora_dsql_sqlx_connector::pool::connect_with(
&config,
PgPoolOptions::new().max_connections(10),
)
.await?;接続文字列を渡すだけで、IAM トークン生成・SSL ハンドシェイク・プール初期化が完了する。以降の CRUD 操作は通常の SQLx と同じ API がそのまま使える。フルソースは以下の折り畳みを参照。
検証 1 のフルソース(verify1_pool_crud.rs)
use aurora_dsql_sqlx_connector::DsqlConnectOptions;
use sqlx::{postgres::PgPoolOptions, Executor, Row};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let endpoint = std::env::var("CLUSTER_ENDPOINT")?;
let conn_str = format!("postgres://admin@{}/postgres", endpoint);
// プール接続(IAM トークン生成・SSL・リフレッシュはすべて自動)
let config = DsqlConnectOptions::from_connection_string(&conn_str)?;
let pool = aurora_dsql_sqlx_connector::pool::connect_with(
&config,
PgPoolOptions::new().max_connections(10),
)
.await?;
println!("Pool connected successfully");
// CREATE
pool.execute(
"CREATE TABLE IF NOT EXISTS test_items(
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
value INT NOT NULL
)",
)
.await?;
// INSERT
sqlx::query("INSERT INTO test_items(name, value) VALUES($1, $2)")
.bind("item-a")
.bind(42)
.execute(&pool)
.await?;
// SELECT
let row = sqlx::query("SELECT name, value FROM test_items WHERE name = $1")
.bind("item-a")
.fetch_one(&pool)
.await?;
println!("Selected: name={}, value={}",
row.get::<&str, _>("name"), row.get::<i32, _>("value"));
// UPDATE
sqlx::query("UPDATE test_items SET value = $1 WHERE name = $2")
.bind(100).bind("item-a").execute(&pool).await?;
// DELETE
sqlx::query("DELETE FROM test_items WHERE name = $1")
.bind("item-a").execute(&pool).await?;
pool.execute("DROP TABLE test_items").await?;
pool.close().await; // バックグラウンドのトークンリフレッシュタスクを停止
Ok(())
}export CLUSTER_ENDPOINT=<cluster-id>.dsql.us-east-1.on.aws
cargo runPool connected successfully
Selected: name=item-a, value=42プールが不要な場合(スクリプトや CLI ツールなど)は、connection::connect() で単一接続も可能だ。
let mut conn = aurora_dsql_sqlx_connector::connection::connect(
"postgres://admin@<cluster>.dsql.us-east-1.on.aws/postgres"
).await?;
let row = sqlx::query("SELECT 'hello' as msg")
.fetch_one(&mut conn).await?;connection::connect() は呼び出しごとに新しい IAM トークンを生成する。長時間の処理ではトークンの有効期限(デフォルト15分)に注意が必要だ。
基本操作を確認できた。次に、Aurora DSQL を本番で使う際に避けられない書き込み競合への対処を検証する。
検証 2: OCC リトライの動作確認
Aurora DSQL は楽観的同時実行制御(OCC)を採用しており、悲観的ロック(SELECT ... FOR UPDATE)は使えない。複数のトランザクションが同一行を同時に更新すると、一方が SQLSTATE 40001(OC000: data conflict)でエラーになる。具体的には、トランザクション内で読み取った行がコミット時点で他のトランザクションによって変更されていた場合に競合が検出される。本番運用ではリトライロジックが必須だ。
コネクタは retry_on_occ ヘルパーを提供する。エクスポネンシャルバックオフ(base 100ms、最大 5000ms)とジッター(0-25%)で自動リトライする。使い方はクロージャを渡すだけだ。
let occ_config = aurora_dsql_sqlx_connector::OCCRetryConfig::default();
let result: Result<(), DsqlError> = retry_on_occ(&occ_config, || async {
let mut tx = pool.begin().await?;
// ... トランザクション内の操作 ...
tx.commit().await?;
Ok(())
}).await;同一行に対して5つの並行タスクからインクリメントを実行し、OCC 競合の発生とリトライ動作を確認する。
検証 2 のフルソース(verify2_occ_retry.rs)
use aurora_dsql_sqlx_connector::{retry_on_occ, DsqlConnectOptions, DsqlError};
use sqlx::{postgres::PgPoolOptions, Executor, Row};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let endpoint = std::env::var("CLUSTER_ENDPOINT")?;
let conn_str = format!("postgres://admin@{}/postgres", endpoint);
let config = DsqlConnectOptions::from_connection_string(&conn_str)?;
let pool = aurora_dsql_sqlx_connector::pool::connect_with(
&config,
PgPoolOptions::new().max_connections(10),
).await?;
// カウンタ行を初期化
pool.execute(
"CREATE TABLE IF NOT EXISTS counter(id INT PRIMARY KEY, value INT NOT NULL)",
).await?;
sqlx::query("INSERT INTO counter(id, value) VALUES(1, 0) ON CONFLICT (id) DO UPDATE SET value = 0")
.execute(&pool).await?;
// デフォルト設定(max_attempts: 3)で並行更新
let occ_config = aurora_dsql_sqlx_connector::OCCRetryConfig::default();
let mut handles = Vec::new();
for task_id in 0..5 {
let pool = pool.clone();
let occ_config = occ_config.clone();
handles.push(tokio::spawn(async move {
let result: Result<(), DsqlError> = retry_on_occ(&occ_config, || async {
let mut tx = pool.begin().await?;
let row = sqlx::query("SELECT value FROM counter WHERE id = 1")
.fetch_one(&mut *tx).await?;
let current: i32 = row.get("value");
sqlx::query("UPDATE counter SET value = $1 WHERE id = 1")
.bind(current + 1).execute(&mut *tx).await?;
tx.commit().await?;
Ok(())
}).await;
match &result {
Ok(()) => println!("Task {} succeeded", task_id),
Err(e) => println!("Task {} failed: {}", task_id, e),
}
result
}));
}
let (mut ok, mut ng) = (0, 0);
for h in handles {
match h.await? { Ok(()) => ok += 1, Err(_) => ng += 1 }
}
let final_val = sqlx::query_scalar::<_, i32>("SELECT value FROM counter WHERE id = 1")
.fetch_one(&pool).await?;
println!("Succeeded: {ok}, Failed: {ng}, Final value: {final_val}");
pool.execute("DROP TABLE counter").await?;
pool.close().await; // バックグラウンドのトークンリフレッシュタスクを停止
Ok(())
}export CLUSTER_ENDPOINT=<cluster-id>.dsql.us-east-1.on.aws
cargo runデフォルト設定(max_attempts: 3)の結果
Task 1 succeeded
Task 2 succeeded
Task 0 succeeded
Task 3 succeeded
Task 4 failed: OCC retry exhausted after 3 attempts: database error:
error returned from database: change conflicts with another transaction,
please retry: (OC000)
Succeeded: 4, Failed: 1, Final value: 45タスク中1タスクが3回のリトライで回復できず失敗した。カウンタ値は4で、成功したタスク数と一致している。ロストアップデートは発生していない。
max_attempts を増やした場合
OCCRetryConfigBuilder で max_attempts を10に変更して再実行する。
let occ_config = aurora_dsql_sqlx_connector::OCCRetryConfigBuilder::default()
.max_attempts(10u32)
.build()?;Task 0 succeeded
Task 3 succeeded
Task 1 succeeded
Task 4 succeeded
Task 2 succeeded
Succeeded: 5, Failed: 0, Final value: 5全タスクが成功し、カウンタ値も正確に5になった。
OCC リトライの設計指針
OCCRetryConfig のデフォルト値と調整ポイントをまとめる。
| パラメータ | デフォルト | 説明 |
|---|---|---|
max_attempts | 3 | 最大リトライ回数 |
base_delay_ms | 100 | 初回バックオフ(ms) |
max_delay_ms | 5000 | バックオフ上限(ms) |
jitter_factor | 0.25 | ジッター係数(0-25%) |
デフォルトの max_attempts: 3 は低並行の書き込みには十分だが、今回のように同一行への高並行更新では不足する場合がある。本番環境では想定される並行度に応じて max_attempts を調整する必要がある。
まとめ
- 接続コードが約50行から1行に — IAM トークン生成、SSL、バックグラウンドリフレッシュがすべてコネクタ内部で処理される。検証 1 で確認した通り、SQLx の API はそのまま使えるため、既存の SQLx ユーザーであれば追加の学習はほぼ不要だ
- OCC リトライは
retry_on_occで簡潔に書ける — Aurora DSQL では悲観的ロックが使えないため、書き込み競合のリトライは必須パターン。コネクタのヘルパーを使えばエクスポネンシャルバックオフ+ジッターが自動適用される max_attemptsのデフォルト3回は高並行で不足する — 同一行への並行更新が多い場合はOCCRetryConfigBuilderで調整が必要。想定並行度でテストしてから本番投入すべきだ
クリーンアップ
# 削除保護を無効化してからクラスタを削除
aws dsql update-cluster \
--identifier <cluster-id> \
--no-deletion-protection-enabled \
--region us-east-1
aws dsql delete-cluster \
--identifier <cluster-id> \
--region us-east-1