時系列データベース比較実践:IoTから金融分析までの5つのプロダクションパターン

数据库

時系列データがリレーショナルデータベースを食い尽くしている?

モニタリングメトリクスが毎秒10万件書き込まれ、IoTセンサーが毎分100万データポイントを報告し、金融Tickデータが1日で数十億行を生成——MySQLの時間パーティションで何とか支えている間に、クエリはタイムアウトし、ストレージコストは爆発的に増大。2026年、**時系列データベース(TSDB)**は時系列データ処理の標準解答となっていますが、InfluxDB、TimescaleDB、TDengine、QuestDBをどう選ぶべきか?

本記事では5つのリアルプロダクションシナリオから出発し、各シナリオの最適データベース選定と完全なコードを提供し、よくある落とし穴を回避します。


時系列データベースアーキテクチャ比較

特徴 InfluxDB 3.0 TimescaleDB TDengine QuestDB
ストレージエンジン Apache Arrow + Parquet PostgreSQL拡張 独自エンジン 独自列指向エンジン
クエリ言語 InfluxQL / SQL SQL SQL SQL
書き込みプロトコル Line Protocol SQL INSERT SQL INSERT / Schemaless ILP / SQL INSERT
圧縮率 10:1~20:1 5:1~10:1 10:1~20:1 10:1~15:1
クラスタリング 3.0ネイティブ PGクラスタ依存 ネイティブ エンタープライズ版
ライセンス Apache 2.0(コミュニティ) Apache 2.0(コミュニティ) AGPL-3.0 Apache 2.0
最適シナリオ モニタリング/Observability 金融/複雑分析 IoT/エッジ 高頻度書き込み/リアルタイム

なぜ1つだけ使わないのか?

  • InfluxDB:モニタリングエコシステム最強、しかし複雑なSQL分析は弱い
  • TimescaleDB:SQLエコシステム完璧、しかし書き込みスループットは専用TSDBに劣る
  • TDengine:IoT書き込みが極めて高速、しかしSQL互換性は限定的
  • QuestDB:高頻度書き込み無敵、しかしエコシステムとツールチェーンが未成熟

パターン1:InfluxDB 3.0 モニタリング&オブザーバビリティ

シナリオ

Kubernetesクラスタ500ノード、10万Pod、Prometheusが15秒ごとにメトリクスを収集、Grafanaでリアルタイム表示。ダウンサンプリング、リテンションポリシー、Flux/InfluxQLデュアルクエリサポートが必要。

インストールと設定

# Dockerデプロイ
docker run -d --name influxdb3 \
  -p 8086:8086 \
  -p 8181:8181 \
  -v influxdb3-data:/var/lib/influxdb3 \
  influxdb:3.0-core

# バケット作成
influx bucket create \
  --name monitoring \
  --retention 30d \
  --org myorg \
  --token ${INFLUX_TOKEN}

モニタリングデータの書き込み

from influxdb_client_3 import InfluxDBClient3, Point
from datetime import datetime, timezone
import random
import time

client = InfluxDBClient3(
    host="localhost:8181",
    database="monitoring",
    token="my-token"
)

def write_metrics():
    points = []
    for i in range(10000):
        point = Point("cpu_metrics") \
            .tag("host", f"node-{random.randint(1, 500)}") \
            .tag("region", random.choice(["us-east", "eu-west", "ap-south"])) \
            .field("usage_percent", random.uniform(10, 95)) \
            .field("load_1m", random.uniform(0.5, 8.0)) \
            .field("memory_percent", random.uniform(20, 90)) \
            .time(datetime.now(timezone.utc))

        points.append(point)

    client.write(record=points, write_precision="ms")
    print(f"Written {len(points)} points")

for _ in range(100):
    write_metrics()
    time.sleep(1)

ダウンサンプリングと継続クエリ

-- InfluxDB 3.0 SQLダウンサンプリング:5分平均CPU
SELECT
    time_bucket('5 minutes', time) AS bucket,
    host,
    avg(cpu_usage_percent) AS avg_cpu,
    max(cpu_usage_percent) AS max_cpu,
    min(cpu_usage_percent) AS min_cpu
FROM cpu_metrics
WHERE time >= now() - INTERVAL '1 hour'
GROUP BY bucket, host
ORDER BY bucket DESC;

-- InfluxQLダウンサンプリング(互換モード)
SELECT
    MEAN("usage_percent") AS "avg_cpu",
    MAX("usage_percent") AS "max_cpu"
INTO "monitoring"."downsampled_5m"."cpu_metrics"
FROM "monitoring"."autogen"."cpu_metrics"
GROUP BY time(5m), "host"

リテンションポリシー

