ethereum_rlp.rlp

Defines the serialization and deserialization format used throughout Ethereum.

_UNION_TYPES

36
_UNION_TYPES: Tuple[object, ...]

RLP

Protocol that describes the requirements to be RLP-encodable.

class RLP:

__dataclass_fields__

53
    __dataclass_fields__: ClassVar[Dict[str, Field[object]]]

Simple

56
Simple: TypeAlias = Union[Sequence["Simple"], bytes]

Extended

58
Extended: TypeAlias = Union[
59
    Sequence["Extended"], bytearray, bytes, Uint, FixedUnsigned, str, bool, RLP
60
]

encode

Encodes raw_data into a sequence of bytes using RLP.

def encode(raw_data: Extended) -> Bytes:
73
    """
74
    Encodes `raw_data` into a sequence of bytes using RLP.
75
    """
76
    # These if statements are ordered by frequency in `fill` (except `str` must
77
    # precede `Sequence`).
78
    if isinstance(raw_data, (bytes, bytearray)):
79
        return encode_bytes(raw_data)
80
    elif isinstance(raw_data, Unsigned):
81
        return encode_bytes(raw_data.to_be_bytes())
82
    elif isinstance(raw_data, str):
83
        return encode_bytes(raw_data.encode())
84
    elif isinstance(raw_data, collections.abc.Sequence):
85
        # Testing against `collections.abc.Sequence` is equivalent to
86
        # `typing.Sequence`, but faster.
87
        # `cast()` has a performance penalty, whereas `type: ignore` is free.
88
        return encode_sequence(raw_data)  # type: ignore[arg-type]
89
    elif is_dataclass(raw_data):
90
        return encode_sequence(
91
            getattr(raw_data, field.name) for field in fields(raw_data)
92
        )
93
    elif isinstance(raw_data, bool):
94
        if raw_data:
95
            return encode_bytes(b"\x01")
96
        else:
97
            return encode_bytes(b"")
98
    else:
99
        raise EncodingError(
100
            "RLP Encoding of type {} is not supported".format(type(raw_data))
101
        )

encode_bytes

Encodes raw_bytes, a sequence of bytes, using RLP.

def encode_bytes(raw_bytes: Bytes) -> Bytes:
105
    """
106
    Encodes `raw_bytes`, a sequence of bytes, using RLP.
107
    """
108
    len_raw_data = len(raw_bytes)
109
110
    if len_raw_data == 1 and raw_bytes[0] < 0x80:
111
        return raw_bytes
112
    elif len_raw_data < 0x38:
113
        return bytes([0x80 + len_raw_data]) + raw_bytes
114
    else:
115
        # length of raw data represented as big endian bytes
116
        len_raw_data_as_be = Uint(len_raw_data).to_be_bytes()
117
        return (
118
            bytes([0xB7 + len(len_raw_data_as_be)])
119
            + len_raw_data_as_be
120
            + raw_bytes
121
        )

encode_sequence

Encodes a list of RLP encodable objects (raw_sequence) using RLP.

def encode_sequence(raw_sequence: Iterable[Extended]) -> Bytes:
127
    """
128
    Encodes a list of RLP encodable objects (`raw_sequence`) using RLP.
129
    """
130
    joined_encodings = join_encodings(raw_sequence)
131
    len_joined_encodings = len(joined_encodings)
132
133
    if len_joined_encodings < 0x38:
134
        return Bytes([0xC0 + len_joined_encodings]) + joined_encodings
135
    else:
136
        len_joined_encodings_as_be = Uint(len_joined_encodings).to_be_bytes()
137
        return (
138
            Bytes([0xF7 + len(len_joined_encodings_as_be)])
139
            + len_joined_encodings_as_be
140
            + joined_encodings
141
        )

join_encodings

Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.

def join_encodings(raw_sequence: Iterable[Extended]) -> Bytes:
145
    """
146
    Obtain concatenation of rlp encoding for each item in the sequence
147
    raw_sequence.
148
    """
149
    return b"".join(encode(item) for item in raw_sequence)

