ethereum_rlp.rlp

Defines the serialization and deserialization format used throughout Ethereum.

_UNION_TYPES

34
_UNION_TYPES: Tuple[object, ...]

RLP

Protocol that describes the requirements to be RLP-encodable.

class RLP:

__dataclass_fields__

51
    __dataclass_fields__: ClassVar[Dict[str, Field[object]]]

Simple

54
Simple: TypeAlias = Union[Sequence["Simple"], bytes]

Extended

56
Extended: TypeAlias = Union[
57
    Sequence["Extended"], bytearray, bytes, Uint, FixedUnsigned, str, bool, RLP
58
]

encode

Encodes raw_data into a sequence of bytes using RLP.

def encode(raw_data: Extended) -> Bytes:
67
    """
68
    Encodes `raw_data` into a sequence of bytes using RLP.
69
    """
70
    if isinstance(raw_data, Sequence):
71
        if isinstance(raw_data, (bytearray, bytes)):
72
            return encode_bytes(raw_data)
73
        elif isinstance(raw_data, str):
74
            return encode_bytes(raw_data.encode())
75
        else:
76
            return encode_sequence(raw_data)
77
    elif isinstance(raw_data, (Uint, FixedUnsigned)):
78
        return encode(raw_data.to_be_bytes())
79
    elif isinstance(raw_data, bool):
80
        if raw_data:
81
            return encode_bytes(b"\x01")
82
        else:
83
            return encode_bytes(b"")
84
    elif is_dataclass(raw_data):
85
        return encode(astuple(raw_data))
86
    else:
87
        raise EncodingError(
88
            "RLP Encoding of type {} is not supported".format(type(raw_data))
89
        )

encode_bytes

Encodes raw_bytes, a sequence of bytes, using RLP.

def encode_bytes(raw_bytes: Bytes) -> Bytes:
93
    """
94
    Encodes `raw_bytes`, a sequence of bytes, using RLP.
95
    """
96
    len_raw_data = len(raw_bytes)
97
98
    if len_raw_data == 1 and raw_bytes[0] < 0x80:
99
        return raw_bytes
100
    elif len_raw_data < 0x38:
101
        return bytes([0x80 + len_raw_data]) + raw_bytes
102
    else:
103
        # length of raw data represented as big endian bytes
104
        len_raw_data_as_be = Uint(len_raw_data).to_be_bytes()
105
        return (
106
            bytes([0xB7 + len(len_raw_data_as_be)])
107
            + len_raw_data_as_be
108
            + raw_bytes
109
        )

encode_sequence

Encodes a list of RLP encodable objects (raw_sequence) using RLP.

def encode_sequence(raw_sequence: Sequence[Extended]) -> Bytes:
113
    """
114
    Encodes a list of RLP encodable objects (`raw_sequence`) using RLP.
115
    """
116
    joined_encodings = join_encodings(raw_sequence)
117
    len_joined_encodings = len(joined_encodings)
118
119
    if len_joined_encodings < 0x38:
120
        return Bytes([0xC0 + len_joined_encodings]) + joined_encodings
121
    else:
122
        len_joined_encodings_as_be = Uint(len_joined_encodings).to_be_bytes()
123
        return (
124
            Bytes([0xF7 + len(len_joined_encodings_as_be)])
125
            + len_joined_encodings_as_be
126
            + joined_encodings
127
        )

join_encodings

Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.

def join_encodings(raw_sequence: Sequence[Extended]) -> Bytes:
131
    """
132
    Obtain concatenation of rlp encoding for each item in the sequence
133
    raw_sequence.
134
    """
135
    return b"".join(encode(item) for item in raw_sequence)

decode

Decodes an integer, byte sequence, or list of RLP encodable objects from the byte sequence encoded_data, using RLP.

def decode(encoded_data: Bytes) -> Simple:
144
    """
145
    Decodes an integer, byte sequence, or list of RLP encodable objects
146
    from the byte sequence `encoded_data`, using RLP.
147
    """
148
    if len(encoded_data) <= 0:
149
        raise DecodingError("Cannot decode empty bytestring")
150
151
    if encoded_data[0] <= 0xBF:
152
        # This means that the raw data is of type bytes
153
        return decode_to_bytes(encoded_data)
154
    else:
155
        # This means that the raw data is of type sequence
156
        return decode_to_sequence(encoded_data)

U

159
U = TypeVar("U", bound=Extended)

decode_to

Decode the bytes in encoded_data to an object of type cls. cls can be a Bytes subclass, a dataclass, Uint, U256 or Tuple[cls].

def decode_to(cls: Type[U], ​​encoded_data: Bytes) -> U:
163
    """
164
    Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be
165
    a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`.
166
    """
167
    decoded = decode(encoded_data)
168
    try:
169
        return (cls, decoded)