from influxdb_client import InfluxDBClient

client = InfluxDBClient(url="http://localhost:8086", token="my-token")
buckets_api = client.buckets_api()

raw_bucket = buckets_api.find_bucket_by_name("monitoring")
raw_bucket.retention_rules[0].every_seconds = 30 * 24 * 3600  # 30日

downsampled = buckets_api.create_bucket(
    bucket_name="monitoring-downsampled",
    retention_rules={"every_seconds": 365 * 24 * 3600},  # 1年
    org="myorg"
)

Grafana統合

# grafana/provisioning/datasources/influxdb.yaml
apiVersion: 1
datasources:
  - name: InfluxDB 3.0
    type: influxdb
    url: http://influxdb3:8181
    access: proxy
    jsonData:
      version: Flux
      organization: myorg
      defaultBucket: monitoring
    secureJsonData:
      token: ${INFLUX_TOKEN}

パターン2:TimescaleDB 金融分析

シナリオ

定量取引プラットフォーム、毎秒5000件の株式Tickデータを受信、リアルタイムVWAP計算、移動平均、ボリンジャーバンド、複雑なJOINクエリ(取引オーダー表とユーザー表の結合)が必要。

インストールと設定

# Dockerデプロイ
docker run -d --name timescaledb \
  -p 5432:5432 \
  -e POSTGRES_PASSWORD=postgres \
  -v tsdb-data:/var/lib/postgresql/data \
  timescale/timescaledb:latest-pg16

# 拡張機能の有効化
psql -h localhost -U postgres -c "CREATE EXTENSION IF NOT EXISTS timescaledb;"

Hypertableとスキーマの作成

CREATE TABLE stock_ticks (
    time        TIMESTAMPTZ NOT NULL,
    symbol      VARCHAR(10) NOT NULL,
    price       NUMERIC(12, 4) NOT NULL,
    volume      BIGINT NOT NULL,
    bid         NUMERIC(12, 4),
    ask         NUMERIC(12, 4),
    exchange    VARCHAR(10)
);

SELECT create_hypertable('stock_ticks', 'time',
    chunk_time_interval => INTERVAL '1 day',
    partitioning_column => 'symbol',
    number_partitions => 4
);

CREATE INDEX idx_symbol_time ON stock_ticks (symbol, time DESC);

CREATE TABLE trading_orders (
    id          BIGSERIAL,
    time        TIMESTAMPTZ NOT NULL,
    symbol      VARCHAR(10) NOT NULL,
    side        VARCHAR(4) NOT NULL,
    price       NUMERIC(12, 4) NOT NULL,
    quantity    BIGINT NOT NULL,
    user_id     BIGINT NOT NULL,
    status      VARCHAR(10) DEFAULT 'pending'
);

SELECT create_hypertable('trading_orders', 'time',
    chunk_time_interval => INTERVAL '1 day'
);

継続集約(Continuous Aggregates)

-- 1分足OHLC
CREATE MATERIALIZED VIEW ohlc_1min
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 minute', time) AS bucket,
    symbol,
    first(price, time) AS open,
    last(price, time) AS close,
    max(price) AS high,
    min(price) AS low,
    sum(volume) AS volume
FROM stock_ticks
GROUP BY bucket, symbol
WITH NO DATA;

-- 自動リフレッシュポリシー
SELECT add_continuous_aggregate_policy('ohlc_1min',
    start_offset => INTERVAL '3 hours',
    end_offset => INTERVAL '1 minute',
    schedule_interval => INTERVAL '1 minute'
);

-- 5分足(1分集約に基づく)
CREATE MATERIALIZED VIEW ohlc_5min
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('5 minutes', bucket) AS bucket,
    symbol,
    first(open, bucket) AS open,
    last(close, bucket) AS close,
    max(high) AS high,
    min(low) AS low,
    sum(volume) AS volume
FROM ohlc_1min
GROUP BY bucket, symbol
WITH NO DATA;

SELECT add_continuous_aggregate_policy('ohlc_5min',
    start_offset => INTERVAL '12 hours',
    end_offset => INTERVAL '5 minutes',
    schedule_interval => INTERVAL '5 minutes'
);

ハイパーファンクションによるテクニカル指標計算

-- VWAP(出来高加重平均価格)
SELECT
    time_bucket('5 minutes', time) AS bucket,
    symbol,
    sum(price * volume) / sum(volume) AS vwap
FROM stock_ticks
WHERE time >= now() - INTERVAL '1 hour'
GROUP BY bucket, symbol;

