ethereum.rlp

.. _rlp:

Recursive Length Prefix (RLP) Encoding ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. contents:: Table of Contents :backlinks: none :local:

Introduction

Defines the serialization and deserialization format used throughout Ethereum.

RLP

Protocol that describes the requirements to be RLP-encodable.

class RLP:

__dataclass_fields__

50
    __dataclass_fields__: ClassVar[Dict]

Simple

53
Simple: TypeAlias = Union[Sequence["Simple"], bytes]

Extended

55
Extended: TypeAlias = Union[
56
    Sequence["Extended"], bytearray, bytes, Uint, FixedUnsigned, str, bool, RLP
57
]

encode

Encodes raw_data into a sequence of bytes using RLP.

Parameters

raw_data : A Bytes, Uint, Uint256 or sequence of RLP encodable objects.

Returns

encoded : ethereum.base_types.Bytes The RLP encoded bytes representing raw_data.

def encode(raw_data: Extended) -> Bytes:
66
    """
67
    Encodes `raw_data` into a sequence of bytes using RLP.
68
69
    Parameters
70
    ----------
71
    raw_data :
72
        A `Bytes`, `Uint`, `Uint256` or sequence of `RLP` encodable
73
        objects.
74
75
    Returns
76
    -------
77
    encoded : `ethereum.base_types.Bytes`
78
        The RLP encoded bytes representing `raw_data`.
79
    """
80
    if isinstance(raw_data, Sequence):
81
        if isinstance(raw_data, (bytearray, bytes)):
82
            return encode_bytes(raw_data)
83
        elif isinstance(raw_data, str):
84
            return encode_bytes(raw_data.encode())
85
        else:
86
            return encode_sequence(raw_data)
87
    elif isinstance(raw_data, (Uint, FixedUnsigned)):
88
        return encode(raw_data.to_be_bytes())
89
    elif isinstance(raw_data, bool):
90
        if raw_data:
91
            return encode_bytes(b"\x01")
92
        else:
93
            return encode_bytes(b"")
94
    elif is_dataclass(raw_data):
95
        return encode(astuple(raw_data))
96
    else:
97
        raise RLPEncodingError(
98
            "RLP Encoding of type {} is not supported".format(type(raw_data))
99
        )

encode_bytes

Encodes raw_bytes, a sequence of bytes, using RLP.

Parameters

raw_bytes : Bytes to encode with RLP.

Returns

encoded : ethereum.base_types.Bytes The RLP encoded bytes representing raw_bytes.

def encode_bytes(raw_bytes: Bytes) -> Bytes:
103
    """
104
    Encodes `raw_bytes`, a sequence of bytes, using RLP.
105
106
    Parameters
107
    ----------
108
    raw_bytes :
109
        Bytes to encode with RLP.
110
111
    Returns
112
    -------
113
    encoded : `ethereum.base_types.Bytes`
114
        The RLP encoded bytes representing `raw_bytes`.
115
    """
116
    len_raw_data = len(raw_bytes)
117
118
    if len_raw_data == 1 and raw_bytes[0] < 0x80:
119
        return raw_bytes
120
    elif len_raw_data < 0x38:
121
        return bytes([0x80 + len_raw_data]) + raw_bytes
122
    else:
123
        # length of raw data represented as big endian bytes
124
        len_raw_data_as_be = Uint(len_raw_data).to_be_bytes()
125
        return (
126
            bytes([0xB7 + len(len_raw_data_as_be)])
127
            + len_raw_data_as_be
128
            + raw_bytes
129
        )

encode_sequence

Encodes a list of RLP encodable objects (raw_sequence) using RLP.

Parameters

raw_sequence : Sequence of RLP encodable objects.

Returns

encoded : ethereum.base_types.Bytes The RLP encoded bytes representing raw_sequence.

def encode_sequence(raw_sequence: Sequence[Extended]) -> Bytes:
133
    """
134
    Encodes a list of RLP encodable objects (`raw_sequence`) using RLP.
135
136
    Parameters
137
    ----------
138
    raw_sequence :
139
            Sequence of RLP encodable objects.
140
141
    Returns
142
    -------
143
    encoded : `ethereum.base_types.Bytes`
144
        The RLP encoded bytes representing `raw_sequence`.
145
    """
