ethereum_rlp.rlp

Defines the serialization and deserialization format used throughout Ethereum.

_UNION_TYPES

34
_UNION_TYPES: Tuple[object, ...]

RLP

Protocol that describes the requirements to be RLP-encodable.

class RLP:

__dataclass_fields__

51
    __dataclass_fields__: ClassVar[Dict[str, Field[object]]]

Simple

54
Simple: TypeAlias = Union[Sequence["Simple"], bytes]

Extended

56
Extended: TypeAlias = Union[
57
    Sequence["Extended"], bytearray, bytes, Uint, FixedUnsigned, str, bool, RLP
58
]

encode

Encodes raw_data into a sequence of bytes using RLP.

def encode(raw_data: Extended) -> Bytes:
67
    """
68
    Encodes `raw_data` into a sequence of bytes using RLP.
69
    """
70
    if isinstance(raw_data, Sequence):
71
        if isinstance(raw_data, (bytearray, bytes)):
72
            return encode_bytes(raw_data)
73
        elif isinstance(raw_data, str):
74
            return encode_bytes(raw_data.encode())
75
        else:
76
            return encode_sequence(raw_data)
77
    elif isinstance(raw_data, (Uint, FixedUnsigned)):
78
        return encode(raw_data.to_be_bytes())
79
    elif isinstance(raw_data, bool):
80
        if raw_data:
81
            return encode_bytes(b"\x01")
82
        else:
83
            return encode_bytes(b"")
84
    elif is_dataclass(raw_data):
85
        return encode(astuple(raw_data))
86
    else:
87
        raise EncodingError(
88
            "RLP Encoding of type {} is not supported".format(type(raw_data))
89
        )

encode_bytes

Encodes raw_bytes, a sequence of bytes, using RLP.

def encode_bytes(raw_bytes: Bytes) -> Bytes:
93
    """
94
    Encodes `raw_bytes`, a sequence of bytes, using RLP.
95
    """
96
    len_raw_data = len(raw_bytes)
97
98
    if len_raw_data == 1 and raw_bytes[0] < 0x80:
99
        return raw_bytes
100
    elif len_raw_data < 0x38:
101
        return bytes([0x80 + len_raw_data]) + raw_bytes
102
    else:
103
        # length of raw data represented as big endian bytes
104
        len_raw_data_as_be = Uint(len_raw_data).to_be_bytes()
105
        return (
106
            bytes([0xB7 + len(len_raw_data_as_be)])
107
            + len_raw_data_as_be
108
            + raw_bytes
109
        )

encode_sequence

Encodes a list of RLP encodable objects (raw_sequence) using RLP.

def encode_sequence(raw_sequence: Sequence[Extended]) -> Bytes:
113
    """
114
    Encodes a list of RLP encodable objects (`raw_sequence`) using RLP.
115
    """
116
    joined_encodings = join_encodings(raw_sequence)
117
    len_joined_encodings = len(joined_encodings)
118
119
    if len_joined_encodings < 0x38:
120
        return Bytes([0xC0 + len_joined_encodings]) + joined_encodings
121
    else:
122
        len_joined_encodings_as_be = Uint(len_joined_encodings).to_be_bytes()
123
        return (
124
            Bytes([0xF7 + len(len_joined_encodings_as_be)])
125
            + len_joined_encodings_as_be
126
            + joined_encodings
127
        )

join_encodings

Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.

def join_encodings(raw_sequence: Sequence[Extended]) -> Bytes:
131
    """
132
    Obtain concatenation of rlp encoding for each item in the sequence
133
    raw_sequence.
134
    """
135
    return b"".join(encode(item) for item in raw_sequence)

decode

Decodes an integer, byte sequence, or list of RLP encodable objects from the byte sequence encoded_data, using RLP.

def decode(encoded_data: Bytes) -> Simple:
144
    """
145
    Decodes an integer, byte sequence, or list of RLP encodable objects
146
    from the byte sequence `encoded_data`, using RLP.
147
    """
148
    if len(encoded_data) <= 0:
149
        raise DecodingError("Cannot decode empty bytestring")
150
151
    if encoded_data[0] <= 0xBF:
152
        # This means that the raw data is of type bytes
153
        return decode_to_bytes(encoded_data)
154
    else:
155
        # This means that the raw data is of type sequence
156
        return decode_to_sequence(encoded_data)

U

159
U = TypeVar("U", bound=Extended)

decode_to

Decode the bytes in encoded_data to an object of type cls. cls can be a Bytes subclass, a dataclass, Uint, U256 or Tuple[cls].

def decode_to(cls: Type[U], ​​encoded_data: Bytes) -> U:
163
    """
164
    Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be
165
    a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`.
166
    """
167
    decoded = decode(encoded_data)
