Go Serverless実践:2026年Knativeでゼロからイベント駆動マイクロサービスを構築する

DevOps

こんな問題に直面していませんか?

マイクロサービスに分割した後、サービス数が爆発的に増え、各サービスは実行インスタンスを維持する必要があります——1日に数回しか呼ばれないサービスでも。早朝3時のトラフィック低谷時、数十のサービスがアイドリングでコストを消費し、プロモーション時のトラフィック急増にはスケーリングが常に遅れます。さらに頭を悩ませるのは、イベント駆動のビジネスシナリオが増え続けていること:注文作成後に在庫引き当て、支払い成功後に物流通知、ユーザー登録後にウェルカムメール送信……これらの非同期チェーンを従来のマイクロサービスで実装するのは重くて複雑です。

「オンデマンド実行、イベントトリガー、オートスケーリング」の軽量ソリューションをお探しなら、Knative + Go は2026年に最も投資すべき組み合わせです。


Knativeコアアーキテクチャ概要

KnativeはKubernetes上に構築されたServerlessフレームワークで、2つのコアコンポーネントで構成されています:

コンポーネント 責務 コア概念
Serving リクエストルーティング、オートスケーリング、バージョン管理 Service → Configuration → Route → Revision
Eventing イベントルーティング、トリガー、メッセージバス Broker → Trigger → Source → Sink

Servingワークフロー:ユーザーリクエスト → Route → Revision(特定バージョン)→ Podオートスケーリング

Eventingワークフロー:イベントソース(Source)→ Broker(メッセージバス)→ Trigger(フィルタリングルール)→ Sink(消費サービス)

Go言語は、コンパイル型、高速起動、小さなメモリフットプリントの特性から、Knative Serverless関数の最良の選択肢の一つです。


問題の深掘り:なぜ従来のマイクロサービスでは不十分なのか?

従来のマイクロサービスアーキテクチャには、イベント駆動シナリオで3つの大きなペインポイントがあります:

  1. リソースの無駄:常駐プロセスが24時間稼働、低トラフィック時のCPU利用率は5%未満
  2. スケーリングの遅れ:HPAは遅延するメトリクスに基づいており、トラフィックスパイクに対応できない
  3. 複雑なイベント処理:メッセージキュー、リトライ、デッドレターキューを自前で統合する必要がある

Knativeのソリューション:

ペインポイント Knativeソリューション 効果
リソースの無駄 Scale-to-Zero、トラフィックなし時にPodを0にスケール コスト60-80%削減
スケーリングの遅れ 同時リクエスト数ベースのKPAオートスケーリング 秒単位でトラフィック変化に対応
複雑なイベント処理 組み込みBroker/Triggerモデル 宣言的イベントルーティング

ステップバイステップ:Knativeイベント駆動サービスのゼロからの構築

ステップ1:環境準備

# Knative Servingのインストール
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.17.0/serving-crds.yaml
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.17.0/serving-core.yaml

# Knative Eventingのインストール
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.17.0/eventing-crds.yaml
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.17.0/eventing-core.yaml

# MT-channel-based Brokerのインストール
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.17.0/mt-channel-broker.yaml

# インストールの確認
kubectl get pods -n knative-serving
kubectl get pods -n knative-eventing

ステップ2:Goイベント処理サービスの作成

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"time"

	cloudevents "github.com/cloudevents/sdk-go/v2"
)

type OrderEvent struct {
	OrderID     string  `json:"orderId"`
	UserID      string  `json:"userId"`
	Amount      float64 `json:"amount"`
	ProductSKU  string  `json:"productSku"`
	CreatedAt   string  `json:"createdAt"`
}

type InventoryResult struct {
	OrderID   string `json:"orderId"`
	Status    string `json:"status"`
	Message   string `json:"message"`
	Timestamp string `json:"timestamp"`
}

func handleCloudEvent(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, cloudevents.Result) {
	var order OrderEvent
	if err := event.DataAs(&order); err != nil {
		log.Printf("Failed to parse event data: %v", err)
		return nil, cloudevents.NewResult(http.StatusBadRequest, "failed to parse data: %s", err)
	}

	log.Printf("Processing order: %s, SKU: %s, Amount: %.2f", order.OrderID, order.ProductSKU, order.Amount)

	result := InventoryResult{
		OrderID:   order.OrderID,
		Status:    "deducted",
		Message:   fmt.Sprintf("Inventory deducted for SKU %s", order.ProductSKU),
		Timestamp: time.Now().UTC().Format(time.RFC3339),
	}

	respEvent := cloudevents.NewEvent()
	respEvent.SetSource("com.toolsku.inventory-service")
	respEvent.SetType("com.toolsku.inventory.result")
	respEvent.SetData(cloudevents.ApplicationJSON, result)

	return &respEvent, cloudevents.ResultACK
}

