Recursive Length Prefix (RLP) Encoding

Introduction

Defines the serialization and deserialization format used throughout Ethereum.

Module Contents

Functions

encode

Encodes raw_data into a sequence of bytes using RLP.

encode_bytes

Encodes raw_bytes, a sequence of bytes, using RLP.

encode_sequence

Encodes a list of RLP encodable objects (raw_sequence) using RLP.

get_joined_encodings

Obtain concatenation of rlp encoding for each item in the sequence

decode

Decodes an integer, byte sequence, or list of RLP encodable objects

decode_to

Decode the bytes in encoded_data to an object of type cls. cls can be

_decode_to

Decode the rlp structure in encoded_data to an object of type cls.

decode_to_bytes

Decodes a rlp encoded byte stream assuming that the decoded data

decode_to_sequence

Decodes a rlp encoded byte stream assuming that the decoded data

decode_joined_encodings

Decodes joined_encodings, which is a concatenation of RLP encoded

decode_item_length

Find the length of the rlp encoding for the first object in the

rlp_hash

Obtain the keccak-256 hash of the rlp encoding of the passed in data.

Attributes

RLP

T

Module Details

RLP

RLP
RLP = Any

encode

encode(raw_data: RLP)ethereum.base_types.Bytes

Encodes raw_data into a sequence of bytes using RLP.

Parameters

raw_data – A Bytes, Uint, Uint256 or sequence of RLP encodable objects.

Returns

encoded – The RLP encoded bytes representing raw_data.

Return type

ethereum.base_types.Bytes

def encode(raw_data: RLP) -> Bytes:
    if isinstance(raw_data, (bytearray, bytes)):
        return encode_bytes(raw_data)
    elif isinstance(raw_data, (Uint, FixedUInt)):
        return encode(raw_data.to_be_bytes())
    elif isinstance(raw_data, str):
        return encode_bytes(raw_data.encode())
    elif isinstance(raw_data, bool):
        if raw_data:
            return encode_bytes(b"\x01")
        else:
            return encode_bytes(b"")
    elif isinstance(raw_data, Sequence):
        return encode_sequence(raw_data)
    elif is_dataclass(raw_data):
        return encode(astuple(raw_data))
    else:
        raise RLPEncodingError(
            "RLP Encoding of type {} is not supported".format(type(raw_data))
        )

encode_bytes

encode_bytes(raw_bytes: ethereum.base_types.Bytes)ethereum.base_types.Bytes

Encodes raw_bytes, a sequence of bytes, using RLP.

Parameters

raw_bytes – Bytes to encode with RLP.

Returns

encoded – The RLP encoded bytes representing raw_bytes.

Return type

ethereum.base_types.Bytes

def encode_bytes(raw_bytes: Bytes) -> Bytes:
    len_raw_data = Uint(len(raw_bytes))

    if len_raw_data == 1 and raw_bytes[0] < 0x80:
        return raw_bytes
    elif len_raw_data < 0x38:
        return bytes([0x80 + len_raw_data]) + raw_bytes
    else:
        # length of raw data represented as big endian bytes
        len_raw_data_as_be = len_raw_data.to_be_bytes()
        return (
            bytes([0xB7 + len(len_raw_data_as_be)])
            + len_raw_data_as_be
            + raw_bytes
        )

encode_sequence

encode_sequence(raw_sequence: Sequence[RLP])ethereum.base_types.Bytes

Encodes a list of RLP encodable objects (raw_sequence) using RLP.

Parameters

raw_sequence – Sequence of RLP encodable objects.

Returns

encoded – The RLP encoded bytes representing raw_sequence.

Return type

ethereum.base_types.Bytes

def encode_sequence(raw_sequence: Sequence[RLP]) -> Bytes:
    joined_encodings = get_joined_encodings(raw_sequence)
    len_joined_encodings = Uint(len(joined_encodings))

    if len_joined_encodings < 0x38:
        return Bytes([0xC0 + len_joined_encodings]) + joined_encodings
    else:
        len_joined_encodings_as_be = len_joined_encodings.to_be_bytes()
        return (
            Bytes([0xF7 + len(len_joined_encodings_as_be)])
            + len_joined_encodings_as_be
            + joined_encodings
        )

get_joined_encodings

get_joined_encodings(raw_sequence: Sequence[RLP])ethereum.base_types.Bytes

Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.

Parameters

raw_sequence – Sequence to encode with RLP.

Returns

