ethereum.rlp

.. _rlp:

Recursive Length Prefix (RLP) Encoding ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. contents:: Table of Contents :backlinks: none :local:

Introduction

Defines the serialization and deserialization format used throughout Ethereum.

RLP

Protocol that describes the requirements to be RLP-encodable.

class RLP:

__dataclass_fields__

49
    __dataclass_fields__: ClassVar[Dict]

Simple

52
Simple: TypeAlias = Union[Sequence["Simple"], bytes]

Extended

54
Extended: TypeAlias = Union[
55
    Sequence["Extended"], bytearray, bytes, Uint, FixedUint, str, bool, RLP
56
]

encode

Encodes raw_data into a sequence of bytes using RLP.

Parameters

raw_data : A Bytes, Uint, Uint256 or sequence of RLP encodable objects.

Returns

encoded : ethereum.base_types.Bytes The RLP encoded bytes representing raw_data.

def encode(raw_data: Extended) -> Bytes:
65
    """
66
    Encodes `raw_data` into a sequence of bytes using RLP.
67
68
    Parameters
69
    ----------
70
    raw_data :
71
        A `Bytes`, `Uint`, `Uint256` or sequence of `RLP` encodable
72
        objects.
73
74
    Returns
75
    -------
76
    encoded : `ethereum.base_types.Bytes`
77
        The RLP encoded bytes representing `raw_data`.
78
    """
79
    if isinstance(raw_data, Sequence):
80
        if isinstance(raw_data, (bytearray, bytes)):
81
            return encode_bytes(raw_data)
82
        elif isinstance(raw_data, str):
83
            return encode_bytes(raw_data.encode())
84
        else:
85
            return encode_sequence(raw_data)
86
    elif isinstance(raw_data, (Uint, FixedUint)):
87
        return encode(raw_data.to_be_bytes())
88
    elif isinstance(raw_data, bool):
89
        if raw_data:
90
            return encode_bytes(b"\x01")
91
        else:
92
            return encode_bytes(b"")
93
    elif is_dataclass(raw_data):
94
        return encode(astuple(raw_data))
95
    else:
96
        raise RLPEncodingError(
97
            "RLP Encoding of type {} is not supported".format(type(raw_data))
98
        )

encode_bytes

Encodes raw_bytes, a sequence of bytes, using RLP.

Parameters

raw_bytes : Bytes to encode with RLP.

Returns

encoded : ethereum.base_types.Bytes The RLP encoded bytes representing raw_bytes.

def encode_bytes(raw_bytes: Bytes) -> Bytes:
102
    """
103
    Encodes `raw_bytes`, a sequence of bytes, using RLP.
104
105
    Parameters
106
    ----------
107
    raw_bytes :
108
        Bytes to encode with RLP.
109
110
    Returns
111
    -------
112
    encoded : `ethereum.base_types.Bytes`
113
        The RLP encoded bytes representing `raw_bytes`.
114
    """
115
    len_raw_data = Uint(len(raw_bytes))
116
117
    if len_raw_data == 1 and raw_bytes[0] < 0x80:
118
        return raw_bytes
119
    elif len_raw_data < 0x38:
120
        return bytes([0x80 + len_raw_data]) + raw_bytes
121
    else:
122
        # length of raw data represented as big endian bytes
123
        len_raw_data_as_be = len_raw_data.to_be_bytes()
124
        return (
125
            bytes([0xB7 + len(len_raw_data_as_be)])
126
            + len_raw_data_as_be
127
            + raw_bytes
128
        )

encode_sequence

Encodes a list of RLP encodable objects (raw_sequence) using RLP.

Parameters

raw_sequence : Sequence of RLP encodable objects.

Returns

encoded : ethereum.base_types.Bytes The RLP encoded bytes representing raw_sequence.

def encode_sequence(raw_sequence: Sequence[Extended]) -> Bytes:
132
    """
133
    Encodes a list of RLP encodable objects (`raw_sequence`) using RLP.
134
135
    Parameters
136
    ----------
137
    raw_sequence :
138
            Sequence of RLP encodable objects.
139
140
    Returns
141
    -------
142
    encoded : `ethereum.base_types.Bytes`
143
        The RLP encoded bytes representing `raw_sequence`.
144
    """
