Python 以字符串形式请求 CA 证书

问题描述 投票:0回答:5

目前我们使用将CA证书放在服务器上的方法来访问第三方API。

certificate_path = os.path.join(CERT_PATH, 'cacert.pem')
certificate_key_path = os.path.join(CERT_PATH, 'cacert.key')
response = requests.get(url, cert=(certificate_path, certificate_key_path))

这可行,但出于安全目的,我们正在寻找而不是将 CA 证书存储在服务器上,而是存储在数据库中的

Accounts
表中(客户提出的安全原因)。

所以问题是:

  • 有什么方法可以直接将 CA 证书的字符串传递到

    requests
    (除了将内容写入临时文件之外)?

  • 是否还有其他

    http
    python 模块支持在
    http
    get/post 请求中传递 CA 证书字符串?

  • 我们是否应该使用其他方法来代替将它们存储在数据库和服务器中?

python python-requests ca
5个回答
9
投票

您提供的示例正在传递客户端证书,如请求文档中所示。

就目前情况而言,无法在内存中(或作为字符串)传递客户端证书和密钥。

猴子修补来救援 - 通过猴子修补

requests
,您可以添加从内存加载客户端证书和密钥的功能。以下补丁允许以各种格式传递客户端证书和密钥,而不会破坏现有功能。

import requests
from OpenSSL.crypto import PKCS12, X509, PKey


def _is_key_file_encrypted(keyfile):
    '''In memory key is not encrypted'''
    if isinstance(keyfile, PKey):
        return False
    return _is_key_file_encrypted.original(keyfile)


class PyOpenSSLContext(requests.packages.urllib3.contrib.pyopenssl.PyOpenSSLContext):
    '''Support loading certs from memory'''
    def load_cert_chain(self, certfile, keyfile=None, password=None):
        if isinstance(certfile, X509) and isinstance(keyfile, PKey):
            self._ctx.use_certificate(certfile)
            self._ctx.use_privatekey(keyfile)
        else:
            super().load_cert_chain(certfile, keyfile=keyfile, password=password)


class HTTPAdapter(requests.adapters.HTTPAdapter):
    '''Handle a variety of cert types'''
    def cert_verify(self, conn, url, verify, cert):
        if cert:
            # PKCS12
            if isinstance(cert, PKCS12):
                conn.cert_file = cert.get_certificate()
                conn.key_file = cert.get_privatekey()
                cert = None
            elif isinstance(cert, tuple) and len(cert) == 2:
                # X509 and PKey
                if isinstance(cert[0], X509) and hasattr(cert[1], PKey):
                    conn.cert_file = cert[0]
                    conn.key_file = cert[1]
                    cert = None
                # cryptography objects
                elif hasattr(cert[0], 'public_bytes') and hasattr(cert[1], 'private_bytes'):
                    conn.cert_file = X509.from_cryptography(cert[0])
                    conn.key_file = PKey.from_cryptography_key(cert[1])
                    cert = None
        super().cert_verify(conn, url, verify, cert)


def patch_requests(adapter=True):
    '''You can perform a full patch and use requests as usual:

    >>> patch_requests()
    >>> requests.get('https://httpbin.org/get')

    or use the adapter explicitly:

    >>> patch_requests(adapter=False)
    >>> session = requests.Session()
    >>> session.mount('https', HTTPAdapter())
    >>> session.get('https://httpbin.org/get')
    '''
    if hasattr(requests.packages.urllib3.util.ssl_, '_is_key_file_encrypted'):
        _is_key_file_encrypted.original = requests.packages.urllib3.util.ssl_._is_key_file_encrypted
        requests.packages.urllib3.util.ssl_._is_key_file_encrypted = _is_key_file_encrypted
    requests.packages.urllib3.util.ssl_.SSLContext = PyOpenSSLContext
    if adapter:
        requests.sessions.HTTPAdapter = HTTPAdapter

要使用补丁,您可以执行以下操作(假设上面的代码位于名为

patch.py
的文件中)

import os
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from patch import patch_requests


CLIENT_CERT = serialization.load_pem_x509_certificate(
    os.getenv('CLIENT_CERT'), default_backend())
CLIENT_KEY = serialization.load_pem_private_key(
    os.getenv('CLIENT_KEY'), None, default_backend())