168
    try:
169
        return (cls, decoded)
170
    except Exception as e:
171
        raise DecodingError(f"cannot decode into `{cls.__name__}`") from e

deserialize_to

174
@overload
def deserialize_to(class_: Type[U], ​​value: Simple) -> U:
176
    pass

deserialize_to

179
@overload
def deserialize_to(class_: object, ​​value: Simple) -> Extended:
181
    pass

deserialize_to

Convert the already decoded value (see decode) into an object of type class_.

def deserialize_to(class_: object, ​​value: Simple) -> Extended:
185
    """
186
    Convert the already decoded `value` (see [`decode`]) into an object of type
187
    `class_`.
188
189
    [`decode`]: ref:ethereum_rlp.rlp.decode
190
    """
191
    origin = get_origin(class_)
192
193
    while origin is Annotated:
194
        assert isinstance(class_, _Annotation)
195
        result, class_ = _deserialize_annotated(class_, value)
196
        if result is not None:
197
            return result
198
        origin = get_origin(class_)
199
200
    if not isinstance(class_, type):
201
        return _deserialize_to_annotation(class_, value)
202
    elif is_dataclass(class_):
203
        return _deserialize_to_dataclass(class_, value)
204
    elif issubclass(class_, (Uint, FixedUnsigned)):
205
        return _deserialize_to_uint(class_, value)
206
    elif issubclass(class_, (Bytes, FixedBytes)):
207
        return _deserialize_to_bytes(class_, value)
208
    elif class_ is bool:
209
        return _deserialize_to_bool(value)
210
    else:
211
        raise NotImplementedError(class_)

_deserialize_to_dataclass

def _deserialize_to_dataclass(cls: Type[U], ​​decoded: Simple) -> U:
215
    assert is_dataclass(cls)
216
    hints = get_type_hints(cls, include_extras=True)
217
    target_fields = fields(cls)
218
219
    if isinstance(decoded, bytes):
220
        raise DecodingError(f"got `bytes` while decoding `{cls.__name__}`")
221
222
    if len(target_fields) != len(decoded):
223
        name = cls.__name__
224
        actual = len(decoded)
225
        expected = len(target_fields)
226
        raise DecodingError(
227
            f"`{name}` needs {expected} field(s), but got {actual} instead"
228
        )
229
230
    values: Dict[str, Any] = {}
231
232
    for value, target_field in zip(decoded, target_fields):
233
        resolved_type = hints[target_field.name]
234
        try:
235
            values[target_field.name] = (resolved_type, value)
236
        except Exception as e:
237
            msg = f"cannot decode field `{cls.__name__}.{target_field.name}`"
238
            raise DecodingError(msg) from e
239
240
    result = cls(**values)
241
    assert isinstance(result, cls)
242
    return cast(U, result)

_deserialize_to_bool

def _deserialize_to_bool(value: Simple) -> bool:
246
    if value == b"":
247
        return False
248
    elif value == b"\x01":
249
        return True
250
    else:
251
        raise DecodingError("invalid boolean")

_deserialize_to_bytes

def _deserialize_to_bytes(class_: Union[Type[Bytes], Type[FixedBytes]], ​​value: Simple) -> Union[Bytes, FixedBytes]:
257
    if not isinstance(value, bytes):
258
        raise DecodingError("invalid bytes")
259
    try:
260
        return class_(value)
261
    except ValueError as e:
262
        raise DecodingError from e

_deserialize_to_uint

def _deserialize_to_uint(class_: Union[Type[Uint], Type[FixedUnsigned]], ​​decoded: Simple) -> Union[Uint, FixedUnsigned]:
268
    if not isinstance(decoded, bytes):
269
        raise DecodingError("invalid uint")
270
    try:
271
        return class_.from_be_bytes(decoded)
272
    except ValueError as e:
273
        raise DecodingError from e

_Annotation

276
@runtime_checkable
class _Annotation:

__metadata__

278
    __metadata__: Sequence[object]

__origin__

279
    __origin__: object

_deserialize_annotated

def _deserialize_annotated(annotation: _Annotation, ​​value: Simple) -> Union[Tuple[Extended, None], Tuple[None, object]]:
285
    codecs = [x for x in annotation.__metadata__ if isinstance(x, With)]
286
    if not codecs:
287
        return (None, annotation.__origin__)
288
289
    if len(codecs) > 1:
290
        raise Exception(
291
            "multiple rlp.With annotations applied to the same type"
292
        )
293
294
    codec = codecs[0]
295
    result = codec._decoder(value)
296
297
    try:
298
        assert isinstance(
299
            result, annotation.__origin__  # type: ignore[arg-type]
300
        ), "annotated returned wrong type"
301
    except TypeError as e:
302
        # TODO: Check annotation types that don't work with `isinstance`.
