WebAuthn FIDO2实现实战:从注册到无密码认证的6种生产模式
安全
密码已死,Passkey当立
2026年,密码泄露事件仍在以每月数亿条的速度增长。钓鱼攻击绕过传统MFA,SMS OTP被SIM Swap截获,TOTP令牌用户因"MFA疲劳攻击"而盲目点击允许。Passkey(通行密钥) 基于FIDO2/WebAuthn标准,用公钥密码学+设备生物识别彻底消灭密码——Apple、Google、Microsoft三大生态全面互通,浏览器覆盖率超过96%。
本文将给出6种生产级模式:从Passkey注册到认证流程,凭据管理到多设备支持,回退策略到完整Python+JavaScript生产服务,每段代码可直接运行。
WebAuthn/FIDO2核心概念
| 概念 | 说明 |
|---|---|
| WebAuthn | W3C标准,浏览器提供的navigator.credentials API,定义了注册和认证的JavaScript接口 |
| FIDO2 | FIDO Alliance标准,包含CTAP2协议,规范认证器与服务端的通信 |
| Passkey | 可跨设备同步的FIDO2凭据,存储在iCloud Keychain/Google Password Manager中 |
| Authenticator | 认证器,分平台认证器(TouchID/FaceID/Windows Hello)和漫游认证器(YubiKey) |
| Relying Party (RP) | 依赖方,即你的网站/服务端,通过rpID标识 |
| Credential | 凭据,公钥密码学中的密钥对,私钥永远不离开认证器安全芯片 |
| Attestation | 证明,注册时认证器向RP证明其安全性和合法性(可选用none) |
| Assertion | 断言,认证时认证器用私钥签名challenge后返回的签名证明 |
问题分析:WebAuthn生产落地的5大挑战
- 注册流程复杂:服务端需生成正确的
PublicKeyCredentialCreationOptions,COSE算法标识、authenticatorSelection参数配置极易出错 - 认证流程安全:challenge必须真随机、单次使用,签名验证需处理CBOR/COSE编码,遗漏任何校验即产生安全漏洞
- 凭据生命周期管理:用户可能注册多个Passkey,需支持列表展示、删除、重命名,丢失所有设备时需恢复机制
- 跨平台与多设备:iCloud Passkey与Google Passkey不互通,企业场景需支持安全密钥(YubiKey),条件UI(Conditional UI)集成复杂
- 降级与回退:老旧设备不支持WebAuthn,必须保留密码回退,混合认证流程的用户体验设计极具挑战
6种生产模式实战
Pattern 1:Passkey注册流程
服务端(Python + py_webauthn):
from py_webauthn import WebAuthnRP, WebAuthnUser, WebAuthnCredential
from py_webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
UserVerificationRequirement,
AuthenticatorAttachment,
ResidentKeyRequirement,
PublicKeyCredentialDescriptor,
AttestationConveyancePreference,
)
import secrets
import base64
class PasskeyRegistrationService:
def __init__(self, rp_id: str, rp_name: str, origin: str):
self.rp_id = rp_id
self.rp_name = rp_name
self.origin = origin
def generate_registration_options(
self,
user_id: str,
username: str,
display_name: str,
exclude_credentials: list[dict] | None = None,
) -> dict:
challenge = secrets.token_bytes(32)
challenge_b64 = base64.urlsafe_b64encode(challenge).rstrip(b"=").decode()
exclude_list = []
if exclude_credentials:
for cred in exclude_credentials:
exclude_list.append(
PublicKeyCredentialDescriptor(
id=base64.urlsafe_b64decode(cred["id"] + "=="),
type="public-key",
)
)
options = {
"challenge": challenge_b64,
"rp": {"id": self.rp_id, "name": self.rp_name},
"user": {
"id": base64.urlsafe_b64encode(user_id.encode()).rstrip(b"=").decode(),
"name": username,
"displayName": display_name,
},
"pubKeyCredParams": [
{"type": "public-key", "alg": -7},
{"type": "public-key", "alg": -257},
],
"timeout": 60000,
"excludeCredentials": [
{
"type": ec.type,
"id": base64.urlsafe_b64encode(ec.id).rstrip(b"=").decode(),
}
for ec in exclude_list
],
"authenticatorSelection": {
"authenticatorAttachment": "platform",
"requireResidentKey": True,
"residentKey": "required",
"userVerification": "required",
},
"attestation": "none",
}
return options, challenge_b64
def verify_registration(
self,
challenge_b64: str,
credential_response: dict,
expected_origin: str,
) -> dict:
from py_webauthn import verify_registration_response
verification = verify_registration_response(
credential=credential_response,
expected_challenge=base64.urlsafe_b64encode(
base64.urlsafe_b64decode(challenge_b64 + "==")
).rstrip(b"=").decode(),
expected_origin=expected_origin,
expected_rp_id=self.rp_id,
)
return {
"credential_id": base64.urlsafe_b64encode(
verification.credential_id
).rstrip(b"=").decode(),
"public_key": base64.urlsafe_b64encode(
verification.credential_public_key
).rstrip(b"=").decode(),
"sign_count": verification.sign_count,
}
客户端(JavaScript):
async function registerPasskey(userId, username, displayName) {
const excludeCredentials = await getExistingCredentials(userId);
const optionsResponse = await fetch("/api/webauthn/registration/options", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
userId,
username,
displayName,
excludeCredentials,
}),
});
const { options, challenge } = await optionsResponse.json();
const publicKeyCredentialCreationOptions = {
...options,
challenge: base64urlDecode(options.challenge),
user: {
...options.user,
id: base64urlDecode(options.user.id),
},
excludeCredentials: options.excludeCredentials.map((ec) => ({
...ec,
id: base64urlDecode(ec.id),
})),
};
try {
const credential = await navigator.credentials.create({
publicKey: publicKeyCredentialCreationOptions,
});
const verificationPayload = {
id: credential.id,
rawId: base64urlEncode(credential.rawId),
type: credential.type,
response: {
attestationObject: base64urlEncode(
credential.response.attestationObject
),
clientDataJSON: base64urlEncode(credential.response.clientDataJSON),
},
};
const verifyResponse = await fetch("/api/webauthn/registration/verify", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
challenge,
credential: verificationPayload,
}),
});
const result = await verifyResponse.json();
if (result.success) {
showToast("Passkey注册成功!");
}
} catch (error) {
if (error.name === "InvalidStateError") {
showToast("该设备已注册Passkey,请勿重复注册");
} else if (error.name === "NotAllowedError") {
showToast("用户取消了注册操作");
} else {
showToast(`注册失败:${error.message}`);
}
}
}
function base64urlDecode(str) {
const padding = "=".repeat((4 - (str.length % 4)) % 4);
const base64 = str.replace(/-/g, "+").replace(/_/g, "/") + padding;
const binaryStr = atob(base64);
return Uint8Array.from(binaryStr, (c) => c.charCodeAt(0));
}
function base64urlEncode(buffer) {
const bytes = new Uint8Array(buffer);
let binary = "";
bytes.forEach((b) => (binary += String.fromCharCode(b)));
return btoa(binary).replace(/\+/g, "-").replace(/\//g, "_").replace(/=/g, "");
}
Pattern 2:认证流程(WebAuthn API)
服务端:
class PasskeyAuthService:
def __init__(self, rp_id: str, origin: str):
self.rp_id = rp_id
self.origin = origin
def generate_authentication_options(
self,
allow_credentials: list[dict] | None = None,
) -> dict:
challenge = secrets.token_bytes(32)
challenge_b64 = base64.urlsafe_b64encode(challenge).rstrip(b"=").decode()
allow_list = []
if allow_credentials:
for cred in allow_credentials:
allow_list.append(
PublicKeyCredentialDescriptor(
id=base64.urlsafe_b64decode(cred["id"] + "=="),
type="public-key",
)
)
options = {
"challenge": challenge_b64,
"rpId": self.rp_id,
"timeout": 60000,
"allowCredentials": [
{
"type": ac.type,
"id": base64.urlsafe_b64encode(ac.id).rstrip(b"=").decode(),
}
for ac in allow_list
],
"userVerification": "required",
}
return options, challenge_b64
def verify_authentication(
self,
challenge_b64: str,
credential_response: dict,
stored_credential: dict,
) -> bool:
from py_webauthn import verify_authentication_response
verification = verify_authentication_response(
credential=credential_response,
expected_challenge=base64.urlsafe_b64encode(
base64.urlsafe_b64decode(challenge_b64 + "==")
).rstrip(b"=").decode(),
expected_origin=self.origin,
expected_rp_id=self.rp_id,
credential_public_key=base64.urlsafe_b64decode(
stored_credential["public_key"] + "=="
),
credential_current_sign_count=stored_credential["sign_count"],
)
return verification.new_sign_count
客户端:
async function authenticateWithPasskey() {
const optionsResponse = await fetch("/api/webauthn/authentication/options", {
method: "POST",
headers: { "Content-Type": "application/json" },
});
const { options, challenge } = await optionsResponse.json();
const publicKeyCredentialRequestOptions = {
...options,
challenge: base64urlDecode(options.challenge),
allowCredentials: options.allowCredentials?.map((ac) => ({
...ac,
id: base64urlDecode(ac.id),
})),
};
try {
const assertion = await navigator.credentials.get({
publicKey: publicKeyCredentialRequestOptions,
});
const verificationPayload = {
id: assertion.id,
rawId: base64urlEncode(assertion.rawId),
type: assertion.type,
response: {
authenticatorData: base64urlEncode(
assertion.response.authenticatorData
),
clientDataJSON: base64urlEncode(assertion.response.clientDataJSON),
signature: base64urlEncode(assertion.response.signature),
userHandle: assertion.response.userHandle
? base64urlEncode(assertion.response.userHandle)
: null,
},
};
const verifyResponse = await fetch(
"/api/webauthn/authentication/verify",
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
challenge,
credential: verificationPayload,
}),
}
);
const result = await verifyResponse.json();
if (result.success) {
window.location.href = "/dashboard";
}
} catch (error) {
if (error.name === "NotAllowedError") {
showToast("认证已取消");
} else {
showToast(`认证失败:${error.message}`);
}
}
}
条件UI(Autofill)模式:
async function setupConditionalUI() {
if (!window.PublicKeyCredential) return;
const available = await PublicKeyCredential.isUserVerifyingPlatformAuthenticatorAvailable();
if (!available) return;
const optionsResponse = await fetch(
"/api/webauthn/authentication/options",
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ conditionalUI: true }),
}
);
const { options, challenge } = await optionsResponse.json();
const publicKeyCredentialRequestOptions = {
...options,
challenge: base64urlDecode(options.challenge),
mediation: "conditional",
};
try {
const assertion = await navigator.credentials.get({
publicKey: publicKeyCredentialRequestOptions,
});
// 处理认证结果,同上
} catch (error) {
// 条件UI模式下用户未选择Passkey是正常行为,静默处理
}
}
// 页面加载时自动设置
document.addEventListener("DOMContentLoaded", () => {
const usernameInput = document.getElementById("username");
if (usernameInput) {
usernameInput.setAttribute("autocomplete", "username webauthn");
setupConditionalUI();
}
});
Pattern 3:凭据管理(列表、删除、更新)
服务端:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PasskeyCredential:
credential_id: str
user_id: str
public_key: str
sign_count: int
name: str
device_type: str
created_at: datetime
last_used_at: datetime | None
transports: list[str]
class CredentialManagementService:
def __init__(self, db_session):
self.db = db_session
def list_credentials(self, user_id: str) -> list[dict]:
credentials = self.db.query(PasskeyCredential).filter(
PasskeyCredential.user_id == user_id
).order_by(PasskeyCredential.created_at.desc()).all()
return [
{
"id": cred.credential_id,
"name": cred.name or f"Passkey #{idx + 1}",
"deviceType": cred.device_type,
"createdAt": cred.created_at.isoformat(),
"lastUsedAt": cred.last_used_at.isoformat() if cred.last_used_at else None,
"transports": cred.transports,
}
for idx, cred in enumerate(credentials)
]
def delete_credential(self, user_id: str, credential_id: str) -> bool:
cred = self.db.query(PasskeyCredential).filter(
PasskeyCredential.user_id == user_id,
PasskeyCredential.credential_id == credential_id,
).first()
if not cred:
return False
remaining_count = self.db.query(PasskeyCredential).filter(
PasskeyCredential.user_id == user_id
).count()
if remaining_count <= 1:
raise ValueError("不能删除最后一个Passkey,请先注册备用凭据")
self.db.delete(cred)
self.db.commit()
return True
def rename_credential(self, user_id: str, credential_id: str, new_name: str) -> bool:
cred = self.db.query(PasskeyCredential).filter(
PasskeyCredential.user_id == user_id,
PasskeyCredential.credential_id == credential_id,
).first()
if not cred:
return False
cred.name = new_name[:64]
self.db.commit()
return True
def update_sign_count(self, credential_id: str, new_sign_count: int) -> None:
cred = self.db.query(PasskeyCredential).filter(
PasskeyCredential.credential_id == credential_id
).first()
if cred:
cred.sign_count = new_sign_count
cred.last_used_at = datetime.utcnow()
self.db.commit()
客户端:
class PasskeyManager {
constructor(userId) {
this.userId = userId;
}
async listCredentials() {
const response = await fetch(`/api/webauthn/credentials?userId=${this.userId}`);
return response.json();
}
async deleteCredential(credentialId) {
const confirmed = confirm("确定要删除此Passkey吗?删除后无法恢复。");
if (!confirmed) return false;
const response = await fetch("/api/webauthn/credentials/delete", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
userId: this.userId,
credentialId,
}),
});
const result = await response.json();
if (result.success) {
this.renderCredentialList();
}
return result.success;
}
async renameCredential(credentialId, newName) {
const response = await fetch("/api/webauthn/credentials/rename", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
userId: this.userId,
credentialId,
newName,
}),
});
return (await response.json()).success;
}
async renderCredentialList() {
const credentials = await this.listCredentials();
const container = document.getElementById("passkey-list");
container.innerHTML = credentials
.map(
(cred) => `
<div class="passkey-item" data-id="${cred.id}">
<div class="passkey-info">
<span class="passkey-name">${cred.name}</span>
<span class="passkey-device">${cred.deviceType}</span>
<span class="passkey-date">
创建于 ${new Date(cred.createdAt).toLocaleDateString()}
${cred.lastUsedAt ? `· 上次使用 ${new Date(cred.lastUsedAt).toLocaleDateString()}` : ""}
</span>
</div>
<div class="passkey-actions">
<button onclick="passkeyManager.renameCredential('${cred.id}', prompt('新名称:', '${cred.name}'))">
重命名
</button>
<button onclick="passkeyManager.deleteCredential('${cred.id}')">
删除
</button>
</div>
</div>
`
)
.join("");
}
}
Pattern 4:多设备与跨平台支持
服务端:
class MultiDevicePasskeyService:
SUPPORTED_ALGORITHMS = [-7, -257, -37]
def generate_inclusive_registration_options(
self,
user_id: str,
username: str,
display_name: str,
existing_credentials: list[dict],
) -> dict:
challenge = secrets.token_bytes(32)
challenge_b64 = base64.urlsafe_b64encode(challenge).rstrip(b"=").decode()
exclude_list = [
{
"type": "public-key",
"id": cred["credential_id"],
}
for cred in existing_credentials
]
options = {
"challenge": challenge_b64,
"rp": {"id": self.rp_id, "name": self.rp_name},
"user": {
"id": base64.urlsafe_b64encode(user_id.encode()).rstrip(b"=").decode(),
"name": username,
"displayName": display_name,
},
"pubKeyCredParams": [
{"type": "public-key", "alg": alg}
for alg in self.SUPPORTED_ALGORITHMS
],
"timeout": 120000,
"excludeCredentials": exclude_list,
"authenticatorSelection": {
"authenticatorAttachment": "platform",
"requireResidentKey": True,
"residentKey": "required",
"userVerification": "preferred",
},
"attestation": "none",
}
return options, challenge_b64
def generate_security_key_options(
self,
user_id: str,
username: str,
display_name: str,
) -> dict:
challenge = secrets.token_bytes(32)
challenge_b64 = base64.urlsafe_b64encode(challenge).rstrip(b"=").decode()
options = {
"challenge": challenge_b64,
"rp": {"id": self.rp_id, "name": self.rp_name},
"user": {
"id": base64.urlsafe_b64encode(user_id.encode()).rstrip(b"=").decode(),
"name": username,
"displayName": display_name,
},
"pubKeyCredParams": [
{"type": "public-key", "alg": -7},
{"type": "public-key", "alg": -257},
],
"timeout": 120000,
"authenticatorSelection": {
"authenticatorAttachment": "cross-platform",
"requireResidentKey": False,
"residentKey": "discouraged",
"userVerification": "preferred",
},
"attestation": "direct",
}
return options, challenge_b64
客户端:
async function registerPlatformPasskey(userId, username, displayName) {
const existingCreds = await getExistingCredentials(userId);
const { options, challenge } = await fetchRegistrationOptions({
userId,
username,
displayName,
existingCredentials: existingCreds,
type: "platform",
});
return executeRegistration(options, challenge);
}
async function registerSecurityKey(userId, username, displayName) {
const { options, challenge } = await fetchRegistrationOptions({
userId,
username,
displayName,
type: "cross-platform",
});
return executeRegistration(options, challenge);
}
async function detectPasskeySupport() {
if (!window.PublicKeyCredential) {
return {
supported: false,
platformAuthenticator: false,
conditionalUI: false,
};
}
const platformAuth = await PublicKeyCredential.isUserVerifyingPlatformAuthenticatorAvailable();
let conditionalUI = false;
if (platformAuth && PublicKeyCredential.isConditionalMediationAvailable) {
conditionalUI = await PublicKeyCredential.isConditionalMediationAvailable();
}
return {
supported: true,
platformAuthenticator: platformAuth,
conditionalUI,
};
}
async function renderRegistrationUI(userId, username, displayName) {
const support = await detectPasskeySupport();
const container = document.getElementById("passkey-registration");
if (support.platformAuthenticator) {
container.innerHTML += `
<button onclick="registerPlatformPasskey('${userId}', '${username}', '${displayName}')">
注册Passkey(手机/电脑生物识别)
</button>
`;
}
container.innerHTML += `
<button onclick="registerSecurityKey('${userId}', '${username}', '${displayName}')">
注册安全密钥(YubiKey等)
</button>
`;
if (!support.supported) {
container.innerHTML = `
<p>您的浏览器不支持WebAuthn,请使用密码登录。</p>
`;
}
}
Pattern 5:回退策略(密码+Passkey混合)
服务端:
class HybridAuthService:
def __init__(self, db_session, rp_id: str, origin: str):
self.db = db_session
self.rp_id = rp_id
self.origin = origin
def get_login_options(self, user_id: str | None = None) -> dict:
if user_id:
credentials = self._get_user_credentials(user_id)
has_passkey = len(credentials) > 0
if has_passkey:
auth_options, challenge = PasskeyAuthService(
self.rp_id, self.origin
).generate_authentication_options(
allow_credentials=[
{"id": c.credential_id} for c in credentials
]
)
return {
"mode": "passkey_preferred",
"passkeyOptions": auth_options,
"challenge": challenge,
"passwordFallback": True,
}
return {
"mode": "password_only",
"passkeyOptions": None,
"challenge": None,
"passwordFallback": True,
}
def promote_to_passkey(self, user_id: str) -> dict:
user = self._get_user(user_id)
existing_creds = self._get_user_credentials(user_id)
reg_options, challenge = PasskeyRegistrationService(
self.rp_id, "MyApp", self.origin
).generate_registration_options(
user_id=user_id,
username=user.username,
display_name=user.display_name,
exclude_credentials=[
{"id": c.credential_id} for c in existing_creds
],
)
return {
"registrationOptions": reg_options,
"challenge": challenge,
"message": "升级您的账户:注册Passkey以获得更安全的无密码登录体验",
}
客户端:
class HybridLoginFlow {
constructor() {
this.step = "initial";
}
async init() {
const support = await detectPasskeySupport();
if (support.conditionalUI) {
this.setupAutofillPasskey();
}
this.renderLoginForm(support);
}
renderLoginForm(support) {
const form = document.getElementById("login-form");
form.innerHTML = `
<input type="text" id="username" name="username"
autocomplete="username webauthn"
placeholder="用户名" required />
<div id="passkey-section" style="display:none">
<button type="button" id="passkey-login-btn">
使用Passkey登录
</button>
<p class="divider-text">或使用密码</p>
</div>
<div id="password-section">
<input type="password" id="password" name="password"
autocomplete="current-password"
placeholder="密码" />
<button type="submit">登录</button>
</div>
`;
document.getElementById("username").addEventListener("blur", async () => {
const username = document.getElementById("username").value;
if (!username) return;
const check = await fetch("/api/auth/check", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ username }),
});
const { hasPasskey } = await check.json();
if (hasPasskey && support.platformAuthenticator) {
document.getElementById("passkey-section").style.display = "block";
}
});
document.getElementById("passkey-login-btn").addEventListener(
"click",
() => this.authenticateWithPasskey()
);
form.addEventListener("submit", (e) => {
e.preventDefault();
this.authenticateWithPassword();
});
}
async setupAutofillPasskey() {
const { options, challenge } = await fetch(
"/api/webauthn/authentication/options",
{
method: "POST",
headers: { "Content-Type": "application/json" },
}
).then((r) => r.json());
try {
const assertion = await navigator.credentials.get({
publicKey: {
...options,
challenge: base64urlDecode(options.challenge),
mediation: "conditional",
},
});
await this.verifyAssertion(assertion, challenge);
} catch {
// 用户未选择Passkey,静默
}
}
async authenticateWithPassword() {
const username = document.getElementById("username").value;
const password = document.getElementById("password").value;
const response = await fetch("/api/auth/login", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ username, password }),
});
const result = await response.json();
if (result.success) {
if (result.suggestPasskeyUpgrade) {
this.showPasskeyUpgradePrompt(result.userId);
} else {
window.location.href = "/dashboard";
}
}
}
showPasskeyUpgradePrompt(userId) {
const upgrade = confirm(
"检测到您的设备支持Passkey,是否现在注册以获得更安全的无密码登录?"
);
if (upgrade) {
registerPlatformPasskey(userId);
} else {
window.location.href = "/dashboard";
}
}
}
Pattern 6:完整生产认证服务(Python + JavaScript)
服务端(FastAPI + py_webauthn):
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import secrets
import base64
from typing import Optional
app = FastAPI(title="WebAuthn Production Auth Service")
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourapp.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
challenge_store: dict[str, dict] = {}
session_store: dict[str, dict] = {}
class RegistrationOptionsRequest(BaseModel):
userId: str
username: str
displayName: str
excludeCredentials: Optional[list[dict]] = None
class RegistrationVerifyRequest(BaseModel):
challenge: str
credential: dict
class AuthenticationOptionsRequest(BaseModel):
userId: Optional[str] = None
conditionalUI: Optional[bool] = False
class AuthenticationVerifyRequest(BaseModel):
challenge: str
credential: dict
RP_ID = "yourapp.com"
RP_NAME = "MyApp"
ORIGIN = "https://yourapp.com"
registration_service = PasskeyRegistrationService(RP_ID, RP_NAME, ORIGIN)
auth_service = PasskeyAuthService(RP_ID, ORIGIN)
@app.post("/api/webauthn/registration/options")
async def create_registration_options(req: RegistrationOptionsRequest):
options, challenge = registration_service.generate_registration_options(
user_id=req.userId,
username=req.username,
display_name=req.displayName,
exclude_credentials=req.excludeCredentials,
)
challenge_store[challenge] = {
"userId": req.userId,
"type": "registration",
"expiresAt": secrets.token_hex(8),
}
return {"options": options, "challenge": challenge}
@app.post("/api/webauthn/registration/verify")
async def verify_registration_endpoint(req: RegistrationVerifyRequest):
stored = challenge_store.get(req.challenge)
if not stored:
raise HTTPException(status_code=400, detail="无效或已过期的challenge")
del challenge_store[req.challenge]
try:
result = registration_service.verify_registration(
challenge_b64=req.challenge,
credential_response=req.credential,
expected_origin=ORIGIN,
)
# 存储credential到数据库
# save_credential(stored["userId"], result)
return {"success": True, "credentialId": result["credential_id"]}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/api/webauthn/authentication/options")
async def create_authentication_options(req: AuthenticationOptionsRequest):
allow_credentials = None
if req.userId:
# 从数据库获取用户凭据
# allow_credentials = get_user_credentials(req.userId)
pass
options, challenge = auth_service.generate_authentication_options(
allow_credentials=allow_credentials,
)
challenge_store[challenge] = {
"userId": req.userId,
"type": "authentication",
}
return {"options": options, "challenge": challenge}
@app.post("/api/webauthn/authentication/verify")
async def verify_authentication_endpoint(req: AuthenticationVerifyRequest):
stored = challenge_store.get(req.challenge)
if not stored:
raise HTTPException(status_code=400, detail="无效或已过期的challenge")
del challenge_store[req.challenge]
# 从数据库获取凭据
# stored_credential = get_credential(req.credential.id)
try:
new_sign_count = auth_service.verify_authentication(
challenge_b64=req.challenge,
credential_response=req.credential,
stored_credential={}, # 替换为实际凭据
)
session_token = secrets.token_urlsafe(32)
session_store[session_token] = {
"userId": stored["userId"],
"authenticatedAt": __import__("datetime").datetime.utcnow().isoformat(),
}
return {
"success": True,
"sessionToken": session_token,
}
except Exception as e:
raise HTTPException(status_code=401, detail=str(e))
@app.get("/api/webauthn/credentials")
async def list_credentials(userId: str):
# 从数据库获取
return {"credentials": []}
@app.post("/api/webauthn/credentials/delete")
async def delete_credential(request: Request):
data = await request.json()
return {"success": True}
@app.post("/api/webauthn/credentials/rename")
async def rename_credential(request: Request):
data = await request.json()
return {"success": True}
客户端完整集成:
class WebAuthnClient {
constructor(baseUrl = "") {
this.baseUrl = baseUrl;
}
async isSupported() {
if (!window.PublicKeyCredential) return false;
return PublicKeyCredential.isUserVerifyingPlatformAuthenticatorAvailable();
}
async register(userId, username, displayName) {
const existingCreds = await this._getExistingCredentials(userId);
const { options, challenge } = await this._fetch("/api/webauthn/registration/options", {
userId,
username,
displayName,
excludeCredentials: existingCreds,
});
try {
const credential = await navigator.credentials.create({
publicKey: this._decodeRegistrationOptions(options),
});
const result = await this._fetch("/api/webauthn/registration/verify", {
challenge,
credential: this._encodeCredential(credential),
});
return { success: true, credentialId: result.credentialId };
} catch (error) {
return { success: false, error: error.name, message: error.message };
}
}
async authenticate(userId = null) {
const { options, challenge } = await this._fetch(
"/api/webauthn/authentication/options",
{ userId, conditionalUI: false }
);
try {
const assertion = await navigator.credentials.get({
publicKey: this._decodeAuthenticationOptions(options),
});
const result = await this._fetch("/api/webauthn/authentication/verify", {
challenge,
credential: this._encodeAssertion(assertion),
});
return { success: true, sessionToken: result.sessionToken };
} catch (error) {
return { success: false, error: error.name, message: error.message };
}
}
async listCredentials(userId) {
const response = await fetch(
`${this.baseUrl}/api/webauthn/credentials?userId=${userId}`
);
return response.json();
}
async deleteCredential(userId, credentialId) {
return this._fetch("/api/webauthn/credentials/delete", {
userId,
credentialId,
});
}
async renameCredential(userId, credentialId, newName) {
return this._fetch("/api/webauthn/credentials/rename", {
userId,
credentialId,
newName,
});
}
_decodeRegistrationOptions(options) {
return {
...options,
challenge: base64urlDecode(options.challenge),
user: {
...options.user,
id: base64urlDecode(options.user.id),
},
excludeCredentials: (options.excludeCredentials || []).map((ec) => ({
...ec,
id: base64urlDecode(ec.id),
})),
};
}
_decodeAuthenticationOptions(options) {
return {
...options,
challenge: base64urlDecode(options.challenge),
allowCredentials: (options.allowCredentials || []).map((ac) => ({
...ac,
id: base64urlDecode(ac.id),
})),
};
}
_encodeCredential(credential) {
return {
id: credential.id,
rawId: base64urlEncode(credential.rawId),
type: credential.type,
response: {
attestationObject: base64urlEncode(credential.response.attestationObject),
clientDataJSON: base64urlEncode(credential.response.clientDataJSON),
},
};
}
_encodeAssertion(assertion) {
return {
id: assertion.id,
rawId: base64urlEncode(assertion.rawId),
type: assertion.type,
response: {
authenticatorData: base64urlEncode(assertion.response.authenticatorData),
clientDataJSON: base64urlEncode(assertion.response.clientDataJSON),
signature: base64urlEncode(assertion.response.signature),
userHandle: assertion.response.userHandle
? base64urlEncode(assertion.response.userHandle)
: null,
},
};
}
async _fetch(path, body) {
const response = await fetch(`${this.baseUrl}${path}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.detail || "请求失败");
}
return response.json();
}
async _getExistingCredentials(userId) {
try {
const { credentials } = await this.listCredentials(userId);
return credentials.map((c) => ({ id: c.id }));
} catch {
return [];
}
}
}
const webauthn = new WebAuthnClient();
5大避坑指南
坑1:challenge未校验或重复使用
❌ 错误:
CHALLENGE = "fixed-challenge-value-12345"
def verify_registration(credential_response):
verification = verify_registration_response(
credential=credential_response,
expected_challenge=CHALLENGE,
expected_origin=ORIGIN,
expected_rp_id=RP_ID,
)
const CHALLENGE = "fixed-challenge-value-12345";
const options = { challenge: CHALLENGE, ... };
✅ 正确:
import secrets
def generate_challenge() -> str:
challenge = secrets.token_bytes(32)
challenge_b64 = base64.urlsafe_b64encode(challenge).rstrip(b"=").decode()
challenge_store[challenge_b64] = {
"createdAt": datetime.utcnow(),
"used": False,
}
return challenge_b64
def verify_challenge(challenge_b64: str) -> bool:
stored = challenge_store.get(challenge_b64)
if not stored or stored["used"]:
return False
age = (datetime.utcnow() - stored["createdAt"]).total_seconds()
if age > 300:
del challenge_store[challenge_b64]
return False
stored["used"] = True
return True
// 客户端永远从服务端获取challenge,不自行生成
const { options, challenge } = await fetch("/api/webauthn/registration/options", {
method: "POST",
body: JSON.stringify({ userId, username, displayName }),
}).then(r => r.json());
坑2:origin校验缺失
❌ 错误:
def verify_registration(credential_response, challenge):
verification = verify_registration_response(
credential=credential_response,
expected_challenge=challenge,
expected_origin="*", # 不校验origin
expected_rp_id=RP_ID,
)
// 客户端无origin校验意识,直接发送
✅ 正确:
ALLOWED_ORIGINS = {
"https://yourapp.com",
"https://www.yourapp.com",
}
def verify_registration(credential_response, challenge):
client_data = json.loads(
base64.urlsafe_b64decode(
credential_response["response"]["clientDataJSON"] + "=="
)
)
origin = client_data.get("origin")
if origin not in ALLOWED_ORIGINS:
raise ValueError(f"不允许的origin: {origin}")
verification = verify_registration_response(
credential=credential_response,
expected_challenge=challenge,
expected_origin=origin,
expected_rp_id=RP_ID,
)
// 确保页面运行在正确的origin下
if (window.location.origin !== "https://yourapp.com") {
console.error("WebAuthn只能在正确的origin下使用");
}
坑3:sign_count未校验导致克隆攻击
❌ 错误:
def verify_authentication(credential_response, stored_credential):
verification = verify_authentication_response(
credential=credential_response,
expected_challenge=challenge,
expected_origin=ORIGIN,
expected_rp_id=RP_ID,
credential_public_key=stored_credential["public_key"],
credential_current_sign_count=0, # 始终传0
)
# 忽略返回的sign_count
✅ 正确:
def verify_authentication(credential_response, stored_credential):
verification = verify_authentication_response(
credential=credential_response,
expected_challenge=challenge,
expected_origin=ORIGIN,
expected_rp_id=RP_ID,
credential_public_key=stored_credential["public_key"],
credential_current_sign_count=stored_credential["sign_count"],
)
if verification.new_sign_count <= stored_credential["sign_count"]:
# 可能是克隆凭据,记录安全事件
log_security_event(
"possible_credential_clone",
credential_id=stored_credential["credential_id"],
stored_count=stored_credential["sign_count"],
received_count=verification.new_sign_count,
)
raise ValueError("可能的凭据克隆攻击")
stored_credential["sign_count"] = verification.new_sign_count
坑4:residentKey配置错误导致Passkey无法同步
❌ 错误:
options = {
"authenticatorSelection": {
"authenticatorAttachment": "platform",
"requireResidentKey": False, # 错误:Passkey需要resident key
"residentKey": "discouraged",
"userVerification": "discouraged",
},
}
const options = {
authenticatorSelection: {
authenticatorAttachment: "platform",
requireResidentKey: false,
residentKey: "discouraged",
userVerification: "discouraged",
},
};
✅ 正确:
options = {
"authenticatorSelection": {
"authenticatorAttachment": "platform",
"requireResidentKey": True,
"residentKey": "required", # Passkey必须设置为required
"userVerification": "required", # 确保生物识别验证
},
}
const options = {
authenticatorSelection: {
authenticatorAttachment: "platform",
requireResidentKey: true,
residentKey: "required",
userVerification: "required",
},
};
坑5:删除最后一个凭据导致账户锁死
❌ 错误:
def delete_credential(user_id: str, credential_id: str):
cred = db.query(Credential).filter_by(id=credential_id).first()
db.delete(cred)
db.commit()
return {"success": True}
✅ 正确:
def delete_credential(user_id: str, credential_id: str):
remaining = db.query(Credential).filter_by(user_id=user_id).count()
if remaining <= 1:
has_password = db.query(User).filter_by(id=user_id).first().password_hash
if not has_password:
raise ValueError(
"这是您最后一个Passkey且未设置密码,"
"删除后将无法登录。请先注册备用Passkey或设置密码。"
)
cred = db.query(Credential).filter_by(id=credential_id).first()
db.delete(cred)
db.commit()
return {"success": True}
错误排查表
| 错误信息 | 原因 | 解决方案 |
|---|---|---|
DOMException: The operation is insecure |
页面非HTTPS或localhost | 确保使用HTTPS或localhost开发环境 |
DOMException: The operation was cancelled |
用户取消或超时 | 捕获NotAllowedError,提供重试选项 |
InvalidStateError |
设备已注册该RP的凭据 | 使用excludeCredentials排除已注册凭据 |
SecurityError: The rpId is not valid |
rpId与当前域名不匹配 | 确认rpId为有效域名且与页面origin一致 |
TypeError: Credentials container is empty |
无匹配的allowCredentials | 检查credential ID编码是否正确 |
DOMException: A requested feature is not supported |
浏览器不支持residentKey | 降级使用residentKey: "preferred" |
Verification failed: signature mismatch |
公钥或签名数据损坏 | 检查base64url编解码,确认COSE key格式 |
Verification failed: challenge mismatch |
challenge编码不一致 | 确保服务端和客户端使用相同的base64url编解码 |
NetworkError: authentication cancelled |
认证器通信中断 | 重试,检查USB/NFC连接(安全密钥场景) |
ConstraintError: request has too many params |
excludeCredentials过多 | 限制排除列表最多10个凭据 |
高级优化
1. Challenge存储优化(Redis + TTL)
import redis
import json
redis_client = redis.Redis(host="localhost", port=6379, db=0)
class RedisChallengeStore:
CHALLENGE_TTL = 300 # 5分钟过期
def store(self, challenge: str, data: dict) -> None:
key = f"webauthn:challenge:{challenge}"
redis_client.setex(key, self.CHALLENGE_TTL, json.dumps(data))
def retrieve(self, challenge: str) -> dict | None:
key = f"webauthn:challenge:{challenge}"
data = redis_client.get(key)
if data:
redis_client.delete(key)
return json.loads(data)
return None
def exists(self, challenge: str) -> bool:
return redis_client.exists(f"webauthn:challenge:{challenge}") > 0
2. 凭据传输层优化
def store_credential_with_transports(
user_id: str,
credential_id: str,
public_key: str,
sign_count: int,
transports: list[str],
) -> None:
credential = PasskeyCredential(
credential_id=credential_id,
user_id=user_id,
public_key=public_key,
sign_count=sign_count,
transports=transports,
device_type=_classify_device(transports),
created_at=datetime.utcnow(),
)
db.add(credential)
db.commit()
def _classify_device(transports: list[str]) -> str:
if "internal" in transports:
return "platform"
if "usb" in transports and "nfc" in transports:
return "security_key_multi"
if "usb" in transports:
return "security_key_usb"
if "nfc" in transports:
return "security_key_nfc"
if "ble" in transports:
return "security_key_ble"
return "unknown"
3. 认证审计日志
from dataclasses import dataclass
from datetime import datetime
@dataclass
class AuthAuditLog:
id: str
user_id: str
event_type: str
credential_id: str
ip_address: str
user_agent: str
success: bool
error_message: str | None
created_at: datetime
class AuthAuditService:
def log_registration(
self,
user_id: str,
credential_id: str,
success: bool,
request,
error: str | None = None,
) -> None:
log = AuthAuditLog(
id=secrets.token_urlsafe(16),
user_id=user_id,
event_type="passkey_registration",
credential_id=credential_id,
ip_address=request.client.host,
user_agent=request.headers.get("user-agent", ""),
success=success,
error_message=error,
created_at=datetime.utcnow(),
)
# 存储到审计日志表
db.add(log)
db.commit()
def log_authentication(
self,
user_id: str,
credential_id: str,
success: bool,
request,
error: str | None = None,
) -> None:
log = AuthAuditLog(
id=secrets.token_urlsafe(16),
user_id=user_id,
event_type="passkey_authentication",
credential_id=credential_id,
ip_address=request.client.host,
user_agent=request.headers.get("user-agent", ""),
success=success,
error_message=error,
created_at=datetime.utcnow(),
)
db.add(log)
db.commit()
def detect_anomaly(self, user_id: str, window_minutes: int = 15) -> bool:
cutoff = datetime.utcnow() - timedelta(minutes=window_minutes)
recent_failures = (
db.query(AuthAuditLog)
.filter(
AuthAuditLog.user_id == user_id,
AuthAuditLog.success == False,
AuthAuditLog.created_at >= cutoff,
)
.count()
)
return recent_failures >= 5
认证方式对比
| 特性 | WebAuthn/Passkey | TOTP | SMS OTP | Magic Link | OAuth2 |
|---|---|---|---|---|---|
| 防钓鱼 | ✅ 域名绑定 | ❌ 可被钓鱼 | ❌ 可被钓鱼 | ⚠️ 部分防护 | ⚠️ 依赖IdP |
| 用户体验 | ✅ 一键生物识别 | ⚠️ 需输入6位码 | ⚠️ 等待短信 | ⚠️ 需查邮箱 | ✅ 一键跳转 |
| 设备要求 | ⚠️ 需支持WebAuthn | ✅ 任何手机 | ✅ 任何手机 | ✅ 任何邮箱 | ✅ 任何浏览器 |
| 离线可用 | ✅ 本地验证 | ✅ 离线生成 | ❌ 需网络 | ❌ 需网络 | ❌ 需网络 |
| 多设备同步 | ✅ 云端同步 | ❌ 需手动迁移 | ✅ 手机号不变 | ✅ 邮箱可登录 | ✅ 账号通用 |
| 实现成本 | ⚠️ 中等 | ✅ 低 | ⚠️ 需短信费用 | ✅ 低 | ⚠️ 需集成IdP |
| 安全等级 | 🏆 最高 | ⭐ 高 | ⭐ 中 | ⭐ 中 | ⭐ 高 |
| 标准化 | ✅ W3C/FIDO | ✅ RFC 6238 | ❌ 无统一标准 | ❌ 无统一标准 | ✅ RFC 6749 |
总结:WebAuthn/FIDO2不是"锦上添花"的可选功能,而是2026年身份认证的必选项。6种生产模式覆盖了从注册到认证、从凭据管理到多设备支持、从回退策略到完整服务的全链路。记住三个关键:challenge必须真随机单次使用、residentKey必须设为required、永远保留至少一个恢复通道。Passkey是密码的终结者,但前提是你正确实现了它。
推荐工具
本站提供浏览器本地工具,免注册即可试用 →
#WebAuthn#FIDO2#Passkey#无密码认证#生物识别#2026#安全