-- 移動平均(ギャップフィル付き)
SELECT
    time_bucket_gapfill('1 minute', time, now() - INTERVAL '1 hour', now()) AS bucket,
    symbol,
    locf(last(price, time)) AS last_price,
    avg(last(price, time)) OVER (
        PARTITION BY symbol
        ORDER BY bucket
        ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
    ) AS ma_20
FROM stock_ticks
WHERE time >= now() - INTERVAL '2 hours'
GROUP BY bucket, symbol;

-- ボリンジャーバンド
WITH ma_20 AS (
    SELECT
        time_bucket('5 minutes', time) AS bucket,
        symbol,
        avg(price) AS ma,
        stddev(price) AS std
    FROM stock_ticks
    WHERE time >= now() - INTERVAL '2 hours'
    GROUP BY bucket, symbol
    HAVING count(*) >= 10
)
SELECT
    bucket, symbol, ma,
    ma + 2 * std AS upper_band,
    ma - 2 * std AS lower_band
FROM ma_20
ORDER BY bucket DESC, symbol;

Python書き込みとクエリ

import asyncpg
import asyncio
from datetime import datetime, timezone

async def write_tick(pool, symbol, price, volume, bid, ask, exchange):
    async with pool.acquire() as conn:
        await conn.execute(
            """INSERT INTO stock_ticks (time, symbol, price, volume, bid, ask, exchange)
               VALUES ($1, $2, $3, $4, $5, $6, $7)""",
            datetime.now(timezone.utc), symbol, price, volume, bid, ask, exchange
        )

async def batch_write_ticks(pool, ticks):
    async with pool.acquire() as conn:
        await conn.executemany(
            """INSERT INTO stock_ticks (time, symbol, price, volume, bid, ask, exchange)
               VALUES ($1, $2, $3, $4, $5, $6, $7)""",
            [(t['time'], t['symbol'], t['price'], t['volume'],
              t['bid'], t['ask'], t['exchange']) for t in ticks]
        )

async def query_ohlc(pool, symbol, hours=1):
    async with pool.acquire() as conn:
        rows = await conn.fetch(
            """SELECT bucket, symbol, open, close, high, low, volume
               FROM ohlc_1min
               WHERE symbol = $1 AND bucket >= now() - INTERVAL '$2 hours'
               ORDER BY bucket""",
            symbol, hours
        )
        return [dict(row) for row in rows]

async def main():
    pool = await asyncpg.create_pool(
        "postgresql://postgres:postgres@localhost:5432/postgres",
        min_size=5, max_size=20
    )
    ticks = [
        {"time": datetime.now(timezone.utc), "symbol": "AAPL",
         "price": 198.50, "volume": 1000, "bid": 198.48, "ask": 198.52, "exchange": "NASDAQ"},
        {"time": datetime.now(timezone.utc), "symbol": "GOOGL",
         "price": 175.30, "volume": 800, "bid": 175.28, "ask": 175.32, "exchange": "NASDAQ"},
    ]
    await batch_write_ticks(pool, ticks)
    result = await query_ohlc(pool, "AAPL")
    print(f"Got {len(result)} candles")
    await pool.close()

asyncio.run(main())

パターン3:TDengine IoTエッジコンピューティング

シナリオ

産業用IoTゲートウェイ、1000センサーが毎秒データを報告、エッジノードリソース制限(4コア8GB)、超低レイテンシ書き込み、スーパーテーブルによる同種デバイス管理、クラスタデプロイによる高可用性が必要。

インストールと設定

# Dockerデプロイ
docker run -d --name tdengine \
  -p 6030:6030 \
  -p 6041:6041 \
  -v tdengine-data:/var/lib/taos \
  tdengine/tdengine:3.3.4.0

# taos CLIを使用
taos

スーパーテーブルとサブテーブルの作成

-- データベース作成
CREATE DATABASE iot_edge PRECISION 'ms' KEEP 3650 BUFFER 96 WAL_LEVEL 1;

USE iot_edge;

-- スーパーテーブル(テンプレート)作成
CREATE STABLE sensor_data (
    ts          TIMESTAMP,
    temperature FLOAT,
    humidity    FLOAT,
    pressure    FLOAT,
    voltage     FLOAT
) TAGS (
    device_id   BINARY(32),
    device_type BINARY(16),
    location    BINARY(64),
    factory     BINARY(32)
);

-- 挿入時にTAGSを指定してサブテーブルを自動作成
INSERT INTO d_sensor_001 USING sensor_data TAGS ('sensor-001', 'temperature', 'workshop-A-line-1', 'factory-east')
VALUES (NOW, 23.5, 65.2, 1013.25, 3.3);

INSERT INTO d_sensor_002 USING sensor_data TAGS ('sensor-002', 'pressure', 'workshop-A-line-2', 'factory-east')
VALUES (NOW, 25.1, 60.8, 1012.80, 3.28);

