scramClientFirst

语法

scramClientFirst(user, cnonce)

参数

user 是表示用户名的字符串。它只能包含字母、下划线或数字,并且它不能以数字开头。长度不能超过30个字符。

cnonce 是用户在客户端生成的 nonce,必须是 16 字节的一次性随机数,使用 base64 编码。

详情

scramClientFirst 用于 SCRAM(Salted Challenge Response Authentication Mechanism)认证过程的第一阶段。通过该函数可获取服务器端的认证信息。

返回值:

一个包含以下三个元素的元组:

  • salt:base64 编码的哈希盐值
  • iterCount:密码哈希的迭代次数
  • combined_nonce:客户端 nonce 和服务器端 nonce 的 base64 编码组合
注:

SCRAM 完整认证流程如下所示:

1. 客户端发起认证请求

生成一个客户端 nonce(一次性随机值),然后将其与用户名一起发给服务端。

2. 服务端响应认证请求

服务端收到信息后,通过scramClientFirst函数生成用于加密的哈希盐值、密码哈希的迭代次数,并生成 nonce 与客户端 nonce 拼接,然后将这些信息返回给客户端。

3. 客户端生成认证凭证,然后再次提交认证请求

客户端根据收到的信息将明文密码转化为密钥,最终生成客户端凭证,然后向服务端提交带凭证的认证请求。

4. 服务端验证认证请求

服务端收到请求后,通过scramClientFinal对拼接后的 nonce 以及客户端凭证进行验证。若验证成功,返回服务端签名发送给客户端。

5. 客户端验证服务端签名

客户端对服务端返回的签名进行认证。若签名验证通过,则用户登录成功;若不通过,则断开会话连接。

例子

以下脚本是一个完整的示例,展示了如何在 Python 客户端完成用户(”user01”)SCRAM 模式登录,其中服务端的操作通过 DolphinDB Python API 的相关接口实现(详见 Python API 手册)。

import dolphindb as ddb

# 建立与 DolphinDB 的会话连接
s = ddb.session("183.134.101.133", 8888)
s.run("print", 1, 2, 3)

##########################

import hashlib
import hmac
import os
import base64
import hashlib


# ------------------ 服务端函数 ------------------
def server_handle_client_first(username, client_nonce):
  """处理第一次认证请求,返回 salt、迭代次数和合并后的 nonce"""
    salt, iter_count, nonce = s.run("scramClientFirst", username, client_nonce)

    return {
        "salt": salt,
        "iteration_count": iter_count,
        "combined_nonce": nonce
    }

def server_handle_client_final(user, combined_nonce, client_proof):    
"""处理最终认证请求,验证 client_proof,返回 server_signature"""
    return s.run("scramClientFinal", user, combined_nonce, client_proof)

# ------------------ 客户端函数 ------------------
def client_initiate_authentication(username):
    """客户端发起认证,生成 client_nonce"""
    client_nonce = base64.b64encode(os.urandom(16)).decode()
    return {
        "username": username,
        "client_nonce": client_nonce
    }

def client_generate_proof(user, password, salt, iteration_count, cnonce, combined_nonce):
    """生成 client_proof"""
    
    # 计算SaltedPassword
    salted_password = hashlib.pbkdf2_hmac(
        'sha256',
        password.encode(),
        base64.b64decode(salt),
        iteration_count
    )
    
    # 生成密钥
    client_key = hmac.new(salted_password, b"Client Key", hashlib.sha256).digest()
    stored_key = hashlib.sha256(client_key).digest()
    server_key = hmac.new(salted_password, b"Server Key", hashlib.sha256).digest()
    
    # 构造AuthMessage
    auth_message = (
        f"n={user},r={cnonce},"  # client-first-bare
        f"r={combined_nonce},s={salt},i={iteration_count},"  # server-first
        f"c=biws,r={combined_nonce}"                         # client-final-without-proof
    )
    
    # 计算ClientProof
    client_signature = hmac.new(stored_key, auth_message.encode(), hashlib.sha256).digest()
    client_proof = bytes([ck ^ cs for ck, cs in zip(client_key, client_signature)])
    
    return {
        "client_proof": base64.b64encode(client_proof).decode(),
        "server_key": server_key,
        "auth_message": auth_message
    }

# ------------------ SCRAM 模式登录 ------------------
if __name__ == "__main__":
    
    # 客户端发起认证
    client_data = client_initiate_authentication("user01")
    
    # 服务端处理首次请求
    server_response = server_handle_client_first(
        client_data["username"], 
        client_data["client_nonce"]
    )
    
    password = "123456"
    
    # 客户端生成证明
    client_proof_data = client_generate_proof(
        "user01",
        password,
        server_response["salt"],
        server_response["iteration_count"],
        client_data["client_nonce"],
        server_response["combined_nonce"]
    )
    
    # 服务端验证并返回签名
    # auth_sessions[server_response["combined_nonce"]]["auth_message"] = client_proof_data["auth_message"]
    server_signature = server_handle_client_final(
        "user01",
        server_response["combined_nonce"],
        client_proof_data["client_proof"]
    )
    
    # 客户端验证服务端签名
    computed_server_sig = hmac.new(
        client_proof_data["server_key"],
        client_proof_data["auth_message"].encode(),
        hashlib.sha256
    ).digest()
    
    # 若匹配则认证成功
    assert server_signature == base64.b64encode(computed_server_sig).decode()
    print("SCRAM authentication succeeded!")

    # 查看当前会话与用户
    print(s.run("getCurrentSessionAndUser()"))