Go GraphQL聯邦實戰:從子圖到超圖的6種生產模式

技术架构

當單體GraphQL Schema遇上團隊邊界:微服務的GraphQL困局

一個電商平台的GraphQL Schema膨脹到3000行,使用者團隊、商品團隊、訂單團隊都在同一個schema上修改。每次發佈都要協調,一個團隊的breaking change拖垮整個API。更糟糕的是,跨服務的N+1查詢讓回應時間從50ms飆升到3s——使用者查詢訂單列表,每個訂單要查商品詳情,每個商品要查庫存狀態,100個訂單就是300次下游呼叫。

這不是假設。當你的微服務架構已經拆分,但GraphQL層仍然是單體時,團隊自治和API效能就成了不可調和的矛盾。GraphQL Federation就是為了解決這個問題而生——讓每個服務擁有自己的GraphQL Schema(子圖),透過閘道器組合成統一的API(超圖),同時避免N+1查詢和跨團隊耦合。


核心概念速查

概念 用途 關鍵特性 典型場景
Federation 多個GraphQL服務組合為統一API 對客戶端透明,每個服務獨立部署 微服務架構下的統一API層
Subgraph 單個服務的GraphQL Schema 擁有獨立型別和解析器,透過@key宣告實體 使用者服務、商品服務、訂單服務
Supergraph 所有子圖組合後的完整Schema 由閘道器自動合成,客戶端只看到超圖 統一API入口
Entity 跨子圖共享的型別 用@key標識,多個子圖可貢獻欄位 User、Product、Order等核心領域物件
@key 宣告實體的唯一標識欄位 支援複合鍵,多個@key表示多組標識 @key(fields: "id")@key(fields: "sku warehouseId")
Gateway 聯邦查詢路由和執行引擎 查詢規劃、實體批量解析、快取 Apollo Router、Apollo Gateway
Schema Stitching 手動組合多個GraphQL Schema 更靈活但需手動處理衝突 自定義組合邏輯、非標準聯邦場景

GraphQL聯邦架構的5大挑戰

挑戰1:實體邊界劃分不清

使用者服務有User的姓名和郵箱,訂單服務也有User但只關心id和訂單列表。如果User的所有欄位都放在使用者服務,訂單服務每次查詢都要跨服務呼叫;如果分散在多個服務,實體的歸屬和一致性就成了問題。

挑戰2:N+1查詢在聯邦層放大

客戶端查詢 { orders { user { name } } },閘道器先從訂單服務獲取訂單列表,再為每個訂單的userId去使用者服務解析User實體。100個訂單就是100次User實體解析請求,效能災難。

挑戰3:Schema演進和相容性

商品服務要給Product新增required欄位,但訂單服務的Product引用可能不相容。子圖的breaking change可能影響整個超圖,但誰來做全域性相容性檢查?

挑戰4:認證和授權穿透

JWT Token需要從閘道器傳遞到每個子圖,不同子圖可能有不同的許可權模型。使用者服務需要user:read許可權,訂單服務需要order:read許可權,如何在閘道器層統一處理?

挑戰5:可觀測性和錯誤追蹤

一個查詢可能涉及3個子圖,當查詢失敗時,錯誤來自哪個子圖?延遲瓶頸在哪個服務?分散式追蹤如何在GraphQL層正確傳播?


6種生產級聯邦模式

模式1:子圖服務定義——gqlgen基礎搭建

子圖是聯邦的基本單元。使用gqlgen生成GraphQL服務,宣告聯邦directive,定義實體型別。

GraphQL Schema(users.graphqls)

extend schema
  @link(url: "https://specs.apollo.dev/federation/v2.0",
        import: ["@key", "@shareable", "@external", "@requires"])

type User @key(fields: "id") @key(fields: "email") {
  id: ID!
  email: String!
  name: String!
  avatar: String
  createdAt: String!
  orders: [Order!]!
}

type Order @key(fields: "id") @shareable {
  id: ID!
  userId: ID!
  items: [OrderItem!]!
  total: Float!
  status: OrderStatus!
}

enum OrderStatus {
  PENDING
  CONFIRMED
  SHIPPED
  DELIVERED
  CANCELLED
}

type OrderItem {
  productId: ID!
  quantity: Int!
  price: Float!
}

Go Resolver實作

package graph

import (
	"context"
	"fmt"

	"github.com/99designs/gqlgen/graphql"
	"github.com/99designs/gqlgen/graphql/handler"
	"github.com/99designs/gqlgen/graphql/handler/extension"
	"github.com/99designs/gqlgen/graphql/handler/transport"
)

type User struct {
	ID        string `json:"id"`
	Email     string `json:"email"`
	Name      string `json:"name"`
	Avatar    string `json:"avatar,omitempty"`
	CreatedAt string `json:"createdAt"`
}

type Order struct {
	ID     string     `json:"id"`
	UserID string     `json:"userId"`
	Items  []OrderItem `json:"items"`
	Total  float64    `json:"total"`
	Status string     `json:"status"`
}

type OrderItem struct {
	ProductID string  `json:"productId"`
	Quantity  int     `json:"quantity"`
	Price     float64 `json:"price"`
}

type Resolver struct {
	userRepo  UserRepository
	orderRepo OrderRepository
}

func NewResolver(userRepo UserRepository, orderRepo OrderRepository) *Resolver {
	return &Resolver{userRepo: userRepo, orderRepo: orderRepo}
}

func (r *Resolver) User(ctx context.Context, id string) (*User, error) {
	user, err := r.userRepo.FindByID(ctx, id)
	if err != nil {
		return nil, fmt.Errorf("user not found: %w", err)
	}
	return user, nil
}

func (r *Resolver) Users(ctx context.Context, limit int, offset int) ([]*User, error) {
	return r.userRepo.List(ctx, limit, offset)
}