-- バッチ書き込み(高性能)
INSERT INTO d_sensor_001 VALUES (NOW + 1s, 23.6, 65.1, 1013.20, 3.31)
             d_sensor_002 VALUES (NOW + 1s, 25.0, 60.9, 1012.75, 3.29)
             d_sensor_001 VALUES (NOW + 2s, 23.7, 65.0, 1013.15, 3.30);

クエリと分析

-- デバイスタイプ別集約:過去1時間の平均温度
SELECT device_type, avg(temperature) AS avg_temp, max(temperature) AS max_temp
FROM sensor_data
WHERE ts >= NOW - 1h
GROUP BY device_type;

-- 工場と場所別集約
SELECT factory, location, count(*) AS sample_count,
       avg(temperature) AS avg_temp,
       stddev(temperature) AS temp_std
FROM sensor_data
WHERE ts >= NOW - 6h AND device_type = 'temperature'
GROUP BY factory, location;

-- 異常検知:温度が3σを超過
SELECT ts, device_id, temperature
FROM sensor_data
WHERE ts >= NOW - 1h
  AND temperature > (
      SELECT avg(temperature) + 3 * stddev(temperature)
      FROM sensor_data
      WHERE ts >= NOW - 24h AND device_type = 'temperature'
  );

-- ダウンサンプリング:5分集約
SELECT _wstart AS bucket, device_id,
       avg(temperature) AS avg_temp,
       min(temperature) AS min_temp,
       max(temperature) AS max_temp
FROM sensor_data
WHERE ts >= NOW - 1h
INTERVAL(5m) SLIDING(5m) PARTITION BY device_id;

Python SDK書き込み

from taosrest import RestClient
from datetime import datetime, timezone
import random

client = RestClient("http://localhost:6041", user="root", password="taosdata")

def write_sensor_batch(records):
    sql_lines = []
    for r in records:
        line = f"d_{r['device_id']} USING sensor_data TAGS ('{r['device_id']}', '{r['device_type']}', '{r['location']}', '{r['factory']}') VALUES ('{r['ts']}', {r['temperature']}, {r['humidity']}, {r['pressure']}, {r['voltage']})"
        sql_lines.append(line)
    sql = "INSERT INTO " + " ".join(sql_lines)
    client.sql(sql, database="iot_edge")

def query_latest(device_id, minutes=10):
    result = client.sql(
        f"SELECT ts, temperature, humidity, pressure FROM sensor_data "
        f"WHERE device_id = '{device_id}' AND ts >= NOW - {minutes}m "
        f"ORDER BY ts DESC LIMIT 100",
        database="iot_edge"
    )
    return result

records = []
for i in range(1000):
    records.append({
        "device_id": f"sensor-{i % 100:03d}",
        "device_type": random.choice(["temperature", "pressure", "humidity"]),
        "location": f"workshop-A-line-{i % 10}",
        "factory": "factory-east",
        "ts": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f"),
        "temperature": round(random.uniform(20, 30), 2),
        "humidity": round(random.uniform(50, 70), 2),
        "pressure": round(random.uniform(1010, 1015), 2),
        "voltage": round(random.uniform(3.2, 3.4), 2),
    })
write_sensor_batch(records)

クラスタデプロイ

# taos.cfg - 最初のdnode
firstEp         tdnode1:6030
secondEp        tdnode2:6030
serverPort      6030
dataDir         /var/lib/taos
logDir          /var/log/taos

# dnode作成
taos> CREATE DNODE "tdnode2:6030";
taos> CREATE DNODE "tdnode3:6030";

-- mnode作成
taos> ALTER DNODE 2 mnodeRole 'mnode';
taos> ALTER DNODE 3 mnodeRole 'mnode';

-- データベースレプリカ数
ALTER DATABASE iot_edge REPLICA 3;

パターン4:QuestDB 高頻度書き込み

シナリオ

暗号通貨取引所マーケットデータ、毎秒50万Tick、SQLインターフェースでの直接クエリ、SIMD最適化集約、ILP(InfluxDB Line Protocol)高速書き込みが必要。

インストールと設定

# Dockerデプロイ
docker run -d --name questdb \
  -p 9000:9000 \
  -p 9009:9009 \
  -p 8812:8812 \
  -v questdb-data:/root/.questdb \
  questdb/questdb:8.2.1

# Webコンソール: http://localhost:9000
# ILPポート: 9009
# PostgreSQL互換ポート: 8812

テーブル作成と書き込み

