Rust SQLxデータベースドライバ:コネクションプールから非同期クエリまでの5つの本番級実践パターン

编程语言

Rust SQLxデータベースドライバ:コネクションプールから非同期クエリまでの5つの本番級実践パターン

こんな悪夢を経験したことはありませんか:Rustプロジェクトの本番稼働後、SQLの入力ミスが実行時にしか発見されず、サービス全体がクラッシュする?あるいはコネクションプールの設定が不適切で、高負荷時にデータベース接続が枯渇し、リクエストがすべてタイムアウトする?従来のORMの実行時SQLエラー、手動接続管理のリソースリーク、非同期クエリの互換性トラップ——これら3つの問題は、Rustバックエンド開発者を徹夜させるのに十分です。SQLxは、Rustエコシステムで最も革新的なデータベースドライバとして、コンパイル時SQLチェック、ネイティブ非同期サポート、自動コネクションプール管理により、これらの課題を根本的に解決します。

本記事では、5つの本番級実践パターンを通じて、SQLxコンパイル時チェック、コネクションプールチューニング、型安全クエリ、トランザクション管理、データベースマイグレーションを網羅し、完全に実行可能なコードと落とし穴ガイドを提供します。

コア概念一覧

概念 説明 主要タイプ/マクロ
SQLx Rust非同期データベースドライバ、コンパイル時SQLチェック対応 sqlx crate
コンパイル時チェック コンパイル段階でSQL構文と型マッチングを検証 query!query_as!
PgPool PostgreSQL非同期コネクションプール PgPoolOptions
PgConnection PostgreSQL単一非同期接続 PgConnection
query! コンパイル時チェックSQL、匿名構造体を返す query!("SELECT ...")
query_as! コンパイル時チェックSQL、カスタム構造体にマッピング query_as!(User, "SELECT ...")
query 実行時SQLクエリ(コンパイル時チェックなし) sqlx::query("SELECT ...")
Transaction データベーストランザクション、ネスト対応 Transaction<'_, Postgres>
Migration データベーススキーマバージョン管理 sqlx-cli
Type Mapping Rust型とSQL型の自動マッピング FromRow derive
PgRow PostgreSQLクエリ結果行 PgRow
sqlx-cli SQLx CLIツール、マイグレーションとデータベース管理 cargo install sqlx-cli
DATABASE_URL コンパイル時チェックに必要なデータベース接続環境変数 .env ファイル
オフラインモード データベース接続なしでコンパイル可能なオフラインモード sqlx-data.json / .sqlx ディレクトリ

一、5つの主要な課題分析

課題1:コンパイル時チェック環境の構築が複雑

SQLxのquery!マクロはコンパイル時にデータベースに接続してSQLを検証する必要があるため、開発環境にDATABASE_URLの設定が必須で、CI/CDパイプラインにもデータベースインスタンスが必要です。チームコラボレーションやコンテナ化デプロイでは、この依存関係が大きな障害になります。オフラインモードで緩和できますが、初期の.sqlxメタデータ生成にはやはりデータベース接続が必要です。

課題2:非同期ランタイムの互換性

SQLxはデフォルトでTokioランタイムを使用します。プロジェクトでasync-stdやsmolを使用する場合、対応するfeature flagを有効にする必要があります。異なるランタイムの混用はpanicやデッドロックを引き起こす可能性があり、特にサードパーティライブラリの統合時に陥りやすいです。

課題3:コネクションプールパラメータのチューニング

PgPoolOptionsmax_connectionsmin_connectionsacquire_timeoutidle_timeoutなどの複数のパラメータを提供します。設定不適切だと接続リーク、プール枯渇、リソース浪費につながります。高負荷シナリオでは、データベースの最大接続数とサービスインスタンス数に基づいてプールパラメータを正確に計算する必要があります。

課題4:トランザクションデッドロック処理

Rustの所有権システムによりトランザクションのライフサイクル管理は厳格ですが、複数のトランザクションが同じデータ行を同時に操作するとデッドロックが発生する可能性があります。ネストされたトランザクション(SAVEPOINT)の使用、トランザクションタイムアウト設定、リトライ戦略はすべて慎重な設計が必要です。

課題5:マイグレーションバージョン管理

