ethereum.rlp
.. _rlp:
Recursive Length Prefix (RLP) Encoding ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. contents:: Table of Contents :backlinks: none :local:
Introduction
Defines the serialization and deserialization format used throughout Ethereum.
RLP
Protocol
that describes the requirements to be RLP-encodable.
class RLP:
__dataclass_fields__
49 | __dataclass_fields__: ClassVar[Dict] |
---|
Simple
52 | Simple: TypeAlias = Union[Sequence["Simple"], bytes] |
---|
Extended
54 | Extended: TypeAlias = Union[ |
---|---|
55 | Sequence["Extended"], bytearray, bytes, Uint, FixedUint, str, bool, RLP |
56 | ] |
encode
Encodes raw_data
into a sequence of bytes using RLP.
Parameters
raw_data :
A Bytes
, Uint
, Uint256
or sequence of RLP
encodable
objects.
Returns
encoded : ethereum.base_types.Bytes
The RLP encoded bytes representing raw_data
.
def encode(raw_data: Extended) -> Bytes:
65 | """ |
---|---|
66 | Encodes `raw_data` into a sequence of bytes using RLP. |
67 |
|
68 | Parameters |
69 | ---------- |
70 | raw_data : |
71 | A `Bytes`, `Uint`, `Uint256` or sequence of `RLP` encodable |
72 | objects. |
73 |
|
74 | Returns |
75 | ------- |
76 | encoded : `ethereum.base_types.Bytes` |
77 | The RLP encoded bytes representing `raw_data`. |
78 | """ |
79 | if isinstance(raw_data, Sequence): |
80 | if isinstance(raw_data, (bytearray, bytes)): |
81 | return encode_bytes(raw_data) |
82 | elif isinstance(raw_data, str): |
83 | return encode_bytes(raw_data.encode()) |
84 | else: |
85 | return encode_sequence(raw_data) |
86 | elif isinstance(raw_data, (Uint, FixedUint)): |
87 | return encode(raw_data.to_be_bytes()) |
88 | elif isinstance(raw_data, bool): |
89 | if raw_data: |
90 | return encode_bytes(b"\x01") |
91 | else: |
92 | return encode_bytes(b"") |
93 | elif is_dataclass(raw_data): |
94 | return encode(astuple(raw_data)) |
95 | else: |
96 | raise RLPEncodingError( |
97 | "RLP Encoding of type {} is not supported".format(type(raw_data)) |
98 | ) |
encode_bytes
Encodes raw_bytes
, a sequence of bytes, using RLP.
Parameters
raw_bytes : Bytes to encode with RLP.
Returns
encoded : ethereum.base_types.Bytes
The RLP encoded bytes representing raw_bytes
.
def encode_bytes(raw_bytes: Bytes) -> Bytes:
102 | """ |
---|---|
103 | Encodes `raw_bytes`, a sequence of bytes, using RLP. |
104 |
|
105 | Parameters |
106 | ---------- |
107 | raw_bytes : |
108 | Bytes to encode with RLP. |
109 |
|
110 | Returns |
111 | ------- |
112 | encoded : `ethereum.base_types.Bytes` |
113 | The RLP encoded bytes representing `raw_bytes`. |
114 | """ |
115 | len_raw_data = Uint(len(raw_bytes)) |
116 | |
117 | if len_raw_data == 1 and raw_bytes[0] < 0x80: |
118 | return raw_bytes |
119 | elif len_raw_data < 0x38: |
120 | return bytes([0x80 + len_raw_data]) + raw_bytes |
121 | else: |
122 | # length of raw data represented as big endian bytes |
123 | len_raw_data_as_be = len_raw_data.to_be_bytes() |
124 | return ( |
125 | bytes([0xB7 + len(len_raw_data_as_be)]) |
126 | + len_raw_data_as_be |
127 | + raw_bytes |
128 | ) |
encode_sequence
Encodes a list of RLP encodable objects (raw_sequence
) using RLP.
Parameters
raw_sequence : Sequence of RLP encodable objects.
Returns
encoded : ethereum.base_types.Bytes
The RLP encoded bytes representing raw_sequence
.
def encode_sequence(raw_sequence: Sequence[Extended]) -> Bytes:
132 | """ |
---|---|
133 | Encodes a list of RLP encodable objects (`raw_sequence`) using RLP. |
134 |
|
135 | Parameters |
136 | ---------- |
137 | raw_sequence : |
138 | Sequence of RLP encodable objects. |
139 |
|
140 | Returns |
141 | ------- |
142 | encoded : `ethereum.base_types.Bytes` |
143 | The RLP encoded bytes representing `raw_sequence`. |
144 | """ |
145 | joined_encodings = get_joined_encodings(raw_sequence) |
146 | len_joined_encodings = Uint(len(joined_encodings)) |
147 | |
148 | if len_joined_encodings < 0x38: |
149 | return Bytes([0xC0 + len_joined_encodings]) + joined_encodings |
150 | else: |
151 | len_joined_encodings_as_be = len_joined_encodings.to_be_bytes() |
152 | return ( |
153 | Bytes([0xF7 + len(len_joined_encodings_as_be)]) |
154 | + len_joined_encodings_as_be |
155 | + joined_encodings |
156 | ) |
get_joined_encodings
Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.
Parameters
raw_sequence : Sequence to encode with RLP.
Returns
joined_encodings : ethereum.base_types.Bytes
The concatenated RLP encoded bytes for each item in sequence
raw_sequence.
def get_joined_encodings(raw_sequence: Sequence[Extended]) -> Bytes:
160 | """ |
---|---|
161 | Obtain concatenation of rlp encoding for each item in the sequence |
162 | raw_sequence. |
163 |
|
164 | Parameters |
165 | ---------- |
166 | raw_sequence : |
167 | Sequence to encode with RLP. |
168 |
|
169 | Returns |
170 | ------- |
171 | joined_encodings : `ethereum.base_types.Bytes` |
172 | The concatenated RLP encoded bytes for each item in sequence |
173 | raw_sequence. |
174 | """ |
175 | return b"".join(encode(item) for item in raw_sequence) |
decode
Decodes an integer, byte sequence, or list of RLP encodable objects
from the byte sequence encoded_data
, using RLP.
Parameters
encoded_data : A sequence of bytes, in RLP form.
Returns
decoded_data : RLP
Object decoded from encoded_data
.
def decode(encoded_data: Bytes) -> Simple:
184 | """ |
---|---|
185 | Decodes an integer, byte sequence, or list of RLP encodable objects |
186 | from the byte sequence `encoded_data`, using RLP. |
187 |
|
188 | Parameters |
189 | ---------- |
190 | encoded_data : |
191 | A sequence of bytes, in RLP form. |
192 |
|
193 | Returns |
194 | ------- |
195 | decoded_data : `RLP` |
196 | Object decoded from `encoded_data`. |
197 | """ |
198 | if len(encoded_data) <= 0: |
199 | raise RLPDecodingError("Cannot decode empty bytestring") |
200 | |
201 | if encoded_data[0] <= 0xBF: |
202 | # This means that the raw data is of type bytes |
203 | return decode_to_bytes(encoded_data) |
204 | else: |
205 | # This means that the raw data is of type sequence |
206 | return decode_to_sequence(encoded_data) |
U
209 | U = TypeVar("U", bound=Extended) |
---|
decode_to
Decode the bytes in encoded_data
to an object of type cls
. cls
can be
a Bytes
subclass, a dataclass, Uint
, U256
or Tuple[cls]
.
Parameters
cls: Type[U]
The type to decode to.
encoded_data :
A sequence of bytes, in RLP form.
Returns
decoded_data : U
Object decoded from encoded_data
.
def decode_to(cls: Type[U], encoded_data: Bytes) -> U:
213 | """ |
---|---|
214 | Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be |
215 | a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`. |
216 |
|
217 | Parameters |
218 | ---------- |
219 | cls: `Type[U]` |
220 | The type to decode to. |
221 | encoded_data : |
222 | A sequence of bytes, in RLP form. |
223 |
|
224 | Returns |
225 | ------- |
226 | decoded_data : `U` |
227 | Object decoded from `encoded_data`. |
228 | """ |
229 | decoded = decode(encoded_data) |
230 | return |
_deserialize_to
_deserialize_to
_deserialize_to
def _deserialize_to(class_: object, value: Simple) -> Extended:
244 | if not isinstance(class_, type): |
---|---|
245 | return _deserialize_to_annotation(class_, value) |
246 | elif is_dataclass(class_): |
247 | return _deserialize_to_dataclass(class_, value) |
248 | elif issubclass(class_, (Uint, FixedUint)): |
249 | return _deserialize_to_uint(class_, value) |
250 | elif issubclass(class_, (Bytes, FixedBytes)): |
251 | return _deserialize_to_bytes(class_, value) |
252 | elif class_ is bool: |
253 | return _deserialize_to_bool(value) |
254 | else: |
255 | raise NotImplementedError(class_) |
_deserialize_to_dataclass
def _deserialize_to_dataclass(cls: Type[U], decoded: Simple) -> U:
259 | assert is_dataclass(cls) |
---|---|
260 | hints = get_type_hints(cls) |
261 | target_fields = fields(cls) |
262 | |
263 | if isinstance(decoded, bytes): |
264 | raise RLPDecodingError(f"got `bytes` while decoding `{cls.__name__}`") |
265 | |
266 | if len(target_fields) != len(decoded): |
267 | name = cls.__name__ |
268 | actual = len(decoded) |
269 | expected = len(target_fields) |
270 | raise RLPDecodingError( |
271 | f"`{name}` needs {expected} field(s), but got {actual} instead" |
272 | ) |
273 | |
274 | values: Dict[str, Any] = {} |
275 | |
276 | for value, target_field in zip(decoded, target_fields): |
277 | resolved_type = hints[target_field.name] |
278 | values[target_field.name] = |
279 | |
280 | result = cls(**values) |
281 | assert isinstance(result, cls) |
282 | return cast(U, result) |
_deserialize_to_bool
def _deserialize_to_bool(value: Simple) -> bool:
286 | if value == b"": |
---|---|
287 | return False |
288 | elif value == b"\x01": |
289 | return True |
290 | else: |
291 | raise RLPDecodingError |
_deserialize_to_bytes
def _deserialize_to_bytes(class_: Union[Type[Bytes], Type[FixedBytes]], value: Simple) -> Union[Bytes, FixedBytes]:
297 | if not isinstance(value, bytes): |
---|---|
298 | raise RLPDecodingError |
299 | try: |
300 | return class_(value) |
301 | except ValueError as e: |
302 | raise RLPDecodingError from e |
_deserialize_to_uint
def _deserialize_to_uint(class_: Union[Type[Uint], Type[FixedUint]], decoded: Simple) -> Union[Uint, FixedUint]:
308 | if not isinstance(decoded, bytes): |
---|---|
309 | raise RLPDecodingError |
310 | try: |
311 | return class_.from_be_bytes(decoded) |
312 | except ValueError as e: |
313 | raise RLPDecodingError from e |
_deserialize_to_annotation
def _deserialize_to_annotation(annotation: object, value: Simple) -> Extended:
317 | origin = get_origin(annotation) |
---|---|
318 | if origin is Union: |
319 | return _deserialize_to_union(annotation, value) |
320 | elif origin in (Tuple, tuple): |
321 | return _deserialize_to_tuple(annotation, value) |
322 | elif origin is None: |
323 | raise Exception(annotation) |
324 | else: |
325 | raise NotImplementedError(f"RLP non-type {origin!r}") |
_deserialize_to_union
def _deserialize_to_union(annotation: object, value: Simple) -> Extended:
329 | arguments = get_args(annotation) |
---|---|
330 | successes = [] |
331 | failures = [] |
332 | for argument in arguments: |
333 | try: |
334 | success = |
335 | except Exception as e: |
336 | failures.append(e) |
337 | continue |
338 |
|
339 | successes.append(success) |
340 | |
341 | if len(successes) == 1: |
342 | return successes[0] |
343 | elif not successes: |
344 | raise RLPDecodingError(f"no matching union variant\n{failures!r}") |
345 | else: |
346 | raise RLPDecodingError("multiple matching union variants") |
_deserialize_to_tuple
def _deserialize_to_tuple(annotation: object, values: Simple) -> Sequence[Extended]:
352 | if isinstance(values, bytes): |
---|---|
353 | raise RLPDecodingError |
354 | arguments = list(get_args(annotation)) |
355 | |
356 | if arguments[-1] is Ellipsis: |
357 | arguments.pop() |
358 | fill_count = len(values) - len(arguments) |
359 | arguments = list(arguments) + [arguments[-1]] * fill_count |
360 | |
361 | decoded = [] |
362 | for argument, value in zip(arguments, values): |
363 | decoded.append( |
364 | |
365 | return tuple(decoded) |
decode_to_bytes
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type bytes
.
Parameters
encoded_bytes : RLP encoded byte stream.
Returns
decoded : ethereum.base_types.Bytes
RLP decoded Bytes data
def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
369 | """ |
---|---|
370 | Decodes a rlp encoded byte stream assuming that the decoded data |
371 | should be of type `bytes`. |
372 |
|
373 | Parameters |
374 | ---------- |
375 | encoded_bytes : |
376 | RLP encoded byte stream. |
377 |
|
378 | Returns |
379 | ------- |
380 | decoded : `ethereum.base_types.Bytes` |
381 | RLP decoded Bytes data |
382 | """ |
383 | if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80: |
384 | return encoded_bytes |
385 | elif encoded_bytes[0] <= 0xB7: |
386 | len_raw_data = encoded_bytes[0] - 0x80 |
387 | if len_raw_data >= len(encoded_bytes): |
388 | raise RLPDecodingError |
389 | raw_data = encoded_bytes[1 : 1 + len_raw_data] |
390 | if len_raw_data == 1 and raw_data[0] < 0x80: |
391 | raise RLPDecodingError |
392 | return raw_data |
393 | else: |
394 | # This is the index in the encoded data at which decoded data |
395 | # starts from. |
396 | decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7 |
397 | if decoded_data_start_idx - 1 >= len(encoded_bytes): |
398 | raise RLPDecodingError |
399 | if encoded_bytes[1] == 0: |
400 | raise RLPDecodingError |
401 | len_decoded_data = Uint.from_be_bytes( |
402 | encoded_bytes[1:decoded_data_start_idx] |
403 | ) |
404 | if len_decoded_data < 0x38: |
405 | raise RLPDecodingError |
406 | decoded_data_end_idx = decoded_data_start_idx + len_decoded_data |
407 | if decoded_data_end_idx - 1 >= len(encoded_bytes): |
408 | raise RLPDecodingError |
409 | return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx] |
decode_to_sequence
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type Sequence
of objects.
Parameters
encoded_sequence : An RLP encoded Sequence.
Returns
decoded : Sequence[RLP]
Sequence of objects decoded from encoded_sequence
.
def decode_to_sequence(encoded_sequence: Bytes) -> Sequence[Simple]:
413 | """ |
---|---|
414 | Decodes a rlp encoded byte stream assuming that the decoded data |
415 | should be of type `Sequence` of objects. |
416 |
|
417 | Parameters |
418 | ---------- |
419 | encoded_sequence : |
420 | An RLP encoded Sequence. |
421 |
|
422 | Returns |
423 | ------- |
424 | decoded : `Sequence[RLP]` |
425 | Sequence of objects decoded from `encoded_sequence`. |
426 | """ |
427 | if encoded_sequence[0] <= 0xF7: |
428 | len_joined_encodings = encoded_sequence[0] - 0xC0 |
429 | if len_joined_encodings >= len(encoded_sequence): |
430 | raise RLPDecodingError |
431 | joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings] |
432 | else: |
433 | joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7 |
434 | if joined_encodings_start_idx - 1 >= len(encoded_sequence): |
435 | raise RLPDecodingError |
436 | if encoded_sequence[1] == 0: |
437 | raise RLPDecodingError |
438 | len_joined_encodings = Uint.from_be_bytes( |
439 | encoded_sequence[1:joined_encodings_start_idx] |
440 | ) |
441 | if len_joined_encodings < 0x38: |
442 | raise RLPDecodingError |
443 | joined_encodings_end_idx = ( |
444 | joined_encodings_start_idx + len_joined_encodings |
445 | ) |
446 | if joined_encodings_end_idx - 1 >= len(encoded_sequence): |
447 | raise RLPDecodingError |
448 | joined_encodings = encoded_sequence[ |
449 | joined_encodings_start_idx:joined_encodings_end_idx |
450 | ] |
451 | |
452 | return decode_joined_encodings(joined_encodings) |
decode_joined_encodings
Decodes joined_encodings
, which is a concatenation of RLP encoded
objects.
Parameters
joined_encodings : concatenation of RLP encoded objects
Returns
decoded : List[RLP]
A list of objects decoded from joined_encodings
.
def decode_joined_encodings(joined_encodings: Bytes) -> Sequence[Simple]:
456 | """ |
---|---|
457 | Decodes `joined_encodings`, which is a concatenation of RLP encoded |
458 | objects. |
459 |
|
460 | Parameters |
461 | ---------- |
462 | joined_encodings : |
463 | concatenation of RLP encoded objects |
464 |
|
465 | Returns |
466 | ------- |
467 | decoded : `List[RLP]` |
468 | A list of objects decoded from `joined_encodings`. |
469 | """ |
470 | decoded_sequence = [] |
471 | |
472 | item_start_idx = 0 |
473 | while item_start_idx < len(joined_encodings): |
474 | encoded_item_length = decode_item_length( |
475 | joined_encodings[item_start_idx:] |
476 | ) |
477 | if item_start_idx + encoded_item_length - 1 >= len(joined_encodings): |
478 | raise RLPDecodingError |
479 | encoded_item = joined_encodings[ |
480 | item_start_idx : item_start_idx + encoded_item_length |
481 | ] |
482 | decoded_sequence.append(decode(encoded_item)) |
483 | item_start_idx += encoded_item_length |
484 | |
485 | return decoded_sequence |
decode_item_length
Find the length of the rlp encoding for the first object in the
encoded sequence.
Here encoded_data
refers to concatenation of rlp encoding for each
item in a sequence.
NOTE - This is a helper function not described in the spec. It was introduced as the spec doesn't discuss about decoding the RLP encoded data.
Parameters
encoded_data : RLP encoded data for a sequence of objects.
Returns
rlp_length : int
def decode_item_length(encoded_data: Bytes) -> int:
489 | """ |
---|---|
490 | Find the length of the rlp encoding for the first object in the |
491 | encoded sequence. |
492 | Here `encoded_data` refers to concatenation of rlp encoding for each |
493 | item in a sequence. |
494 |
|
495 | NOTE - This is a helper function not described in the spec. It was |
496 | introduced as the spec doesn't discuss about decoding the RLP encoded |
497 | data. |
498 |
|
499 | Parameters |
500 | ---------- |
501 | encoded_data : |
502 | RLP encoded data for a sequence of objects. |
503 |
|
504 | Returns |
505 | ------- |
506 | rlp_length : `int` |
507 | """ |
508 | if len(encoded_data) <= 0: |
509 | raise RLPDecodingError |
510 | |
511 | first_rlp_byte = Uint(encoded_data[0]) |
512 | |
513 | # This is the length of the big endian representation of the length of |
514 | # rlp encoded object byte stream. |
515 | length_length = Uint(0) |
516 | decoded_data_length = 0 |
517 | |
518 | # This occurs only when the raw_data is a single byte whose value < 128 |
519 | if first_rlp_byte < 0x80: |
520 | # We return 1 here, as the end formula |
521 | # 1 + length_length + decoded_data_length would be invalid for |
522 | # this case. |
523 | return 1 |
524 | # This occurs only when the raw_data is a byte stream with length < 56 |
525 | # and doesn't fall into the above cases |
526 | elif first_rlp_byte <= 0xB7: |
527 | decoded_data_length = first_rlp_byte - 0x80 |
528 | # This occurs only when the raw_data is a byte stream and doesn't fall |
529 | # into the above cases |
530 | elif first_rlp_byte <= 0xBF: |
531 | length_length = first_rlp_byte - 0xB7 |
532 | if length_length >= len(encoded_data): |
533 | raise RLPDecodingError |
534 | if encoded_data[1] == 0: |
535 | raise RLPDecodingError |
536 | decoded_data_length = Uint.from_be_bytes( |
537 | encoded_data[1 : 1 + length_length] |
538 | ) |
539 | # This occurs only when the raw_data is a sequence of objects with |
540 | # length(concatenation of encoding of each object) < 56 |
541 | elif first_rlp_byte <= 0xF7: |
542 | decoded_data_length = first_rlp_byte - 0xC0 |
543 | # This occurs only when the raw_data is a sequence of objects and |
544 | # doesn't fall into the above cases. |
545 | elif first_rlp_byte <= 0xFF: |
546 | length_length = first_rlp_byte - 0xF7 |
547 | if length_length >= len(encoded_data): |
548 | raise RLPDecodingError |
549 | if encoded_data[1] == 0: |
550 | raise RLPDecodingError |
551 | decoded_data_length = Uint.from_be_bytes( |
552 | encoded_data[1 : 1 + length_length] |
553 | ) |
554 | |
555 | return 1 + length_length + decoded_data_length |
rlp_hash
Obtain the keccak-256 hash of the rlp encoding of the passed in data.
Parameters
data : The data for which we need the rlp hash.
Returns
hash : Hash32
The rlp hash of the passed in data.
def rlp_hash(data: Extended) -> Hash32:
559 | """ |
---|---|
560 | Obtain the keccak-256 hash of the rlp encoding of the passed in data. |
561 |
|
562 | Parameters |
563 | ---------- |
564 | data : |
565 | The data for which we need the rlp hash. |
566 |
|
567 | Returns |
568 | ------- |
569 | hash : `Hash32` |
570 | The rlp hash of the passed in data. |
571 | """ |
572 | return keccak256(encode(data)) |