145
    joined_encodings = get_joined_encodings(raw_sequence)
146
    len_joined_encodings = Uint(len(joined_encodings))
147
148
    if len_joined_encodings < 0x38:
149
        return Bytes([0xC0 + len_joined_encodings]) + joined_encodings
150
    else:
151
        len_joined_encodings_as_be = len_joined_encodings.to_be_bytes()
152
        return (
153
            Bytes([0xF7 + len(len_joined_encodings_as_be)])
154
            + len_joined_encodings_as_be
155
            + joined_encodings
156
        )

get_joined_encodings

Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.

Parameters

raw_sequence : Sequence to encode with RLP.

Returns

joined_encodings : ethereum.base_types.Bytes The concatenated RLP encoded bytes for each item in sequence raw_sequence.

def get_joined_encodings(raw_sequence: Sequence[Extended]) -> Bytes:
160
    """
161
    Obtain concatenation of rlp encoding for each item in the sequence
162
    raw_sequence.
163
164
    Parameters
165
    ----------
166
    raw_sequence :
167
        Sequence to encode with RLP.
168
169
    Returns
170
    -------
171
    joined_encodings : `ethereum.base_types.Bytes`
172
        The concatenated RLP encoded bytes for each item in sequence
173
        raw_sequence.
174
    """
175
    return b"".join(encode(item) for item in raw_sequence)

decode

Decodes an integer, byte sequence, or list of RLP encodable objects from the byte sequence encoded_data, using RLP.

Parameters

encoded_data : A sequence of bytes, in RLP form.

Returns

decoded_data : RLP Object decoded from encoded_data.

def decode(encoded_data: Bytes) -> Simple:
184
    """
185
    Decodes an integer, byte sequence, or list of RLP encodable objects
186
    from the byte sequence `encoded_data`, using RLP.
187
188
    Parameters
189
    ----------
190
    encoded_data :
191
        A sequence of bytes, in RLP form.
192
193
    Returns
194
    -------
195
    decoded_data : `RLP`
196
        Object decoded from `encoded_data`.
197
    """
198
    if len(encoded_data) <= 0:
199
        raise RLPDecodingError("Cannot decode empty bytestring")
200
201
    if encoded_data[0] <= 0xBF:
202
        # This means that the raw data is of type bytes
203
        return decode_to_bytes(encoded_data)
204
    else:
205
        # This means that the raw data is of type sequence
206
        return decode_to_sequence(encoded_data)

U

209
U = TypeVar("U", bound=Extended)

decode_to

Decode the bytes in encoded_data to an object of type cls. cls can be a Bytes subclass, a dataclass, Uint, U256 or Tuple[cls].

Parameters

cls: Type[U] The type to decode to. encoded_data : A sequence of bytes, in RLP form.

Returns

decoded_data : U Object decoded from encoded_data.

def decode_to(cls: Type[U], ​​encoded_data: Bytes) -> U:
213
    """
214
    Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be
215
    a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`.
216
217
    Parameters
218
    ----------
219
    cls: `Type[U]`
220
        The type to decode to.
221
    encoded_data :
222
        A sequence of bytes, in RLP form.
223
224
    Returns
225
    -------
226
    decoded_data : `U`
227
        Object decoded from `encoded_data`.
228
    """
229
    decoded = decode(encoded_data)
230
    return (cls, decoded)

_deserialize_to

233
@overload
def _deserialize_to(class_: Type[U], ​​value: Simple) -> U:
235
    pass

_deserialize_to

238
@overload
def _deserialize_to(class_: object, ​​value: Simple) -> Extended:
240
    pass

_deserialize_to

def _deserialize_to(class_: object, ​​value: Simple) -> Extended:
244
    if not isinstance(class_, type):
245
        return _deserialize_to_annotation(class_, value)
246
    elif is_dataclass(class_):
247
        return _deserialize_to_dataclass(class_, value)
248
    elif issubclass(class_, (Uint, FixedUint)):
249
        return _deserialize_to_uint(class_, value)
250
    elif issubclass(class_, (Bytes, FixedBytes)):
251
        return _deserialize_to_bytes(class_, value)
252
    elif class_ is bool:
253
        return _deserialize_to_bool(value)
254
    else:
255
        raise NotImplementedError(class_)

_deserialize_to_dataclass

def _deserialize_to_dataclass(cls: Type[U], ​​decoded: Simple) -> U:
259
    assert is_dataclass(cls)
260
    hints = get_type_hints(cls)
261
    target_fields = fields(cls)
262
263
    if isinstance(decoded, bytes):
264
        raise RLPDecodingError(f"got `bytes` while decoding `{cls.__name__}`")
265
266
    if len(target_fields) != len(decoded):
267
        name = cls.__name__
268
        actual = len(decoded)
269
        expected = len(target_fields)
270
        raise RLPDecodingError(
271
            f"`{name}` needs {expected} field(s), but got {actual} instead"
272
        )
273
274
    values: Dict[str, Any] = {}
275
276
    for value, target_field in zip(decoded, target_fields):
277
        resolved_type = hints[target_field.name]
278
        values[target_field.name] = (resolved_type, value)
279
280
    result = cls(**values)
281
    assert isinstance(result, cls)
282
    return cast(U, result)

_deserialize_to_bool

def _deserialize_to_bool(value: Simple) -> bool:
286
    if value == b"":
287
        return False
288
    elif value == b"\x01":
289
        return True
290
    else:
291
        raise RLPDecodingError

_deserialize_to_bytes

def _deserialize_to_bytes(class_: Union[Type[Bytes], Type[FixedBytes]], ​​value: Simple) -> Union[Bytes, FixedBytes]:
297
    if not isinstance(value, bytes):
298
        raise RLPDecodingError
299
    try:
300
        return class_(value)
301
    except ValueError as e:
302
        raise RLPDecodingError from e

_deserialize_to_uint

def _deserialize_to_uint(class_: Union[Type[Uint], Type[FixedUint]], ​​decoded: Simple) -> Union[Uint, FixedUint]:
308
    if not isinstance(decoded, bytes):
309
        raise RLPDecodingError
310
    try:
311
        return class_.from_be_bytes(decoded)
312
    except ValueError as e:
313
        raise RLPDecodingError from e

_deserialize_to_annotation

def _deserialize_to_annotation(annotation: object, ​​value: Simple) -> Extended:
317
    origin = get_origin(annotation)
318
    if origin is Union:
319
        return _deserialize_to_union(annotation, value)
320
    elif origin in (Tuple, tuple):
321
        return _deserialize_to_tuple(annotation, value)
322
    elif origin is None:
323
        raise Exception(annotation)
324
    else:
325
        raise NotImplementedError(f"RLP non-type {origin!r}")

_deserialize_to_union

def _deserialize_to_union(annotation: object, ​​value: Simple) -> Extended:
329
    arguments = get_args(annotation)
330
    successes = []
331
    failures = []
332
    for argument in arguments:
333
        try:
334
            success = (argument, value)
335
        except Exception as e:
336
            failures.append(e)
337
            continue
338
339
        successes.append(success)
340
341
    if len(successes) == 1:
342
        return successes[0]
343
    elif not successes:
344
        raise RLPDecodingError(f"no matching union variant\n{failures!r}")
345
    else:
346
        raise RLPDecodingError("multiple matching union variants")

_deserialize_to_tuple

def _deserialize_to_tuple(annotation: object, ​​values: Simple) -> Sequence[Extended]:
352
    if isinstance(values, bytes):
353
        raise RLPDecodingError
354
    arguments = list(get_args(annotation))
355
356
    if arguments[-1] is Ellipsis:
357
        arguments.pop()
358
        fill_count = len(values) - len(arguments)
359
        arguments = list(arguments) + [arguments[-1]] * fill_count
360
361
    decoded = []
362
    for argument, value in zip(arguments, values):
363
        decoded.append((argument, value))
364
365
    return tuple(decoded)

decode_to_bytes

Decodes a rlp encoded byte stream assuming that the decoded data should be of type bytes.

Parameters

encoded_bytes : RLP encoded byte stream.

Returns

decoded : ethereum.base_types.Bytes RLP decoded Bytes data