func (r *entityResolver) FindUserByID(ctx context.Context, id string) (*User, error) {
	return r.userRepo.FindByID(ctx, id)
}

func (r *entityResolver) FindUserByEmail(ctx context.Context, email string) (*User, error) {
	return r.userRepo.FindByEmail(ctx, email)
}

func NewGraphQLServer(resolver *Resolver) *handler.Server {
	srv := handler.New(NewExecutableSchema(Config{Resolvers: resolver}))
	srv.AddTransport(transport.POST{})
	srv.AddTransport(transport.GET{})
	srv.Use(extension.Introspection{})
	return srv
}

gqlgen設定(gqlgen.yml)

schema:
  - users.graphqls
exec:
  filename: graph/generated.go
model:
  filename: graph/model/models_gen.go
resolver:
  filename: graph/resolver.go
  type: Resolver
federation:
  filename: graph/federation.go
  package: graph

模式2:實體解析與@key指令——跨服務型別拼接

@key宣告實體的標識欄位,閘道器透過__resolveReference函式在不同子圖間解析實體。這是聯邦的核心機制。

商品子圖Schema(products.graphqls)

extend schema
  @link(url: "https://specs.apollo.dev/federation/v2.0",
        import: ["@key", "@shareable", "@external", "@requires", "@provides"])

type Product @key(fields: "id") @key(fields: "sku") {
  id: ID!
  sku: String!
  name: String!
  description: String
  price: Float!
  inventory: Int!
  category: Category!
  reviews: [Review!]! @provides(fields: "rating")
}

type Category @key(fields: "id") {
  id: ID!
  name: String!
  parent: Category
}

type Review {
  id: ID!
  userId: ID!
  productId: ID!
  rating: Int!
  comment: String
}

type Query {
  product(id: ID!): Product
  products(categoryId: ID, limit: Int, offset: Int): [Product!]!
  category(id: ID!): Category
}

Go實體解析器

package graph

import (
	"context"
	"fmt"
)

type Product struct {
	ID          string  `json:"id"`
	SKU         string  `json:"sku"`
	Name        string  `json:"name"`
	Description string  `json:"description,omitempty"`
	Price       float64 `json:"price"`
	Inventory   int     `json:"inventory"`
	CategoryID  string  `json:"categoryId"`
}

type Category struct {
	ID       string `json:"id"`
	Name     string `json:"name"`
	ParentID string `json:"parentId,omitempty"`
}

type ProductRepository interface {
	FindByID(ctx context.Context, id string) (*Product, error)
	FindBySKU(ctx context.Context, sku string) (*Product, error)
	ListByCategory(ctx context.Context, categoryID string, limit, offset int) ([]*Product, error)
}

type entityResolver struct {
	productRepo  ProductRepository
	categoryRepo CategoryRepository
}

func (r *entityResolver) FindProductByID(ctx context.Context, id string) (*Product, error) {
	product, err := r.productRepo.FindByID(ctx, id)
	if err != nil {
		return nil, fmt.Errorf("product entity resolution failed for id=%s: %w", id, err)
	}
	return product, nil
}

func (r *entityResolver) FindProductBySKU(ctx context.Context, sku string) (*Product, error) {
	product, err := r.productRepo.FindBySKU(ctx, sku)
	if err != nil {
		return nil, fmt.Errorf("product entity resolution failed for sku=%s: %w", sku, err)
	}
	return product, nil
}

func (r *entityResolver) FindCategoryByID(ctx context.Context, id string) (*Category, error) {
	category, err := r.categoryRepo.FindByID(ctx, id)
	if err != nil {
		return nil, fmt.Errorf("category entity resolution failed for id=%s: %w", id, err)
	}
	return category, nil
}

func (r *Resolver) Product(ctx context.Context, id string) (*Product, error) {
	return r.productRepo.FindByID(ctx, id)
}

func (r *Resolver) Products(ctx context.Context, categoryId *string, limit *int, offset *int) ([]*Product, error) {
	lim := 20
	off := 0
	if limit != nil {
		lim = *limit
	}
	if offset != nil {
		off = *offset
	}
	if categoryId != nil {
		return r.productRepo.ListByCategory(ctx, *categoryId, lim, off)
	}
	return r.productRepo.List(ctx, lim, off)
}

複合鍵實體

type WarehouseStock @key(fields: "sku warehouseId") {
  sku: String!
  warehouseId: ID!
  quantity: Int!
  reservedQuantity: Int!
  location: String!
}
type WarehouseStock struct {
	SKU              string `json:"sku"`
	WarehouseID      string `json:"warehouseId"`
	Quantity         int    `json:"quantity"`
	ReservedQuantity int    `json:"reservedQuantity"`
	Location         string `json:"location"`
}

type WarehouseStockRef struct {
	SKU         string `json:"sku"`
	WarehouseID string `json:"warehouseId"`
}

func (r *entityResolver) FindWarehouseStockBySkuAndWarehouseId(
	ctx context.Context,
	sku string,
	warehouseId string,
) (*WarehouseStock, error) {
	stock, err := r.stockRepo.FindBySKUAndWarehouse(ctx, sku, warehouseId)
	if err != nil {
		return nil, fmt.Errorf("warehouse stock resolution failed: %w", err)
	}
	return stock, nil
}

模式3:Apollo Federation v2組合——從子圖到超圖

Federation v2引入了@link、@shareable、@override等新directive,讓Schema組合更靈活。使用rover CLI進行Schema檢查和發佈。

Supergraph設定(supergraph.yaml)

federation_version: =2.8.0
subgraphs:
  users:
    routing_url: http://users-service:4001/graphql
    schema:
      file: ./schemas/users.graphqls
  products:
    routing_url: http://products-service:4002/graphql
    schema:
      file: ./schemas/products.graphqls
  orders:
    routing_url: http://orders-service:4003/graphql
    schema:
      file: ./schemas/orders.graphqls
  reviews:
    routing_url: http://reviews-service:4004/graphql
    schema:
      file: ./schemas/reviews.graphqls