decode

Decodes an integer, byte sequence, or list of RLP encodable objects from the byte sequence encoded_data, using RLP.

def decode(encoded_data: Bytes) -> Simple:
158
    """
159
    Decodes an integer, byte sequence, or list of RLP encodable objects
160
    from the byte sequence `encoded_data`, using RLP.
161
    """
162
    if len(encoded_data) <= 0:
163
        raise DecodingError("Cannot decode empty bytestring")
164
165
    if encoded_data[0] <= 0xBF:
166
        # This means that the raw data is of type bytes
167
        return decode_to_bytes(encoded_data)
168
    else:
169
        # This means that the raw data is of type sequence
170
        return decode_to_sequence(encoded_data)

U

173
U = TypeVar("U", bound=Extended)

decode_to

Decode the bytes in encoded_data to an object of type cls. cls can be a Bytes subclass, a dataclass, Uint, U256 or Tuple[cls].

def decode_to(cls: Type[U], ​​encoded_data: Bytes) -> U:
177
    """
178
    Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be
179
    a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`.
180
    """
181
    decoded = decode(encoded_data)
182
    try:
183
        return (cls, decoded)
184
    except Exception as e:
185
        raise DecodingError(f"cannot decode into `{cls.__name__}`") from e

deserialize_to

188
@overload
def deserialize_to(class_: Type[U], ​​value: Simple) -> U:
190
    pass

deserialize_to

193
@overload
def deserialize_to(class_: object, ​​value: Simple) -> Extended:
195
    pass

deserialize_to

Convert the already decoded value (see decode) into an object of type class_.

def deserialize_to(class_: object, ​​value: Simple) -> Extended:
199
    """
200
    Convert the already decoded `value` (see [`decode`]) into an object of type
201
    `class_`.
202
203
    [`decode`]: ref:ethereum_rlp.rlp.decode
204
    """
205
    origin = get_origin(class_)
206
207
    while origin is Annotated:
208
        assert isinstance(class_, _Annotation)
209
        result, class_ = _deserialize_annotated(class_, value)
210
        if result is not None:
211
            return result
212
        origin = get_origin(class_)
213
214
    if not isinstance(class_, type):
215
        return _deserialize_to_annotation(class_, value)
216
    elif is_dataclass(class_):
217
        return _deserialize_to_dataclass(class_, value)
218
    elif issubclass(class_, (Uint, FixedUnsigned)):
219
        return _deserialize_to_uint(class_, value)
220
    elif issubclass(class_, (Bytes, FixedBytes)):
221
        return _deserialize_to_bytes(class_, value)
222
    elif class_ is bool:
223
        return _deserialize_to_bool(value)
224
    else:
225
        raise NotImplementedError(class_)

_deserialize_to_dataclass

def _deserialize_to_dataclass(cls: Type[U], ​​decoded: Simple) -> U:
229
    assert is_dataclass(cls)
230
    hints = get_type_hints(cls, include_extras=True)
231
    target_fields = fields(cls)
232
233
    if isinstance(decoded, bytes):
234
        raise DecodingError(f"got `bytes` while decoding `{cls.__name__}`")
235
236
    if len(target_fields) != len(decoded):
237
        name = cls.__name__
238
        actual = len(decoded)
239
        expected = len(target_fields)
240
        raise DecodingError(
241
            f"`{name}` needs {expected} field(s), but got {actual} instead"
242
        )
243
244
    values: Dict[str, Any] = {}
245
246
    for value, target_field in zip(decoded, target_fields):
247
        resolved_type = hints[target_field.name]
248
        try:
249
            values[target_field.name] = (resolved_type, value)
250
        except Exception as e:
251
            msg = f"cannot decode field `{cls.__name__}.{target_field.name}`"
252
            raise DecodingError(msg) from e
253
254
    result = cls(**values)
255
    assert isinstance(result, cls)
256
    return cast(U, result)

_deserialize_to_bool

def _deserialize_to_bool(value: Simple) -> bool:
260
    if value == b"":
261
        return False