146
    joined_encodings = get_joined_encodings(raw_sequence)
147
    len_joined_encodings = len(joined_encodings)
148
149
    if len_joined_encodings < 0x38:
150
        return Bytes([0xC0 + len_joined_encodings]) + joined_encodings
151
    else:
152
        len_joined_encodings_as_be = Uint(len_joined_encodings).to_be_bytes()
153
        return (
154
            Bytes([0xF7 + len(len_joined_encodings_as_be)])
155
            + len_joined_encodings_as_be
156
            + joined_encodings
157
        )

get_joined_encodings

Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.

Parameters

raw_sequence : Sequence to encode with RLP.

Returns

joined_encodings : ethereum.base_types.Bytes The concatenated RLP encoded bytes for each item in sequence raw_sequence.

def get_joined_encodings(raw_sequence: Sequence[Extended]) -> Bytes:
161
    """
162
    Obtain concatenation of rlp encoding for each item in the sequence
163
    raw_sequence.
164
165
    Parameters
166
    ----------
167
    raw_sequence :
168
        Sequence to encode with RLP.
169
170
    Returns
171
    -------
172
    joined_encodings : `ethereum.base_types.Bytes`
173
        The concatenated RLP encoded bytes for each item in sequence
174
        raw_sequence.
175
    """
176
    return b"".join(encode(item) for item in raw_sequence)

decode

Decodes an integer, byte sequence, or list of RLP encodable objects from the byte sequence encoded_data, using RLP.

Parameters

encoded_data : A sequence of bytes, in RLP form.

Returns

decoded_data : RLP Object decoded from encoded_data.

def decode(encoded_data: Bytes) -> Simple:
185
    """
186
    Decodes an integer, byte sequence, or list of RLP encodable objects
187
    from the byte sequence `encoded_data`, using RLP.
188
189
    Parameters
190
    ----------
191
    encoded_data :
192
        A sequence of bytes, in RLP form.
193
194
    Returns
195
    -------
196
    decoded_data : `RLP`
197
        Object decoded from `encoded_data`.
198
    """
199
    if len(encoded_data) <= 0:
200
        raise RLPDecodingError("Cannot decode empty bytestring")
201
202
    if encoded_data[0] <= 0xBF:
203
        # This means that the raw data is of type bytes
204
        return decode_to_bytes(encoded_data)
205
    else:
206
        # This means that the raw data is of type sequence
207
        return decode_to_sequence(encoded_data)

U

210
U = TypeVar("U", bound=Extended)

decode_to

Decode the bytes in encoded_data to an object of type cls. cls can be a Bytes subclass, a dataclass, Uint, U256 or Tuple[cls].

Parameters

cls: Type[U] The type to decode to. encoded_data : A sequence of bytes, in RLP form.

Returns

decoded_data : U Object decoded from encoded_data.

def decode_to(cls: Type[U], ​​encoded_data: Bytes) -> U:
214
    """
215
    Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be
216
    a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`.
217
218
    Parameters
219
    ----------
220
    cls: `Type[U]`
221
        The type to decode to.
222
    encoded_data :
223
        A sequence of bytes, in RLP form.
224
225
    Returns
226
    -------
227
    decoded_data : `U`
228
        Object decoded from `encoded_data`.
229
    """
230
    decoded = decode(encoded_data)
231
    return (cls, decoded)

_deserialize_to

234
@overload
def _deserialize_to(class_: Type[U], ​​value: Simple) -> U:
236
    pass

_deserialize_to

239
@overload
def _deserialize_to(class_: object, ​​value: Simple) -> Extended:
241
    pass

_deserialize_to

def _deserialize_to(class_: object, ​​value: Simple) -> Extended:
245
    if not isinstance(class_, type):
246
        return _deserialize_to_annotation(class_, value)
247
    elif is_dataclass(class_):
248
        return _deserialize_to_dataclass(class_, value)
249
    elif issubclass(class_, (Uint, FixedUnsigned)):
250
        return _deserialize_to_uint(class_, value)
251
    elif issubclass(class_, (Bytes, FixedBytes)):
252
        return _deserialize_to_bytes(class_, value)
253
    elif class_ is bool:
254
        return _deserialize_to_bool(value)
255
    else:
256
        raise NotImplementedError(class_)

_deserialize_to_dataclass

def _deserialize_to_dataclass(cls: Type[U], ​​decoded: Simple) -> U:
260
    assert is_dataclass(cls)
261
    hints = get_type_hints(cls)
262
    target_fields = fields(cls)
263
264
    if isinstance(decoded, bytes):
265
        raise RLPDecodingError(f"got `bytes` while decoding `{cls.__name__}`")
266
267
    if len(target_fields) != len(decoded):
268
        name = cls.__name__
269
        actual = len(decoded)
270
        expected = len(target_fields)
271
        raise RLPDecodingError(
272
            f"`{name}` needs {expected} field(s), but got {actual} instead"
273
        )
274
275
    values: Dict[str, Any] = {}
276
277
    for value, target_field in zip(decoded, target_fields):
278
        resolved_type = hints[target_field.name]
279
        values[target_field.name] = (resolved_type, value)
280
281
    result = cls(**values)
282
    assert isinstance(result, cls)
283
    return cast(U, result)

_deserialize_to_bool

def _deserialize_to_bool(value: Simple) -> bool:
287
    if value == b"":
288
        return False
289
    elif value == b"\x01":
290
        return True
291
    else:
292
        raise RLPDecodingError

_deserialize_to_bytes

def _deserialize_to_bytes(class_: Union[Type[Bytes], Type[FixedBytes]], ​​value: Simple) -> Union[Bytes, FixedBytes]:
298
    if not isinstance(value, bytes):
299
        raise RLPDecodingError
300
    try:
301
        return class_(value)
302
    except ValueError as e:
303
        raise RLPDecodingError from e

_deserialize_to_uint

def _deserialize_to_uint(class_: Union[Type[Uint], Type[FixedUnsigned]], ​​decoded: Simple) -> Union[Uint, FixedUnsigned]:
309
    if not isinstance(decoded, bytes):
310
        raise RLPDecodingError
311
    try:
312
        return class_.from_be_bytes(decoded)
313
    except ValueError as e:
314
        raise RLPDecodingError from e

_deserialize_to_annotation

def _deserialize_to_annotation(annotation: object, ​​value: Simple) -> Extended:
318
    origin = get_origin(annotation)
319
    if origin is Union:
320
        return _deserialize_to_union(annotation, value)
321
    elif origin in (Tuple, tuple):
322
        return _deserialize_to_tuple(annotation, value)
323
    elif origin is None:
324
        raise Exception(annotation)
325
    else:
326
        raise NotImplementedError(f"RLP non-type {origin!r}")

_deserialize_to_union

def _deserialize_to_union(annotation: object, ​​value: Simple) -> Extended:
330
    arguments = get_args(annotation)
331
    successes = []
332
    failures = []
333
    for argument in arguments:
334
        try:
335
            success = (argument, value)
336
        except Exception as e:
337
            failures.append(e)
338
            continue
339
340
        successes.append(success)
341
342
    if len(successes) == 1:
343
        return successes[0]
344
    elif not successes:
345
        raise RLPDecodingError(f"no matching union variant\n{failures!r}")
346
    else:
347
        raise RLPDecodingError("multiple matching union variants")

_deserialize_to_tuple

def _deserialize_to_tuple(annotation: object, ​​values: Simple) -> Sequence[Extended]:
353
    if isinstance(values, bytes):
354
        raise RLPDecodingError
355
    arguments = list(get_args(annotation))
356
357
    if arguments[-1] is Ellipsis:
358
        arguments.pop()
359
        fill_count = len(values) - len(arguments)
360
        arguments = list(arguments) + [arguments[-1]] * fill_count
361
362
    decoded = []
363
    for argument, value in zip(arguments, values):
364
        decoded.append((argument, value))
365
366
    return tuple(decoded)

decode_to_bytes

Decodes a rlp encoded byte stream assuming that the decoded data should be of type bytes.

Parameters

encoded_bytes : RLP encoded byte stream.

Returns

decoded : ethereum.base_types.Bytes RLP decoded Bytes data