訂單子圖Schema(orders.graphqls)

extend schema
  @link(url: "https://specs.apollo.dev/federation/v2.0",
        import: ["@key", "@shareable", "@external", "@requires"])

type Order @key(fields: "id") {
  id: ID!
  userId: ID!
  items: [OrderItem!]!
  total: Float!
  status: OrderStatus!
  shippingAddress: Address
  createdAt: String!
  user: User @requires(fields: "userId")
}

type OrderItem {
  productId: ID!
  quantity: Int!
  unitPrice: Float!
  product: Product
}

type Address @shareable {
  street: String!
  city: String!
  state: String!
  zipCode: String!
  country: String!
}

type User @key(fields: "id") @shareable {
  id: ID! @external
  orders: [Order!]!
}

type Product @key(fields: "id") @shareable {
  id: ID! @external
  orderItems: [OrderItem!]!
}

enum OrderStatus {
  PENDING
  CONFIRMED
  SHIPPED
  DELIVERED
  CANCELLED
}

type Query {
  order(id: ID!): Order
  orders(userId: ID, status: OrderStatus, limit: Int, offset: Int): [Order!]!
}

Schema檢查和發佈

# 檢查子圖Schema相容性
rover subgraph check my-graph \
  --name users \
  --schema ./schemas/users.graphqls

# 發佈子圖Schema
rover subgraph publish my-graph@production \
  --name users \
  --schema ./schemas/users.graphqls \
  --routing-url http://users-service:4001/graphql

# 組合超圖
rover supergraph compose --config supergraph.yaml > supergraph.graphqls

Go子圖HTTP服務

package main

import (
	"log"
	"net/http"
	"os"

	"github.com/99designs/gqlgen/graphql/handler"
	"github.com/99designs/gqlgen/graphql/playground"
	"github.com/go-chi/chi/v5"
)

func main() {
	port := os.Getenv("PORT")
	if port == "" {
		port = "4001"
	}

	router := chi.NewRouter()

	userRepo := NewPostgresUserRepository(os.Getenv("DATABASE_URL"))
	orderRepo := NewPostgresOrderRepository(os.Getenv("DATABASE_URL"))
	resolver := graph.NewResolver(userRepo, orderRepo)

	srv := handler.NewDefaultServer(graph.NewExecutableSchema(
		graph.Config{Resolvers: resolver},
	))

	router.Handle("/", playground.Handler("GraphQL Playground", "/query"))
	router.Handle("/query", srv)

	log.Printf("🚀 Users subgraph running on :%s", port)
	log.Fatal(http.ListenAndServe(":"+port, router))
}

模式4:閘道器路由與查詢規劃——Apollo Router

Apollo Router是用Rust編寫的高效能閘道器,支援查詢規劃、實體批量解析、快取和可觀測性。

Router設定(router.yaml)

supergraph:
  listen: 0.0.0.0:4000
  path: /graphql
  introspection: true

health_check:
  listen: 0.0.0.0:8088

cors:
  origins:
    - https://app.example.com
    - http://localhost:3000
  methods:
    - GET
    - POST
  headers:
    - Authorization
    - Content-Type
    - X-Request-ID

headers:
  all:
    request:
      - propagate:
          matching: "^X-.*"
      - propagate:
          named: Authorization
  subgraphs:
    users:
      request:
        - propagate:
            named: Authorization
        - set:
            name: X-User-Service-Key
            value: "${USERS_SERVICE_KEY}"
    orders:
      request:
        - propagate:
            named: Authorization

traffic_shaping:
  all:
    rate_limit:
      capacity: 1000
      interval: 1s
  subgraphs:
    users:
      timeout: 5s
      rate_limit:
        capacity: 500
        interval: 1s
    products:
      timeout: 3s
    orders:
      timeout: 10s

telemetry:
  tracing:
    common:
      service_name: apollo-router
    otlp:
      endpoint: http://otel-collector:4317
      protocol: grpc
  metrics:
    common:
      service_name: apollo-router
    otlp:
      endpoint: http://otel-collector:4317
      protocol: grpc
  logging:
    format: json

Docker Compose部署

version: "3.9"

services:
  router:
    image: ghcr.io/apollographql/router:v1.45.0
    ports:
      - "4000:4000"
      - "8088:8088"
    volumes:
      - ./router.yaml:/dist/configuration/router.yaml:ro
      - ./supergraph.graphqls:/dist/schema/supergraph.graphqls:ro
    environment:
      - USERS_SERVICE_KEY=${USERS_SERVICE_KEY}
      - APOLLO_KEY=${APOLLO_KEY}
      - APOLLO_GRAPH_REF=${APOLLO_GRAPH_REF}
    depends_on:
      - users-service
      - products-service
      - orders-service
      - reviews-service
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8088/health"]
      interval: 10s
      timeout: 5s
      retries: 3

  users-service:
    build:
      context: ./services/users
      dockerfile: Dockerfile
    ports:
      - "4001:4001"
    environment:
      - DATABASE_URL=postgres://users:password@postgres:5432/users?sslmode=disable
      - PORT=4001
    depends_on:
      - postgres

  products-service:
    build:
      context: ./services/products
      dockerfile: Dockerfile
    ports:
      - "4002:4002"
    environment:
      - DATABASE_URL=postgres://products:password@postgres:5432/products?sslmode=disable
      - PORT=4002

  orders-service:
    build:
      context: ./services/orders
      dockerfile: Dockerfile
    ports:
      - "4003:4003"
    environment:
      - DATABASE_URL=postgres://orders:password@postgres:5432/orders?sslmode=disable
      - PORT=4003

  reviews-service:
    build:
      context: ./services/reviews
      dockerfile: Dockerfile
    ports:
      - "4004:4004"
    environment:
      - DATABASE_URL=postgres://reviews:password@postgres:5432/reviews?sslmode=disable
      - PORT=4004

  postgres:
    image: postgres:16-alpine
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_MULTIPLE_DATABASES=users,products,orders,reviews
      - POSTGRES_PASSWORD=password
    volumes:
      - pgdata:/var/lib/postgresql/data