データベースマイグレーションは複数人でのコラボレーション時に競合しやすいです。sqlx-cliのマイグレーションファイルはタイムスタンプ順にソートする必要があり、ロールバック戦略は事前の計画が必要です。本番環境でのマイグレーション実行はゼロダウンタイムとデータ整合性を考慮する必要があります。


二、ステップバイステップ実践:5つの本番級パターン

パターン1:SQLxプロジェクトセットアップとコンパイル時チェック

Cargo.toml設定:

[package]
name = "sqlx-production-demo"
version = "0.1.0"
edition = "2021"

[dependencies]
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "postgres", "chrono", "uuid", "migrate"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1", features = ["v4", "serde"] }
dotenvy = "0.15"
tracing = "0.1"
tracing-subscriber = "0.3"
thiserror = "2"

環境設定:

# .env ファイル
DATABASE_URL=postgres://app_user:secure_password@localhost:5432/app_db

コンパイル時チェック完全例:

use sqlx::postgres::PgPoolOptions;
use sqlx::FromRow;

#[derive(Debug, Clone, FromRow, serde::Serialize)]
struct User {
    id: i64,
    username: String,
    email: String,
    created_at: chrono::DateTime<chrono::Utc>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    dotenvy::dotenv().ok();
    tracing_subscriber::fmt::init();

    let database_url = std::env::var("DATABASE_URL")?;
    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect(&database_url)
        .await?;

    // コンパイル時チェック:SQL構文エラーやフィールド不一致の場合、コンパイルが失敗
    let user = sqlx::query_as!(
        User,
        r#"SELECT id, username, email, created_at FROM users WHERE id = $1"#,
        1i64
    )
    .fetch_one(&pool)
    .await?;

    println!("Found user: {:?}", user);

    // query!マクロで匿名構造体を取得
    let count = sqlx::query!(r#"SELECT COUNT(*) as count FROM users"#)
        .fetch_one(&pool)
        .await?;

    println!("Total users: {:?}", count.count);

    Ok(())
}

オフラインモード設定(CI/CD対応):

# オフラインメタデータを生成
cargo sqlx prepare

# CIで環境変数を設定
SQLX_OFFLINE=true cargo build

オフラインモードは.sqlx/ディレクトリにJSONメタデータファイルを生成し、コンパイル時にデータベース接続が不要になります。

パターン2:コネクションプール設定とチューニング

use sqlx::postgres::PgPoolOptions;
use sqlx::Postgres;
use std::time::Duration;

async fn create_pool() -> Result<sqlx::Pool<Postgres>, sqlx::Error> {
    let database_url = std::env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set");

    let pool = PgPoolOptions::new()
        // 最大接続数 = DB max_connections / サービスインスタンス数 - 予約接続数
        // 例:DB max_connections=100, 3サービスインスタンス, 予約10個
        // インスタンスごと: (100 - 10) / 3 ≈ 30
        .max_connections(30)
        // 最小アイドル接続数、コールドスタート時の大量接続を回避
        .min_connections(5)
        // 接続取得タイムアウト、リクエストの無限待機を防止
        .acquire_timeout(Duration::from_secs(5))
        // アイドルタイムアウト、アイドル接続を自動回収
        .idle_timeout(Duration::from_secs(600))
        // 接続の最大ライフタイム、長時間使用による問題を防止
        .max_lifetime(Duration::from_secs(1800))
        // 接続前のヘルスチェック
        .before_acquire(|conn, _meta| Box::pin(async move {
            sqlx::query("SELECT 1")
                .execute(conn)
                .await?;
            Ok(true)
        }))
        .connect(&database_url)
        .await?;

    Ok(pool)
}

// コネクションプールヘルスチェック
async fn check_pool_health(pool: &sqlx::Pool<Postgres>) -> bool {
    let size = pool.size();
    let idle = pool.num_idle();
    tracing::info!(
        "Pool status: total={}, idle={}, active={}",
        size,
        idle,
        size - idle
    );
    idle > 0 || size < 30
}

パターン3:CRUD操作と型安全クエリ

use sqlx::{FromRow, PgPool, Postgres, query_as};
use chrono::{DateTime, Utc};
use uuid::Uuid;

#[derive(Debug, Clone, FromRow, serde::Serialize, serde::Deserialize)]
pub struct Product {
    pub id: Uuid,
    pub name: String,
    pub description: Option<String>,
    pub price: rust_decimal::Decimal,
    pub stock: i32,
    pub category_id: Option<Uuid>,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
}

#[derive(Debug, serde::Deserialize)]
pub struct CreateProduct {
    pub name: String,
    pub description: Option<String>,
    pub price: rust_decimal::Decimal,
    pub stock: i32,
    pub category_id: Option<Uuid>,
}

#[derive(Debug, serde::Deserialize)]
pub struct UpdateProduct {
    pub name: Option<String>,
    pub description: Option<String>,
    pub price: Option<rust_decimal::Decimal>,
    pub stock: Option<i32>,
}

pub struct ProductRepository;

impl ProductRepository {
    pub async fn create(
        pool: &PgPool,
        input: CreateProduct,
    ) -> Result<Product, sqlx::Error> {
        let product = sqlx::query_as!(
            Product,
            r#"
            INSERT INTO products (id, name, description, price, stock, category_id, created_at, updated_at)
            VALUES ($1, $2, $3, $4, $5, $6, NOW(), NOW())
            RETURNING id, name, description, price, stock, category_id, created_at, updated_at
            "#,
            Uuid::new_v4(),
            input.name,
            input.description,
            input.price,
            input.stock,
            input.category_id,
        )
        .fetch_one(pool)
        .await?;

        Ok(product)
    }

