如何使用 GCP KMS 获取有效且确定性的签名并恢复以太坊交易的地址?

问题描述 投票:0回答:1

我想使用 GCP KMS 签署以太坊交易并恢复发件人地址。然而,奇怪的是,当我提供完全相同的交易时,我没有得到相同的签名。我使用的 HSM 密钥采用

ec-sign-secp256k1-sha256
算法。

以下是我用来签署交易并恢复发件人地址的代码:

from google.cloud import kms
from google.oauth2 import service_account
from crcmod.predefined import mkPredefinedCrcFun  # type: ignore
from dotenv import load_dotenv
from json import loads
from web3 import Web3
from rlp import encode
from eth_utils import (
    add_0x_prefix,
    keccak,
    remove_0x_prefix,
    to_bytes,
    to_int,
    to_checksum_address,
    int_to_big_endian,
    big_endian_to_int,
)
from eth_keys import keys

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from pyasn1.type import univ, namedtype
from pyasn1.codec.der.decoder import decode

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = loads(os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))
CREDENTIALS = service_account.Credentials.from_service_account_info(
    GOOGLE_APPLICATION_CREDENTIALS
)

class EcdsaSig(univ.Sequence):
    componentType = namedtype.NamedTypes(
        namedtype.NamedType("r", univ.Integer()),
        namedtype.NamedType("s", univ.Integer()),
    )

    def decode_ecdsa_sig(self, asn_string_buffer):
        ecdsa_sig, _ = decode(asn_string_buffer, asn1Spec=EcdsaSig())

        return ecdsa_sig["r"], ecdsa_sig["s"]


class HSM:
    client: kms.KeyManagementServiceClient
    key_version_name: str
    address: str

    def __init__(
        self,
        project_id: str,
        location_id: str,
        key_ring_id: str,
        key_id: str,
        version_id: str,
    ) -> None:
        self.client = kms.KeyManagementServiceClient(credentials=CREDENTIALS)
        self.key_version_name = self.client.crypto_key_version_path(
            project_id, location_id, key_ring_id, key_id, version_id
        )
        self.address = ""

    def sign(self, transaction: dict) -> None:
        message_bytes = to_bytes(hexstr=self.__rlp_encode_transaction(transaction))
        hash_message = keccak(primitive=message_bytes)
        signature, r, s = self.__request_kms_signature(hash_message)
        self.__derive_ethereum_address()
        _, v = self.__determine_correct_v(hash_message, r, s)

        sig = self.__join_signature(r, s, v)
        print(sig)

        return sig

    def derive_address(self) -> str:
        self.__derive_ethereum_address()

        return self.address

    def __request_kms_signature(self, hashed_message: bytes) -> tuple[bytes, int, int]:
        digest = {"sha256": hashed_message}
        digest_crc32c = self.__crc32c(hashed_message)

        sign_request = kms.AsymmetricSignRequest(
            name=self.key_version_name, digest=digest, digest_crc32c=digest_crc32c
        )

        sign_response = self.client.asymmetric_sign(request=sign_request)
        print(len(sign_response.signature.hex()))
        r, s = self.__find_ethereum_sig(sign_response.signature)

        return sign_response.signature, r, s

    def __crc32c(self, data: bytes) -> int:
        crc32c_fun = mkPredefinedCrcFun("crc-32c")
        return crc32c_fun(data)

    def __pem_to_der(self, pem_key: str) -> bytes:
        # Deserialize PEM-encoded key from the provided string
        private_key = serialization.load_pem_public_key(
            pem_key.encode(),  # Convert PEM string to bytes
            backend=default_backend(),
        )

        # Serialize the key to DER format
        der_key = private_key.public_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PublicFormat.SubjectPublicKeyInfo,
        )

        return der_key

    def __derive_ethereum_address(self) -> None:
        if not self.address:
            public_key = self.client.get_public_key(
                request={"name": self.key_version_name}
            )
            der_public_key = self.__pem_to_der(public_key.pem)
            eth_address = keccak(primitive=der_public_key)[-20:]
            self.address = to_checksum_address(eth_address)

    def __find_ethereum_sig(self, signature: bytes) -> tuple[int, int]:
        r, s = EcdsaSig().decode_ecdsa_sig(signature)
        secp256k1_N = int(
            "fffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141", 16
        )

        s = secp256k1_N - s if s > secp256k1_N / 2 else s

        return int(r), int(s)

    def __determine_correct_v(self, message: bytes, r: int, s: int) -> tuple[str, int]:
        v = 0
        recoverd_address = self.__recover_pub_key_from_sig(message, r, s, v)
        print(recoverd_address)
        print(self.address)
        if recoverd_address.lower() != self.address.lower():
            # if the pub key for v = 27 does not match
            # it has to be v = 28
            v = 1
            recoverd_address = self.__recover_pub_key_from_sig(message, r, s, v)

        return recoverd_address, v + 27

    def __recover_pub_key_from_sig(self, message: bytes, r: int, s: int, v: int) -> str:
        signature = keys.Signature(vrs=(v, r, s))
        public_key = signature.recover_public_key_from_msg(message)
        address = public_key.to_checksum_address()

        return address

    def __join_signature(self, r: int, s: int, v: int) -> str:
        # Ensure r, s, and v are integers
        r, s, v = map(to_bytes, (r, s, v))
        signature_hex = int_to_big_endian(big_endian_to_int(r + s + v)).hex()

        return signature_hex

    def __rlp_encode_transaction(self, tx: dict) -> str:
        encoded_params = encode(
            [
                tx["chainId"],
                tx["nonce"],
                tx["maxPriorityFeePerGas"],
                tx["maxFeePerGas"],
                tx["gas"],
                to_bytes(hexstr=tx["to"]),
                tx["value"],
                to_bytes(hexstr=tx["data"]),
                [],
            ]
        )

        return self.__add_transaction_type(encoded_params.hex())   

    def __add_transaction_type(self, payload: str) -> str:
        return f"02{remove_0x_prefix(payload)}"