volumes:
  pgdata:

查詢規劃範例

query GetUserWithOrders {
  user(id: "user-123") {
    name
    email
    orders(limit: 10) {
      id
      total
      status
      items {
        product {
          name
          price
        }
        quantity
      }
    }
  }
}

閘道器的查詢規劃器會生成如下執行計畫:

  1. 從users子圖查詢User的name、email
  2. 從orders子圖查詢User(id="user-123")的orders
  3. 批量從products子圖解析OrderItem中的Product實體
  4. 合併結果回傳給客戶端

模式5:跨服務資料獲取與N+1防護——DataLoader模式

N+1是GraphQL聯邦最嚴重的效能問題。DataLoader透過批量載入和去重,將N次實體解析合併為1次批量查詢。

Go DataLoader實作

package dataloader

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type BatchFunc[K comparable, V any] func(ctx context.Context, keys []K) (map[K]V, error)

type Loader[K comparable, V any] struct {
	batchFn  BatchFunc[K, V]
	cache    map[K]V
	pending  map[K]chan result[V]
	mu       sync.Mutex
	maxBatch int
	wait     time.Duration
}

type result[V any] struct {
	value V
	err   error
}

func NewLoader[K comparable, V any](batchFn BatchFunc[K, V], opts ...Option[K, V]) *Loader[K, V] {
	l := &Loader[K, V]{
		batchFn:  batchFn,
		cache:    make(map[K]V),
		pending:  make(map[K]chan result[V]),
		maxBatch: 100,
		wait:     10 * time.Millisecond,
	}
	for _, opt := range opts {
		opt(l)
	}
	return l
}

type Option[K comparable, V any] func(*Loader[K, V])

func WithMaxBatch[K comparable, V any](n int) Option[K, V] {
	return func(l *Loader[K, V]) { l.maxBatch = n }
}

func WithWait[K comparable, V any](d time.Duration) Option[K, V] {
	return func(l *Loader[K, V]) { l.wait = d }
}

func (l *Loader[K, V]) Load(ctx context.Context, key K) (V, error) {
	l.mu.Lock()

	if v, ok := l.cache[key]; ok {
		l.mu.Unlock()
		return v, nil
	}

	if ch, ok := l.pending[key]; ok {
		l.mu.Unlock()
		res := <-ch
		return res.value, res.err
	}

	ch := make(chan result[V], 1)
	l.pending[key] = ch

	if len(l.pending) >= l.maxBatch {
		l.mu.Unlock()
		l.dispatch(ctx)
	} else {
		l.mu.Unlock()
		time.AfterFunc(l.wait, func() { l.dispatch(ctx) })
	}

	res := <-ch
	return res.value, res.err
}

func (l *Loader[K, V]) dispatch(ctx context.Context) {
	l.mu.Lock()
	if len(l.pending) == 0 {
		l.mu.Unlock()
		return
	}

	keys := make([]K, 0, len(l.pending))
	chs := make(map[K][]chan result[V], len(l.pending))
	for k, ch := range l.pending {
		keys = append(keys, k)
		chs[k] = append(chs[k], ch)
		delete(l.pending, k)
	}
	l.mu.Unlock()

	results, err := l.batchFn(ctx, keys)

	for _, key := range keys {
		var res result[V]
		if err != nil {
			res = result[V]{err: err}
		} else if v, ok := results[key]; ok {
			res = result[V]{value: v}
			l.mu.Lock()
			l.cache[key] = v
			l.mu.Unlock()
		} else {
			res = result[V]{err: fmt.Errorf("key not found: %v", key)}
		}
		for _, ch := range chs[key] {
			ch <- res
		}
	}
}

func (l *Loader[K, V]) LoadMany(ctx context.Context, keys []K) ([]V, error) {
	values := make([]V, len(keys))
	var firstErr error
	for i, key := range keys {
		v, err := l.Load(ctx, key)
		if err != nil && firstErr == nil {
			firstErr = err
		}
		values[i] = v
	}
	return values, firstErr
}

在Resolver中使用DataLoader

package graph

import (
	"context"
	"fmt"

	"myapp/dataloader"
)

type Loaders struct {
	UserByID    *dataloader.Loader[string, *User]
	ProductByID *dataloader.Loader[string, *Product]
	OrderByID   *dataloader.Loader[string, *Order]
}

func NewLoaders(userRepo UserRepository, productRepo ProductRepository, orderRepo OrderRepository) *Loaders {
	return &Loaders{
		UserByID: dataloader.NewLoader(func(ctx context.Context, ids []string) (map[string]*User, error) {
			users, err := userRepo.FindByIDs(ctx, ids)
			if err != nil {
				return nil, fmt.Errorf("batch user load failed: %w", err)
			}
			result := make(map[string]*User, len(users))
			for _, u := range users {
				result[u.ID] = u
			}
			return result, nil
		}, dataloader.WithMaxBatch[string, *User](200), dataloader.WithWait[string, *User](5*time.Millisecond)),

		ProductByID: dataloader.NewLoader(func(ctx context.Context, ids []string) (map[string]*Product, error) {
			products, err := productRepo.FindByIDs(ctx, ids)
			if err != nil {
				return nil, fmt.Errorf("batch product load failed: %w", err)
			}
			result := make(map[string]*Product, len(products))
			for _, p := range products {
				result[p.ID] = p
			}
			return result, nil
		}, dataloader.WithMaxBatch[string, *Product](200), dataloader.WithWait[string, *Product](5*time.Millisecond)),

		OrderByID: dataloader.NewLoader(func(ctx context.Context, ids []string) (map[string]*Order, error) {
			orders, err := orderRepo.FindByIDs(ctx, ids)
			if err != nil {
				return nil, fmt.Errorf("batch order load failed: %w", err)
			}
			result := make(map[string]*Order, len(orders))
			for _, o := range orders {
				result[o.ID] = o
			}
			return result, nil
		}, dataloader.WithMaxBatch[string, *Order](200), dataloader.WithWait[string, *Order](5*time.Millisecond)),
	}
}