joined_encodings – The concatenated RLP encoded bytes for each item in sequence raw_sequence.

Return type

ethereum.base_types.Bytes

def get_joined_encodings(raw_sequence: Sequence[RLP]) -> Bytes:
    return b"".join(encode(item) for item in raw_sequence)

decode

decode(encoded_data: ethereum.base_types.Bytes)RLP

Decodes an integer, byte sequence, or list of RLP encodable objects from the byte sequence encoded_data, using RLP.

Parameters

encoded_data – A sequence of bytes, in RLP form.

Returns

decoded_data – Object decoded from encoded_data.

Return type

RLP

def decode(encoded_data: Bytes) -> RLP:
    # Raising error as there can never be empty encoded data for any
    # given raw data (including empty raw data)
    # RLP Encoding(b'') -> [0x80]  # noqa: SC100
    # RLP Encoding([])  -> [0xc0]  # noqa: SC100
    ensure(
        len(encoded_data) > 0,
        RLPDecodingError("Cannot decode empty bytestring"),
    )

    if encoded_data[0] <= 0xBF:
        # This means that the raw data is of type bytes
        return decode_to_bytes(encoded_data)
    else:
        # This means that the raw data is of type sequence
        return decode_to_sequence(encoded_data)

T

T
T = TypeVar("T")

decode_to

decode_to(cls: Type[T], encoded_data: ethereum.base_types.Bytes)T

Decode the bytes in encoded_data to an object of type cls. cls can be a Bytes subclass, a dataclass, Uint, U256 or Tuple[cls].

Parameters
  • cls (Type[T]) – The type to decode to.

  • encoded_data – A sequence of bytes, in RLP form.

Returns

decoded_data – Object decoded from encoded_data.

Return type

T

def decode_to(cls: Type[T], encoded_data: Bytes) -> T:
    return _decode_to(cls, decode(encoded_data))

_decode_to

_decode_to(cls: Type[T], raw_rlp: RLP)T

Decode the rlp structure in encoded_data to an object of type cls. cls can be a Bytes subclass, a dataclass, Uint, U256, Tuple[cls, …], Tuple[cls1, cls2] or Union[Bytes, cls].

Parameters
  • cls (Type[T]) – The type to decode to.

  • raw_rlp – A decoded rlp structure.

Returns

decoded_data – Object decoded from encoded_data.

Return type

T

def _decode_to(cls: Type[T], raw_rlp: RLP) -> T:
    if isinstance(cls, type(Tuple[Uint, ...])) and cls._name == "Tuple":  # type: ignore # noqa: E501
        ensure(type(raw_rlp) == list, RLPDecodingError)
        if cls.__args__[1] == ...:  # type: ignore
            args = []
            for raw_item in raw_rlp:
                args.append(_decode_to(cls.__args__[0], raw_item))  # type: ignore # noqa: E501
            return tuple(args)  # type: ignore
        else:
            args = []
            ensure(len(raw_rlp) == len(cls.__args__), RLPDecodingError)  # type: ignore # noqa: E501
            for (t, raw_item) in zip(cls.__args__, raw_rlp):  # type: ignore
                args.append(_decode_to(t, raw_item))
            return tuple(args)  # type: ignore
    elif cls == Union[Bytes0, Bytes20]:
        # We can't support Union types in general, so we support this one
        # (which appears in the Transaction type) as a special case
        ensure(type(raw_rlp) == Bytes, RLPDecodingError)
        if len(raw_rlp) == 0:
            return Bytes0()  # type: ignore
        elif len(raw_rlp) == 20:
            return Bytes20(raw_rlp)  # type: ignore
        else:
            raise RLPDecodingError(
                "Bytes has length {}, expected 0 or 20".format(len(raw_rlp))
            )
    elif isinstance(cls, type(List[Bytes])) and cls._name == "List":  # type: ignore # noqa: E501
        ensure(type(raw_rlp) == list, RLPDecodingError)
        items = []
        for raw_item in raw_rlp:
            items.append(_decode_to(cls.__args__[0], raw_item))  # type: ignore
        return items  # type: ignore
    elif isinstance(cls, type(Union[Bytes, List[Bytes]])) and cls.__origin__ == Union:  # type: ignore # noqa: E501
        if len(cls.__args__) != 2 or Bytes not in cls.__args__:  # type: ignore
            raise RLPDecodingError(
                "RLP Decoding to type {} is not supported".format(cls)
            )
        if isinstance(raw_rlp, Bytes):
            return raw_rlp  # type: ignore
        elif cls.__args__[0] == Bytes:  # type: ignore
            return _decode_to(cls.__args__[1], raw_rlp)  # type: ignore
        else:
            return _decode_to(cls.__args__[0], raw_rlp)  # type: ignore
    elif issubclass(cls, bool):
        if raw_rlp == b"\x01":
            return cls(True)  # type: ignore
        elif raw_rlp == b"":
            return cls(False)  # type: ignore
        else:
            raise TypeError("Cannot decode {} as {}".format(raw_rlp, cls))
    elif issubclass(cls, FixedBytes):
        ensure(type(raw_rlp) == Bytes, RLPDecodingError)
        ensure(len(raw_rlp) == cls.LENGTH, RLPDecodingError)
        return raw_rlp
    elif issubclass(cls, Bytes):
        ensure(type(raw_rlp) == Bytes, RLPDecodingError)
        return raw_rlp
    elif issubclass(cls, (Uint, FixedUInt)):
        ensure(type(raw_rlp) == Bytes, RLPDecodingError)
        try:
            return cls.from_be_bytes(raw_rlp)  # type: ignore
        except ValueError:
            raise RLPDecodingError
    elif is_dataclass(cls):
        ensure(type(raw_rlp) == list, RLPDecodingError)
        assert isinstance(raw_rlp, list)
        args = []
        ensure(len(fields(cls)) == len(raw_rlp), RLPDecodingError)
        for (field, rlp_item) in zip(fields(cls), raw_rlp):
            args.append(_decode_to(field.type, rlp_item))
        return cls(*args)
    else:
        raise RLPDecodingError(
            "RLP Decoding to type {} is not supported".format(cls)
        )