def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
370
    """
371
    Decodes a rlp encoded byte stream assuming that the decoded data
372
    should be of type `bytes`.
373
374
    Parameters
375
    ----------
376
    encoded_bytes :
377
        RLP encoded byte stream.
378
379
    Returns
380
    -------
381
    decoded : `ethereum.base_types.Bytes`
382
        RLP decoded Bytes data
383
    """
384
    if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80:
385
        return encoded_bytes
386
    elif encoded_bytes[0] <= 0xB7:
387
        len_raw_data = encoded_bytes[0] - 0x80
388
        if len_raw_data >= len(encoded_bytes):
389
            raise RLPDecodingError
390
        raw_data = encoded_bytes[1 : 1 + len_raw_data]
391
        if len_raw_data == 1 and raw_data[0] < 0x80:
392
            raise RLPDecodingError
393
        return raw_data
394
    else:
395
        # This is the index in the encoded data at which decoded data
396
        # starts from.
397
        decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7
398
        if decoded_data_start_idx - 1 >= len(encoded_bytes):
399
            raise RLPDecodingError
400
        if encoded_bytes[1] == 0:
401
            raise RLPDecodingError
402
        len_decoded_data = int(
403
            Uint.from_be_bytes(encoded_bytes[1:decoded_data_start_idx])
404
        )
405
        if len_decoded_data < 0x38:
406
            raise RLPDecodingError
407
        decoded_data_end_idx = decoded_data_start_idx + int(len_decoded_data)
408
        if decoded_data_end_idx - 1 >= len(encoded_bytes):
409
            raise RLPDecodingError
410
        return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx]

decode_to_sequence

Decodes a rlp encoded byte stream assuming that the decoded data should be of type Sequence of objects.

Parameters

encoded_sequence : An RLP encoded Sequence.

Returns

decoded : Sequence[RLP] Sequence of objects decoded from encoded_sequence.

def decode_to_sequence(encoded_sequence: Bytes) -> Sequence[Simple]:
414
    """
415
    Decodes a rlp encoded byte stream assuming that the decoded data
416
    should be of type `Sequence` of objects.
417
418
    Parameters
419
    ----------
420
    encoded_sequence :
421
        An RLP encoded Sequence.
422
423
    Returns
424
    -------
425
    decoded : `Sequence[RLP]`
426
        Sequence of objects decoded from `encoded_sequence`.
427
    """
428
    if encoded_sequence[0] <= 0xF7:
429
        len_joined_encodings = encoded_sequence[0] - 0xC0
430
        if len_joined_encodings >= len(encoded_sequence):
431
            raise RLPDecodingError
432
        joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings]
433
    else:
434
        joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7
435
        if joined_encodings_start_idx - 1 >= len(encoded_sequence):
436
            raise RLPDecodingError
437
        if encoded_sequence[1] == 0:
438
            raise RLPDecodingError
439
        len_joined_encodings = int(
440
            Uint.from_be_bytes(encoded_sequence[1:joined_encodings_start_idx])
441
        )
442
        if len_joined_encodings < 0x38:
443
            raise RLPDecodingError
444
        joined_encodings_end_idx = (
445
            joined_encodings_start_idx + len_joined_encodings
446
        )
447
        if joined_encodings_end_idx - 1 >= len(encoded_sequence):
448
            raise RLPDecodingError
449
        joined_encodings = encoded_sequence[
450
            joined_encodings_start_idx:joined_encodings_end_idx
451
        ]
452
453
    return decode_joined_encodings(joined_encodings)

decode_joined_encodings

Decodes joined_encodings, which is a concatenation of RLP encoded objects.

Parameters

joined_encodings : concatenation of RLP encoded objects

Returns

decoded : List[RLP] A list of objects decoded from joined_encodings.

def decode_joined_encodings(joined_encodings: Bytes) -> Sequence[Simple]:
457
    """
458
    Decodes `joined_encodings`, which is a concatenation of RLP encoded
459
    objects.
460
461
    Parameters
462
    ----------
463
    joined_encodings :
464
        concatenation of RLP encoded objects
465
466
    Returns
467
    -------
468
    decoded : `List[RLP]`
469
        A list of objects decoded from `joined_encodings`.
470
    """
471
    decoded_sequence = []
472
473
    item_start_idx = 0
474
    while item_start_idx < len(joined_encodings):