def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
369
    """
370
    Decodes a rlp encoded byte stream assuming that the decoded data
371
    should be of type `bytes`.
372
373
    Parameters
374
    ----------
375
    encoded_bytes :
376
        RLP encoded byte stream.
377
378
    Returns
379
    -------
380
    decoded : `ethereum.base_types.Bytes`
381
        RLP decoded Bytes data
382
    """
383
    if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80:
384
        return encoded_bytes
385
    elif encoded_bytes[0] <= 0xB7:
386
        len_raw_data = encoded_bytes[0] - 0x80
387
        if len_raw_data >= len(encoded_bytes):
388
            raise RLPDecodingError
389
        raw_data = encoded_bytes[1 : 1 + len_raw_data]
390
        if len_raw_data == 1 and raw_data[0] < 0x80:
391
            raise RLPDecodingError
392
        return raw_data
393
    else:
394
        # This is the index in the encoded data at which decoded data
395
        # starts from.
396
        decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7
397
        if decoded_data_start_idx - 1 >= len(encoded_bytes):
398
            raise RLPDecodingError
399
        if encoded_bytes[1] == 0:
400
            raise RLPDecodingError
401
        len_decoded_data = Uint.from_be_bytes(
402
            encoded_bytes[1:decoded_data_start_idx]
403
        )
404
        if len_decoded_data < 0x38:
405
            raise RLPDecodingError
406
        decoded_data_end_idx = decoded_data_start_idx + len_decoded_data
407
        if decoded_data_end_idx - 1 >= len(encoded_bytes):
408
            raise RLPDecodingError
409
        return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx]

decode_to_sequence

Decodes a rlp encoded byte stream assuming that the decoded data should be of type Sequence of objects.

Parameters

encoded_sequence : An RLP encoded Sequence.

Returns

decoded : Sequence[RLP] Sequence of objects decoded from encoded_sequence.

def decode_to_sequence(encoded_sequence: Bytes) -> Sequence[Simple]:
413
    """
414
    Decodes a rlp encoded byte stream assuming that the decoded data
415
    should be of type `Sequence` of objects.
416
417
    Parameters
418
    ----------
419
    encoded_sequence :
420
        An RLP encoded Sequence.
421
422
    Returns
423
    -------
424
    decoded : `Sequence[RLP]`
425
        Sequence of objects decoded from `encoded_sequence`.
426
    """
427
    if encoded_sequence[0] <= 0xF7:
428
        len_joined_encodings = encoded_sequence[0] - 0xC0
429
        if len_joined_encodings >= len(encoded_sequence):
430
            raise RLPDecodingError
431
        joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings]
432
    else:
433
        joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7
434
        if joined_encodings_start_idx - 1 >= len(encoded_sequence):
435
            raise RLPDecodingError
436
        if encoded_sequence[1] == 0:
437
            raise RLPDecodingError
438
        len_joined_encodings = Uint.from_be_bytes(
439
            encoded_sequence[1:joined_encodings_start_idx]
440
        )
441
        if len_joined_encodings < 0x38:
442
            raise RLPDecodingError
443
        joined_encodings_end_idx = (
444
            joined_encodings_start_idx + len_joined_encodings
445
        )
446
        if joined_encodings_end_idx - 1 >= len(encoded_sequence):
447
            raise RLPDecodingError
448
        joined_encodings = encoded_sequence[
449
            joined_encodings_start_idx:joined_encodings_end_idx
450
        ]
451
452
    return decode_joined_encodings(joined_encodings)

decode_joined_encodings

Decodes joined_encodings, which is a concatenation of RLP encoded objects.

Parameters

joined_encodings : concatenation of RLP encoded objects

Returns

decoded : List[RLP] A list of objects decoded from joined_encodings.

def decode_joined_encodings(joined_encodings: Bytes) -> Sequence[Simple]:
456
    """
457
    Decodes `joined_encodings`, which is a concatenation of RLP encoded
458
    objects.
459
460
    Parameters
461
    ----------
462
    joined_encodings :
463
        concatenation of RLP encoded objects
464
465
    Returns
466
    -------
467
    decoded : `List[RLP]`
468
        A list of objects decoded from `joined_encodings`.
469
    """
470
    decoded_sequence = []
471
472
    item_start_idx = 0
473
    while item_start_idx < len(joined_encodings):