-- SQLでテーブル作成
CREATE TABLE crypto_ticks (
    ts          TIMESTAMP,
    symbol      SYMBOL,
    price       DOUBLE,
    volume      DOUBLE,
    side        SYMBOL,
    exchange    SYMBOL
) TIMESTAMP(ts) PARTITION BY DAY WAL;

ILP高速書き込み

import socket
import time
import random

class QuestDBILPWriter:
    def __init__(self, host="localhost", port=9009):
        self.host = host
        self.port = port
        self.sock = None

    def connect(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((self.host, self.port))

    def write_line(self, line):
        self.sock.sendall((line + "\n").encode())

    def close(self):
        if self.sock:
            self.sock.close()

    def write_tick(self, symbol, price, volume, side, exchange, ts_ns):
        line = f"crypto_ticks,symbol={symbol},side={side},exchange={exchange} price={price},volume={volume} {ts_ns}"
        self.write_line(line)

writer = QuestDBILPWriter()
writer.connect()

symbols = ["BTC-USD", "ETH-USD", "SOL-USD", "BNB-USD", "XRP-USD"]
base_time = int(time.time() * 1_000_000_000)

batch_size = 10000
total_written = 0
start = time.time()

for i in range(100000):
    symbol = random.choice(symbols)
    price = round(random.uniform(20000, 70000) if symbol == "BTC-USD" else random.uniform(1000, 4000), 2)
    volume = round(random.uniform(0.01, 5.0), 6)
    side = random.choice(["buy", "sell"])
    exchange = random.choice(["binance", "coinbase", "kraken"])
    ts_ns = base_time + i * 1_000_000

    writer.write_tick(symbol, price, volume, side, exchange, ts_ns)
    total_written += 1

    if total_written % batch_size == 0:
        elapsed = time.time() - start
        rate = total_written / elapsed
        print(f"Written {total_written} ticks, rate: {rate:.0f} ticks/s")

writer.close()
print(f"Total: {total_written} ticks in {time.time()-start:.2f}s")

SQLクエリ(SIMD最適化)

-- 過去1分間の取引ペア別VWAP
SELECT
    symbol,
    sum(price * volume) / sum(volume) AS vwap,
    sum(volume) AS total_volume,
    count() AS tick_count
FROM crypto_ticks
WHERE ts >= dateadd('m', -1, now())
GROUP BY symbol;

-- 過去5分間のOHLC
SELECT
    symbol,
    first(price) AS open,
    last(price) AS close,
    max(price) AS high,
    min(price) AS low,
    sum(volume) AS volume
FROM crypto_ticks
WHERE ts >= dateadd('m', -5, now())
SAMPLE BY 1m ALIGN TO CALENDAR;

-- 売買量比率
SELECT
    symbol,
    sum(volume) FILTER (WHERE side = 'buy') AS buy_volume,
    sum(volume) FILTER (WHERE side = 'sell') AS sell_volume,
    sum(volume) FILTER (WHERE side = 'buy') / sum(volume) FILTER (WHERE side = 'sell') AS buy_sell_ratio
FROM crypto_ticks
WHERE ts >= dateadd('m', -5, now())
GROUP BY symbol;

-- 価格ボラティリティ(5分標準偏差)
SELECT
    symbol,
    stddev(price) AS price_std,
    avg(price) AS avg_price,
    stddev(price) / avg(price * 1.0) AS cv
FROM crypto_ticks
WHERE ts >= dateadd('m', -5, now())
GROUP BY symbol
ORDER BY cv DESC;

PostgreSQL互換クエリ

import psycopg2

conn = psycopg2.connect(
    host="localhost",
    port=8812,
    user="admin",
    password="quest",
    database="qdb"
)

cursor = conn.cursor()

cursor.execute("""
    SELECT symbol, sum(price * volume) / sum(volume) AS vwap
    FROM crypto_ticks
    WHERE ts >= dateadd('m', -5, now())
    GROUP BY symbol
""")

for row in cursor.fetchall():
    print(f"{row[0]}: VWAP = {row[1]:.2f}")

cursor.close()
conn.close()

パターン5:マルチデータベースアーキテクチャ

シナリオ

大規模プラットフォームでIoTデータ、モニタリングデータ、金融データが同時に存在。単一TSDBでは全要件を満たせない。データ特性に基づいて最適データベースを自動選択するルーティングレイヤーと、クロスデータベースフェデレーションクエリのサポートが必要。

ルーティングレイヤー設計

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional
import time

class DataType(Enum):
    IOT_SENSOR = "iot_sensor"
    MONITORING = "monitoring"
    FINANCIAL = "financial"
    HIGH_FREQUENCY = "high_frequency"

@dataclass
class TimeSeriesPoint:
    measurement: str
    tags: dict[str, str]
    fields: dict[str, Any]
    timestamp: int
    data_type: DataType

class TSDBAdapter(ABC):
    @abstractmethod
    def write(self, points: list[TimeSeriesPoint]) -> int:
        pass

    @abstractmethod
    def query(self, sql: str, params: Optional[dict] = None) -> list[dict]:
        pass

    @abstractmethod
    def health_check(self) -> bool:
        pass

class InfluxDBAdapter(TSDBAdapter):
    def __init__(self, host: str, token: str, org: str, bucket: str):
        from influxdb_client_3 import InfluxDBClient3
        self.client = InfluxDBClient3(host=host, database=bucket, token=token)
        self.org = org
        self.bucket = bucket

    def write(self, points: list[TimeSeriesPoint]) -> int:
        from influxdb_client_3 import Point
        influx_points = []
        for p in points:
            pt = Point(p.measurement)
            for k, v in p.tags.items():
                pt.tag(k, v)
            for k, v in p.fields.items():
                pt.field(k, v)
            pt.time(time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(p.timestamp / 1000)))
            influx_points.append(pt)
        self.client.write(record=influx_points)
        return len(influx_points)

    def query(self, sql: str, params: Optional[dict] = None) -> list[dict]:
        result = self.client.query(sql)
        return [row for row in result]

    def health_check(self) -> bool:
        try:
            self.client.query("SELECT 1")
            return True
        except Exception:
            return False