func (r *orderResolver) User(ctx context.Context, obj *Order) (*User, error) {
	loader := ctx.Value(loaderKey).(*Loaders)
	return loader.UserByID.Load(ctx, obj.UserID)
}

func (r *orderItemResolver) Product(ctx context.Context, obj *OrderItem) (*Product, error) {
	loader := ctx.Value(loaderKey).(*Loaders)
	return loader.ProductByID.Load(ctx, obj.ProductID)
}

批量查詢Repository

package repository

import (
	"context"
	"database/sql"
	"fmt"
	"strings"

	_ "github.com/lib/pq"
)

type PostgresUserRepository struct {
	db *sql.DB
}

func NewPostgresUserRepository(dbURL string) (*PostgresUserRepository, error) {
	db, err := sql.Open("postgres", dbURL)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to database: %w", err)
	}
	db.SetMaxOpenConns(25)
	db.SetMaxIdleConns(10)
	return &PostgresUserRepository{db: db}, nil
}

func (r *PostgresUserRepository) FindByIDs(ctx context.Context, ids []string) ([]*User, error) {
	if len(ids) == 0 {
		return nil, nil
	}

	placeholders := make([]string, len(ids))
	args := make([]interface{}, len(ids))
	for i, id := range ids {
		placeholders[i] = fmt.Sprintf("$%d", i+1)
		args[i] = id
	}

	query := fmt.Sprintf(
		"SELECT id, email, name, avatar, created_at FROM users WHERE id IN (%s)",
		strings.Join(placeholders, ","),
	)

	rows, err := r.db.QueryContext(ctx, query, args...)
	if err != nil {
		return nil, fmt.Errorf("batch query users failed: %w", err)
	}
	defer rows.Close()

	users := make([]*User, 0, len(ids))
	for rows.Next() {
		var u User
		var avatar sql.NullString
		if err := rows.Scan(&u.ID, &u.Email, &u.Name, &avatar, &u.CreatedAt); err != nil {
			return nil, fmt.Errorf("scan user row failed: %w", err)
		}
		if avatar.Valid {
			u.Avatar = avatar.String
		}
		users = append(users, &u)
	}
	return users, rows.Err()
}

模式6:生產級聯邦架構——認證、監控與高可用

生產環境的聯邦架構需要處理認證穿透、分散式追蹤、限流熔斷和優雅降級。

認證中介軟體

package middleware

import (
	"context"
	"net/http"
	"strings"

	"github.com/golang-jwt/jwt/v5"
)

type contextKey string

const (
	userIDKey    contextKey = "userID"
	userRoleKey  contextKey = "userRole"
	authHeaderKey           = "Authorization"
)

type Claims struct {
	UserID string `json:"sub"`
	Role   string `json:"role"`
	jwt.RegisteredClaims
}

func AuthMiddleware(jwtSecret string) func(http.Handler) http.Handler {
	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			authHeader := r.Header.Get(authHeaderKey)
			if authHeader == "" {
				next.ServeHTTP(w, r)
				return
			}

			tokenStr := strings.TrimPrefix(authHeader, "Bearer ")
			if tokenStr == authHeader {
				next.ServeHTTP(w, r)
				return
			}

			token, err := jwt.ParseWithClaims(tokenStr, &Claims{}, func(t *jwt.Token) (interface{}, error) {
				if _, ok := t.Method.(*jwt.SigningMethodHMAC); !ok {
					return nil, fmt.Errorf("unexpected signing method: %v", t.Header["alg"])
				}
				return []byte(jwtSecret), nil
			})

			if err != nil || !token.Valid {
				http.Error(w, "invalid token", http.StatusUnauthorized)
				return
			}

			claims, ok := token.Claims.(*Claims)
			if !ok {
				http.Error(w, "invalid claims", http.StatusUnauthorized)
				return
			}

			ctx := context.WithValue(r.Context(), userIDKey, claims.UserID)
			ctx = context.WithValue(ctx, userRoleKey, claims.Role)
			next.ServeHTTP(w, r.WithContext(ctx))
		})
	}
}

func GetUserID(ctx context.Context) string {
	if v, ok := ctx.Value(userIDKey).(string); ok {
		return v
	}
	return ""
}

func GetUserRole(ctx context.Context) string {
	if v, ok := ctx.Value(userRoleKey).(string); ok {
		return v
	}
	return ""
}

OpenTelemetry追蹤整合

package telemetry

import (
	"context"
	"fmt"
	"time"

	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
	"go.opentelemetry.io/otel/propagation"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	"go.opentelemetry.io/otel/trace"
)

func InitTracer(ctx context.Context, endpoint string) (func(context.Context) error, error) {
	exporter, err := otlptracegrpc.New(ctx,
		otlptracegrpc.WithEndpoint(endpoint),
		otlptracegrpc.WithInsecure(),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create OTLP exporter: %w", err)
	}

	provider := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(newResource()),
	)

	otel.SetTracerProvider(provider)
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
		propagation.TraceContext{},
		propagation.Baggage{},
	))

	return provider.Shutdown, nil
}

func StartSpan(ctx context.Context, name string) (context.Context, trace.Span) {
	tracer := otel.Tracer("graphql-federation")
	return tracer.Start(ctx, name)
}