    pub async fn find_by_id(pool: &PgPool, id: Uuid) -> Result<Option<Product>, sqlx::Error> {
        let product = sqlx::query_as!(
            Product,
            r#"SELECT id, name, description, price, stock, category_id, created_at, updated_at
               FROM products WHERE id = $1"#,
            id
        )
        .fetch_optional(pool)
        .await?;

        Ok(product)
    }

    pub async fn update(
        pool: &PgPool,
        id: Uuid,
        input: UpdateProduct,
    ) -> Result<Option<Product>, sqlx::Error> {
        let product = sqlx::query_as!(
            Product,
            r#"
            UPDATE products
            SET name = COALESCE($2, name),
                description = COALESCE($3, description),
                price = COALESCE($4, price),
                stock = COALESCE($5, stock),
                updated_at = NOW()
            WHERE id = $1
            RETURNING id, name, description, price, stock, category_id, created_at, updated_at
            "#,
            id,
            input.name,
            input.description,
            input.price,
            input.stock,
        )
        .fetch_optional(pool)
        .await?;

        Ok(product)
    }

    pub async fn delete(pool: &PgPool, id: Uuid) -> Result<bool, sqlx::Error> {
        let result = sqlx::query!(r#"DELETE FROM products WHERE id = $1"#, id)
            .execute(pool)
            .await?;

        Ok(result.rows_affected() > 0)
    }

    pub async fn list(
        pool: &PgPool,
        limit: i64,
        offset: i64,
    ) -> Result<(Vec<Product>, i64), sqlx::Error> {
        let products = sqlx::query_as!(
            Product,
            r#"SELECT id, name, description, price, stock, category_id, created_at, updated_at
               FROM products ORDER BY created_at DESC LIMIT $1 OFFSET $2"#,
            limit,
            offset
        )
        .fetch_all(pool)
        .await?;

        let total = sqlx::query_scalar!(r#"SELECT COUNT(*) as "count!" FROM products"#)
            .fetch_one(pool)
            .await?;

        Ok((products, total))
    }

    // 動的クエリビルダーを使用
    pub async fn search(
        pool: &PgPool,
        name_filter: Option<&str>,
        min_price: Option<rust_decimal::Decimal>,
        max_price: Option<rust_decimal::Decimal>,
    ) -> Result<Vec<Product>, sqlx::Error> {
        let mut query_builder = sqlx::QueryBuilder::new(
            "SELECT id, name, description, price, stock, category_id, created_at, updated_at FROM products WHERE 1=1"
        );

        if let Some(name) = name_filter {
            query_builder.push(" AND name ILIKE ");
            query_builder.push(format!("%{}%", name));
        }

        if let Some(min) = min_price {
            query_builder.push(" AND price >= ");
            query_builder.push(min);
        }

        if let Some(max) = max_price {
            query_builder.push(" AND price <= ");
            query_builder.push(max);
        }

        query_builder.push(" ORDER BY created_at DESC");

        let products = query_builder
            .build_query_as::<Product>()
            .fetch_all(pool)
            .await?;

        Ok(products)
    }
}

パターン4:トランザクション管理

use sqlx::{PgPool, Postgres, Transaction};
use uuid::Uuid;

#[derive(Debug, thiserror::Error)]
pub enum OrderError {
    #[error("Insufficient stock for product {product_id}")]
    InsufficientStock { product_id: Uuid },
    #[error("Product not found: {product_id}")]
    ProductNotFound { product_id: Uuid },
    #[error("Database error: {0}")]
    Database(#[from] sqlx::Error),
}

pub struct OrderService;

impl OrderService {
    // 基本トランザクション:注文作成と在庫引当
    pub async fn create_order(
        pool: &PgPool,
        user_id: Uuid,
        product_id: Uuid,
        quantity: i32,
    ) -> Result<Uuid, OrderError> {
        let order_id = Uuid::new_v4();

        let mut tx = pool.begin().await?;

        // 在庫確認
        let stock: Option<(i32,)> = sqlx::query_as(
            "SELECT stock FROM products WHERE id = $1 FOR UPDATE",
        )
        .bind(product_id)
        .fetch_optional(&mut *tx)
        .await?;

        let current_stock = stock
            .ok_or(OrderError::ProductNotFound { product_id })?
            .0;

        if current_stock < quantity {
            return Err(OrderError::InsufficientStock { product_id });
        }

        // 在庫引当
        sqlx::query(
            "UPDATE products SET stock = stock - $1, updated_at = NOW() WHERE id = $2",
        )
        .bind(quantity)
        .bind(product_id)
        .execute(&mut *tx)
        .await?;

        // 注文作成
        sqlx::query(
            r#"INSERT INTO orders (id, user_id, product_id, quantity, status, created_at)
               VALUES ($1, $2, $3, $4, 'pending', NOW())"#,
        )
        .bind(order_id)
        .bind(user_id)
        .bind(product_id)
        .bind(quantity)
        .execute(&mut *tx)
        .await?;

        tx.commit().await?;

        Ok(order_id)
    }

    // ネストされたトランザクション(SAVEPOINT)
    pub async fn create_order_with_log(
        pool: &PgPool,
        user_id: Uuid,
        product_id: Uuid,
        quantity: i32,
    ) -> Result<Uuid, OrderError> {
        let mut tx = pool.begin().await?;

        // 外部トランザクション:注文作成
        let order_id = Uuid::new_v4();
        sqlx::query(
            r#"INSERT INTO orders (id, user_id, product_id, quantity, status, created_at)
               VALUES ($1, $2, $3, $4, 'pending', NOW())"#,
        )
        .bind(order_id)
        .bind(user_id)
        .bind(product_id)
        .bind(quantity)
        .execute(&mut *tx)
        .await?;

        // ネストされたトランザクション(SAVEPOINT):ログ記録、失敗しても外部に影響しない
        let nested = tx.begin().await;
        match nested {
            Ok(mut savepoint) => {
                let log_result = sqlx::query(
                    "INSERT INTO order_logs (order_id, action, created_at) VALUES ($1, 'created', NOW())",
                )
                .bind(order_id)
                .execute(&mut *savepoint)
                .await;

                match log_result {
                    Ok(_) => savepoint.commit().await?,
                    Err(e) => {
                        tracing::warn!("Order log failed: {}, continuing", e);
                        savepoint.rollback().await?;
                    }
                }
            }
            Err(e) => {
                tracing::warn!("Savepoint failed: {}, continuing", e);
            }
        }

        tx.commit().await?;
        Ok(order_id)
    }

    // トランザクションリトライ戦略
    pub async fn create_order_with_retry(
        pool: &PgPool,
        user_id: Uuid,
        product_id: Uuid,
        quantity: i32,
        max_retries: u32,
    ) -> Result<Uuid, OrderError> {
        let mut attempts = 0;

        loop {
            match Self::create_order(pool, user_id, product_id, quantity).await {
                Ok(order_id) => return Ok(order_id),
                Err(OrderError::Database(e)) => {
                    attempts += 1;
                    if attempts >= max_retries {
                        return Err(OrderError::Database(e));
                    }
                    if let Some(db_error) = e.as_database_error() {
                        // デッドロックまたは直列化失敗、リトライ可能
                        if db_error.code().as_deref() == Some("40P01")
                            || db_error.code().as_deref() == Some("40001")
                        {
                            tracing::warn!(
                                "Retryable error (attempt {}/{}): {}",
                                attempts,
                                max_retries,
                                e
                            );
                            tokio::time::sleep(
                                std::time::Duration::from_millis(100 * attempts as u64)
                            ).await;
                            continue;
                        }
                    }
                    return Err(OrderError::Database(e));
                }
                Err(e) => return Err(e),
            }
        }
    }
}

パターン5:データベースマイグレーション

sqlx-cliのインストール:

cargo install sqlx-cli --no-default-features --features postgres

マイグレーションファイルの作成:

sqlx migrate add create_users_table
sqlx migrate add create_products_table
sqlx migrate add create_orders_table

マイグレーションSQLファイル:

-- migrations/20260615000001_create_users_table.sql
CREATE TABLE users (
    id BIGSERIAL PRIMARY KEY,
    username VARCHAR(64) NOT NULL UNIQUE,
    email VARCHAR(255) NOT NULL UNIQUE,
    password_hash VARCHAR(255) NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);
-- migrations/20260615000002_create_products_table.sql
CREATE TABLE products (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(255) NOT NULL,
    description TEXT,
    price DECIMAL(10, 2) NOT NULL CHECK (price >= 0),
    stock INTEGER NOT NULL DEFAULT 0 CHECK (stock >= 0),
    category_id UUID,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_products_category ON products(category_id);
CREATE INDEX idx_products_name ON products USING gin(to_tsvector('simple', name));
-- migrations/20260615000003_create_orders_table.sql
CREATE TABLE orders (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id BIGINT NOT NULL REFERENCES users(id),
    product_id UUID NOT NULL REFERENCES products(id),
    quantity INTEGER NOT NULL CHECK (quantity > 0),
    status VARCHAR(32) NOT NULL DEFAULT 'pending',
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE order_logs (
    id BIGSERIAL PRIMARY KEY,
    order_id UUID NOT NULL REFERENCES orders(id),
    action VARCHAR(64) NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_orders_user ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_order_logs_order ON order_logs(order_id);

プログラムによるマイグレーション実行:

use sqlx::migrate::Migrator;
use sqlx::postgres::PgPoolOptions;

// マイグレーションファイルをコンパイル時に埋め込み
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");

async fn run_migrations() -> Result<(), Box<dyn std::error::Error>> {
    dotenvy::dotenv().ok();
    let database_url = std::env::var("DATABASE_URL")?;

    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect(&database_url)
        .await?;

    // マイグレーションを実行
    MIGRATOR.run(&pool).await?;

    tracing::info!("Migrations completed successfully");
    Ok(())
}

// 条件チェック付きマイグレーション
async fn safe_migrate(pool: &sqlx::PgPool) -> Result<(), sqlx::migrate::MigrateError> {
    let migrator = MIGRATOR;

    // 保留中のマイグレーションを確認
    let applied: Vec<(String,)> = sqlx::query_as(
        "SELECT version FROM _sqlx_migrations ORDER BY version",
    )
    .fetch_all(pool)
    .await
    .unwrap_or_default();

    tracing::info!("Already applied {} migrations", applied.len());

    migrator.run(pool).await
}

三、5つのよくある落とし穴

落とし穴1:コンパイル時チェックと実行時クエリの混用による型不一致

誤った方法:

// query!は匿名構造体を返す、フィールド名はSQLエイリアスと完全に一致する必要がある
let user = sqlx::query!(
    r#"SELECT id, username as name FROM users WHERE id = $1"#,
    user_id
)
.fetch_one(&pool)
.await?;
// コンパイルエラー:匿名構造体のフィールド名はnameでusernameではない
println!("{}", user.username); // フィールドが存在しない!

正しい方法:

let user = sqlx::query!(
    r#"SELECT id, username as "name!" FROM users WHERE id = $1"#,
    user_id
)
.fetch_one(&pool)
.await?;
println!("{}", user.name); // 正しいエイリアスを使用

// またはquery_as!でカスタム構造体にマッピング
#[derive(FromRow)]
struct UserRow {
    id: i64,
    name: String,
}
let user = sqlx::query_as_unchecked!(
    UserRow,
    r#"SELECT id, username as name FROM users WHERE id = $1"#,
    user_id
)
.fetch_one(&pool)
.await?;

落とし穴2:Drop時のコネクションプールの適切なクローズ漏れ

誤った方法:

async fn bad_app() {
    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect("postgres://...")
        .await
        .unwrap();
    // 関数終了時にpoolがdropされるが、非同期クリーンアップが完了していない可能性
    // データベース側の接続が閉じられないままになる
}

正しい方法:

async fn good_app() {
    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect("postgres://...")
        .await
        .unwrap();

    // 明示的にコネクションプールをクローズ
    pool.close().await;

    // またはアプリケーション終了時にtokio runtimeが完全に待機することを確認
}

落とし穴3:トランザクション内での長時間ロック保持

誤った方法:

async fn bad_transaction(pool: &PgPool) -> Result<(), sqlx::Error> {
    let mut tx = pool.begin().await?;
    sqlx::query("UPDATE products SET stock = stock - 1 WHERE id = $1")
        .bind(product_id)
        .execute(&mut *tx)
        .await?;
    // トランザクション内で外部APIを呼び出し、数秒かかる可能性
    call_external_api().await; // 危険!ロック保持中の外部呼び出し
    tx.commit().await
}

正しい方法:

async fn good_transaction(pool: &PgPool) -> Result<(), sqlx::Error> {
    // 先に外部呼び出しを完了
    call_external_api().await;

    // その後トランザクションを開始、データベース操作をできるだけ早く完了
    let mut tx = pool.begin().await?;
    sqlx::query("UPDATE products SET stock = stock - 1 WHERE id = $1")
        .bind(product_id)
        .execute(&mut *tx)
        .await?;
    tx.commit().await
}

落とし穴4:Nullableフィールドの型不一致

誤った方法:

// データベースのemailフィールドはNULLを許可
let user = sqlx::query!(
    r#"SELECT id, email FROM users WHERE id = $1"#,
    user_id
)
.fetch_one(&pool)
.await?;
// コンパイルエラー:email型はOption<String>、Stringとして直接使用不可
let email: String = user.email; // 型不一致!

正しい方法:

let user = sqlx::query!(
    r#"SELECT id, email as "email!" FROM users WHERE id = $1"#,
    // "email!"を使用してSQLxにNOT NULLであることを伝える
    user_id
)
.fetch_one(&pool)
.await?;
let email: String = user.email; // 今はString型

// またはOptionを正しく処理
let user = sqlx::query!(
    r#"SELECT id, email FROM users WHERE id = $1"#,
    user_id
)
.fetch_one(&pool)
.await?;
let email = user.email.unwrap_or_default(); // Optionを安全に処理

落とし穴5:非TokioランタイムでのSQLx使用

誤った方法:

// Cargo.tomlでruntime-tokioのみ有効、しかしコードはasync-stdで実行
use async_std::task;

fn main() {
    // panic! SQLxのTokio runtimeが起動していない
    task::block_on(async {
        let pool = PgPoolOptions::new()
            .connect("postgres://...")
            .await
            .unwrap();
    });
}

正しい方法:

// オプション1:Tokioランタイムを使用(推奨)
#[tokio::main]
async fn main() {
    let pool = PgPoolOptions::new()
        .connect("postgres://...")
        .await
        .unwrap();
}

// オプション2:async-stdを使用する必要がある場合、Cargo.tomlで対応するfeatureを有効化
// sqlx = { features = ["runtime-async-std", "tls-rustls"] }

四、エラートラブルシューティング表

エラーメッセージ 原因 解決策
error: variant Some not found query!のNullableフィールドが非NULLとして扱われている "field!"構文で非NULLをマークまたはOptionを処理
Pool timed out while waiting for an open connection コネクションプール枯渇 max_connectionsを増やすか接続リークを確認
no rows returned by query fetch_oneクエリに結果がない fetch_optionalで空の結果を処理
column "xxx" not found in query SQLエイリアスと構造体フィールド名が不一致 SQLのASエイリアスを確認またはFromRowリネームを使用
unsupported type Rust型とPostgreSQL型のマッピング不可 sqlx::Typesqlx::Encode/sqlx::Decodeを実装
database "xxx" does not exist DATABASE_URLのデータベース名が誤り データベースを作成または接続文字列を修正
fatal: password authentication failed データベースのユーザー名またはパスワードが誤り .envファイルのDATABASE_URLを確認
error returned from database: deadlock detected 複数トランザクションの並行競合 リトライロジックを追加、トランザクション操作順序を調整
migrate error: version conflict マイグレーションファイルのタイムスタンプ競合 マイグレーションファイル名を確認、タイムスタンプの一意性を確保
failed to lookup address information データベースホスト名を解決できない ネットワーク接続とDNS設定を確認

五、高度な最適化テクニック

テクニック1:バッチ操作の最適化

use sqlx::{PgPool, QueryBuilder};
use uuid::Uuid;

async fn batch_insert_products(
    pool: &PgPool,
    products: Vec<CreateProduct>,
) -> Result<(), sqlx::Error> {
    if products.is_empty() {
        return Ok(());
    }

    // QueryBuilderでバッチINSERTを構築
    let mut query_builder = QueryBuilder::new(
        "INSERT INTO products (id, name, description, price, stock, category_id, created_at, updated_at) "
    );

    query_builder.push_values(products, |mut b, product| {
        b.push_bind(Uuid::new_v4())
            .push_bind(&product.name)
            .push_bind(&product.description)
            .push_bind(product.price)
            .push_bind(product.stock)
            .push_bind(product.category_id)
            .push_bind(chrono::Utc::now())
            .push_bind(chrono::Utc::now());
    });

    query_builder.build().execute(pool).await?;

    Ok(())
}

// UNNESTを使用したより効率的なバッチ操作
async fn batch_update_stock(
    pool: &PgPool,
    updates: Vec<(Uuid, i32)>,
) -> Result<(), sqlx::Error> {
    let ids: Vec<Uuid> = updates.iter().map(|(id, _)| *id).collect();
    let stocks: Vec<i32> = updates.iter().map(|(_, s)| *s).collect();

    sqlx::query(
        r#"
        UPDATE products p
        SET stock = u.new_stock, updated_at = NOW()
        FROM UNNEST($1::uuid[], $2::int[]) AS u(id, new_stock)
        WHERE p.id = u.id
        "#,
    )
    .bind(ids)
    .bind(stocks)
    .execute(pool)
    .await?;

    Ok(())
}

テクニック2:コネクションプールモニタリングとメトリクス収集

use sqlx::postgres::PgPoolOptions;
use std::time::Duration;

async fn create_monitored_pool() -> Result<sqlx::PgPool, sqlx::Error> {
    let database_url = std::env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set");

    let pool = PgPoolOptions::new()
        .max_connections(30)
        .min_connections(5)
        .acquire_timeout(Duration::from_secs(5))
        .idle_timeout(Duration::from_secs(600))
        .max_lifetime(Duration::from_secs(1800))
        .connect(&database_url)
        .await?;

    // バックグラウンドモニタリングタスクを開始
    let monitor_pool = pool.clone();
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(30));
        loop {
            interval.tick().await;
            let size = monitor_pool.size();
            let idle = monitor_pool.num_idle();
            tracing::info!(
                "[Pool Monitor] total={}, idle={}, active={}",
                size,
                idle,
                size - idle
            );
            // アクティブ接続が80%を超えた場合にアラート
            if size > 0 && (size - idle) as f64 / size as f64 > 0.8 {
                tracing::warn!(
                    "[Pool Alert] Connection usage exceeds 80%! active={}/total={}",
                    size - idle,
                    size
                );
            }
        }
    });

    Ok(pool)
}

テクニック3:カスタム型マッピング

use sqlx::{Decode, Encode, Type, Postgres, postgres::PgTypeInfo};
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum OrderStatus {
    Pending,
    Confirmed,
    Shipped,
    Delivered,
    Cancelled,
}

// SQLx型マッピングを実装
impl Type<Postgres> for OrderStatus {
    fn type_info() -> PgTypeInfo {
        <String as Type<Postgres>>::type_info() // VARCHARにマッピング
    }
}

impl<'r> Decode<'r, Postgres> for OrderStatus {
    fn decode(value: <Postgres as sqlx::Database>::ValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> {
        let s = <String as Decode<Postgres>>::decode(value)?;
        match s.as_str() {
            "pending" => Ok(OrderStatus::Pending),
            "confirmed" => Ok(OrderStatus::Confirmed),
            "shipped" => Ok(OrderStatus::Shipped),
            "delivered" => Ok(OrderStatus::Delivered),
            "cancelled" => Ok(OrderStatus::Cancelled),
            _ => Err(format!("Unknown order status: {}", s).into()),
        }
    }
}

impl<'q> Encode<'q, Postgres> for OrderStatus {
    fn encode_by_ref(&self, buf: &mut <Postgres as sqlx::Database>::ArgumentBuffer<'q>) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
        let s = match self {
            OrderStatus::Pending => "pending",
            OrderStatus::Confirmed => "confirmed",
            OrderStatus::Shipped => "shipped",
            OrderStatus::Delivered => "delivered",
            OrderStatus::Cancelled => "cancelled",
        };
        <String as Encode<Postgres>>::encode_by_ref(&s.to_string(), buf)
    }
}

// PostgreSQL列挙型を使用(より良いアプローチ)
// まずマイグレーションで列挙型を作成:
// CREATE TYPE order_status AS ENUM ('pending', 'confirmed', 'shipped', 'delivered', 'cancelled');
// 次にsqlx::Type deriveマクロを使用:
// #[derive(sqlx::Type)]
// #[sqlx(type_name = "order_status", rename_all = "lowercase")]
// pub enum OrderStatus { Pending, Confirmed, Shipped, Delivered, Cancelled }

六、比較分析

特徴 SQLx Diesel SeaORM sqlx-core tokio-postgres
非同期サポート ✅ ネイティブ非同期 ❌ 同期(diesel-async実験的) ✅ ネイティブ非同期 ✅ ネイティブ非同期 ✅ ネイティブ非同期
コンパイル時チェック ✅ query!マクロ ✅ DSL ❌ 実行時 ❌ なし ❌ なし
コネクションプール ✅ 内蔵 ✅ r2d2 ✅ 内蔵 ✅ 内蔵 ❌ deadpoolが必要
マイグレーションツール ✅ sqlx-cli ✅ diesel-cli ✅ 内蔵 ✅ 内蔵 ❌ なし
動的クエリ ✅ QueryBuilder ✅ DSL ✅ 動的フィルター ✅ QueryBuilder ❌ 手動文字列結合
マルチデータベース ✅ Pg/MySQL/SQLite ✅ Pg/MySQL/SQLite ✅ Pg/MySQL/SQLite ✅ Pg/MySQL/SQLite ❌ PostgreSQLのみ
学習曲線 高(DSLが複雑) 高(低レベルAPI) 高(低レベルAPI)
パフォーマンス 非常に高い 非常に高い
エコシステム成熟度 非常に高い
ORM機能 ❌ クエリのみ ✅ フルORM ✅ フルORM ❌ ドライバのみ ❌ ドライバのみ

選択ガイド

  • コンパイル時SQLチェック + 非同期が必要 → SQLx
  • フルORM + 型安全DSLが必要 → Diesel
  • 非同期ORM + 動的クエリが必要 → SeaORM
  • 極限のパフォーマンス + 低レベル制御が必要 → sqlx-core / tokio-postgres

七、まとめ

Rust SQLxデータベースドライバのコアバリューはコンパイル時SQLチェックにあります——実行時になって初めて発見されるSQLエラーをコンパイル段階に前倒しし、ネイティブ非同期と内蔵コネクションプールと組み合わせることで、Rustのデータベース操作を安全かつ効率的にします。3つの重要原則を忘れないでください:常にquery!/query_as!を実行時クエリの代わりに使用するコネクションプールパラメータは実際の負荷に基づいて正確にチューニングするトランザクション内でロックを保持したまま外部呼び出しを絶対にしない。この5つの本番級パターンをマスターすれば、Rustデータベース操作は「動く」から「信頼できる」へとアップグレードします。


おすすめツール

ブラウザローカルツールを無料で試す →

#Rust#SQLx#数据库#异步查询#PostgreSQL#2026#连接池