class TimescaleDBAdapter(TSDBAdapter):
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool = None

    async def init_pool(self):
        import asyncpg
        self.pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=10)

    def write(self, points: list[TimeSeriesPoint]) -> int:
        import asyncio
        return asyncio.get_event_loop().run_until_complete(self._async_write(points))

    async def _async_write(self, points: list[TimeSeriesPoint]) -> int:
        async with self.pool.acquire() as conn:
            rows = []
            for p in points:
                rows.append((
                    p.timestamp, p.measurement,
                    p.fields.get("price", 0), p.fields.get("volume", 0)
                ))
            await conn.executemany(
                "INSERT INTO ts_data (time, symbol, price, volume) VALUES ($1, $2, $3, $4)",
                rows
            )
            return len(rows)

    def query(self, sql: str, params: Optional[dict] = None) -> list[dict]:
        import asyncio
        return asyncio.get_event_loop().run_until_complete(self._async_query(sql))

    async def _async_query(self, sql: str) -> list[dict]:
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(sql)
            return [dict(row) for row in rows]

    def health_check(self) -> bool:
        try:
            return asyncio.get_event_loop().run_until_complete(self._async_health())
        except Exception:
            return False

    async def _async_health(self) -> bool:
        async with self.pool.acquire() as conn:
            await conn.fetchval("SELECT 1")
            return True

class TSDBRouter:
    ROUTING_TABLE = {
        DataType.IOT_SENSOR: "tdengine",
        DataType.MONITORING: "influxdb",
        DataType.FINANCIAL: "timescaledb",
        DataType.HIGH_FREQUENCY: "questdb",
    }

    def __init__(self):
        self.adapters: dict[str, TSDBAdapter] = {}

    def register(self, name: str, adapter: TSDBAdapter):
        self.adapters[name] = adapter

    def write(self, points: list[TimeSeriesPoint]) -> dict[str, int]:
        grouped: dict[str, list[TimeSeriesPoint]] = {}
        for p in points:
            target = self.ROUTING_TABLE.get(p.data_type, "influxdb")
            grouped.setdefault(target, []).append(p)

        results = {}
        for target, target_points in grouped.items():
            adapter = self.adapters.get(target)
            if adapter and adapter.health_check():
                results[target] = adapter.write(target_points)
            else:
                fallback = "influxdb" if target != "influxdb" else "timescaledb"
                fallback_adapter = self.adapters.get(fallback)
                if fallback_adapter:
                    results[f"{target}->fallback->{fallback}"] = fallback_adapter.write(target_points)
        return results

    def query(self, data_type: DataType, sql: str) -> list[dict]:
        target = self.ROUTING_TABLE.get(data_type, "influxdb")
        adapter = self.adapters.get(target)
        if adapter and adapter.health_check():
            return adapter.query(sql)
        raise RuntimeError(f"No available adapter for {data_type}")

フェデレーションクエリ