262
    elif value == b"\x01":
263
        return True
264
    else:
265
        raise DecodingError("invalid boolean")

_deserialize_to_bytes

def _deserialize_to_bytes(class_: Union[Type[Bytes], Type[FixedBytes]], ​​value: Simple) -> Union[Bytes, FixedBytes]:
271
    if not isinstance(value, bytes):
272
        raise DecodingError("invalid bytes")
273
    try:
274
        return class_(value)
275
    except ValueError as e:
276
        raise DecodingError from e

_deserialize_to_uint

def _deserialize_to_uint(class_: Union[Type[Uint], Type[FixedUnsigned]], ​​decoded: Simple) -> Union[Uint, FixedUnsigned]:
282
    if not isinstance(decoded, bytes):
283
        raise DecodingError("invalid uint")
284
    if len(decoded) > 0 and decoded[0] == 0:
285
        raise DecodingError("non-canonical integer")
286
    try:
287
        return class_.from_be_bytes(decoded)
288
    except ValueError as e:
289
        raise DecodingError from e

_Annotation

292
@runtime_checkable
class _Annotation:

__metadata__

294
    __metadata__: Sequence[object]

__origin__

295
    __origin__: object

_deserialize_annotated

def _deserialize_annotated(annotation: _Annotation, ​​value: Simple) -> Union[Tuple[Extended, None], Tuple[None, object]]:
301
    codecs = [x for x in annotation.__metadata__ if isinstance(x, With)]
302
    if not codecs:
303
        return (None, annotation.__origin__)
304
305
    if len(codecs) > 1:
306
        raise Exception(
307
            "multiple rlp.With annotations applied to the same type"
308
        )
309
310
    codec = codecs[0]
311
    result = codec._decoder(value)
312
313
    try:
314
        assert isinstance(
315
            result, annotation.__origin__  # type: ignore[arg-type]
316
        ), "annotated returned wrong type"
317
    except TypeError as e:
318
        # TODO: Check annotation types that don't work with `isinstance`.
319
        msg = f"annotation {annotation.__origin__} doesn't support isinstance"
320
        raise NotImplementedError(msg) from e
321
322
    return (codec._decoder(value), None)

_deserialize_to_annotation

def _deserialize_to_annotation(annotation: object, ​​value: Simple) -> Extended:
326
    origin = get_origin(annotation)
327
    if origin in _UNION_TYPES:
328
        return _deserialize_to_union(annotation, value)
329
    elif origin in (Tuple, tuple):
330
        return _deserialize_to_tuple(annotation, value)
331
    elif origin in (List, Sequence, list):
332
        return _deserialize_to_list(annotation, value)
333
    elif origin is None:
334
        raise Exception(annotation)
335
    else:
336
        raise NotImplementedError(f"RLP non-type {origin!r}")

_deserialize_to_union

def _deserialize_to_union(annotation: object, ​​value: Simple) -> Extended:
340
    arguments = get_args(annotation)
341
    successes: List[Extended] = []
342
    failures = []
343
    for argument in arguments:
344
        try:
345
            success = (argument, value)
346
        except Exception as e:
347
            failures.append(e)
348
            continue
349
350
        successes.append(success)
351
352
    if len(successes) == 1:
353
        return successes[0]
354
    elif not successes:
355
        raise DecodingError(f"no matching union variant\n{failures!r}")
356
    else:
357
        raise DecodingError("multiple matching union variants")

_deserialize_to_tuple

def _deserialize_to_tuple(annotation: object, ​​values: Simple) -> Sequence[Extended]:
363
    if isinstance(values, bytes):
364
        raise DecodingError("invalid tuple")
365
    arguments = list(get_args(annotation))
366
367
    if arguments[-1] is Ellipsis:
368
        arguments.pop()
369
        fill_count = len(values) - len(arguments)
370
        arguments = list(arguments) + [arguments[-1]] * fill_count
371
372
    decoded = []
373
    for index, (argument, value) in enumerate(zip(arguments, values)):
374
        try:
375
            deserialized = (argument, value)
376
        except Exception as e:
377
            msg = f"cannot decode tuple element {index} of type `{argument}`"
378
            raise DecodingError(msg) from e
379
        decoded.append(deserialized)
380
381
    return tuple(decoded)

_deserialize_to_list

def _deserialize_to_list(annotation: object, ​​values: Simple) -> Sequence[Extended]:
387
    if isinstance(values, bytes):
388
        raise DecodingError("invalid list")
389
    argument = get_args(annotation)[0]
390
    results = []
391
    for index, value in enumerate(values):
392
        try:
393
            deserialized = (argument, value)
394
        except Exception as e:
395
            msg = f"cannot decode list item {index} of type `{annotation}`"
396
            raise DecodingError(msg) from e
397
        results.append(deserialized)
398
    return results

decode_to_bytes

Decodes a rlp encoded byte stream assuming that the decoded data should be of type bytes.

def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
402
    """
403
    Decodes a rlp encoded byte stream assuming that the decoded data
404
    should be of type `bytes`.
405
    """
406
    if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80:
407
        return encoded_bytes
408
    elif encoded_bytes[0] <= 0xB7:
409
        len_raw_data = encoded_bytes[0] - 0x80
410
        if len_raw_data < 0:
411
            raise DecodingError("negative length")
412
        if len_raw_data >= len(encoded_bytes):
413
            raise DecodingError("truncated")
414
        if 1 + len_raw_data < len(encoded_bytes):
415
            raise DecodingError("trailing bytes")
416
        raw_data = encoded_bytes[1 : 1 + len_raw_data]
417
        if len_raw_data == 1 and raw_data[0] < 0x80:
418
            raise DecodingError
419
        return raw_data
420
    else:
421
        # This is the index in the encoded data at which decoded data
422
        # starts from.
423
        decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7
424
        if decoded_data_start_idx - 1 >= len(encoded_bytes):
425
            raise DecodingError("truncated")
426
        if encoded_bytes[1] == 0:
427
            raise DecodingError
428
        len_decoded_data = int(
429
            Uint.from_be_bytes(encoded_bytes[1:decoded_data_start_idx])
430
        )
431
        if len_decoded_data < 0x38:
432
            raise DecodingError
433
        decoded_data_end_idx = decoded_data_start_idx + int(len_decoded_data)
434
        if decoded_data_end_idx - 1 >= len(encoded_bytes):
435
            raise DecodingError("truncated")
436
        if decoded_data_end_idx < len(encoded_bytes):
437
            raise DecodingError("trailing bytes")
438
        return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx]

decode_to_sequence

Decodes a rlp encoded byte stream assuming that the decoded data should be of type Sequence of objects.

def decode_to_sequence(encoded_sequence: Bytes) -> Sequence[Simple]:
442
    """
443
    Decodes a rlp encoded byte stream assuming that the decoded data
444
    should be of type `Sequence` of objects.
445
    """
446
    if encoded_sequence[0] <= 0xF7:
447
        len_joined_encodings = encoded_sequence[0] - 0xC0
448
        if len_joined_encodings >= len(encoded_sequence):
449
            raise DecodingError("truncated")
450
        if 1 + len_joined_encodings < len(encoded_sequence):
451
            raise DecodingError("trailing bytes")
452
        joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings]
453
    else:
454
        joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7
455
        if joined_encodings_start_idx - 1 >= len(encoded_sequence):
456
            raise DecodingError("truncated")
457
        if encoded_sequence[1] == 0:
458
            raise DecodingError
459
        len_joined_encodings = int(
460
            Uint.from_be_bytes(encoded_sequence[1:joined_encodings_start_idx])
461
        )
462
        if len_joined_encodings < 0x38:
463
            raise DecodingError
464
        joined_encodings_end_idx = (
465
            joined_encodings_start_idx + len_joined_encodings
466
        )
467
        if joined_encodings_end_idx - 1 >= len(encoded_sequence):
468
            raise DecodingError("truncated")