func healthHandler(w http.ResponseWriter, r *http.Request) {
	w.WriteHeader(http.StatusOK)
	fmt.Fprint(w, "healthy")
}

func main() {
	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
	}

	http.HandleFunc("/health", healthHandler)

	ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:"+port)
	ctx = cloudevents.WithEncodingStructured(ctx)

	p, err := cloudevents.NewHTTP(cloudevents.WithPort(parsePort(port)), cloudevents.WithPath("/"))
	if err != nil {
		log.Fatalf("Failed to create cloud events protocol: %v", err)
	}

	handler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, handleCloudEvent)
	if err != nil {
		log.Fatalf("Failed to create handler: %v", err)
	}

	mux := http.NewServeMux()
	mux.Handle("/", handler)
	mux.HandleFunc("/health", healthHandler)

	log.Printf("Inventory service starting on port %s", port)
	if err := http.ListenAndServe(":"+port, mux); err != nil {
		log.Fatal(err)
	}
}

func parsePort(port string) int {
	var p int
	fmt.Sscanf(port, "%d", &p)
	if p == 0 {
		p = 8080
	}
	return p
}

ステップ3:Dockerfileの作成

FROM golang:1.23-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /inventory-service .

FROM gcr.io/distroless/static:nonroot
COPY --from=builder /inventory-service /inventory-service
USER 65532:65532
ENTRYPOINT ["/inventory-service"]

ステップ4:Knative Serviceのデプロイ

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: inventory-service
  namespace: production
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/target: "10"
        autoscaling.knative.dev/min-scale: "0"
        autoscaling.knative.dev/max-scale: "50"
        autoscaling.knative.dev/scale-to-zero-pod-retention-period: "5m"
    spec:
      containerConcurrency: 5
      timeoutSeconds: 30
      containers:
        - image: registry.toolsku.com/inventory-service:v1.0.0
          ports:
            - containerPort: 8080
          env:
            - name: PORT
              value: "8080"
          resources:
            requests:
              cpu: "100m"
              memory: "128Mi"
            limits:
              cpu: "500m"
              memory: "256Mi"
          readinessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 1
            periodSeconds: 3
kubectl apply -f service.yaml
kubectl get ksvc inventory-service -n production

ステップ5:Eventingイベントルーティングの設定

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: order-broker
  namespace: production
---
apiVersion: sources.knative.dev/v1
kind: ApiServerSource
metadata:
  name: order-events-source
  namespace: production
spec:
  mode: Resource
  resources:
    - apiVersion: apps.toolsku.com/v1
      kind: Order
  serviceAccountName: event-watcher
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: order-broker
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: inventory-trigger
  namespace: production
spec:
  broker: order-broker
  filter:
    attributes:
      type: com.toolsku.order.created
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: inventory-service
kubectl apply -f eventing.yaml
kubectl get broker,trigger -n production

完全コード:注文処理イベントチェーン

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	cloudevents "github.com/cloudevents/sdk-go/v2"
)

type Order struct {
	OrderID    string  `json:"orderId"`
	UserID     string  `json:"userId"`
	Items      []Item  `json:"items"`
	Total      float64 `json:"total"`
	Status     string  `json:"status"`
	CreatedAt  string  `json:"createdAt"`
}

type Item struct {
	SKU      string  `json:"sku"`
	Name     string  `json:"name"`
	Quantity int     `json:"quantity"`
	Price    float64 `json:"price"`
}

type PaymentRequest struct {
	OrderID string  `json:"orderId"`
	Amount  float64 `json:"amount"`
	Method  string  `json:"method"`
}

type Notification struct {
	UserID  string `json:"userId"`
	Channel string `json:"channel"`
	Title   string `json:"title"`
	Body    string `json:"body"`
}

var (
	inventoryDB = sync.Map{}
	paymentDB   = sync.Map{}
)

func init() {
	inventoryDB.Store("SKU-001", 100)
	inventoryDB.Store("SKU-002", 50)
	inventoryDB.Store("SKU-003", 200)
}