474
        encoded_item_length = decode_item_length(
475
            joined_encodings[item_start_idx:]
476
        )
477
        if item_start_idx + encoded_item_length - 1 >= len(joined_encodings):
478
            raise RLPDecodingError
479
        encoded_item = joined_encodings[
480
            item_start_idx : item_start_idx + encoded_item_length
481
        ]
482
        decoded_sequence.append(decode(encoded_item))
483
        item_start_idx += encoded_item_length
484
485
    return decoded_sequence

decode_item_length

Find the length of the rlp encoding for the first object in the encoded sequence. Here encoded_data refers to concatenation of rlp encoding for each item in a sequence.

NOTE - This is a helper function not described in the spec. It was introduced as the spec doesn't discuss about decoding the RLP encoded data.

Parameters

encoded_data : RLP encoded data for a sequence of objects.

Returns

rlp_length : int

def decode_item_length(encoded_data: Bytes) -> int:
489
    """
490
    Find the length of the rlp encoding for the first object in the
491
    encoded sequence.
492
    Here `encoded_data` refers to concatenation of rlp encoding for each
493
    item in a sequence.
494
495
    NOTE - This is a helper function not described in the spec. It was
496
    introduced as the spec doesn't discuss about decoding the RLP encoded
497
    data.
498
499
    Parameters
500
    ----------
501
    encoded_data :
502
        RLP encoded data for a sequence of objects.
503
504
    Returns
505
    -------
506
    rlp_length : `int`
507
    """
508
    if len(encoded_data) <= 0:
509
        raise RLPDecodingError
510
511
    first_rlp_byte = Uint(encoded_data[0])
512
513
    # This is the length of the big endian representation of the length of
514
    # rlp encoded object byte stream.
515
    length_length = Uint(0)
516
    decoded_data_length = 0
517
518
    # This occurs only when the raw_data is a single byte whose value < 128
519
    if first_rlp_byte < 0x80:
520
        # We return 1 here, as the end formula
521
        # 1 + length_length + decoded_data_length would be invalid for
522
        # this case.
523
        return 1
524
    # This occurs only when the raw_data is a byte stream with length < 56
525
    # and doesn't fall into the above cases
526
    elif first_rlp_byte <= 0xB7:
527
        decoded_data_length = first_rlp_byte - 0x80
528
    # This occurs only when the raw_data is a byte stream and doesn't fall
529
    # into the above cases
530
    elif first_rlp_byte <= 0xBF:
531
        length_length = first_rlp_byte - 0xB7
532
        if length_length >= len(encoded_data):
533
            raise RLPDecodingError
534
        if encoded_data[1] == 0:
535
            raise RLPDecodingError
536
        decoded_data_length = Uint.from_be_bytes(
537
            encoded_data[1 : 1 + length_length]
538
        )
539
    # This occurs only when the raw_data is a sequence of objects with
540
    # length(concatenation of encoding of each object) < 56
541
    elif first_rlp_byte <= 0xF7:
542
        decoded_data_length = first_rlp_byte - 0xC0
543
    # This occurs only when the raw_data is a sequence of objects and
544
    # doesn't fall into the above cases.
545
    elif first_rlp_byte <= 0xFF:
546
        length_length = first_rlp_byte - 0xF7
547
        if length_length >= len(encoded_data):
548
            raise RLPDecodingError
549
        if encoded_data[1] == 0:
550
            raise RLPDecodingError
551
        decoded_data_length = Uint.from_be_bytes(
552
            encoded_data[1 : 1 + length_length]
553
        )
554
555
    return 1 + length_length + decoded_data_length

rlp_hash

Obtain the keccak-256 hash of the rlp encoding of the passed in data.

Parameters

data : The data for which we need the rlp hash.

Returns

hash : Hash32 The rlp hash of the passed in data.

def rlp_hash(data: Extended) -> Hash32:
559
    """
560
    Obtain the keccak-256 hash of the rlp encoding of the passed in data.
561
562
    Parameters
563
    ----------
564
    data :
565
        The data for which we need the rlp hash.
566
567
    Returns
568
    -------
569
    hash : `Hash32`
570
        The rlp hash of the passed in data.
571
    """
572
    return keccak256(encode(data))