服务器 SSH 登录的安全性,我实现了 SSH2FA,在保持双因素认证的安全性的同时,提供了类似扫码的便捷体验, 适用于个人/小团体使用
核心功能h3
- Web 审批:登录时终端输出一个链接,点击后在浏览器确认即可放行。
- 断网降级:若无法连接认证服务器,自动回退到传统 TOTP 验证,确保不被锁在门外。
url认证

认证服务器失联TOPT

实现思路h3
- 使用
sshd_config的ForceCommand在登录成功后先运行自定义程序,验证通过后再启动用户的 Shell。 - 项目分为两部分:
- Auth Server (Python):提供
/start_login、/check_status、/approve接口,管理登录会话。 - Client (Go):编译为单文件二进制,负责请求审批链接、轮询状态以及离线 TOTP 验证。
- Auth Server (Python):提供
代码实现h2
1. Auth Server (Python)h3
#!/data/miniforge3/bin/python3from flask import Flask, request, jsonifyimport uuidimport timeimport threading
app = Flask(__name__)
# 内存存储:# tokens: {session_token: {"status": "pending/approved/rejected", "username": "root", "display_token": "xxx", "ts": 12345}}# display_to_session: {display_token: session_token}# 生产环境建议用 Redistokens = {}display_to_session = {}
def cleanup_tokens(): """定期清理过期 Token""" while True: now = time.time() expired = [t for t, v in tokens.items() if now - v['ts'] > 300] for t in expired: # 清理 display_to_session 映射 display_token = tokens[t].get('display_token') if display_token and display_token in display_to_session: del display_to_session[display_token] del tokens[t] time.sleep(60)
threading.Thread(target=cleanup_tokens, daemon=True).start()
@app.route('/start_login', methods=['POST'])def start_login(): data = request.json username = data.get('username')
# 生成两个独立的 token session_token = str(uuid.uuid4()) # 客户端用于轮询 (私密) display_token = str(uuid.uuid4()) # 用户看到的批准链接 (公开)
tokens[session_token] = { "status": "pending", "username": username, "display_token": display_token, "ts": time.time() }
# 建立 display_token -> session_token 的映射 display_to_session[display_token] = session_token
# 这里模拟把链接打印到服务器日志,实际场景你可以把这个链接发短信/钉钉给管理员 print(f"\n[SERVER] New Login Request for {username}") print(f"[SERVER] Approve Link: http://192.168.10.10:5000/approve?token={display_token}") print(f"[SERVER] Session Token: {session_token} (kept secret)\n")
return jsonify({ "success": True, "session_token": session_token, "display_token": display_token })
@app.route('/check_status', methods=['POST'])def check_status(): data = request.json token = data.get('token') if token not in tokens: return jsonify({"status": "expired"}) return jsonify({"status": tokens[token]["status"]})
@app.route('/approve', methods=['GET'])def approve(): display_token = request.args.get('token')
# 通过 display_token 找到对应的 session_token session_token = display_to_session.get(display_token)
if session_token and session_token in tokens: tokens[session_token]['status'] = 'approved' username = tokens[session_token]['username'] print(f"[SERVER] ✅ Login APPROVED for {username}") return "<h1>Login APPROVED! You can close this window.</h1>"
return "<h1>Token Invalid or Expired</h1>", 404
@app.route('/reject', methods=['GET'])def reject(): display_token = request.args.get('token')
# 通过 display_token 找到对应的 session_token session_token = display_to_session.get(display_token)
if session_token and session_token in tokens: tokens[session_token]['status'] = 'rejected' username = tokens[session_token]['username'] print(f"[SERVER] ❌ Login REJECTED for {username}") return "<h1>Login REJECTED.</h1>"
return "<h1>Token Invalid or Expired</h1>", 404
if __name__ == '__main__': # 监听 0.0.0.0 方便你从手机/浏览器访问 app.run(host='0.0.0.0', port=5000)2. Client (Go)h3
package main
import ( "bufio" "bytes" "encoding/json" "flag" "fmt" "io" "net/http" "os" "os/exec" "strings" "syscall" "time"
"github.com/pquerna/otp/totp")
// ================= Configuration =================
// const (// // Backend API address// APIHost = "http://127.0.0.1:5000"
// // Enable 2FA for non-interactive connections (SCP, SFTP, Git, rsync, etc.)// // true = These commands must also perform 2FA authentication// // false = These commands bypass authentication (recommended for better UX)// EnableNonInteractive2FA = true
// // Authentication timeout (seconds)// AuthTimeout = 60
// // Status polling interval (seconds)// PollInterval = 2// )
// =================================================
// SessionType represents the type of SSH sessiontype SessionType string
const ( Interactive SessionType = "interactive" NonInteractive SessionType = "non-interactive")
// StartLoginRequest represents the request to start logintype StartLoginRequest struct { Username string `json:"username"`}
// StartLoginResponse represents the response from start_logintype StartLoginResponse struct { Success bool `json:"success"` SessionToken string `json:"session_token"` // 私密,用于轮询 DisplayToken string `json:"display_token"` // 公开,显示给用户}
// CheckStatusRequest represents the request to check statustype CheckStatusRequest struct { Token string `json:"token"`}
// CheckStatusResponse represents the response from check_statustype CheckStatusResponse struct { Status string `json:"status"` // "pending", "approved", "rejected"}
// getSessionType determines if this is an interactive or non-interactive sessionfunc getSessionType() (SessionType, string) { cmd := os.Getenv("SSH_ORIGINAL_COMMAND") if cmd != "" { return NonInteractive, cmd } return Interactive, ""}
// getUsername retrieves the current username from environmentfunc getUsername() string { if user := os.Getenv("PAM_USER"); user != "" { return user } if user := os.Getenv("USER"); user != "" { return user } return "unknown"}
// httpPost sends a POST request with JSON datafunc httpPost(url string, data interface{}) (map[string]interface{}, error) { jsonData, err := json.Marshal(data) if err != nil { return nil, err }
client := &http.Client{Timeout: 5 * time.Second} resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData)) if err != nil { return nil, err } defer resp.Body.Close()
body, err := io.ReadAll(resp.Body) if err != nil { return nil, err }
var result map[string]interface{} if err := json.Unmarshal(body, &result); err != nil { return nil, err }
return result, nil}
// readTOTPSecret reads the TOTP secret for a given username from the secrets filefunc readTOTPSecret(username, secretsFile string) (string, error) { file, err := os.Open(secretsFile) if err != nil { return "", fmt.Errorf("cannot open TOTP secrets file: %w", err) } defer file.Close()
scanner := bufio.NewScanner(file) for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) if line == "" || strings.HasPrefix(line, "#") { continue }
parts := strings.SplitN(line, ":", 2) if len(parts) == 2 && parts[0] == username { return strings.TrimSpace(parts[1]), nil } }
if err := scanner.Err(); err != nil { return "", fmt.Errorf("error reading secrets file: %w", err) }
return "", fmt.Errorf("no TOTP secret found for user: %s", username)}
// promptTOTPCode prompts the user to enter their TOTP codefunc promptTOTPCode(username string) (string, error) { fmt.Println() fmt.Println("---------------------------------------------") fmt.Printf("SERVER UNREACHABLE - OFFLINE MODE\n") fmt.Printf("TOTP VERIFICATION FOR: %s\n", username) fmt.Println("---------------------------------------------") fmt.Print("Enter TOTP code from your authenticator app: ")
reader := bufio.NewReader(os.Stdin) code, err := reader.ReadString('\n') if err != nil { return "", err }
return strings.TrimSpace(code), nil}
// performTOTPAuth handles offline TOTP authenticationfunc performTOTPAuth(username, secretsFile string, sessionType SessionType) (bool, error) { // Read TOTP secret for user secret, err := readTOTPSecret(username, secretsFile) if err != nil { return false, err }
// For non-interactive sessions, we cannot prompt for TOTP if sessionType == NonInteractive { return false, fmt.Errorf("TOTP authentication not available for non-interactive sessions") }
// Prompt user for TOTP code code, err := promptTOTPCode(username) if err != nil { return false, fmt.Errorf("failed to read TOTP code: %w", err) }
// Validate TOTP code with time skew tolerance valid := totp.Validate(code, secret) if !valid { return false, fmt.Errorf("invalid TOTP code") }
fmt.Println("Access Granted.") return true, nil}
// performAuth handles the complete 2FA authentication flow with TOTP fallbackfunc performAuth(username string, sessionType SessionType, pollInterval int, timeout int, apiHost string, enableTOTPFallback bool, totpSecretsFile string) (bool, error) { startURL := fmt.Sprintf("%s/start_login", apiHost) checkURL := fmt.Sprintf("%s/check_status", apiHost)
// 1. Request login tokens resp, err := httpPost(startURL, StartLoginRequest{Username: username}) if err != nil { // Server unreachable - try TOTP fallback if enabled if enableTOTPFallback { fmt.Fprintf(os.Stderr, "Warning: 2FA server unreachable, falling back to TOTP authentication\n") return performTOTPAuth(username, totpSecretsFile, sessionType) } return false, fmt.Errorf("2FA Service Unreachable: %w", err) }
success, ok := resp["success"].(bool) if !ok || !success { return false, fmt.Errorf("failed to start login session") }
// 提取两个 token sessionToken, ok := resp["session_token"].(string) if !ok || sessionToken == "" { return false, fmt.Errorf("invalid session_token received") }
displayToken, ok := resp["display_token"].(string) if !ok || displayToken == "" { return false, fmt.Errorf("invalid display_token received") }
// 使用 display_token 构建批准链接 (公开给用户) authLink := fmt.Sprintf("%s/approve?token=%s", apiHost, displayToken)
// 2. Display prompt based on session type if sessionType == Interactive { // Interactive SSH: Display formatted prompt fmt.Println() fmt.Println("---------------------------------------------") fmt.Printf("SECURITY CHECK FOR: %s\n", username) fmt.Println("---------------------------------------------") fmt.Print("Waiting for approval..\n\n") } else { // Non-interactive: Minimal output to stderr fmt.Fprintf(os.Stderr, "2FA Required. Approve at: %s\n", authLink) }
// 3. Poll for approval status (使用 session_token,私密) maxAttempts := timeout / pollInterval for i := 0; i < maxAttempts; i++ { statusResp, err := httpPost(checkURL, CheckStatusRequest{Token: sessionToken}) if err == nil { if status, ok := statusResp["status"].(string); ok { switch status { case "approved": if sessionType == Interactive { fmt.Println("Access Granted.") } return true, nil case "rejected": return false, fmt.Errorf("access denied") } } } time.Sleep(time.Duration(pollInterval) * time.Second) }
return false, fmt.Errorf("2FA timeout")}
// executeCommand executes the original SSH command or starts a shellfunc executeCommand(originalCmd string) error { if originalCmd != "" { // Execute the original command cmd := exec.Command("sh", "-c", originalCmd) cmd.Stdin = os.Stdin cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr return cmd.Run() }
// Start interactive shell shell := os.Getenv("SHELL") if shell == "" { shell = "/bin/bash" }
// Use syscall.Exec to replace current process with shell return syscall.Exec(shell, []string{"-l"}, os.Environ())}
func main() {
apiHost := flag.String("apiHost", "http://127.0.0.1:5000", "API host") timeout := flag.Int("timeout", 30, "Timeout in seconds") pollInterval := flag.Int("pollInterval", 1, "Poll interval in seconds") enableNonInteractive2FA := flag.Bool("enableNonInteractive2FA", false, "Enable 2FA for non-interactive sessions") totpSecretsFile := flag.String("totpSecretsFile", "/etc/ssh2fa/totp_secrets", "Path to TOTP secrets file") enableTOTPFallback := flag.Bool("enableTOTPFallback", true, "Enable TOTP fallback when server is unreachable") flag.Parse()
// Ensure we exit with error code on any panic (security-first) defer func() { if r := recover(); r != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", r) os.Exit(1) } }()
username := getUsername() sessionType, originalCmd := getSessionType()
// === Policy Decision === if sessionType == NonInteractive && !*enableNonInteractive2FA { // Bypass 2FA for non-interactive sessions if disabled // Uncomment for debugging: // fmt.Fprintf(os.Stderr, "DEBUG: Skipping 2FA for command: %s\n", originalCmd) if err := executeCommand(originalCmd); err != nil { os.Exit(1) } os.Exit(0) }
// === Perform Authentication === success, err := performAuth(username, sessionType, *pollInterval, *timeout, *apiHost, *enableTOTPFallback, *totpSecretsFile) if !success { if err != nil { fmt.Fprintf(os.Stderr, "\n❌ %s\n", err.Error()) } else { fmt.Fprintf(os.Stderr, "\n❌ Authentication failed\n") } os.Exit(1) }
// === Execute Command After Successful Auth === if err := executeCommand(originalCmd); err != nil { fmt.Fprintf(os.Stderr, "Error executing command: %v\n", err) os.Exit(1) }}使用指南h2
- 编译客户端
Terminal window go mod init ssh2fago get github.com/pquerna/otp/totpgo build -o ssh2fa ssh2fa.gosudo mv ssh2fa /usr/local/bin/ - 配置离线 TOTP 密钥
Terminal window sudo mkdir -p /etc/ssh2faecho "root:JBSWY3DPEHPK3PXP" | sudo tee /etc/ssh2fa/totp_secretssudo chmod 600 /etc/ssh2fa/totp_secrets - 修改
sshd_config
ForceCommand /root/project/ssh2fa/ssh2fa -apiHost http://127.0.0.1:5000 -enableNonInteractive2FA true -pollInterval 1 -timeout 240重启 SSH 服务:systemctl restart sshd。
实际体验h2
- 登录时终端输出审批链接,点击后即放行。
- 若 Auth Server 不可用,系统提示离线模式并要求输入 TOTP,仍可正常登录。
这套方案在实际使用中提升了体验,同时保持了安全性,欢迎自行部署并根据需求改进。
⚠️ 无法连接认证服务器 - 启用离线模式
这时候输入之前的 TOTP 码,照样能进。