使用 AES 加密 HTTP Body

AES

AES(Advanced Encryption Standard)是一种使用分组加密(Block cipher)的对称密钥算法, 所谓对称加密算法,就是双方先约定好一个密钥,在通信过程中使用同一对密钥进行加密和解密。

入参

在使用 AES 算法加密 / 解密的过程中,调用方只需要关注 3 个入参:

密钥

在 AES 中,密钥的长度可以是 128、192 或 256 bits,加密强度随密钥的长度增加。密钥必须严格保密。

明文 / 密文

前文提到一个很重要的概念——分组加密,简单说就是算法会把明文切分成多块固定长度,然后分块进行加密。 在 AES 中,块的长度固定是 128 bits。

通常来讲,分组密码要求明文的长度必须是块的整数倍,以便切分,所以要 AES 求明文的长度必须是 128 bits 的整数倍。 对于长度不足的明文,此时就引入了另一个概念——填充,即利用特定的规则(填充方式),将明文填到符合要求的长度。 但也存在某些特殊情况,AES 的某些工作模式下可以切换为流密码(Stream cipher),此时不要求明文的长度必须是块的整数倍。

初始向量

初始向量就是在加密过程中使用的一个随机数,主要用于保证密文的随机性,防止相同的明文加密后产出的密文也相同,初始向量不要求保密。

在某些情况下,并不要求初始向量的强随机性,只要求是一个长时间不会重复的数字(例如累加的数字),此时可以称之为 Nonce(一次性数字)。

相同的密钥和初始向量加密相同的明文,会输出相同的密文,这会导致加密无法抵御重放攻击。所以无论什么情况下,都不要重复使用初始化向量。

工作模式

以下列举一些常见的 AES 工作模式:

ECB

电子密码本(Electronic codebook)模式将明文分块后,每个块都使用相同的密钥进行独立加密,此模式不需要 IV。 相同的明文块加密后生成相同的密文块,不利于安全性。

CBC

密码分组链接(Cipher-block chaining)模式将每个明文块与前一个密文块进行异或操作,然后再进行加密,以此来实现加密的链式反应, 每个明文块的加密都依赖于前一个密文块的值。

由于第一个明文块没有前文与之进行异或操作,所以此模式需要 IV 来对第一个块进行加密。

CFB

密文反馈(Cipher feedback)模式将前一个密文块作为输入,生成一串伪随机数流,并与明文进行异或操作,得到密文。 这种模式具有较好的性能,但是对于错误的同步或传输问题,安全性不如 CBC 模式。

此模式可以将块密码变为自同步的流密码。

与 CBC 相似,此模式需要 IV。

OFB

输出反馈(Output feedback)模式与 CFB 模式类似,但是 OFB 模式使用的是加密函数的输出,而不是密文块。因此,它具有较好的性能和安全性。

此模式可以将块密码变成同步的流密码。

此模式需要 IV。

CTR

计数器(Counter)模式使用一个计数器和一个随机数生成器生成一系列的伪随机数流,并将其与明文异或得到密文。 CTR 模式具有良好的性能和安全性,因为它不依赖于前一个密文块,也不需要像 CBC 模式那样的填充操作。

此模式可以将块密码变为流密码。

此模式需要 Nonce。

AEAD

认证加密(Authenticated Encryption)和带有关联数据的认证加密 (Authenticated Encryption with Associated Data)是一种能够同时保证数据的保密性、 完整性和真实性的一种加密模式。 AEAD 是 AE 的变种,区别在于 AEAD 可以附带额外的认证关联信息。

前述的 AES 工作模式中,AES 都只提供了加密的功能,无法阻止攻击者对加密后的密文进行攻击,所以需要对加密 / 解密过程再增加一个认证的步骤, 用于确保数据的保密性、 完整性和真实性。

最简单的认证方式就是 HMAC(Hash-based Message Authentication Code),简单说就是基于一个密钥, 将明文算出 Hash,并将 Hash 与密文一同传输。接收方解密后再对明文进行相同的步骤计算出 Hash 再比对。

另一种方式是使用 AEAD 模式,目前最常见的 AEAD 模式就是 GCM(Galois/Counter Mode)。

GCM 是基于 CTR 的一种工作模式,它将加密和完整性保护合并到了一起,可以在保证安全性的同时提高加密效率。 同时还可以附加数据(Additional Authenticated Data),允许在加密和身份验证过程中传输额外的数据,从而增加了灵活性。 使用此模式时,除了生成密文,还会生成一个用于认证的 Tag,长度为 16 bytes,可以视情况截取长度。

铺垫结束,至此,我大概了解了使用 AES 加密消息的一些基本知识,接下来是实践环节。

选型

实现思路

在 Starlette 中有两种方式可以在不侵入业务代码的情况下实现 AES 的加密 / 解密逻辑,一是中间件(Middleware), 另一种方式是直接给 Handler 添加装饰器。两种方式都可以实现,但是我想对 HTTP 的流式请求和响应也做加密和解密的处理, 同时考虑与 FastAPI 的兼容性,且使用 AES-256-GCM 的初衷也是因为其支持流式加密,不用把所有数据都 Load 到内存当中, 更节省内存空间,所以中间件会比较有优势。