475
        encoded_item_length = decode_item_length(
476
            joined_encodings[item_start_idx:]
477
        )
478
        if item_start_idx + encoded_item_length - 1 >= len(joined_encodings):
479
            raise RLPDecodingError
480
        encoded_item = joined_encodings[
481
            item_start_idx : item_start_idx + encoded_item_length
482
        ]
483
        decoded_sequence.append(decode(encoded_item))
484
        item_start_idx += encoded_item_length
485
486
    return decoded_sequence

decode_item_length

Find the length of the rlp encoding for the first object in the encoded sequence. Here encoded_data refers to concatenation of rlp encoding for each item in a sequence.

NOTE - This is a helper function not described in the spec. It was introduced as the spec doesn't discuss about decoding the RLP encoded data.

Parameters

encoded_data : RLP encoded data for a sequence of objects.

Returns

rlp_length : int

def decode_item_length(encoded_data: Bytes) -> int:
490
    """
491
    Find the length of the rlp encoding for the first object in the
492
    encoded sequence.
493
    Here `encoded_data` refers to concatenation of rlp encoding for each
494
    item in a sequence.
495
496
    NOTE - This is a helper function not described in the spec. It was
497
    introduced as the spec doesn't discuss about decoding the RLP encoded
498
    data.
499
500
    Parameters
501
    ----------
502
    encoded_data :
503
        RLP encoded data for a sequence of objects.
504
505
    Returns
506
    -------
507
    rlp_length : `int`
508
    """
509
    if len(encoded_data) <= 0:
510
        raise RLPDecodingError
511
512
    first_rlp_byte = encoded_data[0]
513
514
    # This is the length of the big endian representation of the length of
515
    # rlp encoded object byte stream.
516
    length_length = 0
517
    decoded_data_length = 0
518
519
    # This occurs only when the raw_data is a single byte whose value < 128
520
    if first_rlp_byte < 0x80:
521
        # We return 1 here, as the end formula
522
        # 1 + length_length + decoded_data_length would be invalid for
523
        # this case.
524
        return 1
525
    # This occurs only when the raw_data is a byte stream with length < 56
526
    # and doesn't fall into the above cases
527
    elif first_rlp_byte <= 0xB7:
528
        decoded_data_length = first_rlp_byte - 0x80
529
    # This occurs only when the raw_data is a byte stream and doesn't fall
530
    # into the above cases
531
    elif first_rlp_byte <= 0xBF:
532
        length_length = first_rlp_byte - 0xB7
533
        if length_length >= len(encoded_data):
534
            raise RLPDecodingError
535
        if encoded_data[1] == 0:
536
            raise RLPDecodingError
537
        decoded_data_length = int(
538
            Uint.from_be_bytes(encoded_data[1 : 1 + length_length])
539
        )
540
    # This occurs only when the raw_data is a sequence of objects with
541
    # length(concatenation of encoding of each object) < 56
542
    elif first_rlp_byte <= 0xF7:
543
        decoded_data_length = first_rlp_byte - 0xC0
544
    # This occurs only when the raw_data is a sequence of objects and
545
    # doesn't fall into the above cases.
546
    elif first_rlp_byte <= 0xFF:
547
        length_length = first_rlp_byte - 0xF7
548
        if length_length >= len(encoded_data):
549
            raise RLPDecodingError
550
        if encoded_data[1] == 0:
551
            raise RLPDecodingError
552
        decoded_data_length = int(
553
            Uint.from_be_bytes(encoded_data[1 : 1 + length_length])
554
        )
555
556
    return 1 + length_length + decoded_data_length

rlp_hash

Obtain the keccak-256 hash of the rlp encoding of the passed in data.

Parameters

data : The data for which we need the rlp hash.

Returns

hash : Hash32 The rlp hash of the passed in data.

def rlp_hash(data: Extended) -> Hash32:
560
    """
561
    Obtain the keccak-256 hash of the rlp encoding of the passed in data.
562
563
    Parameters
564
    ----------
565
    data :
566
        The data for which we need the rlp hash.
567
568
    Returns
569
    -------
570
    hash : `Hash32`
571
        The rlp hash of the passed in data.
572
    """
573
    return keccak256(encode(data))