使用 AES 加密 HTTP Body
AES
AES(Advanced Encryption Standard)是一种使用分组加密(Block cipher)的对称密钥算法, 所谓对称加密算法,就是双方先约定好一个密钥,在通信过程中使用同一对密钥进行加密和解密。
入参
在使用 AES 算法加密 / 解密的过程中,调用方只需要关注 3 个入参:
- 密钥(Key)
- 初始向量(Initialization Vector)
- 明文 / 密文
密钥
在 AES 中,密钥的长度可以是 128、192 或 256 bits,加密强度随密钥的长度增加。密钥必须严格保密。
明文 / 密文
前文提到一个很重要的概念——分组加密,简单说就是算法会把明文切分成多块固定长度,然后分块进行加密。 在 AES 中,块的长度固定是 128 bits。
通常来讲,分组密码要求明文的长度必须是块的整数倍,以便切分,所以要 AES 求明文的长度必须是 128 bits 的整数倍。 对于长度不足的明文,此时就引入了另一个概念——填充,即利用特定的规则(填充方式),将明文填到符合要求的长度。 但也存在某些特殊情况,AES 的某些工作模式下可以切换为流密码(Stream cipher),此时不要求明文的长度必须是块的整数倍。
初始向量
初始向量就是在加密过程中使用的一个随机数,主要用于保证密文的随机性,防止相同的明文加密后产出的密文也相同,初始向量不要求保密。
在某些情况下,并不要求初始向量的强随机性,只要求是一个长时间不会重复的数字(例如累加的数字),此时可以称之为 Nonce(一次性数字)。
相同的密钥和初始向量加密相同的明文,会输出相同的密文,这会导致加密无法抵御重放攻击。所以无论什么情况下,都不要重复使用初始化向量。
工作模式
以下列举一些常见的 AES 工作模式:
ECB
电子密码本(Electronic codebook)模式将明文分块后,每个块都使用相同的密钥进行独立加密,此模式不需要 IV。 相同的明文块加密后生成相同的密文块,不利于安全性。
CBC
密码分组链接(Cipher-block chaining)模式将每个明文块与前一个密文块进行异或操作,然后再进行加密,以此来实现加密的链式反应, 每个明文块的加密都依赖于前一个密文块的值。
由于第一个明文块没有前文与之进行异或操作,所以此模式需要 IV 来对第一个块进行加密。
CFB
密文反馈(Cipher feedback)模式将前一个密文块作为输入,生成一串伪随机数流,并与明文进行异或操作,得到密文。 这种模式具有较好的性能,但是对于错误的同步或传输问题,安全性不如 CBC 模式。
此模式可以将块密码变为自同步的流密码。
与 CBC 相似,此模式需要 IV。
OFB
输出反馈(Output feedback)模式与 CFB 模式类似,但是 OFB 模式使用的是加密函数的输出,而不是密文块。因此,它具有较好的性能和安全性。
此模式可以将块密码变成同步的流密码。
此模式需要 IV。
CTR
计数器(Counter)模式使用一个计数器和一个随机数生成器生成一系列的伪随机数流,并将其与明文异或得到密文。 CTR 模式具有良好的性能和安全性,因为它不依赖于前一个密文块,也不需要像 CBC 模式那样的填充操作。
此模式可以将块密码变为流密码。
此模式需要 Nonce。
AEAD
认证加密(Authenticated Encryption)和带有关联数据的认证加密 (Authenticated Encryption with Associated Data)是一种能够同时保证数据的保密性、 完整性和真实性的一种加密模式。 AEAD 是 AE 的变种,区别在于 AEAD 可以附带额外的认证关联信息。
前述的 AES 工作模式中,AES 都只提供了加密的功能,无法阻止攻击者对加密后的密文进行攻击,所以需要对加密 / 解密过程再增加一个认证的步骤, 用于确保数据的保密性、 完整性和真实性。
最简单的认证方式就是 HMAC(Hash-based Message Authentication Code),简单说就是基于一个密钥, 将明文算出 Hash,并将 Hash 与密文一同传输。接收方解密后再对明文进行相同的步骤计算出 Hash 再比对。
另一种方式是使用 AEAD 模式,目前最常见的 AEAD 模式就是 GCM(Galois/Counter Mode)。
GCM 是基于 CTR 的一种工作模式,它将加密和完整性保护合并到了一起,可以在保证安全性的同时提高加密效率。 同时还可以附加数据(Additional Authenticated Data),允许在加密和身份验证过程中传输额外的数据,从而增加了灵活性。 使用此模式时,除了生成密文,还会生成一个用于认证的 Tag,长度为 16 bytes,可以视情况截取长度。
铺垫结束,至此,我大概了解了使用 AES 加密消息的一些基本知识,接下来是实践环节。
选型
- 加密方法:AES-256-GCM,256 是指 Key 的长度
- 加密库:pyca/cryptography
- Web 框架:encode/starlette,兼容 FastAPI
- 测试:unittest
- HTTP 客户端:encode/httpx
实现思路
在 Starlette 中有两种方式可以在不侵入业务代码的情况下实现 AES 的加密 / 解密逻辑,一是中间件(Middleware), 另一种方式是直接给 Handler 添加装饰器。两种方式都可以实现,但是我想对 HTTP 的流式请求和响应也做加密和解密的处理, 同时考虑与 FastAPI 的兼容性,且使用 AES-256-GCM 的初衷也是因为其支持流式加密,不用把所有数据都 Load 到内存当中, 更节省内存空间,所以中间件会比较有优势。
另外,AES-256-GCM 加密 / 解密过程中需要的 Nonce 和 Tag 长度则分别放到
HTTP Headers 的 ENCRYPTION-NONCE
和 ENCRYPTION-TAG-LENGTH
字段中,
由于 Nonce 是 Byte 类型的数据,所以 base64 编码后传输。
由于 Tag 需要所有的数据都加密完毕才能生成,所以把 Tag 追加到密文的末尾,
接收方通过 HTTP Headers 中的 ENCRYPTION-TAG-LENGTH
长度截取 Tag。
Starlette 的一个小坑
Starlette 的 Request 对象有一个特点就是 Body 中的数据只能被消费一次,如果多次消费,会造成 IO 堵塞,详情见 #495。
解决方案:对于在中间件中消费 Body 的场景,Starlette 已经提出了解决方案—— Pure ASGI Middleware,在这个中间件中, 调用者直接处理底层 ASGI 的事件,这是在 Request 对象生成前就定义好的 Hook,所以不会造成 Body 的多次消费。
Demo
具体实现代码见 YogiLiu/encryption_http_demo,这里只对关键部分进行说明。 中间件的具体代码实现在 encryption_http_demo/middleware.py 中。
分离 Tag
由于 Tag 被附加到 Body 的末尾,且 Body 可能是流式的数据,所以这里需要一个结构来将 Tag 从 Body 中分离出来,对应的代码为:
class _BufferTag:
"""
Separate tag from bytes, assuming that tag is appended to end of data.
:param length: tag length
"""
def __init__(self, length: int):
self._buf = b''
self._len = length
self._is_finalize = False
def update(self, s: bytes) -> bytes:
"""
Input data bytes and return non-tag part ciphertext.
:param s: data bytes
:return: ciphertext
"""
if self._is_finalize:
raise ValueError('finalized')
self._buf += s
front = self._buf[:-self._len]
self._buf = self._buf[-self._len:]
return front
def finalize(self) -> bytes:
"""
Get tag bytes and disable continue to update.
:return: tag bytes
"""
self._is_finalize = True
return self._buf
_BufferTag
的目的就是留住最后的 n 个字节,每次接收到新的 chunk 的时候,都调用 update
方法,
update
方法同时会返回非 Tag 的部分,最后当 Body 结束了,_BufferTag
中剩余的部分就是 Tag,通过 finalize
方法读取即可。
解密 Request
async def receive_wrapper() -> Message:
message = await receive()
data = buf.update(message['body'])
try:
message['body'] = receive_decryptor.update(data)
if not message.get('more_body'):
# Call `finalize_with_tag` instead of `finalize` to delay validation of the authentication tag.
message['body'] += receive_decryptor.finalize_with_tag(buf.finalize())
except exceptions.InvalidTag as error:
raise ExceptionWrapper('invalid tag') from error
return message
注意如果解密失败,要手动捕获解密失败的异常(exceptions.InvalidTag
)并手动抛出 ExceptionWrapper
,这里的目的是为了在后面捕获
异常是只捕获由解密过程触发的异常,如下:
try:
await self.app(scope, receive_wrapper, send_wrapper)
return
except ExceptionWrapper as err:
error_msg = str(err)
若此处直接捕获 exceptions.InvalidTag
,可能会捕获到加密过程中触发的异常,不利于调试。
加密 Response
这里使用了 os.urandom
来生成 Nonce,这是一个随机性强的方法,对于 Nonce 来说其实不需要这么强的随机性。
send_nonce = os.urandom(16)
send_algorithm = algorithms.AES(self._secret_key)
send_cipher = Cipher(send_algorithm, mode=modes.GCM(send_nonce))
send_encryptor = send_cipher.encryptor()
加密的过程其实很简单,只要处理好事件及数据的加密逻辑就行。
async def send_wrapper(message: Message) -> None:
if message['type'] == 'http.response.start':
message['headers'].append((b'ENCRYPTION-NONCE', base64.b64encode(send_nonce)))
message['headers'].append((b'ENCRYPTION-TAG-LENGTH', b'16'))
if message['type'] == 'http.response.body':
message['body'] = send_encryptor.update(message['body'])
if not message.get('more_body'):
message['body'] += send_encryptor.finalize() + send_encryptor.tag
await send(message)
错误响应
由于在加密 / 解密过程中很多地方都可能需要个客户端响应异常,所以这里做集中的错误相应,同时错误响应也是加密的。
encrypted = send_encryptor.update(
error_msg.encode('utf-8')) + send_encryptor.finalize() + send_encryptor.tag
err_response = PlainTextResponse(encrypted, status_code=422,
headers={'ENCRYPTION-NONCE': base64.b64encode(send_nonce).decode('utf-8'),
'ENCRYPTION-TAG-LENGTH': '16'})
await err_response(scope, receive, send)