限流和熔斷

package middleware

import (
	"context"
	"fmt"
	"net/http"
	"sync"
	"time"
)

type RateLimiter struct {
	mu       sync.Mutex
	clients  map[string]*clientBucket
	rate     int
	interval time.Duration
}

type clientBucket struct {
	tokens   int
	lastSeen time.Time
}

func NewRateLimiter(rate int, interval time.Duration) *RateLimiter {
	rl := &RateLimiter{
		clients:  make(map[string]*clientBucket),
		rate:     rate,
		interval: interval,
	}
	go rl.cleanup()
	return rl
}

func (rl *RateLimiter) Allow(key string) bool {
	rl.mu.Lock()
	defer rl.mu.Unlock()

	now := time.Now()
	bucket, ok := rl.clients[key]
	if !ok {
		rl.clients[key] = &clientBucket{tokens: rl.rate - 1, lastSeen: now}
		return true
	}

	elapsed := now.Sub(bucket.lastSeen)
	if elapsed >= rl.interval {
		bucket.tokens = rl.rate - 1
		bucket.lastSeen = now
		return true
	}

	if bucket.tokens <= 0 {
		return false
	}

	bucket.tokens--
	return true
}

func (rl *RateLimiter) cleanup() {
	ticker := time.NewTicker(time.Minute)
	for range ticker.C {
		rl.mu.Lock()
		for key, bucket := range rl.clients {
			if time.Since(bucket.lastSeen) > 3*rl.interval {
				delete(rl.clients, key)
			}
		}
		rl.mu.Unlock()
	}
}

func RateLimitMiddleware(limiter *RateLimiter) func(http.Handler) http.Handler {
	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			clientID := r.Header.Get("X-Client-ID")
			if clientID == "" {
				clientID = r.RemoteAddr
			}

			if !limiter.Allow(clientID) {
				http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
				return
			}

			next.ServeHTTP(w, r)
		})
	}
}

type CircuitBreaker struct {
	mu           sync.Mutex
	failureCount int
	threshold    int
	timeout      time.Duration
	state        string
	lastFailure  time.Time
}

func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker {
	return &CircuitBreaker{
		threshold: threshold,
		timeout:   timeout,
		state:     "closed",
	}
}

func (cb *CircuitBreaker) Execute(fn func() error) error {
	cb.mu.Lock()
	if cb.state == "open" {
		if time.Since(cb.lastFailure) > cb.timeout {
			cb.state = "half-open"
			cb.mu.Unlock()
		} else {
			cb.mu.Unlock()
			return fmt.Errorf("circuit breaker is open")
		}
	} else {
		cb.mu.Unlock()
	}

	err := fn()
	cb.mu.Lock()
	defer cb.mu.Unlock()

	if err != nil {
		cb.failureCount++
		cb.lastFailure = time.Now()
		if cb.failureCount >= cb.threshold {
			cb.state = "open"
		}
		return err
	}

	cb.failureCount = 0
	cb.state = "closed"
	return nil
}

完整服務啟動

package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/99designs/gqlgen/graphql/handler"
	"github.com/99designs/gqlgen/graphql/playground"
	"github.com/go-chi/chi/v5"
	chimw "github.com/go-chi/chi/v5/middleware"

	"myapp/graph"
	"myapp/middleware"
	"myapp/telemetry"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	shutdown, err := telemetry.InitTracer(ctx, os.Getenv("OTEL_ENDPOINT"))
	if err != nil {
		log.Printf("⚠️ Tracer init failed: %v", err)
	} else {
		defer shutdown(ctx)
	}

	userRepo, err := NewPostgresUserRepository(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatalf("Failed to connect to user database: %v", err)
	}
	orderRepo, err := NewPostgresOrderRepository(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatalf("Failed to connect to order database: %v", err)
	}

	resolver := graph.NewResolver(userRepo, orderRepo)
	loaders := graph.NewLoaders(userRepo, nil, orderRepo)

	srv := handler.NewDefaultServer(graph.NewExecutableSchema(
		graph.Config{Resolvers: resolver},
	))

	limiter := middleware.NewRateLimiter(100, time.Second)
	breaker := middleware.NewCircuitBreaker(5, 30*time.Second)

	router := chi.NewRouter()
	router.Use(chimw.RequestID)
	router.Use(chimw.RealIP)
	router.Use(chimw.Logger)
	router.Use(chimw.Recoverer)
	router.Use(chimw.Timeout(30 * time.Second))
	router.Use(middleware.AuthMiddleware(os.Getenv("JWT_SECRET")))
	router.Use(middleware.RateLimitMiddleware(limiter))

	router.Handle("/", playground.Handler("GraphQL Playground", "/query"))
	router.Handle("/query", srv)
	router.Get("/health", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
		w.Write([]byte("ok"))
	})

	port := os.Getenv("PORT")
	if port == "" {
		port = "4001"
	}

	server := &http.Server{
		Addr:         ":" + port,
		Handler:      router,
		ReadTimeout:  15 * time.Second,
		WriteTimeout: 30 * time.Second,
		IdleTimeout:  60 * time.Second,
	}

	go func() {
		log.Printf("🚀 Subgraph running on :%s", port)
		if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("Server error: %v", err)
		}
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit

	log.Println("Shutting down gracefully...")
	shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
	defer shutdownCancel()

	if err := server.Shutdown(shutdownCtx); err != nil {
		log.Fatalf("Forced shutdown: %v", err)
	}
	log.Println("Server stopped")
	_ = breaker
	_ = loaders
}

5大常見陷阱

陷阱1:忘記在子圖中實作__resolveReference

錯誤做法

// 只定義了@key但沒有實作實體解析
type Resolver struct{}