469
        if joined_encodings_end_idx < len(encoded_sequence):
470
            raise DecodingError("trailing bytes")
471
        joined_encodings = encoded_sequence[
472
            joined_encodings_start_idx:joined_encodings_end_idx
473
        ]
474
475
    return decode_joined_encodings(joined_encodings)

decode_joined_encodings

Decodes joined_encodings, which is a concatenation of RLP encoded objects.

def decode_joined_encodings(joined_encodings: Bytes) -> Sequence[Simple]:
479
    """
480
    Decodes `joined_encodings`, which is a concatenation of RLP encoded
481
    objects.
482
    """
483
    decoded_sequence = []
484
485
    item_start_idx = 0
486
    while item_start_idx < len(joined_encodings):
487
        encoded_item_length = decode_item_length(
488
            joined_encodings[item_start_idx:]
489
        )
490
        if item_start_idx + encoded_item_length - 1 >= len(joined_encodings):
491
            raise DecodingError("truncated")
492
        encoded_item = joined_encodings[
493
            item_start_idx : item_start_idx + encoded_item_length
494
        ]
495
        decoded_sequence.append(decode(encoded_item))
496
        item_start_idx += encoded_item_length
497
498
    return decoded_sequence

decode_item_length

Find the length of the rlp encoding for the first object in the encoded sequence.

Here encoded_data refers to concatenation of rlp encoding for each item in a sequence.

def decode_item_length(encoded_data: Bytes) -> int:
502
    """
503
    Find the length of the rlp encoding for the first object in the
504
    encoded sequence.
505
506
    Here `encoded_data` refers to concatenation of rlp encoding for each
507
    item in a sequence.
508
    """
509
    if len(encoded_data) <= 0:
510
        raise DecodingError
511
512
    first_rlp_byte = encoded_data[0]
513
514
    # This is the length of the big endian representation of the length of
515
    # rlp encoded object byte stream.
516
    length_length = 0
517
    decoded_data_length = 0
518
519
    # This occurs only when the raw_data is a single byte whose value < 128
520
    if first_rlp_byte < 0x80:
521
        # We return 1 here, as the end formula
522
        # 1 + length_length + decoded_data_length would be invalid for
523
        # this case.
524
        return 1
525
    # This occurs only when the raw_data is a byte stream with length < 56
526
    # and doesn't fall into the above cases
527
    elif first_rlp_byte <= 0xB7:
528
        decoded_data_length = first_rlp_byte - 0x80
529
    # This occurs only when the raw_data is a byte stream and doesn't fall
530
    # into the above cases
531
    elif first_rlp_byte <= 0xBF:
532
        length_length = first_rlp_byte - 0xB7
533
        if length_length >= len(encoded_data):
534
            raise DecodingError("truncated")
535
        if encoded_data[1] == 0:
536
            raise DecodingError
537
        decoded_data_length = int(
538
            Uint.from_be_bytes(encoded_data[1 : 1 + length_length])
539
        )
540
    # This occurs only when the raw_data is a sequence of objects with
541
    # length(concatenation of encoding of each object) < 56
542
    elif first_rlp_byte <= 0xF7:
543
        decoded_data_length = first_rlp_byte - 0xC0
544
    # This occurs only when the raw_data is a sequence of objects and
545
    # doesn't fall into the above cases.
546
    else:
547
        assert first_rlp_byte <= 0xFF
548
        length_length = first_rlp_byte - 0xF7
549
        if length_length >= len(encoded_data):
550
            raise DecodingError("truncated")
551
        if encoded_data[1] == 0:
552
            raise DecodingError
553
        decoded_data_length = int(
554
            Uint.from_be_bytes(encoded_data[1 : 1 + length_length])
555
        )
556
557
    return 1 + length_length + decoded_data_length

Decoder

560
Decoder: TypeAlias = Callable[[Simple], Extended]

With

When used with Annotated, indicates that a value needs to be encoded/decoded using a custom function.

class With:

__init__

def __init__(self, ​​decoder: Decoder) -> None:
572
        self._decoder = decoder