func handleOrderCreated(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, cloudevents.Result) {
	var order Order
	if err := event.DataAs(&order); err != nil {
		return nil, cloudevents.NewResult(http.StatusBadRequest, "parse error: %s", err)
	}

	allAvailable := true
	for _, item := range order.Items {
		stock, ok := inventoryDB.Load(item.SKU)
		if !ok || stock.(int) < item.Quantity {
			allAvailable = false
			break
		}
	}

	if !allAvailable {
		failEvent := cloudevents.NewEvent()
		failEvent.SetSource("com.toolsku.inventory")
		failEvent.SetType("com.toolsku.inventory.insufficient")
		failEvent.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
			"orderId": order.OrderID,
			"reason":  "insufficient stock",
		})
		return &failEvent, cloudevents.ResultACK
	}

	for _, item := range order.Items {
		stock, _ := inventoryDB.Load(item.SKU)
		inventoryDB.Store(item.SKU, stock.(int)-item.Quantity)
	}

	paymentReq := PaymentRequest{
		OrderID: order.OrderID,
		Amount:  order.Total,
		Method:  "credit_card",
	}

	paymentEvent := cloudevents.NewEvent()
	paymentEvent.SetSource("com.toolsku.inventory")
	paymentEvent.SetType("com.toolsku.payment.request")
	paymentEvent.SetData(cloudevents.ApplicationJSON, paymentReq)

	log.Printf("Order %s: inventory deducted, payment requested", order.OrderID)
	return &paymentEvent, cloudevents.ResultACK
}

func handlePaymentResult(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, cloudevents.Result) {
	var result map[string]interface{}
	if err := event.DataAs(&result); err != nil {
		return nil, cloudevents.NewResult(http.StatusBadRequest, "parse error: %s", err)
	}

	orderID, _ := result["orderId"].(string)
	status, _ := result["status"].(string)
	userID, _ := result["userId"].(string)

	notification := Notification{
		UserID:  userID,
		Channel: "email",
		Title:   fmt.Sprintf("Order %s - Payment %s", orderID, status),
		Body:    fmt.Sprintf("Your order %s payment is %s", orderID, status),
	}

	notifyEvent := cloudevents.NewEvent()
	notifyEvent.SetSource("com.toolsku.notification")
	notifyEvent.SetType("com.toolsku.notification.send")
	notifyEvent.SetData(cloudevents.ApplicationJSON, notification)

	log.Printf("Payment %s for order %s, notification queued", status, orderID)
	return &notifyEvent, cloudevents.ResultACK
}

func healthHandler(w http.ResponseWriter, r *http.Request) {
	w.WriteHeader(http.StatusOK)
	fmt.Fprint(w, `{"status":"healthy"}`)
}

func main() {
	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
	}

	mux := http.NewServeMux()
	mux.HandleFunc("/health", healthHandler)

	log.Printf("Order processing service starting on :%s", port)
	server := &http.Server{
		Addr:         ":" + port,
		Handler:      mux,
		ReadTimeout:  10 * time.Second,
		WriteTimeout: 30 * time.Second,
		IdleTimeout:  60 * time.Second,
	}
	log.Fatal(server.ListenAndServe())
}

よくある落とし穴ガイド

落とし穴1:コールドスタートタイムアウトによるリクエスト失敗

Knative Scale-to-Zero後の最初のリクエストはコールドスタートが必要です。イメージが大きすぎるか起動が遅いと、タイムアウトしやすくなります。

解決策

  • distrolessベースイメージを使用し、イメージサイズを50MB以下に抑える
  • scale-to-zero-pod-retention-periodを設定してウォームPodを保持
  • progress-deadlineを60秒以上に設定
  • 重要なサービスにはmin-scale: "1"でウォームインスタンスを維持

落とし穴2:KPA同時接続メトリクスの不一致

Knativeのデフォルト同時接続ターゲットは100ですが、Goサービスの処理速度には大きな差があります。

解決策

annotations:
  autoscaling.knative.dev/target: "10"
  autoscaling.knative.dev/target-burst-capacity: "5"
  autoscaling.knative.dev/panic-window-percentage: "10.0"
  autoscaling.knative.dev/panic-threshold-percentage: "200.0"

落とし穴3:CloudEventsフォーマットの非互換性

異なるイベントソースが送信するCloudEventsはStructuredまたはBinaryエンコーディングを使用する場合があります。

解決策

  • Structuredエンコーディングに統一
  • Source側でContent-Type: application/cloudevents+jsonを設定
  • cloudevents/sdk-goの自動デコード機能を使用

落とし穴4:Eventingメッセージの損失

BrokerはデフォルトでインメモリChannelを使用し、Pod再起動時にメッセージが失われます。

解決策

  • 本番環境ではKafka Channelを使用
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
metadata:
  name: order-channel
  namespace: production
spec:
  numPartitions: 3
  replicationFactor: 3

落とし穴5:Revision蓄積によるリソースリーク

Serviceの更新のたびに新しいRevisionが作成され、古いRevisionがクリーンアップされないとConfigMapとDeploymentリソースを占有します。

解決策

spec:
  template:
    metadata:
      annotations:
        serving.knative.dev/revision-ulimits: "3"
  • 定期的にkubectl delete revisions --field-selector=status.conditions[0].status=Falseを実行
  • revision-gc.max-stale-revisions: "3"を設定

