Skip to content

modules

Top-level package for Ethereum KMS Signer.

cli

Console script for ethereum_kms_signer.

kms

BasicKmsAccount

Kinda compatible eth_keys.PrivateKey class

Signature

Kinda compatible Signature class

SignedTransaction

Kinda compatible SignedTransaction class

__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in ethereum_kms_signer/kms.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, rawTransaction, hash, r, s, v) special staticmethod

Create new instance of SignedTransaction(rawTransaction, hash, r, s, v)

__repr__(self) special

Return a nicely formatted representation string

Source code in ethereum_kms_signer/kms.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

get_eth_address(key_id, kms_client=None)

Calculate ethereum address for given AWS KMS key.

Source code in ethereum_kms_signer/kms.py
def get_eth_address(key_id: str, kms_client: KMSClient = None) -> str:
    """Calculate ethereum address for given AWS KMS key."""
    if kms_client is None:
        kms_client = boto3.client("kms")
    pubkey = kms_client.get_public_key(KeyId=key_id)["PublicKey"]
    return der_encoded_public_key_to_eth_address(pubkey)

sign_transaction(tx_obj, key_id, kms_client=None)

Sign a transaction object with given AWS KMS key.

Source code in ethereum_kms_signer/kms.py
def sign_transaction(
    tx_obj: dict, key_id: str, kms_client: KMSClient = None
) -> SignedTransaction:
    """Sign a transaction object with given AWS KMS key."""
    if kms_client is None:
        kms_client = boto3.client("kms")
    kms_pub_key_bytes = kms_client.get_public_key(KeyId=key_id)["PublicKey"]
    address = der_encoded_public_key_to_eth_address(kms_pub_key_bytes)
    kms_account = BasicKmsAccount(key_id, address, kms_client)
    return _sign_transaction(tx_obj, address, kms_account)

spki

der_encoded_public_key_to_eth_address(pubkey)

Given a KMS Public Key, calculate the ethereum address.

Source code in ethereum_kms_signer/spki.py
def der_encoded_public_key_to_eth_address(pubkey: bytes) -> str:
    """
    Given a KMS Public Key, calculate the ethereum address.
    """
    received_record, _ = der_decode(pubkey, asn1Spec=SPKIRecord())
    return public_key_int_to_eth_address(
        int(received_record["subjectPublicKey"].asBinary(), 2)
    )

get_sig_r_s(signature)

Given a KMS signature, calculate r and s.

Source code in ethereum_kms_signer/spki.py
def get_sig_r_s(signature: bytes) -> Tuple[int, int]:
    """
    Given a KMS signature, calculate r and s.
    """
    received_record, _ = der_decode(signature, asn1Spec=ECDSASignatureRecord())
    r = int(received_record["r"].prettyPrint())
    s = int(received_record["s"].prettyPrint())

    max_value_on_curve = (
        0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
    )

    if 2 * s >= max_value_on_curve:
        # s is on wrong side of curve, flip it
        s = max_value_on_curve - s
    return r, s

get_sig_r_s_v(msg_hash, signature, address)

Given a message hash, a KMS signature and an ethereum address calculate r, s, and v.

Source code in ethereum_kms_signer/spki.py
def get_sig_r_s_v(
    msg_hash: bytes, signature: bytes, address: str
) -> Tuple[int, int, int]:
    """
    Given a message hash, a KMS signature and an ethereum address calculate r,
    s, and v.
    """
    r, s = get_sig_r_s(signature)
    v = get_sig_v(msg_hash, r, s, address)
    return r, s, v

get_sig_v(msg_hash, r, s, expected_address)

Given a message hash, r, s and an ethereum address, recover the recovery parameter v.

Source code in ethereum_kms_signer/spki.py
def get_sig_v(msg_hash: bytes, r: int, s: int, expected_address: str) -> int:
    """
    Given a message hash, r, s and an ethereum address, recover the
    recovery parameter v.
    """
    acc = Account()
    recovered = acc._recover_hash(msg_hash, vrs=(27, r, s))
    recovered2 = acc._recover_hash(msg_hash, vrs=(28, r, s))
    expected_checksum_address = to_checksum_address(expected_address)

    if recovered == expected_checksum_address:
        return 0
    elif recovered2 == expected_checksum_address:
        return 1

    raise ValueError("Invalid Signature, cannot compute v, addresses do not match!")

public_key_int_to_eth_address(pubkey)

Given an integer public key, calculate the ethereum address.

Source code in ethereum_kms_signer/spki.py
def public_key_int_to_eth_address(pubkey: int) -> str:
    """
    Given an integer public key, calculate the ethereum address.
    """
    hex_string = hex(pubkey).replace("0x", "")
    padded_hex_string = hex_string.replace("0x", "").zfill(130)[2:]

    k = keccak.new(digest_bits=256)
    k.update(bytes.fromhex(padded_hex_string))
    return to_checksum_address(bytes.fromhex(k.hexdigest())[-20:].hex())