// 缺少這個方法,閘道器無法解析跨子圖實體
// func (r *entityResolver) FindUserByID(ctx context.Context, id string) (*User, error) { ... }

正確做法

func (r *entityResolver) FindUserByID(ctx context.Context, id string) (*User, error) {
	return r.userRepo.FindByID(ctx, id)
}

func (r *entityResolver) FindUserByEmail(ctx context.Context, email string) (*User, error) {
	return r.userRepo.FindByEmail(ctx, email)
}

陷阱2:@shareable濫用導致資料不一致

錯誤做法

type User @key(fields: "id") @shareable {
  id: ID!
  name: String!
  email: String!
  orderCount: Int!  # 多個子圖都提供此欄位,但計算邏輯不同
}

正確做法

type User @key(fields: "id") {
  id: ID!
  name: String!
  email: String!
}

type UserOrderStats @key(fields: "userId") {
  userId: ID!
  orderCount: Int!
  totalSpent: Float!
}

陷阱3:N+1查詢未使用DataLoader

錯誤做法

func (r *orderResolver) User(ctx context.Context, obj *Order) (*User, error) {
    // 每個Order都發一次HTTP請求到使用者服務
    resp, err := http.Get(fmt.Sprintf("http://users-service/users/%s", obj.UserID))
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    var user User
    json.NewDecoder(resp.Body).Decode(&user)
    return &user, nil
}

正確做法

func (r *orderResolver) User(ctx context.Context, obj *Order) (*User, error) {
    loader := ctx.Value(loaderKey).(*Loaders)
    return loader.UserByID.Load(ctx, obj.UserID)
}

陷阱4:子圖Schema變更未做相容性檢查

錯誤做法

# 直接發佈,不做檢查
rover subgraph publish my-graph@production \
  --name users \
  --schema ./schemas/users.graphqls

正確做法

# 先檢查相容性
rover subgraph check my-graph@production \
  --name users \
  --schema ./schemas/users.graphqls

# 確認無breaking change後再發佈
rover subgraph publish my-graph@production \
  --name users \
  --schema ./schemas/users.graphqls \
  --routing-url http://users-service:4001/graphql

陷阱5:閘道器層缺少超時和重試設定

錯誤做法

# router.yaml - 沒有任何超時和重試設定
supergraph:
  listen: 0.0.0.0:4000

正確做法

supergraph:
  listen: 0.0.0.0:4000

traffic_shaping:
  all:
    timeout: 30s
    rate_limit:
      capacity: 1000
      interval: 1s
  subgraphs:
    users:
      timeout: 5s
    products:
      timeout: 3s
    orders:
      timeout: 10s

錯誤排查速查表

錯誤訊息 原因 解決方案
ENCORE_UNKNOWN_DIRECTIVE 子圖使用了未import的federation directive 在@link中新增缺失的directive import
KEY_FIELDS_MISSING_ON_BASE @key引用的欄位在型別上不存在 確保@key指定的欄位已在型別定義中宣告
EXTERNAL_TYPE_MISMATCH @external宣告的型別與擁有子圖不一致 檢查@external欄位的型別是否與原始定義一致
SHAREABLE_MISMATCH 同一型別在不同子圖中@shareable宣告不一致 所有子圖中共享的型別必須都標記@shareable
RESOLVE_REFERENCE_FAILED __resolveReference實作回傳錯誤 檢查實體解析器的資料庫查詢和錯誤處理
QUERY_PLAN_TIMEOUT 查詢規劃超時,子圖過多或查詢過深 限制查詢深度,最佳化Schema結構
SUBGRAPH_UNREACHABLE 子圖服務不可達 檢查子圖服務健康狀態和網路連通性
COMPOSITION_ERROR Schema組合失敗,型別衝突 使用rover subgraph check檢查相容性
N+1_DETECTED 閘道器偵測到N+1查詢模式 為實體解析新增DataLoader批量載入
CIRCULAR_DEPENDENCY 子圖間存在循環依賴 重構實體邊界,使用@requires替代直接引用

進階最佳化

查詢複雜度分析與限制

GraphQL查詢的複雜度可能被惡意利用,一個深度巢狀查詢可以產生指數級的資料量。透過查詢複雜度分析限制查詢成本。

package middleware

import (
	"context"
	"fmt"

	"github.com/99designs/gqlgen/graphql"
)

type ComplexityLimit struct {
	maxComplexity int
}

func NewComplexityLimit(max int) *ComplexityLimit {
	return &ComplexityLimit{maxComplexity: max}
}

func (cl *ComplexityLimit) Extension() graphql.HandlerExtension {
	return graphql.FixedComplexityLimit(cl.maxComplexity)
}

type fieldComplexity struct {
	complexity int
	details    map[string]int
}

func CalculateQueryComplexity(ctx context.Context, req *graphql.Request) (*fieldComplexity, error) {
	complexity := 0
	details := make(map[string]int)

	operation := req.Doc().Operations
	for _, op := range operation {
		for _, sel := range op.SelectionSet {
			calcSelectionComplexity(sel, &complexity, details, 1)
		}
	}

	if complexity > 500 {
		return nil, fmt.Errorf("query complexity %d exceeds limit 500", complexity)
	}

	return &fieldComplexity{complexity: complexity, details: details}, nil
}

func calcSelectionComplexity(sel ast.Selection, total *int, details map[string]int, depth int) {
	switch s := sel.(type) {
	case *ast.Field:
		fieldCost := 1
		if s.SelectionSet != nil {
			fieldCost *= depth
		}
		*total += fieldCost
		details[s.Name.Value] += fieldCost
		if s.SelectionSet != nil {
			for _, child := range s.SelectionSet {
				calcSelectionComplexity(child, total, details, depth+1)
			}
		}
	case *ast.InlineFragment:
		for _, child := range s.SelectionSet {
			calcSelectionComplexity(child, total, details, depth)
		}
	case *ast.FragmentSpread:
		for _, child := range s.Definition.SelectionSet {
			calcSelectionComplexity(child, total, details, depth)
		}
	}
}