303
        msg = f"annotation {annotation.__origin__} doesn't support isinstance"
304
        raise NotImplementedError(msg) from e
305
306
    return (codec._decoder(value), None)

_deserialize_to_annotation

def _deserialize_to_annotation(annotation: object, ​​value: Simple) -> Extended:
310
    origin = get_origin(annotation)
311
    if origin in _UNION_TYPES:
312
        return _deserialize_to_union(annotation, value)
313
    elif origin in (Tuple, tuple):
314
        return _deserialize_to_tuple(annotation, value)
315
    elif origin in (List, Sequence, list):
316
        return _deserialize_to_list(annotation, value)
317
    elif origin is None:
318
        raise Exception(annotation)
319
    else:
320
        raise NotImplementedError(f"RLP non-type {origin!r}")

_deserialize_to_union

def _deserialize_to_union(annotation: object, ​​value: Simple) -> Extended:
324
    arguments = get_args(annotation)
325
    successes: List[Extended] = []
326
    failures = []
327
    for argument in arguments:
328
        try:
329
            success = (argument, value)
330
        except Exception as e:
331
            failures.append(e)
332
            continue
333
334
        successes.append(success)
335
336
    if len(successes) == 1:
337
        return successes[0]
338
    elif not successes:
339
        raise DecodingError(f"no matching union variant\n{failures!r}")
340
    else:
341
        raise DecodingError("multiple matching union variants")

_deserialize_to_tuple

def _deserialize_to_tuple(annotation: object, ​​values: Simple) -> Sequence[Extended]:
347
    if isinstance(values, bytes):
348
        raise DecodingError("invalid tuple")
349
    arguments = list(get_args(annotation))
350
351
    if arguments[-1] is Ellipsis:
352
        arguments.pop()
353
        fill_count = len(values) - len(arguments)
354
        arguments = list(arguments) + [arguments[-1]] * fill_count
355
356
    decoded = []
357
    for index, (argument, value) in enumerate(zip(arguments, values)):
358
        try:
359
            deserialized = (argument, value)
360
        except Exception as e:
361
            msg = f"cannot decode tuple element {index} of type `{argument}`"
362
            raise DecodingError(msg) from e
363
        decoded.append(deserialized)
364
365
    return tuple(decoded)

_deserialize_to_list

def _deserialize_to_list(annotation: object, ​​values: Simple) -> Sequence[Extended]:
371
    if isinstance(values, bytes):
372
        raise DecodingError("invalid list")
373
    argument = get_args(annotation)[0]
374
    results = []
375
    for index, value in enumerate(values):
376
        try:
377
            deserialized = (argument, value)
378
        except Exception as e:
379
            msg = f"cannot decode list item {index} of type `{annotation}`"
380
            raise DecodingError(msg) from e
381
        results.append(deserialized)
382
    return results

decode_to_bytes

Decodes a rlp encoded byte stream assuming that the decoded data should be of type bytes.

def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
386
    """
387
    Decodes a rlp encoded byte stream assuming that the decoded data
388
    should be of type `bytes`.
389
    """
390
    if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80:
391
        return encoded_bytes
392
    elif encoded_bytes[0] <= 0xB7:
393
        len_raw_data = encoded_bytes[0] - 0x80
394
        if len_raw_data < 0:
395
            raise DecodingError("negative length")
396
        if len_raw_data >= len(encoded_bytes):
397
            raise DecodingError("truncated")
398
        raw_data = encoded_bytes[1 : 1 + len_raw_data]
399
        if len_raw_data == 1 and raw_data[0] < 0x80:
400
            raise DecodingError
401
        return raw_data
402
    else:
403
        # This is the index in the encoded data at which decoded data
404
        # starts from.
405
        decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7
406
        if decoded_data_start_idx - 1 >= len(encoded_bytes):
407
            raise DecodingError
408
        if encoded_bytes[1] == 0:
409
            raise DecodingError
410
        len_decoded_data = int(
411
            Uint.from_be_bytes(encoded_bytes[1:decoded_data_start_idx])
412
        )
413
        if len_decoded_data < 0x38:
414
            raise DecodingError
415
        decoded_data_end_idx = decoded_data_start_idx + int(len_decoded_data)
416
        if decoded_data_end_idx - 1 >= len(encoded_bytes):
417
            raise DecodingError
418
        return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx]

decode_to_sequence

Decodes a rlp encoded byte stream assuming that the decoded data should be of type Sequence of objects.

def decode_to_sequence(encoded_sequence: Bytes) -> Sequence[Simple]:
422
    """
423
    Decodes a rlp encoded byte stream assuming that the decoded data
424
    should be of type `Sequence` of objects.
425
    """
426
    if encoded_sequence[0] <= 0xF7:
427
        len_joined_encodings = encoded_sequence[0] - 0xC0
428
        if len_joined_encodings >= len(encoded_sequence):
429
            raise DecodingError
430
        joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings]
431
    else:
432
        joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7
433
        if joined_encodings_start_idx - 1 >= len(encoded_sequence):
434
            raise DecodingError
435
        if encoded_sequence[1] == 0:
436
            raise DecodingError
437
        len_joined_encodings = int(
438
            Uint.from_be_bytes(encoded_sequence[1:joined_encodings_start_idx])
439
        )
440
        if len_joined_encodings < 0x38:
441
            raise DecodingError
442
        joined_encodings_end_idx = (
443
            joined_encodings_start_idx + len_joined_encodings
444
        )
445
        if joined_encodings_end_idx - 1 >= len(encoded_sequence):
446
            raise DecodingError
447
        joined_encodings = encoded_sequence[
448
            joined_encodings_start_idx:joined_encodings_end_idx
449
        ]
450
451
    return decode_joined_encodings(joined_encodings)

decode_joined_encodings

Decodes joined_encodings, which is a concatenation of RLP encoded objects.

def decode_joined_encodings(joined_encodings: Bytes) -> Sequence[Simple]:
455
    """
456
    Decodes `joined_encodings`, which is a concatenation of RLP encoded
457
    objects.
458
    """
459
    decoded_sequence = []
460
461
    item_start_idx = 0
462
    while item_start_idx < len(joined_encodings):
463
        encoded_item_length = decode_item_length(
464
            joined_encodings[item_start_idx:]
465
        )
466
        if item_start_idx + encoded_item_length - 1 >= len(joined_encodings):
467
            raise DecodingError
468
        encoded_item = joined_encodings[
469
            item_start_idx : item_start_idx + encoded_item_length
470
        ]
471
        decoded_sequence.append(decode(encoded_item))
472
        item_start_idx += encoded_item_length
473
474
    return decoded_sequence

decode_item_length

Find the length of the rlp encoding for the first object in the encoded sequence.

Here encoded_data refers to concatenation of rlp encoding for each item in a sequence.

def decode_item_length(encoded_data: Bytes) -> int:
478
    """
479
    Find the length of the rlp encoding for the first object in the
480
    encoded sequence.
481
482
    Here `encoded_data` refers to concatenation of rlp encoding for each
483
    item in a sequence.
484
    """
485
    if len(encoded_data) <= 0:
486
        raise DecodingError
487
488
    first_rlp_byte = encoded_data[0]
489
490
    # This is the length of the big endian representation of the length of
491
    # rlp encoded object byte stream.
492
    length_length = 0
493
    decoded_data_length = 0
494
495
    # This occurs only when the raw_data is a single byte whose value < 128
496
    if first_rlp_byte < 0x80:
497
        # We return 1 here, as the end formula
498
        # 1 + length_length + decoded_data_length would be invalid for
499
        # this case.
500
        return 1
501
    # This occurs only when the raw_data is a byte stream with length < 56
502
    # and doesn't fall into the above cases
503
    elif first_rlp_byte <= 0xB7:
504
        decoded_data_length = first_rlp_byte - 0x80
505
    # This occurs only when the raw_data is a byte stream and doesn't fall
506
    # into the above cases
507
    elif first_rlp_byte <= 0xBF:
508
        length_length = first_rlp_byte - 0xB7
509
        if length_length >= len(encoded_data):
510
            raise DecodingError
511
        if encoded_data[1] == 0:
512
            raise DecodingError
513
        decoded_data_length = int(
514
            Uint.from_be_bytes(encoded_data[1 : 1 + length_length])
515
        )
516
    # This occurs only when the raw_data is a sequence of objects with
517
    # length(concatenation of encoding of each object) < 56
518
    elif first_rlp_byte <= 0xF7:
519
        decoded_data_length = first_rlp_byte - 0xC0
520
    # This occurs only when the raw_data is a sequence of objects and
521
    # doesn't fall into the above cases.
522
    elif first_rlp_byte <= 0xFF:
523
        length_length = first_rlp_byte - 0xF7
524
        if length_length >= len(encoded_data):
525
            raise DecodingError
526
        if encoded_data[1] == 0:
527
            raise DecodingError
528
        decoded_data_length = int(
529
            Uint.from_be_bytes(encoded_data[1 : 1 + length_length])
530
        )
531
532
    return 1 + length_length + decoded_data_length

Decoder

535
Decoder: TypeAlias = Callable[[Simple], Extended]

With

When used with Annotated, indicates that a value needs to be encoded/decoded using a custom function.

class With:

__init__

def __init__(self, ​​decoder: Decoder) -> None:
547
        self._decoder = decoder