class FederatedQuery:
    def __init__(self, router: TSDBRouter):
        self.router = router

    def cross_db_correlation(self, device_id: str, symbol: str, hours: int = 1):
        iot_data = self.router.query(
            DataType.IOT_SENSOR,
            f"SELECT ts, avg(temperature) AS avg_temp FROM sensor_data "
            f"WHERE device_id = '{device_id}' AND ts >= NOW - {hours}h "
            f"INTERVAL(5m)"
        )

        finance_data = self.router.query(
            DataType.FINANCIAL,
            f"SELECT bucket, symbol, avg(close) AS avg_price "
            f"FROM ohlc_5min WHERE symbol = '{symbol}' "
            f"AND bucket >= now() - INTERVAL '{hours} hours' "
            f"GROUP BY bucket, symbol"
        )

        return {
            "iot": iot_data,
            "finance": finance_data,
            "correlation_note": "Cross-database correlation requires application-level join"
        }

データ移行ツール

class TSDBMigrator:
    def __init__(self, source: TSDBAdapter, target: TSDBAdapter, batch_size: int = 10000):
        self.source = source
        self.target = target
        self.batch_size = batch_size

    def migrate(self, query: str, transform_fn=None) -> int:
        total = 0
        offset = 0
        while True:
            paginated_query = f"{query} LIMIT {self.batch_size} OFFSET {offset}"
            rows = self.source.query(paginated_query)
            if not rows:
                break

            if transform_fn:
                rows = [transform_fn(row) for row in rows]

            self.target.write(rows)
            total += len(rows)
            offset += self.batch_size
            print(f"Migrated {total} rows...")

        print(f"Migration complete: {total} rows")
        return total

落とし穴ガイド

落とし穴1:InfluxDB Fluxクエリパフォーマンス

-- ❌ Fluxネストパイプライン、各ステップでフルスキャン
from(bucket: "monitoring")
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> filter(fn: (r) => r.host =~ /node-.*/ )
  |> aggregateWindow(every: 5m, fn: mean)
  |> yield()

-- ✅ InfluxDB 3.0 SQLクエリ、Arrowベクトル化活用
SELECT time_bucket('5 minutes', time) AS bucket,
       host, avg(cpu_usage_percent) AS avg_cpu
FROM cpu_metrics
WHERE time >= now() - INTERVAL '30 days'
  AND host LIKE 'node-%'
GROUP BY bucket, host

落とし穴2:TimescaleDB Hypertableパーティションキーの誤選択

-- ❌ 月次パーティション、1時間クエリが多すぎるchunkをスキャン
SELECT create_hypertable('ticks', 'time',
    chunk_time_interval => INTERVAL '1 month'
);

-- ✅ 日次パーティション、1時間クエリは1つのchunkのみスキャン
SELECT create_hypertable('ticks', 'time',
    chunk_time_interval => INTERVAL '1 day'
);

落とし穴3:TDengineサブテーブル名の衝突

-- ❌ サブテーブル名はグローバルに一意、スーパーテーブル間で重複不可
CREATE STABLE temp_sensor (...) TAGS (device_id BINARY(32));
INSERT INTO d_001 USING temp_sensor TAGS ('dev-001', ...);

CREATE STABLE pressure_sensor (...) TAGS (device_id BINARY(32));
INSERT INTO d_001 USING pressure_sensor TAGS ('dev-001', ...);
-- ERROR: Table already exists

-- ✅ サブテーブル名にスーパーテーブルプレフィックスを追加
INSERT INTO d_temp_001 USING temp_sensor TAGS ('dev-001', ...);
INSERT INTO d_pres_001 USING pressure_sensor TAGS ('dev-001', ...);

落とし穴4:QuestDB SYMBOL型の濫用

-- ❌ 高カーディナリティフィールドにSYMBOLを使用するとメモリ爆発
CREATE TABLE logs (ts TIMESTAMP, user_id SYMBOL, message STRING) TIMESTAMP(ts);

-- ✅ 高カーディナリティはSTRING、低カーディナリティはSYMBOL
CREATE TABLE logs (ts TIMESTAMP, level SYMBOL, user_id STRING, message STRING) TIMESTAMP(ts);

落とし穴5:圧縮ポリシーの無視

-- TimescaleDB:ネイティブ圧縮の有効化
ALTER TABLE stock_ticks SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'symbol',
    timescaledb.compress_orderby = 'time DESC'
);

SELECT add_compression_policy('stock_ticks', INTERVAL '7 days');

-- InfluxDB:適切なリテンションポリシーの設定
influx bucket update --name monitoring --retention 30d

-- TDengine:データベース作成時にKEEPを指定
CREATE DATABASE iot KEEP 3650;

エラートラブルシューティング