持久化查詢與查詢註冊

生產環境應使用持久化查詢(Persisted Queries),客戶端只傳送查詢hash,避免傳輸完整查詢文字,同時防止未知查詢執行。

package persistedquery

import (
	"context"
	"crypto/sha256"
	"encoding/hex"
	"fmt"
	"sync"

	"github.com/99designs/gqlgen/graphql"
)

type PersistedQueryManager struct {
	mu      sync.RWMutex
	queries map[string]string
	strict  bool
}

func NewPersistedQueryManager(strict bool) *PersistedQueryManager {
	return &PersistedQueryManager{
		queries: make(map[string]string),
		strict:  strict,
	}
}

func (pqm *PersistedQueryManager) Register(hash, query string) {
	pqm.mu.Lock()
	defer pqm.mu.Unlock()
	pqm.queries[hash] = query
}

func (pqm *PersistedQueryManager) Middleware() graphql.RequestMiddleware {
	return func(ctx context.Context, next graphql.ResponseHandler) *graphql.Response {
		reqCtx := graphql.GetRequestContext(ctx)
		hash := reqCtx.RawQuery

		if len(hash) == 64 {
			pqm.mu.RLock()
			query, ok := pqm.queries[hash]
			pqm.mu.RUnlock()

			if ok {
				reqCtx.RawQuery = query
			} else if pqm.strict {
				panic(fmt.Sprintf("unknown persisted query: %s", hash))
			}
		}

		return next(ctx)
	}
}

func HashQuery(query string) string {
	h := sha256.Sum256([]byte(query))
	return hex.EncodeToString(h[:])
}

子圖快取策略

子圖層級的快取可以顯著減少重複查詢,特別是熱點實體。

package cache

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

type EntityCache struct {
	rdb    *redis.Client
	prefix string
	ttl    time.Duration
}

func NewEntityCache(redisURL, prefix string, ttl time.Duration) (*EntityCache, error) {
	opts, err := redis.ParseURL(redisURL)
	if err != nil {
		return nil, fmt.Errorf("invalid redis URL: %w", err)
	}

	return &EntityCache{
		rdb:    redis.NewClient(opts),
		prefix: prefix,
		ttl:    ttl,
	}, nil
}

func (c *EntityCache) Get(ctx context.Context, entityType, id string, dest interface{}) error {
	key := fmt.Sprintf("%s:%s:%s", c.prefix, entityType, id)
	val, err := c.rdb.Get(ctx, key).Result()
	if err == redis.Nil {
		return fmt.Errorf("cache miss for %s:%s", entityType, id)
	}
	if err != nil {
		return fmt.Errorf("cache read error: %w", err)
	}
	return json.Unmarshal([]byte(val), dest)
}

func (c *EntityCache) Set(ctx context.Context, entityType, id string, val interface{}) error {
	key := fmt.Sprintf("%s:%s:%s", c.prefix, entityType, id)
	data, err := json.Marshal(val)
	if err != nil {
		return fmt.Errorf("cache marshal error: %w", err)
	}
	return c.rdb.Set(ctx, key, data, c.ttl).Err()
}

func (c *EntityCache) Invalidate(ctx context.Context, entityType, id string) error {
	key := fmt.Sprintf("%s:%s:%s", c.prefix, entityType, id)
	return c.rdb.Del(ctx, key).Err()
}

func (c *EntityCache) InvalidatePattern(ctx context.Context, pattern string) error {
	iter := c.rdb.Scan(ctx, 0, fmt.Sprintf("%s:%s:*", c.prefix, pattern), 100).Iterator()
	var keys []string
	for iter.Next(ctx) {
		keys = append(keys, iter.Val())
	}
	if err := iter.Err(); err != nil {
		return fmt.Errorf("cache scan error: %w", err)
	}
	if len(keys) > 0 {
		return c.rdb.Del(ctx, keys...).Err()
	}
	return nil
}

技術方案對比

維度 Apollo Federation Schema Stitching REST API gRPC tRPC
學習曲線 中等,需理解聯邦概念 高,手動處理衝突 中等,需學Proto 低(僅TypeScript)
Schema管理 自動組合,rover CLI 手動拼接,自定義resolver 無統一Schema Proto定義,自動生成 TypeScript型別推斷
跨團隊協作 優秀,子圖獨立演進 一般,衝突需手動解決 差,介面文件易過時 好,Proto即契約 僅限TS全端
效能 好,查詢規劃+批量解析 一般,N+1需手動處理 差,多次請求 優秀,二進位+HTTP/2 好,端到端型別安全
N+1防護 內建DataLoader支援 需手動實作
生態成熟度 高,Apollo全鏈路 中等,社群方案 中等
語言支援 全語言,Go/Java/TS等 全語言 全語言 全語言 僅TypeScript
即時訂閱 支援 支援 需WebSocket 需雙向串流 支援
可觀測性 Apollo Studio整合 需自建 需自建 OpenTelemetry 需自建
適用場景 大規模微服務API 自定義組合邏輯 簡單CRUD 內部高效能通訊 TS全端專案

總結:GraphQL Federation不是銀彈,但它是目前解決微服務GraphQL架構最成熟的方案。核心原則:子圖按領域邊界拆分,實體用@key宣告,N+1用DataLoader防護,閘道器做查詢規劃和限流,生產環境必須加認證追蹤和快取。從2個子圖開始,逐步拆分,不要一步到位。Schema檢查必須在CI中強制執行,否則breaking change運早會炸掉整個超圖。


推薦工具

本站提供瀏覽器本地工具,免註冊即可試用 →

#GraphQL#Federation#Go#微服务#Apollo#2026#API网关