170
    except Exception as e:
171
        raise DecodingError(f"cannot decode into `{cls.__name__}`") from e

deserialize_to

174
@overload
def deserialize_to(class_: Type[U], ​​value: Simple) -> U:
176
    pass

deserialize_to

179
@overload
def deserialize_to(class_: object, ​​value: Simple) -> Extended:
181
    pass

deserialize_to

Convert the already decoded value (see decode) into an object of type class_.

def deserialize_to(class_: object, ​​value: Simple) -> Extended:
185
    """
186
    Convert the already decoded `value` (see [`decode`]) into an object of type
187
    `class_`.
188
189
    [`decode`]: ref:ethereum_rlp.rlp.decode
190
    """
191
    origin = get_origin(class_)
192
193
    while origin is Annotated:
194
        assert isinstance(class_, _Annotation)
195
        result, class_ = _deserialize_annotated(class_, value)
196
        if result is not None:
197
            return result
198
        origin = get_origin(class_)
199
200
    if not isinstance(class_, type):
201
        return _deserialize_to_annotation(class_, value)
202
    elif is_dataclass(class_):
203
        return _deserialize_to_dataclass(class_, value)
204
    elif issubclass(class_, (Uint, FixedUnsigned)):
205
        return _deserialize_to_uint(class_, value)
206
    elif issubclass(class_, (Bytes, FixedBytes)):
207
        return _deserialize_to_bytes(class_, value)
208
    elif class_ is bool:
209
        return _deserialize_to_bool(value)
210
    else:
211
        raise NotImplementedError(class_)

_deserialize_to_dataclass

def _deserialize_to_dataclass(cls: Type[U], ​​decoded: Simple) -> U:
215
    assert is_dataclass(cls)
216
    hints = get_type_hints(cls, include_extras=True)
217
    target_fields = fields(cls)
218
219
    if isinstance(decoded, bytes):
220
        raise DecodingError(f"got `bytes` while decoding `{cls.__name__}`")
221
222
    if len(target_fields) != len(decoded):
223
        name = cls.__name__
224
        actual = len(decoded)
225
        expected = len(target_fields)
226
        raise DecodingError(
227
            f"`{name}` needs {expected} field(s), but got {actual} instead"
228
        )
229
230
    values: Dict[str, Any] = {}
231
232
    for value, target_field in zip(decoded, target_fields):
233
        resolved_type = hints[target_field.name]
234
        try:
235
            values[target_field.name] = (resolved_type, value)
236
        except Exception as e:
237
            msg = f"cannot decode field `{cls.__name__}.{target_field.name}`"
238
            raise DecodingError(msg) from e
239
240
    result = cls(**values)
241
    assert isinstance(result, cls)
242
    return cast(U, result)

_deserialize_to_bool

def _deserialize_to_bool(value: Simple) -> bool:
246
    if value == b"":
247
        return False
248
    elif value == b"\x01":
249
        return True
250
    else:
251
        raise DecodingError("invalid boolean")

_deserialize_to_bytes

def _deserialize_to_bytes(class_: Union[Type[Bytes], Type[FixedBytes]], ​​value: Simple) -> Union[Bytes, FixedBytes]:
257
    if not isinstance(value, bytes):
258
        raise DecodingError("invalid bytes")
259
    try:
260
        return class_(value)
261
    except ValueError as e:
262
        raise DecodingError from e

_deserialize_to_uint

def _deserialize_to_uint(class_: Union[Type[Uint], Type[FixedUnsigned]], ​​decoded: Simple) -> Union[Uint, FixedUnsigned]:
268
    if not isinstance(decoded, bytes):
269
        raise DecodingError("invalid uint")
270
    if len(decoded) > 0 and decoded[0] == 0:
271
        raise DecodingError("non-canonical integer")
272
    try:
273
        return class_.from_be_bytes(decoded)
274
    except ValueError as e:
275
        raise DecodingError from e

_Annotation

278
@runtime_checkable
class _Annotation:

__metadata__

280
    __metadata__: Sequence[object]

__origin__

281
    __origin__: object

_deserialize_annotated

def _deserialize_annotated(annotation: _Annotation, ​​value: Simple) -> Union[Tuple[Extended, None], Tuple[None, object]]:
287
    codecs = [x for x in annotation.__metadata__ if isinstance(x, With)]
288
    if not codecs:
289
        return (None, annotation.__origin__)
290
291
    if len(codecs) > 1:
292
        raise Exception(
293
            "multiple rlp.With annotations applied to the same type"
294
        )
295
296
    codec = codecs[0]
297
    result = codec._decoder(value)
298
299
    try:
300
        assert isinstance(
301
            result, annotation.__origin__  # type: ignore[arg-type]
302
        ), "annotated returned wrong type"