w3 = Web3(
    Web3.HTTPProvider(
        "RPC_URL"
    )
)

hsm = HSM(
    "innovation-sandbox-1",
    "southamerica-east1",
    "my-key-ring",
    "solidity",
    "1",
)

# Example Ethereum transaction data
nonce = w3.eth.get_transaction_count("ONE_ADDRESS")
gas_price = w3.to_wei("20", "gwei")
gas_limit = 21000
to_address = "ANOTHER_ADDRESS"
value = w3.to_wei("1", "ether")

# Create the Ethereum transaction
transaction = {
    "chainId": w3.eth.chain_id,
    "nonce": nonce,
    "maxPriorityFeePerGas": w3.eth.max_priority_fee,
    "maxFeePerGas": gas_price,
    "gas": gas_limit,
    "to": to_address,
    "value": value,
    "data": "0x",
}


hsm.sign(transaction)

如果有人能帮助我解决这个问题,我将非常感激。谢谢您的宝贵时间。

python cryptography ethereum digital-signature google-cloud-kms
1个回答
-2
投票

比特币的日益普及和价值引起了网络犯罪分子的注意,导致盗窃、诈骗和资产丢失的情况增加。因此,对有效的比特币恢复服务的需求显着增长。该领域领先的服务数字资产恢复实现了高比特币恢复率。通过了解促成成功的因素以及先进技术的作用,个人可以更好地保护他们的比特币资产,并在必要时利用专业的恢复服务。不幸的是,如果没有专业知识和一流的工具,成功找回丢失的比特币的机会可能就像中彩票的机会一样渺茫。这就是网络奇才的用武之地 - Digital Assets Recovery 背后的非凡团队,Digital Assets Recovery 不是普通的技术支持热线。他们是穿着斗篷,或者更确切地说,穿着连帽运动衫的超级英雄,随时准备拯救那些失去了数字财富的心烦意乱的比特币所有者。数字资产恢复专门致力于恢复丢失或无法访问的比特币,为那些认为一切都已丢失的人带来了一线希望。数字资产恢复提供一系列量身定制的服务,以满足陷入困境的比特币所有者的独特需求。他们的网页是www.digitalassetrecovery.info从密码恢复到钱包找回,他们的技术魔法师团队利用他们的专业知识在错综复杂的数字迷宫中导航并挽救曾经看似无法挽回的东西。请访问 Digital Assets Recovery 的网站,了解有关其服务的更多信息:digitalassetsrecovery(@)writeme.com

© www.soinside.com 2019 - 2024. All rights reserved.