decode_to_bytes

decode_to_bytes(encoded_bytes: ethereum.base_types.Bytes)ethereum.base_types.Bytes

Decodes a rlp encoded byte stream assuming that the decoded data should be of type bytes.

Parameters

encoded_bytes – RLP encoded byte stream.

Returns

decoded – RLP decoded Bytes data

Return type

ethereum.base_types.Bytes

def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
    if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80:
        return encoded_bytes
    elif encoded_bytes[0] <= 0xB7:
        len_raw_data = encoded_bytes[0] - 0x80
        ensure(len_raw_data < len(encoded_bytes), RLPDecodingError)
        raw_data = encoded_bytes[1 : 1 + len_raw_data]
        ensure(
            not (len_raw_data == 1 and raw_data[0] < 0x80), RLPDecodingError
        )
        return raw_data
    else:
        # This is the index in the encoded data at which decoded data
        # starts from.
        decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7
        ensure(
            decoded_data_start_idx - 1 < len(encoded_bytes), RLPDecodingError
        )
        # Expectation is that the big endian bytes shouldn't start with 0
        # while trying to decode using RLP, in which case is an error.
        ensure(encoded_bytes[1] != 0, RLPDecodingError)
        len_decoded_data = Uint.from_be_bytes(
            encoded_bytes[1:decoded_data_start_idx]
        )
        ensure(len_decoded_data >= 0x38, RLPDecodingError)
        decoded_data_end_idx = decoded_data_start_idx + len_decoded_data
        ensure(decoded_data_end_idx - 1 < len(encoded_bytes), RLPDecodingError)
        return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx]

decode_to_sequence

decode_to_sequence(encoded_sequence: ethereum.base_types.Bytes)List[RLP]

Decodes a rlp encoded byte stream assuming that the decoded data should be of type Sequence of objects.

Parameters

encoded_sequence – An RLP encoded Sequence.

Returns

decoded – Sequence of objects decoded from encoded_sequence.

Return type

Sequence[RLP]

def decode_to_sequence(encoded_sequence: Bytes) -> List[RLP]:
    if encoded_sequence[0] <= 0xF7:
        len_joined_encodings = encoded_sequence[0] - 0xC0
        ensure(len_joined_encodings < len(encoded_sequence), RLPDecodingError)
        joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings]
    else:
        joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7
        ensure(
            joined_encodings_start_idx - 1 < len(encoded_sequence),
            RLPDecodingError,
        )
        # Expectation is that the big endian bytes shouldn't start with 0
        # while trying to decode using RLP, in which case is an error.
        ensure(encoded_sequence[1] != 0, RLPDecodingError)
        len_joined_encodings = Uint.from_be_bytes(
            encoded_sequence[1:joined_encodings_start_idx]
        )
        ensure(len_joined_encodings >= 0x38, RLPDecodingError)
        joined_encodings_end_idx = (
            joined_encodings_start_idx + len_joined_encodings
        )
        ensure(
            joined_encodings_end_idx - 1 < len(encoded_sequence),
            RLPDecodingError,
        )
        joined_encodings = encoded_sequence[
            joined_encodings_start_idx:joined_encodings_end_idx
        ]

    return decode_joined_encodings(joined_encodings)