# エラーメッセージ 原因 解決方法
1 InfluxDB engine: cache max memory size exceeded 書き込み速度が速すぎてWALキャッシュオーバーフロー cache-max-memory-sizeを増やすか書き込み頻度を下げる
2 TimescaleDB invalid value for chunk_time_interval 無効なinterval形式 数字ではなくINTERVAL '1 day'を使用
3 TimescaleDB cannot drop chunk because it is compressed 圧縮chunkは削除前に解凍が必要 SELECT decompress_chunk('chunk_name')後に削除
4 TDengine Invalid table name 不正な文字または名前が長すぎる 英数字+アンダースコアのみ、最大192バイト
5 TDengine Group by error GROUP BYフィールドがSELECTにない TDengineはGROUP BYフィールドのSELECT内出現を要求
6 QuestDB commit lag ILP書き込みがタイムリーにコミットされていない commit.lag設定を調整または明示的flush
7 QuestDB symbol count exceeded SYMBOLカーディナリティが制限を超過 max.symbol.countを増やすかSTRINGに変更
8 InfluxDB database not found バケットが存在しないか名前が間違っている influx bucket listで正しい名前を確認
9 TimescaleDB hypertable already exists 重複したhypertable作成 先にDROP TABLEするかremove_distributed_hypertableを使用
10 TDengine Out of memory エッジノードのメモリ不足 BUFFERパラメータを減らす、VGROUPS数を下げる

高度な最適化

1. InfluxDBバッチ書き込み

from influxdb_client_3 import InfluxDBClient3, Point
from datetime import datetime, timezone
import random

client = InfluxDBClient3(host="localhost:8181", database="monitoring", token="my-token")

batch = []
for i in range(50000):
    point = Point("cpu_metrics") \
        .tag("host", f"node-{random.randint(1, 500)}") \
        .field("usage_percent", random.uniform(10, 95)) \
        .time(datetime.now(timezone.utc))
    batch.append(point)

    if len(batch) >= 5000:
        client.write(record=batch, write_precision="ms")
        batch = []

if batch:
    client.write(record=batch, write_precision="ms")

2. TimescaleDB圧縮と容量削減

-- 圧縮効果の確認
SELECT
    hypertable_name,
    total_chunks,
    number_compressed_chunks,
    before_compression_bytes / 1024 / 1024 AS before_mb,
    after_compression_bytes / 1024 / 1024 AS after_mb,
    ROUND((1 - after_compression_bytes::FLOAT / before_compression_bytes) * 100, 1) AS compression_ratio_pct
FROM timescaledb_information.compressed_hypertable_stats;

3. TDengineパフォーマンスチューニングパラメータ

-- データベース作成時のパラメータ最適化
CREATE DATABASE iot_edge
    PRECISION 'ms'
    KEEP 3650
    BUFFER 256
    WAL_LEVEL 1
    VGROUPS 4
    CACHEMODEL 'both';

-- BUFFER: 書き込みキャッシュサイズ(MB)、大きいほど高速
-- WAL_LEVEL: 0=WALなし、1=WALありfsyncなし、2=WALありfsyncあり
-- VGROUPS: 仮想ノードグループ数、推奨=CPUコア数
-- CACHEMODEL: 'both'は最新データとメタデータをキャッシュ

4. QuestDB書き込みパフォーマンスチューニング

# questdb.conf
cairo.commit.lag=10000
cairo.commit.mode=async
line.tcp.max.uncommitted.rows=10000
line.tcp.worker.count=2

比較分析

次元 InfluxDB 3.0 TimescaleDB TDengine QuestDB
書き込みスループット 50万ポイント/秒 10万ポイント/秒 100万ポイント/秒 150万ポイント/秒
クエリレイテンシ(単純集約) <10ms <20ms <5ms <5ms
SQL互換性 InfluxQL + 基本SQL 完全PostgreSQL 部分SQL ほぼSQL
JOINサポート 限定的 完全 限定的 限定的
ダウンサンプリング Flux Task / SQL Continuous Aggregates INTERVALクエリ SAMPLE BY
クラスタリング 3.0ネイティブ PG依存 ネイティブ エンタープライズ版
エコシステム Grafana/Prometheus最強 PG完全エコシステム IoTエコシステム 金融/IoT
学習曲線 低(SQL) 低(SQL)
エッジデプロイ 普通 不適 優秀 普通
運用複雑度

まとめ:TSDB選定に銀の弾丸はありません—シナリオ次第です。モニタリング→InfluxDB(最強エコシステム)、金融→TimescaleDB(完全SQL)、IoT→TDengine(最速書き込み+エッジフレンドリー)、高頻度→QuestDB(SIMD最適化)。大規模プラットフォームでは、データ特性に基づいて自動ルーティングするマルチデータベースアーキテクチャが、1つのデータベースに全シナリオを押し付けることを回避します。


オンラインツールおすすめ

ブラウザローカルツールを無料で試す →

#时序数据库#InfluxDB#TimescaleDB#TDengine#QuestDB#2026#数据库