エラートラブルシューティング

# エラーメッセージ 原因 解決方法
1 Revision failed: Container image pull error イメージアドレスの誤りまたはプル権限なし イメージアドレスを確認、imagePullSecretsを作成
2 Revision failed: Container probe failed ヘルスチェックパスの誤りまたは起動が遅い readinessProbeのinitialDelaySecondsを調整
3 Route not ready: Revision is not ready Revisionのデプロイ失敗 kubectl describe revision <name>でイベントを確認
4 Autoscaler internal error KPAが同時接続メトリクスを取得できない activatorとautoscaler Podのステータスを確認
5 Broker not ready: Channel not provisioned Channel CRDがインストールされていない 対応するChannel実装をインストール
6 Trigger delivery failed: no subscriber Sink Serviceが存在しないか準備未完了 ksvcがデプロイ済みでstatusがReadyであることを確認
7 Cold start timeout: progress deadline exceeded イメージが大きすぎるか起動が遅い イメージサイズを最適化、progress-deadlineを増加
8 Event dropped: no broker ingress Broker ingressが準備未完了 Broker statusとingress Podを確認
9 Permission denied: serviceaccount SAにRBAC権限がない SAに対応するClusterRoleバインディングを追加
10 OOMKilled: container limit exceeded メモリ制限が小さすぎる resources.limits.memoryを増加

高度な最適化

1. コールドスタート最適化:プリロードとスナップショット

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: inventory-service
  annotations:
    serving.knative.dev/creator: "admin"
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/min-scale: "0"
        autoscaling.knative.dev/scale-to-zero-pod-retention-period: "10m"
        autoscaling.knative.dev/target: "10"
    spec:
      containers:
        - image: registry.toolsku.com/inventory-service:v1.0.0
          env:
            - name: GOMAXPROCS
              value: "2"
          resources:
            requests:
              cpu: "50m"
              memory: "64Mi"

2. イベントリトライとデッドレターキュー

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: inventory-trigger
spec:
  broker: order-broker
  filter:
    attributes:
      type: com.toolsku.order.created
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: inventory-service
  delivery:
    retry: 5
    backoffPolicy: exponential
    backoffDelay: "1s"
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dead-letter-handler

3. カナリアデプロイメントとトラフィック分割

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: inventory-service
spec:
  traffic:
    - percent: 90
      revisionName: inventory-service-v1
      tag: stable
    - percent: 10
      revisionName: inventory-service-v2
      tag: canary

4. モニタリング統合

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: knative-serving-metrics
spec:
  selector:
    matchLabels:
      app.kubernetes.io/part-of: knative-serving
  endpoints:
    - port: http-metrics
      interval: 15s

比較分析

次元 Knative AWS Lambda OpenFaaS KEDA
実行環境 自社K8sクラスタ AWSマネージド 自社K8sクラスタ 自社K8sクラスタ
言語サポート 任意 任意 任意 任意
イベントモデル Broker/Trigger EventBridge NATS Scaler
コールドスタート 1-5s 100ms-1s 2-8s なし(スケーリングのみ)
Scale-to-Zero サポート サポート サポート サポート
トラフィックカナリア ネイティブサポート Aliasが必要 未サポート 未サポート
ベンダーロックイン なし AWS なし なし
運用複雑度
コストモデル K8sリソース単位 呼び出し回数単位 K8sリソース単位 K8sリソース単位
適用シナリオ エンタープライズK8sエコシステム AWSフルスタック 軽量Serverless イベント駆動スケーリング

まとめ:Knative + Go は、Kubernetesネイティブ環境に最も柔軟なServerlessソリューションを提供します。Servingでリクエスト駆動のオートスケーリングとカナリアデプロイを実現し、Eventingで宣言的なイベントルーティングと信頼性の高い配信を実現します。2026年のKnativeは十分に成熟しており、鍵となるのはオートスケーリングパラメータの適切な設定、適切なChannel実装の選択、そしてコールドスタートの最適化です。イベント駆動の軽量サービスから始め、徐々に完全なイベントチェーンに拡張していくのが、Knative導入の最良のアプローチです。


オンラインツールおすすめ

  • JSONフォーマッター:/ja/json/format — CloudEventsとAPIレスポンスの処理に必須のツール
  • Base64エンコード/デコード:/ja/encode/base64 — Kubernetes Secretとイベントデータのエンコード/デコード
  • Curl to Code:/ja/dev/curl-to-code — curlコマンドをGo HTTPクライアントコードに素早く変換

ブラウザローカルツールを無料で試す →

#Go#Serverless#Knative#Kubernetes#事件驱动#自动伸缩#云原生#冷启动