303
    except TypeError as e:
304
        # TODO: Check annotation types that don't work with `isinstance`.
305
        msg = f"annotation {annotation.__origin__} doesn't support isinstance"
306
        raise NotImplementedError(msg) from e
307
308
    return (codec._decoder(value), None)

_deserialize_to_annotation

def _deserialize_to_annotation(annotation: object, ​​value: Simple) -> Extended:
312
    origin = get_origin(annotation)
313
    if origin in _UNION_TYPES:
314
        return _deserialize_to_union(annotation, value)
315
    elif origin in (Tuple, tuple):
316
        return _deserialize_to_tuple(annotation, value)
317
    elif origin in (List, Sequence, list):
318
        return _deserialize_to_list(annotation, value)
319
    elif origin is None:
320
        raise Exception(annotation)
321
    else:
322
        raise NotImplementedError(f"RLP non-type {origin!r}")

_deserialize_to_union

def _deserialize_to_union(annotation: object, ​​value: Simple) -> Extended:
326
    arguments = get_args(annotation)
327
    successes: List[Extended] = []
328
    failures = []
329
    for argument in arguments:
330
        try:
331
            success = (argument, value)
332
        except Exception as e:
333
            failures.append(e)
334
            continue
335
336
        successes.append(success)
337
338
    if len(successes) == 1:
339
        return successes[0]
340
    elif not successes:
341
        raise DecodingError(f"no matching union variant\n{failures!r}")
342
    else:
343
        raise DecodingError("multiple matching union variants")

_deserialize_to_tuple

def _deserialize_to_tuple(annotation: object, ​​values: Simple) -> Sequence[Extended]:
349
    if isinstance(values, bytes):
350
        raise DecodingError("invalid tuple")
351
    arguments = list(get_args(annotation))
352
353
    if arguments[-1] is Ellipsis:
354
        arguments.pop()
355
        fill_count = len(values) - len(arguments)
356
        arguments = list(arguments) + [arguments[-1]] * fill_count
357
358
    decoded = []
359
    for index, (argument, value) in enumerate(zip(arguments, values)):
360
        try:
361
            deserialized = (argument, value)
362
        except Exception as e:
363
            msg = f"cannot decode tuple element {index} of type `{argument}`"
364
            raise DecodingError(msg) from e
365
        decoded.append(deserialized)
366
367
    return tuple(decoded)

_deserialize_to_list

def _deserialize_to_list(annotation: object, ​​values: Simple) -> Sequence[Extended]:
373
    if isinstance(values, bytes):
374
        raise DecodingError("invalid list")
375
    argument = get_args(annotation)[0]
376
    results = []
377
    for index, value in enumerate(values):
378
        try:
379
            deserialized = (argument, value)
380
        except Exception as e:
381
            msg = f"cannot decode list item {index} of type `{annotation}`"
382
            raise DecodingError(msg) from e
383
        results.append(deserialized)
384
    return results

decode_to_bytes

Decodes a rlp encoded byte stream assuming that the decoded data should be of type bytes.

def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
388
    """
389
    Decodes a rlp encoded byte stream assuming that the decoded data
390
    should be of type `bytes`.
391
    """
392
    if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80:
393
        return encoded_bytes
394
    elif encoded_bytes[0] <= 0xB7:
395
        len_raw_data = encoded_bytes[0] - 0x80
396
        if len_raw_data < 0:
397
            raise DecodingError("negative length")
398
        if len_raw_data >= len(encoded_bytes):
399
            raise DecodingError("truncated")
400
        raw_data = encoded_bytes[1 : 1 + len_raw_data]
401
        if len_raw_data == 1 and raw_data[0] < 0x80:
402
            raise DecodingError
403
        return raw_data
404
    else:
405
        # This is the index in the encoded data at which decoded data
406
        # starts from.
407
        decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7
408
        if decoded_data_start_idx - 1 >= len(encoded_bytes):
409
            raise DecodingError
410
        if encoded_bytes[1] == 0:
411
            raise DecodingError
412
        len_decoded_data = int(
413
            Uint.from_be_bytes(encoded_bytes[1:decoded_data_start_idx])
414
        )
415
        if len_decoded_data < 0x38:
416
            raise DecodingError
417
        decoded_data_end_idx = decoded_data_start_idx + int(len_decoded_data)
418
        if decoded_data_end_idx - 1 >= len(encoded_bytes):
419
            raise DecodingError
420
        return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx]

decode_to_sequence

Decodes a rlp encoded byte stream assuming that the decoded data should be of type Sequence of objects.

def decode_to_sequence(encoded_sequence: Bytes) -> Sequence[Simple]:
424
    """
425
    Decodes a rlp encoded byte stream assuming that the decoded data
426
    should be of type `Sequence` of objects.
427
    """