# monkey patch load_cert_chain to allow loading
# cryptography certs and keys from memory
patch_requests()


response = requests.get(url, cert=(CLIENT_CERT, CLIENT_KEY))

您现在能够以

pyopenssl
对象或
cryptography
对象的形式向内存中的请求提供客户端证书。


6
投票

有一种方法可以通过临时文件来做到这一点,如下所示:

cert = tempfile.NamedTemporaryFile(delete=False)
cert.write(CERTIFICATE_AS_STRING)
cert.close()
requests.get(url, cert=cert.name, verify=True)
os.unlink(cert.name)

如果您想知道为什么这可能不安全,请在此处查看我的答案:https://stackoverflow.com/a/46570264/6445270


4
投票

如果想在不使用临时文件的情况下执行此操作,可以通过覆盖请求 SSLContext 来实现。示例可以在这个答案中看到。


3
投票

我采取了不同的方法并使用 init_poolmanager 来设置 ssl 上下文。我避免修补,因此它仅适用于 Session 对象。

前:

#pip install requests pyOpenSSL

import OpenSSL
import requests
import requests.hooks
from urllib3 import Retry
from urllib3.contrib.pyopenssl import PyOpenSSLContext
from urllib3.util.ssl_ import create_urllib3_context


class ClientSideCertificateHTTPAdapter(requests.adapters.HTTPAdapter):
    DEFAULT_PROTOCOL = create_urllib3_context().protocol

    def __init__(self, *args, cert, key, protocol=DEFAULT_PROTOCOL, **kwargs):
        self._cert = cert
        self._key = key
        self._protocol = protocol
        super().__init__(*args, **kwargs)


    def init_poolmanager(self, *args, **kwargs):
        ctx = PyOpenSSLContext(self._protocol)
        kwargs["ssl_context"] = ctx
        ctx._ctx.use_certificate(self._cert)
        ctx._ctx.use_privatekey(self._key)
        return super().init_poolmanager(*args, **kwargs)


def main():
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, "-----BEGIN CERTIFICATE----- MIIDnjC....cUkiz -----END CERTIFICATE-----")
    key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, "-----BEGIN CERTIFICATE----- MIIDnjC....cUkiz -----END CERTIFICATE-----", b"passphrase_goes_here")

    adapter = ClientSideCertificateHTTPAdapter(cert=cert, key=key, max_retries=Retry(total=10, backoff_factor=0.5))
    session = requests.Session()
    session.mount("https://www.hotmail.com/", adapter)
    session.get("https://www.hotmail.com/api/v2/mail")


if __name__ == "__main__":
    main()

0
投票

由于我无法像 openssl 那样拥有额外的依赖项,因此可以使用

requests
库和临时文件进行替代:

import contextlib
import os
from tempfile import NamedTemporaryFile

import requests

cert_key_str = """
-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----
"""

cert_str = """
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
"""

url = "https://..."


@contextlib.contextmanager
def create_temporary_file(suffix=None):
    """
    Context that introduces a temporary file.
    Adapted from:
    https://stackoverflow.com/a/57701186/5818549

    Creates a temporary file, yields its name, and upon context exit, deletes it.
    (In contrast, tempfile.NamedTemporaryFile() provides a 'file' object and
    deletes the file as soon as that file object is closed, so the temporary file
    cannot be safely re-opened by another library or process.)

    Args:
      suffix: desired filename extension (e.g. '.mp4').

    Yields:
      The name of the temporary file.
    """
    try:
        temporaty_file = NamedTemporaryFile(suffix=suffix, delete=False)
        temporaty_filename = temporaty_file.name
        temporaty_file.close()
        yield temporaty_filename
    finally:
        os.unlink(temporaty_filename)


with (
    create_temporary_file() as cert_filename,
    create_temporary_file() as key_filename
):
    with (
        open(cert_filename, "w") as cert_file,
        open(key_filename, "w") as key_file
    ):
        cert_file.write(cert_str)
        key_file.write(cert_key_str)

    final_conn = requests.get(
        url,
        cert=(cert_filename, key_filename),
    )
    print(final_conn.text)

© www.soinside.com 2019 - 2024. All rights reserved.