decode_joined_encodings

decode_joined_encodings(joined_encodings: ethereum.base_types.Bytes)List[RLP]

Decodes joined_encodings, which is a concatenation of RLP encoded objects.

Parameters

joined_encodings – concatenation of RLP encoded objects

Returns

decoded – A list of objects decoded from joined_encodings.

Return type

List[RLP]

def decode_joined_encodings(joined_encodings: Bytes) -> List[RLP]:
    decoded_sequence = []

    item_start_idx = 0
    while item_start_idx < len(joined_encodings):
        encoded_item_length = decode_item_length(
            joined_encodings[item_start_idx:]
        )
        ensure(
            item_start_idx + encoded_item_length - 1 < len(joined_encodings),
            RLPDecodingError,
        )
        encoded_item = joined_encodings[
            item_start_idx : item_start_idx + encoded_item_length
        ]
        decoded_sequence.append(decode(encoded_item))
        item_start_idx += encoded_item_length

    return decoded_sequence

decode_item_length

decode_item_length(encoded_data: ethereum.base_types.Bytes)int

Find the length of the rlp encoding for the first object in the encoded sequence. Here encoded_data refers to concatenation of rlp encoding for each item in a sequence.

NOTE - This is a helper function not described in the spec. It was introduced as the spec doesn’t discuss about decoding the RLP encoded data.

Parameters

encoded_data – RLP encoded data for a sequence of objects.

Returns

rlp_length

Return type

int

def decode_item_length(encoded_data: Bytes) -> int:
    # Can't decode item length for empty encoding
    ensure(len(encoded_data) > 0, RLPDecodingError)

    first_rlp_byte = Uint(encoded_data[0])

    # This is the length of the big endian representation of the length of
    # rlp encoded object byte stream.
    length_length = Uint(0)
    decoded_data_length = 0

    # This occurs only when the raw_data is a single byte whose value < 128
    if first_rlp_byte < 0x80:
        # We return 1 here, as the end formula
        # 1 + length_length + decoded_data_length would be invalid for
        # this case.
        return 1
    # This occurs only when the raw_data is a byte stream with length < 56
    # and doesn't fall into the above cases
    elif first_rlp_byte <= 0xB7:
        decoded_data_length = first_rlp_byte - 0x80
    # This occurs only when the raw_data is a byte stream and doesn't fall
    # into the above cases
    elif first_rlp_byte <= 0xBF:
        length_length = first_rlp_byte - 0xB7
        ensure(length_length < len(encoded_data), RLPDecodingError)
        # Expectation is that the big endian bytes shouldn't start with 0
        # while trying to decode using RLP, in which case is an error.
        ensure(encoded_data[1] != 0, RLPDecodingError)
        decoded_data_length = Uint.from_be_bytes(
            encoded_data[1 : 1 + length_length]
        )
    # This occurs only when the raw_data is a sequence of objects with
    # length(concatenation of encoding of each object) < 56
    elif first_rlp_byte <= 0xF7:
        decoded_data_length = first_rlp_byte - 0xC0
    # This occurs only when the raw_data is a sequence of objects and
    # doesn't fall into the above cases.
    elif first_rlp_byte <= 0xFF:
        length_length = first_rlp_byte - 0xF7
        ensure(length_length < len(encoded_data), RLPDecodingError)
        # Expectation is that the big endian bytes shouldn't start with 0
        # while trying to decode using RLP, in which case is an error.
        ensure(encoded_data[1] != 0, RLPDecodingError)
        decoded_data_length = Uint.from_be_bytes(
            encoded_data[1 : 1 + length_length]
        )

    return 1 + length_length + decoded_data_length

rlp_hash

rlp_hash(data: RLP)ethereum.crypto.hash.Hash32

Obtain the keccak-256 hash of the rlp encoding of the passed in data.

Parameters

data – The data for which we need the rlp hash.

Returns

hash – The rlp hash of the passed in data.

Return type

Hash32

def rlp_hash(data: RLP) -> Hash32:
    return keccak256(encode(data))