428
    if encoded_sequence[0] <= 0xF7:
429
        len_joined_encodings = encoded_sequence[0] - 0xC0
430
        if len_joined_encodings >= len(encoded_sequence):
431
            raise DecodingError
432
        joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings]
433
    else:
434
        joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7
435
        if joined_encodings_start_idx - 1 >= len(encoded_sequence):
436
            raise DecodingError
437
        if encoded_sequence[1] == 0:
438
            raise DecodingError
439
        len_joined_encodings = int(
440
            Uint.from_be_bytes(encoded_sequence[1:joined_encodings_start_idx])
441
        )
442
        if len_joined_encodings < 0x38:
443
            raise DecodingError
444
        joined_encodings_end_idx = (
445
            joined_encodings_start_idx + len_joined_encodings
446
        )
447
        if joined_encodings_end_idx - 1 >= len(encoded_sequence):
448
            raise DecodingError
449
        joined_encodings = encoded_sequence[
450
            joined_encodings_start_idx:joined_encodings_end_idx
451
        ]
452
453
    return decode_joined_encodings(joined_encodings)

decode_joined_encodings

Decodes joined_encodings, which is a concatenation of RLP encoded objects.

def decode_joined_encodings(joined_encodings: Bytes) -> Sequence[Simple]:
457
    """
458
    Decodes `joined_encodings`, which is a concatenation of RLP encoded
459
    objects.
460
    """
461
    decoded_sequence = []
462
463
    item_start_idx = 0
464
    while item_start_idx < len(joined_encodings):
465
        encoded_item_length = decode_item_length(
466
            joined_encodings[item_start_idx:]
467
        )
468
        if item_start_idx + encoded_item_length - 1 >= len(joined_encodings):
469
            raise DecodingError
470
        encoded_item = joined_encodings[
471
            item_start_idx : item_start_idx + encoded_item_length
472
        ]
473
        decoded_sequence.append(decode(encoded_item))
474
        item_start_idx += encoded_item_length
475
476
    return decoded_sequence

decode_item_length

Find the length of the rlp encoding for the first object in the encoded sequence.

Here encoded_data refers to concatenation of rlp encoding for each item in a sequence.

def decode_item_length(encoded_data: Bytes) -> int:
480
    """
481
    Find the length of the rlp encoding for the first object in the
482
    encoded sequence.
483
484
    Here `encoded_data` refers to concatenation of rlp encoding for each
485
    item in a sequence.
486
    """
487
    if len(encoded_data) <= 0:
488
        raise DecodingError
489
490
    first_rlp_byte = encoded_data[0]
491
492
    # This is the length of the big endian representation of the length of
493
    # rlp encoded object byte stream.
494
    length_length = 0
495
    decoded_data_length = 0
496
497
    # This occurs only when the raw_data is a single byte whose value < 128
498
    if first_rlp_byte < 0x80:
499
        # We return 1 here, as the end formula
500
        # 1 + length_length + decoded_data_length would be invalid for
501
        # this case.
502
        return 1
503
    # This occurs only when the raw_data is a byte stream with length < 56
504
    # and doesn't fall into the above cases
505
    elif first_rlp_byte <= 0xB7:
506
        decoded_data_length = first_rlp_byte - 0x80
507
    # This occurs only when the raw_data is a byte stream and doesn't fall
508
    # into the above cases
509
    elif first_rlp_byte <= 0xBF:
510
        length_length = first_rlp_byte - 0xB7
511
        if length_length >= len(encoded_data):
512
            raise DecodingError
513
        if encoded_data[1] == 0:
514
            raise DecodingError
515
        decoded_data_length = int(
516
            Uint.from_be_bytes(encoded_data[1 : 1 + length_length])
517
        )
518
    # This occurs only when the raw_data is a sequence of objects with
519
    # length(concatenation of encoding of each object) < 56
520
    elif first_rlp_byte <= 0xF7:
521
        decoded_data_length = first_rlp_byte - 0xC0
522
    # This occurs only when the raw_data is a sequence of objects and
523
    # doesn't fall into the above cases.
524
    elif first_rlp_byte <= 0xFF:
525
        length_length = first_rlp_byte - 0xF7
526
        if length_length >= len(encoded_data):
527
            raise DecodingError
528
        if encoded_data[1] == 0:
529
            raise DecodingError
530
        decoded_data_length = int(
531
            Uint.from_be_bytes(encoded_data[1 : 1 + length_length])
532
        )
533
534
    return 1 + length_length + decoded_data_length

Decoder

537
Decoder: TypeAlias = Callable[[Simple], Extended]

With

When used with Annotated, indicates that a value needs to be encoded/decoded using a custom function.

class With:

__init__

def __init__(self, ​​decoder: Decoder) -> None:
549
        self._decoder = decoder