另外,AES-256-GCM 加密 / 解密过程中需要的 Nonce 和 Tag 长度则分别放到 HTTP Headers 的 ENCRYPTION-NONCEENCRYPTION-TAG-LENGTH 字段中, 由于 Nonce 是 Byte 类型的数据,所以 base64 编码后传输。 由于 Tag 需要所有的数据都加密完毕才能生成,所以把 Tag 追加到密文的末尾, 接收方通过 HTTP Headers 中的 ENCRYPTION-TAG-LENGTH 长度截取 Tag。

Starlette 的一个小坑

Starlette 的 Request 对象有一个特点就是 Body 中的数据只能被消费一次,如果多次消费,会造成 IO 堵塞,详情见 #495

解决方案:对于在中间件中消费 Body 的场景,Starlette 已经提出了解决方案—— Pure ASGI Middleware,在这个中间件中, 调用者直接处理底层 ASGI 的事件,这是在 Request 对象生成前就定义好的 Hook,所以不会造成 Body 的多次消费。

Demo

具体实现代码见 YogiLiu/encryption_http_demo,这里只对关键部分进行说明。 中间件的具体代码实现在 encryption_http_demo/middleware.py 中。

分离 Tag

由于 Tag 被附加到 Body 的末尾,且 Body 可能是流式的数据,所以这里需要一个结构来将 Tag 从 Body 中分离出来,对应的代码为:

encryption_http_demo/middleware.py
class _BufferTag:
    """
    Separate tag from bytes, assuming that tag is appended to end of data.

    :param length: tag length
    """

    def __init__(self, length: int):
        self._buf = b''
        self._len = length
        self._is_finalize = False

    def update(self, s: bytes) -> bytes:
        """
        Input data bytes and return non-tag part ciphertext.

        :param s: data bytes
        :return: ciphertext
        """
        if self._is_finalize:
            raise ValueError('finalized')
        self._buf += s
        front = self._buf[:-self._len]
        self._buf = self._buf[-self._len:]
        return front

    def finalize(self) -> bytes:
        """
        Get tag bytes and disable continue to update.

        :return: tag bytes
        """
        self._is_finalize = True
        return self._buf

_BufferTag 的目的就是留住最后的 n 个字节,每次接收到新的 chunk 的时候,都调用 update 方法, update 方法同时会返回非 Tag 的部分,最后当 Body 结束了,_BufferTag 中剩余的部分就是 Tag,通过 finalize 方法读取即可。

解密 Request

encryption_http_demo/middleware.py
async def receive_wrapper() -> Message:
    message = await receive()
    data = buf.update(message['body'])
    try:
        message['body'] = receive_decryptor.update(data)
        if not message.get('more_body'):
            # Call `finalize_with_tag` instead of `finalize` to delay validation of the authentication tag.
            message['body'] += receive_decryptor.finalize_with_tag(buf.finalize())
    except exceptions.InvalidTag as error:
        raise ExceptionWrapper('invalid tag') from error
    return message

注意如果解密失败,要手动捕获解密失败的异常(exceptions.InvalidTag)并手动抛出 ExceptionWrapper,这里的目的是为了在后面捕获 异常是只捕获由解密过程触发的异常,如下:

encryption_http_demo/middleware.py
try:
    await self.app(scope, receive_wrapper, send_wrapper)
    return
except ExceptionWrapper as err:
    error_msg = str(err)

若此处直接捕获 exceptions.InvalidTag,可能会捕获到加密过程中触发的异常,不利于调试。

加密 Response

这里使用了 os.urandom 来生成 Nonce,这是一个随机性强的方法,对于 Nonce 来说其实不需要这么强的随机性。

encryption_http_demo/middleware.py
send_nonce = os.urandom(16)
send_algorithm = algorithms.AES(self._secret_key)
send_cipher = Cipher(send_algorithm, mode=modes.GCM(send_nonce))
send_encryptor = send_cipher.encryptor()

加密的过程其实很简单,只要处理好事件及数据的加密逻辑就行。

encryption_http_demo/middleware.py
async def send_wrapper(message: Message) -> None:
    if message['type'] == 'http.response.start':
        message['headers'].append((b'ENCRYPTION-NONCE', base64.b64encode(send_nonce)))
        message['headers'].append((b'ENCRYPTION-TAG-LENGTH', b'16'))
    if message['type'] == 'http.response.body':
        message['body'] = send_encryptor.update(message['body'])
        if not message.get('more_body'):
            message['body'] += send_encryptor.finalize() + send_encryptor.tag
    await send(message)

错误响应

由于在加密 / 解密过程中很多地方都可能需要个客户端响应异常,所以这里做集中的错误相应,同时错误响应也是加密的。

encryption_http_demo/middleware.py
encrypted = send_encryptor.update(
    error_msg.encode('utf-8')) + send_encryptor.finalize() + send_encryptor.tag
err_response = PlainTextResponse(encrypted, status_code=422,
                            headers={'ENCRYPTION-NONCE': base64.b64encode(send_nonce).decode('utf-8'),
                                     'ENCRYPTION-TAG-LENGTH': '16'})
await err_response(scope, receive, send)
AES Python Starlette FastAPI GCM 对称加密 AEAD HTTP
Creative Commons License This work is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License .