時序資料庫對比實戰:從IoT到金融分析的5種生產模式
時序資料正在吞噬你的關聯式資料庫?
監控指標每秒寫入10萬條、IoT感測器每分鐘上報百萬資料點、金融Tick資料一天產生數十億行——當你還在用MySQL按時間分區勉強支撐時,查詢已經慢到逾時,儲存成本已經爆炸式增長。2026年,**時序資料庫(TSDB)**已經成為處理時間序列資料的標準答案,但InfluxDB、TimescaleDB、TDengine、QuestDB到底怎麼選?
本文從5種真實生產場景出發,給出每個場景的最佳資料庫選型和完整程式碼,幫你少走彎路。
時序資料庫核心架構對比
| 特性 | InfluxDB 3.0 | TimescaleDB | TDengine | QuestDB |
|---|---|---|---|---|
| 底層引擎 | Apache Arrow + Parquet | PostgreSQL擴展 | 自研儲存引擎 | 自研列式引擎 |
| 查詢語言 | InfluxQL / SQL | SQL | SQL | SQL |
| 寫入協定 | Line Protocol | SQL INSERT | SQL INSERT / Schemaless | ILP / SQL INSERT |
| 壓縮比 | 10:1~20:1 | 5:1~10:1 | 10:1~20:1 | 10:1~15:1 |
| 叢集模式 | 3.0原生叢集 | 依賴PG叢集 | 原生叢集 | 企業版叢集 |
| 開源協議 | Apache 2.0(社群版) | Apache 2.0(社群版) | AGPL-3.0 | Apache 2.0 |
| 適用場景 | 監控/Observability | 金融/複雜分析 | IoT/邊緣計算 | 高頻寫入/即時分析 |
為什麼不能只用一個?
- InfluxDB:監控場景生態最強,但複雜SQL分析弱
- TimescaleDB:SQL生態完美,但寫入吞吐不如專用TSDB
- TDengine:IoT場景寫入極快,但SQL相容性有限
- QuestDB:高頻寫入無敵,但生態和工具鏈不夠成熟
Pattern 1:InfluxDB 3.0 監控與可觀測性
場景描述
Kubernetes叢集500個節點、10萬個Pod,Prometheus每15秒採集一次指標,Grafana即時展示。需要支援降取樣(downsampling)、資料保留策略(retention policy)、Flux/InfluxQL雙查詢。
安裝與配置
# Docker部署InfluxDB 3.0
docker run -d --name influxdb3 \
-p 8086:8086 \
-p 8181:8181 \
-v influxdb3-data:/var/lib/influxdb3 \
influxdb:3.0-core
# 建立Bucket(資料庫)
influx bucket create \
--name monitoring \
--retention 30d \
--org myorg \
--token ${INFLUX_TOKEN}
寫入監控資料
from influxdb_client_3 import InfluxDBClient3, Point
from datetime import datetime, timezone
import random
import time
client = InfluxDBClient3(
host="localhost:8181",
database="monitoring",
token="my-token"
)
def write_metrics():
points = []
for i in range(10000):
point = Point("cpu_metrics") \
.tag("host", f"node-{random.randint(1, 500)}") \
.tag("region", random.choice(["us-east", "eu-west", "ap-south"])) \
.field("usage_percent", random.uniform(10, 95)) \
.field("load_1m", random.uniform(0.5, 8.0)) \
.field("memory_percent", random.uniform(20, 90)) \
.time(datetime.now(timezone.utc))
points.append(point)
client.write(record=points, write_precision="ms")
print(f"Written {len(points)} points")
for _ in range(100):
write_metrics()
time.sleep(1)
降取樣與連續查詢
-- InfluxDB 3.0 SQL降取樣:5分鐘平均CPU使用率
SELECT
time_bucket('5 minutes', time) AS bucket,
host,
avg(cpu_usage_percent) AS avg_cpu,
max(cpu_usage_percent) AS max_cpu,
min(cpu_usage_percent) AS min_cpu
FROM cpu_metrics
WHERE time >= now() - INTERVAL '1 hour'
GROUP BY bucket, host
ORDER BY bucket DESC;
-- InfluxQL降取樣(相容模式)
SELECT
MEAN("usage_percent") AS "avg_cpu",
MAX("usage_percent") AS "max_cpu"
INTO "monitoring"."downsampled_5m"."cpu_metrics"
FROM "monitoring"."autogen"."cpu_metrics"
GROUP BY time(5m), "host"
資料保留策略
from influxdb_client import InfluxDBClient, BucketsService
client = InfluxDBClient(url="http://localhost:8086", token="my-token")
buckets_api = client.buckets_api()
raw_bucket = buckets_api.find_bucket_by_name("monitoring")
raw_bucket.retention_rules[0].every_seconds = 30 * 24 * 3600 # 30天
downsampled = buckets_api.create_bucket(
bucket_name="monitoring-downsampled",
retention_rules={"every_seconds": 365 * 24 * 3600}, # 1年
org="myorg"
)
Grafana整合
# grafana/provisioning/datasources/influxdb.yaml
apiVersion: 1
datasources:
- name: InfluxDB 3.0
type: influxdb
url: http://influxdb3:8181
access: proxy
jsonData:
version: Flux
organization: myorg
defaultBucket: monitoring
secureJsonData:
token: ${INFLUX_TOKEN}
Pattern 2:TimescaleDB 金融分析
場景描述
量化交易平台,每秒接收5000條股票Tick資料,需要即時計算VWAP(成交量加權平均價)、移動平均線、布林帶,並支援複雜JOIN查詢(關聯交易訂單表和使用者表)。
安裝與配置
# Docker部署TimescaleDB
docker run -d --name timescaledb \
-p 5432:5432 \
-e POSTGRES_PASSWORD=postgres \
-v tsdb-data:/var/lib/postgresql/data \
timescale/timescaledb:latest-pg16
# 啟用擴展
psql -h localhost -U postgres -c "CREATE EXTENSION IF NOT EXISTS timescaledb;"
建立Hypertable與Schema
CREATE TABLE stock_ticks (
time TIMESTAMPTZ NOT NULL,
symbol VARCHAR(10) NOT NULL,
price NUMERIC(12, 4) NOT NULL,
volume BIGINT NOT NULL,
bid NUMERIC(12, 4),
ask NUMERIC(12, 4),
exchange VARCHAR(10)
);
SELECT create_hypertable('stock_ticks', 'time',
chunk_time_interval => INTERVAL '1 day',
partitioning_column => 'symbol',
number_partitions => 4
);
CREATE INDEX idx_symbol_time ON stock_ticks (symbol, time DESC);
CREATE TABLE trading_orders (
id BIGSERIAL,
time TIMESTAMPTZ NOT NULL,
symbol VARCHAR(10) NOT NULL,
side VARCHAR(4) NOT NULL,
price NUMERIC(12, 4) NOT NULL,
quantity BIGINT NOT NULL,
user_id BIGINT NOT NULL,
status VARCHAR(10) DEFAULT 'pending'
);
SELECT create_hypertable('trading_orders', 'time',
chunk_time_interval => INTERVAL '1 day'
);
連續聚合(Continuous Aggregates)
-- 1分鐘K線
CREATE MATERIALIZED VIEW ohlc_1min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', time) AS bucket,
symbol,
first(price, time) AS open,
last(price, time) AS close,
max(price) AS high,
min(price) AS low,
sum(volume) AS volume
FROM stock_ticks
GROUP BY bucket, symbol
WITH NO DATA;
-- 自動重新整理策略
SELECT add_continuous_aggregate_policy('ohlc_1min',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute'
);
-- 5分鐘K線(基於1分鐘聚合)
CREATE MATERIALIZED VIEW ohlc_5min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', bucket) AS bucket,
symbol,
first(open, bucket) AS open,
last(close, bucket) AS close,
max(high) AS high,
min(low) AS low,
sum(volume) AS volume
FROM ohlc_1min
GROUP BY bucket, symbol
WITH NO DATA;
SELECT add_continuous_aggregate_policy('ohlc_5min',
start_offset => INTERVAL '12 hours',
end_offset => INTERVAL '5 minutes',
schedule_interval => INTERVAL '5 minutes'
);
超函式(Hyperfunctions)計算技術指標
-- VWAP(成交量加權平均價)
SELECT
time_bucket('5 minutes', time) AS bucket,
symbol,
sum(price * volume) / sum(volume) AS vwap
FROM stock_ticks
WHERE time >= now() - INTERVAL '1 hour'
GROUP BY bucket, symbol;
-- 移動平均線(使用time_bucket_gapfill填充缺失資料)
SELECT
time_bucket_gapfill('1 minute', time, now() - INTERVAL '1 hour', now()) AS bucket,
symbol,
locf(last(price, time)) AS last_price,
avg(last(price, time)) OVER (
PARTITION BY symbol
ORDER BY bucket
ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
) AS ma_20
FROM stock_ticks
WHERE time >= now() - INTERVAL '2 hours'
GROUP BY bucket, symbol;
-- 布林帶
WITH ma_20 AS (
SELECT
time_bucket('5 minutes', time) AS bucket,
symbol,
avg(price) AS ma,
stddev(price) AS std
FROM stock_ticks
WHERE time >= now() - INTERVAL '2 hours'
GROUP BY bucket, symbol
HAVING count(*) >= 10
)
SELECT
bucket,
symbol,
ma,
ma + 2 * std AS upper_band,
ma - 2 * std AS lower_band
FROM ma_20
ORDER BY bucket DESC, symbol;
Python寫入與查詢
import asyncpg
import asyncio
from datetime import datetime, timezone
async def write_tick(pool, symbol, price, volume, bid, ask, exchange):
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO stock_ticks (time, symbol, price, volume, bid, ask, exchange)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
datetime.now(timezone.utc), symbol, price, volume, bid, ask, exchange
)
async def batch_write_ticks(pool, ticks):
async with pool.acquire() as conn:
await conn.executemany(
"""INSERT INTO stock_ticks (time, symbol, price, volume, bid, ask, exchange)
VALUES ($1, $2, $3, $4, $5, $6, $7)""",
[(t['time'], t['symbol'], t['price'], t['volume'],
t['bid'], t['ask'], t['exchange']) for t in ticks]
)
async def query_ohlc(pool, symbol, hours=1):
async with pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT bucket, symbol, open, close, high, low, volume
FROM ohlc_1min
WHERE symbol = $1 AND bucket >= now() - INTERVAL '$2 hours'
ORDER BY bucket""",
symbol, hours
)
return [dict(row) for row in rows]
async def main():
pool = await asyncpg.create_pool(
"postgresql://postgres:postgres@localhost:5432/postgres",
min_size=5, max_size=20
)
ticks = [
{"time": datetime.now(timezone.utc), "symbol": "AAPL",
"price": 198.50, "volume": 1000, "bid": 198.48, "ask": 198.52, "exchange": "NASDAQ"},
{"time": datetime.now(timezone.utc), "symbol": "GOOGL",
"price": 175.30, "volume": 800, "bid": 175.28, "ask": 175.32, "exchange": "NASDAQ"},
]
await batch_write_ticks(pool, ticks)
result = await query_ohlc(pool, "AAPL")
print(f"Got {len(result)} candles")
await pool.close()
asyncio.run(main())
Pattern 3:TDengine IoT邊緣計算
場景描述
工業IoT閘道器,1000個感測器每秒上報資料,邊緣節點資源有限(4核8G),需要超低延遲寫入、超級表(Super Table)管理同類裝置、叢集部署保證高可用。
安裝與配置
# Docker部署TDengine
docker run -d --name tdengine \
-p 6030:6030 \
-p 6041:6041 \
-v tdengine-data:/var/lib/taos \
tdengine/tdengine:3.3.4.0
# 使用taos CLI
taos
建立超級表與子表
-- 建立資料庫
CREATE DATABASE iot_edge PRECISION 'ms' KEEP 3650 BUFFER 96 WAL_LEVEL 1;
USE iot_edge;
-- 建立超級表(模板)
CREATE STABLE sensor_data (
ts TIMESTAMP,
temperature FLOAT,
humidity FLOAT,
pressure FLOAT,
voltage FLOAT
) TAGS (
device_id BINARY(32),
device_type BINARY(16),
location BINARY(64),
factory BINARY(32)
);
-- 自動建表:插入時指定TAGS即可建立子表
INSERT INTO d_sensor_001 USING sensor_data TAGS ('sensor-001', 'temperature', 'workshop-A-line-1', 'factory-east')
VALUES (NOW, 23.5, 65.2, 1013.25, 3.3);
INSERT INTO d_sensor_002 USING sensor_data TAGS ('sensor-002', 'pressure', 'workshop-A-line-2', 'factory-east')
VALUES (NOW, 25.1, 60.8, 1012.80, 3.28);
-- 批次寫入(高效能)
INSERT INTO d_sensor_001 VALUES (NOW + 1s, 23.6, 65.1, 1013.20, 3.31)
d_sensor_002 VALUES (NOW + 1s, 25.0, 60.9, 1012.75, 3.29)
d_sensor_001 VALUES (NOW + 2s, 23.7, 65.0, 1013.15, 3.30);
查詢與分析
-- 按裝置型別聚合:最近1小時平均溫度
SELECT device_type, avg(temperature) AS avg_temp, max(temperature) AS max_temp
FROM sensor_data
WHERE ts >= NOW - 1h
GROUP BY device_type;
-- 按工廠和位置聚合
SELECT factory, location, count(*) AS sample_count,
avg(temperature) AS avg_temp,
stddev(temperature) AS temp_std
FROM sensor_data
WHERE ts >= NOW - 6h AND device_type = 'temperature'
GROUP BY factory, location;
-- 異常檢測:溫度超過3倍標準差
SELECT ts, device_id, temperature
FROM sensor_data
WHERE ts >= NOW - 1h
AND temperature > (
SELECT avg(temperature) + 3 * stddev(temperature)
FROM sensor_data
WHERE ts >= NOW - 24h AND device_type = 'temperature'
);
-- 降取樣:每5分鐘聚合
SELECT _wstart AS bucket, device_id,
avg(temperature) AS avg_temp,
min(temperature) AS min_temp,
max(temperature) AS max_temp
FROM sensor_data
WHERE ts >= NOW - 1h
INTERVAL(5m) SLIDING(5m) PARTITION BY device_id;
Python SDK寫入
from taosrest import RestClient
from datetime import datetime, timezone
import random
client = RestClient("http://localhost:6041", user="root", password="taosdata")
def write_sensor_batch(records):
sql_lines = []
for r in records:
line = f"d_{r['device_id']} USING sensor_data TAGS ('{r['device_id']}', '{r['device_type']}', '{r['location']}', '{r['factory']}') VALUES ('{r['ts']}', {r['temperature']}, {r['humidity']}, {r['pressure']}, {r['voltage']})"
sql_lines.append(line)
sql = "INSERT INTO " + " ".join(sql_lines)
client.sql(sql, database="iot_edge")
def query_latest(device_id, minutes=10):
result = client.sql(
f"SELECT ts, temperature, humidity, pressure FROM sensor_data "
f"WHERE device_id = '{device_id}' AND ts >= NOW - {minutes}m "
f"ORDER BY ts DESC LIMIT 100",
database="iot_edge"
)
return result
records = []
for i in range(1000):
records.append({
"device_id": f"sensor-{i % 100:03d}",
"device_type": random.choice(["temperature", "pressure", "humidity"]),
"location": f"workshop-A-line-{i % 10}",
"factory": "factory-east",
"ts": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f"),
"temperature": round(random.uniform(20, 30), 2),
"humidity": round(random.uniform(50, 70), 2),
"pressure": round(random.uniform(1010, 1015), 2),
"voltage": round(random.uniform(3.2, 3.4), 2),
})
write_sensor_batch(records)
叢集部署
# taos.cfg - 第一個dnode
firstEp tdnode1:6030
secondEp tdnode2:6030
serverPort 6030
dataDir /var/lib/taos
logDir /var/log/taos
# 建立dnode
taos> CREATE DNODE "tdnode2:6030";
taos> CREATE DNODE "tdnode3:6030";
-- 建立mnode
taos> ALTER DNODE 2 mnodeRole 'mnode';
taos> ALTER DNODE 3 mnodeRole 'mnode';
-- 資料庫副本數
ALTER DATABASE iot_edge REPLICA 3;
Pattern 4:QuestDB 高頻寫入
場景描述
加密貨幣交易所行情資料,每秒50萬條Tick,需要SQL介面直接查詢、SIMD最佳化的聚合計算、ILP(InfluxDB Line Protocol)高速寫入。
安裝與配置
# Docker部署QuestDB
docker run -d --name questdb \
-p 9000:9000 \
-p 9009:9009 \
-p 8812:8812 \
-v questdb-data:/root/.questdb \
questdb/questdb:8.2.1
# Web Console: http://localhost:9000
# ILP埠: 9009
# PostgreSQL相容埠: 8812
建立表與寫入
-- 透過SQL建立表
CREATE TABLE crypto_ticks (
ts TIMESTAMP,
symbol SYMBOL,
price DOUBLE,
volume DOUBLE,
side SYMBOL,
exchange SYMBOL
) TIMESTAMP(ts) PARTITION BY DAY WAL;
ILP高速寫入
import socket
import time
import random
class QuestDBILPWriter:
def __init__(self, host="localhost", port=9009):
self.host = host
self.port = port
self.sock = None
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
def write_line(self, line):
self.sock.sendall((line + "\n").encode())
def close(self):
if self.sock:
self.sock.close()
def write_tick(self, symbol, price, volume, side, exchange, ts_ns):
line = f"crypto_ticks,symbol={symbol},side={side},exchange={exchange} price={price},volume={volume} {ts_ns}"
self.write_line(line)
writer = QuestDBILPWriter()
writer.connect()
symbols = ["BTC-USD", "ETH-USD", "SOL-USD", "BNB-USD", "XRP-USD"]
base_time = int(time.time() * 1_000_000_000)
batch_size = 10000
total_written = 0
start = time.time()
for i in range(100000):
symbol = random.choice(symbols)
price = round(random.uniform(20000, 70000) if symbol == "BTC-USD" else random.uniform(1000, 4000), 2)
volume = round(random.uniform(0.01, 5.0), 6)
side = random.choice(["buy", "sell"])
exchange = random.choice(["binance", "coinbase", "kraken"])
ts_ns = base_time + i * 1_000_000
writer.write_tick(symbol, price, volume, side, exchange, ts_ns)
total_written += 1
if total_written % batch_size == 0:
elapsed = time.time() - start
rate = total_written / elapsed
print(f"Written {total_written} ticks, rate: {rate:.0f} ticks/s")
writer.close()
print(f"Total: {total_written} ticks in {time.time()-start:.2f}s")
SQL查詢(SIMD最佳化)
-- 最近1分鐘每個交易對的VWAP
SELECT
symbol,
sum(price * volume) / sum(volume) AS vwap,
sum(volume) AS total_volume,
count() AS tick_count
FROM crypto_ticks
WHERE ts >= dateadd('m', -1, now())
GROUP BY symbol;
-- 最近5分鐘的OHLC
SELECT
symbol,
first(price) AS open,
last(price) AS close,
max(price) AS high,
min(price) AS low,
sum(volume) AS volume
FROM crypto_ticks
WHERE ts >= dateadd('m', -5, now())
SAMPLE BY 1m ALIGN TO CALENDAR;
-- 買賣量比
SELECT
symbol,
sum(volume) FILTER (WHERE side = 'buy') AS buy_volume,
sum(volume) FILTER (WHERE side = 'sell') AS sell_volume,
sum(volume) FILTER (WHERE side = 'buy') / sum(volume) FILTER (WHERE side = 'sell') AS buy_sell_ratio
FROM crypto_ticks
WHERE ts >= dateadd('m', -5, now())
GROUP BY symbol;
-- 價格波動率(5分鐘標準差)
SELECT
symbol,
stddev(price) AS price_std,
avg(price) AS avg_price,
stddev(price) / avg(price * 1.0) AS cv
FROM crypto_ticks
WHERE ts >= dateadd('m', -5, now())
GROUP BY symbol
ORDER BY cv DESC;
PostgreSQL相容查詢
import psycopg2
conn = psycopg2.connect(
host="localhost",
port=8812,
user="admin",
password="quest",
database="qdb"
)
cursor = conn.cursor()
cursor.execute("""
SELECT symbol, sum(price * volume) / sum(volume) AS vwap
FROM crypto_ticks
WHERE ts >= dateadd('m', -5, now())
GROUP BY symbol
""")
for row in cursor.fetchall():
print(f"{row[0]}: VWAP = {row[1]:.2f}")
cursor.close()
conn.close()
Pattern 5:多資料庫架構
場景描述
大型平台同時存在IoT資料、監控資料、金融資料,單一TSDB無法滿足所有需求。需要設計路由層,根據資料特徵自動選擇最優資料庫,並支援跨庫聯邦查詢。
路由層設計
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional
import time
class DataType(Enum):
IOT_SENSOR = "iot_sensor"
MONITORING = "monitoring"
FINANCIAL = "financial"
HIGH_FREQUENCY = "high_frequency"
@dataclass
class TimeSeriesPoint:
measurement: str
tags: dict[str, str]
fields: dict[str, Any]
timestamp: int
data_type: DataType
class TSDBAdapter(ABC):
@abstractmethod
def write(self, points: list[TimeSeriesPoint]) -> int:
pass
@abstractmethod
def query(self, sql: str, params: Optional[dict] = None) -> list[dict]:
pass
@abstractmethod
def health_check(self) -> bool:
pass
class InfluxDBAdapter(TSDBAdapter):
def __init__(self, host: str, token: str, org: str, bucket: str):
from influxdb_client_3 import InfluxDBClient3
self.client = InfluxDBClient3(host=host, database=bucket, token=token)
self.org = org
self.bucket = bucket
def write(self, points: list[TimeSeriesPoint]) -> int:
from influxdb_client_3 import Point
influx_points = []
for p in points:
pt = Point(p.measurement)
for k, v in p.tags.items():
pt.tag(k, v)
for k, v in p.fields.items():
pt.field(k, v)
pt.time(time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(p.timestamp / 1000)))
influx_points.append(pt)
self.client.write(record=influx_points)
return len(influx_points)
def query(self, sql: str, params: Optional[dict] = None) -> list[dict]:
result = self.client.query(sql)
return [row for row in result]
def health_check(self) -> bool:
try:
self.client.query("SELECT 1")
return True
except Exception:
return False
class TimescaleDBAdapter(TSDBAdapter):
def __init__(self, dsn: str):
self.dsn = dsn
self.pool = None
async def init_pool(self):
import asyncpg
self.pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=10)
def write(self, points: list[TimeSeriesPoint]) -> int:
import asyncio
return asyncio.get_event_loop().run_until_complete(self._async_write(points))
async def _async_write(self, points: list[TimeSeriesPoint]) -> int:
async with self.pool.acquire() as conn:
rows = []
for p in points:
rows.append((
p.timestamp, p.measurement,
p.fields.get("price", 0), p.fields.get("volume", 0)
))
await conn.executemany(
"INSERT INTO ts_data (time, symbol, price, volume) VALUES ($1, $2, $3, $4)",
rows
)
return len(rows)
def query(self, sql: str, params: Optional[dict] = None) -> list[dict]:
import asyncio
return asyncio.get_event_loop().run_until_complete(self._async_query(sql))
async def _async_query(self, sql: str) -> list[dict]:
async with self.pool.acquire() as conn:
rows = await conn.fetch(sql)
return [dict(row) for row in rows]
def health_check(self) -> bool:
try:
return asyncio.get_event_loop().run_until_complete(self._async_health())
except Exception:
return False
async def _async_health(self) -> bool:
async with self.pool.acquire() as conn:
await conn.fetchval("SELECT 1")
return True
class TSDBRouter:
ROUTING_TABLE = {
DataType.IOT_SENSOR: "tdengine",
DataType.MONITORING: "influxdb",
DataType.FINANCIAL: "timescaledb",
DataType.HIGH_FREQUENCY: "questdb",
}
def __init__(self):
self.adapters: dict[str, TSDBAdapter] = {}
def register(self, name: str, adapter: TSDBAdapter):
self.adapters[name] = adapter
def write(self, points: list[TimeSeriesPoint]) -> dict[str, int]:
grouped: dict[str, list[TimeSeriesPoint]] = {}
for p in points:
target = self.ROUTING_TABLE.get(p.data_type, "influxdb")
grouped.setdefault(target, []).append(p)
results = {}
for target, target_points in grouped.items():
adapter = self.adapters.get(target)
if adapter and adapter.health_check():
results[target] = adapter.write(target_points)
else:
fallback = "influxdb" if target != "influxdb" else "timescaledb"
fallback_adapter = self.adapters.get(fallback)
if fallback_adapter:
results[f"{target}->fallback->{fallback}"] = fallback_adapter.write(target_points)
return results
def query(self, data_type: DataType, sql: str) -> list[dict]:
target = self.ROUTING_TABLE.get(data_type, "influxdb")
adapter = self.adapters.get(target)
if adapter and adapter.health_check():
return adapter.query(sql)
raise RuntimeError(f"No available adapter for {data_type}")
聯邦查詢
class FederatedQuery:
def __init__(self, router: TSDBRouter):
self.router = router
def cross_db_correlation(self, device_id: str, symbol: str, hours: int = 1):
iot_data = self.router.query(
DataType.IOT_SENSOR,
f"SELECT ts, avg(temperature) AS avg_temp FROM sensor_data "
f"WHERE device_id = '{device_id}' AND ts >= NOW - {hours}h "
f"INTERVAL(5m)"
)
finance_data = self.router.query(
DataType.FINANCIAL,
f"SELECT bucket, symbol, avg(close) AS avg_price "
f"FROM ohlc_5min WHERE symbol = '{symbol}' "
f"AND bucket >= now() - INTERVAL '{hours} hours' "
f"GROUP BY bucket, symbol"
)
return {
"iot": iot_data,
"finance": finance_data,
"correlation_note": "Cross-database correlation requires application-level join"
}
資料遷移工具
class TSDBMigrator:
def __init__(self, source: TSDBAdapter, target: TSDBAdapter, batch_size: int = 10000):
self.source = source
self.target = target
self.batch_size = batch_size
def migrate(self, query: str, transform_fn=None) -> int:
total = 0
offset = 0
while True:
paginated_query = f"{query} LIMIT {self.batch_size} OFFSET {offset}"
rows = self.source.query(paginated_query)
if not rows:
break
if transform_fn:
rows = [transform_fn(row) for row in rows]
self.target.write(rows)
total += len(rows)
offset += self.batch_size
print(f"Migrated {total} rows...")
print(f"Migration complete: {total} rows")
return total
避坑指南
坑1:InfluxDB Flux查詢效能差
-- ❌ Flux巢狀管道,每一步都全量掃描
from(bucket: "monitoring")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r.host =~ /node-.*/ )
|> aggregateWindow(every: 5m, fn: mean)
|> yield()
-- ✅ InfluxDB 3.0 SQL查詢,利用Arrow向量化
SELECT time_bucket('5 minutes', time) AS bucket,
host, avg(cpu_usage_percent) AS avg_cpu
FROM cpu_metrics
WHERE time >= now() - INTERVAL '30 days'
AND host LIKE 'node-%'
GROUP BY bucket, host
坑2:TimescaleDB Hypertable分區鍵選錯
-- ❌ 按月分區,查詢1小時資料掃描過多chunk
SELECT create_hypertable('ticks', 'time',
chunk_time_interval => INTERVAL '1 month'
);
-- ✅ 按天分區,1小時查詢只掃描1個chunk
SELECT create_hypertable('ticks', 'time',
chunk_time_interval => INTERVAL '1 day'
);
坑3:TDengine子表名衝突
-- ❌ 子表名全域唯一,不同超級表的子表名不能重複
CREATE STABLE temp_sensor (...) TAGS (device_id BINARY(32));
INSERT INTO d_001 USING temp_sensor TAGS ('dev-001', ...);
CREATE STABLE pressure_sensor (...) TAGS (device_id BINARY(32));
INSERT INTO d_001 USING pressure_sensor TAGS ('dev-001', ...);
-- ERROR: Table already exists
-- ✅ 子表名加上超級表前綴
INSERT INTO d_temp_001 USING temp_sensor TAGS ('dev-001', ...);
INSERT INTO d_pres_001 USING pressure_sensor TAGS ('dev-001', ...);
坑4:QuestDB SYMBOL型別濫用
-- ❌ 高基數欄位用SYMBOL導致記憶體爆炸
CREATE TABLE logs (ts TIMESTAMP, user_id SYMBOL, message STRING) TIMESTAMP(ts);
-- ✅ 高基數欄位用STRING,低基數用SYMBOL
CREATE TABLE logs (ts TIMESTAMP, level SYMBOL, user_id STRING, message STRING) TIMESTAMP(ts);
坑5:忽略壓縮策略導致儲存爆炸
-- TimescaleDB:啟用原生壓縮
ALTER TABLE stock_ticks SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'symbol',
timescaledb.compress_orderby = 'time DESC'
);
SELECT add_compression_policy('stock_ticks', INTERVAL '7 days');
-- InfluxDB:設定合理的retention policy
influx bucket update --name monitoring --retention 30d
-- TDengine:建庫時指定KEEP
CREATE DATABASE iot KEEP 3650;
報錯排查
| 序號 | 報錯資訊 | 原因 | 解決方法 |
|---|---|---|---|
| 1 | InfluxDB engine: cache max memory size exceeded |
寫入過快導致WAL快取溢位 | 增大cache-max-memory-size,或降低寫入頻率 |
| 2 | TimescaleDB invalid value for chunk_time_interval |
分區間隔不是合法interval | 使用INTERVAL '1 day'而非純數字 |
| 3 | TimescaleDB cannot drop chunk because it is compressed |
刪除壓縮chunk需先解壓 | SELECT decompress_chunk('chunk_name') 再刪除 |
| 4 | TDengine Invalid table name |
子表名包含非法字元或超長 | 子表名僅允許字母數字底線,最長192位元組 |
| 5 | TDengine Group by error |
GROUP BY欄位不在SELECT中 | TDengine要求GROUP BY欄位必須出現在SELECT |
| 6 | QuestDB commit lag |
ILP寫入未及時commit | 調整commit.lag配置或明確flush |
| 7 | QuestDB symbol count exceeded |
SYMBOL基數超限 | 增大max.symbol.count或改用STRING |
| 8 | InfluxDB database not found |
Bucket不存在或名稱錯誤 | 檢查influx bucket list確認bucket名 |
| 9 | TimescaleDB hypertable already exists |
重複建立hypertable | 先DROP TABLE再重建,或用remove_distributed_hypertable |
| 10 | TDengine Out of memory |
邊緣節點記憶體不足 | 減小BUFFER引數,降低VGROUPS數量 |
進階最佳化
1. InfluxDB寫入批次化
from influxdb_client_3 import InfluxDBClient3, Point
from datetime import datetime, timezone
import random
client = InfluxDBClient3(host="localhost:8181", database="monitoring", token="my-token")
batch = []
for i in range(50000):
point = Point("cpu_metrics") \
.tag("host", f"node-{random.randint(1, 500)}") \
.field("usage_percent", random.uniform(10, 95)) \
.time(datetime.now(timezone.utc))
batch.append(point)
if len(batch) >= 5000:
client.write(record=batch, write_precision="ms")
batch = []
if batch:
client.write(record=batch, write_precision="ms")
2. TimescaleDB壓縮與空間節省
-- 檢視壓縮效果
SELECT
hypertable_name,
total_chunks,
number_compressed_chunks,
before_compression_bytes / 1024 / 1024 AS before_mb,
after_compression_bytes / 1024 / 1024 AS after_mb,
ROUND((1 - after_compression_bytes::FLOAT / before_compression_bytes) * 100, 1) AS compression_ratio_pct
FROM timescaledb_information.compressed_hypertable_stats;
3. TDengine效能調優引數
-- 建立資料庫時最佳化引數
CREATE DATABASE iot_edge
PRECISION 'ms'
KEEP 3650
BUFFER 256
WAL_LEVEL 1
VGROUPS 4
CACHEMODEL 'both';
-- BUFFER: 寫入快取大小(MB),越大寫入越快
-- WAL_LEVEL: 0=不寫WAL 1=寫WAL不fsync 2=寫WAL並fsync
-- VGROUPS: 虛擬節點組數,建議=CPU核數
-- CACHEMODEL: 'both'快取最新資料和中繼資料
4. QuestDB寫入效能最佳化
# questdb.conf
cairo.commit.lag=10000
cairo.commit.mode=async
line.tcp.max.uncommitted.rows=10000
line.tcp.worker.count=2
對比分析
| 維度 | InfluxDB 3.0 | TimescaleDB | TDengine | QuestDB |
|---|---|---|---|---|
| 寫入吞吐 | 50萬點/秒 | 10萬點/秒 | 100萬點/秒 | 150萬點/秒 |
| 查詢延遲(簡單聚合) | <10ms | <20ms | <5ms | <5ms |
| SQL相容性 | InfluxQL + 基礎SQL | 完整PostgreSQL | 部分SQL | 大部分SQL |
| JOIN支援 | 有限 | 完整 | 有限 | 有限 |
| 降取樣 | Flux Task / SQL | Continuous Aggregates | INTERVAL查詢 | SAMPLE BY |
| 叢集 | 3.0原生 | 依賴PG | 原生 | 企業版 |
| 生態 | Grafana/Prometheus最強 | PG生態完整 | IoT生態 | 金融/IoT |
| 學習曲線 | 中 | 低(SQL) | 中 | 低(SQL) |
| 邊緣部署 | 一般 | 不適合 | 極佳 | 一般 |
| 運維複雜度 | 中 | 中 | 低 | 低 |
總結:時序資料庫選型沒有銀彈,關鍵看場景——監控選InfluxDB(生態最強)、金融選TimescaleDB(SQL完整)、IoT選TDengine(寫入最快+邊緣友好)、高頻選QuestDB(SIMD最佳化極致)。大型平台推薦多資料庫路由架構,按資料特徵自動分流,避免一個資料庫扛所有場景。
線上工具推薦
- JSON格式化:/zh-TW/json/format
- Hash計算:/zh-TW/encode/hash
- cURL轉程式碼:/zh-TW/dev/curl-to-code
- SQL格式化:/zh-TW/dev/sql-format
本站提供瀏覽器本地工具,免註冊即可試用 →