ethereum.rlp
.. _rlp:
Recursive Length Prefix (RLP) Encoding ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. contents:: Table of Contents :backlinks: none :local:
Introduction
Defines the serialization and deserialization format used throughout Ethereum.
RLP
Protocol
that describes the requirements to be RLP-encodable.
class RLP:
__dataclass_fields__
50 | __dataclass_fields__: ClassVar[Dict] |
---|
Simple
53 | Simple: TypeAlias = Union[Sequence["Simple"], bytes] |
---|
Extended
55 | Extended: TypeAlias = Union[ |
---|---|
56 | Sequence["Extended"], bytearray, bytes, Uint, FixedUnsigned, str, bool, RLP |
57 | ] |
encode
Encodes raw_data
into a sequence of bytes using RLP.
Parameters
raw_data :
A Bytes
, Uint
, Uint256
or sequence of RLP
encodable
objects.
Returns
encoded : ethereum.base_types.Bytes
The RLP encoded bytes representing raw_data
.
def encode(raw_data: Extended) -> Bytes:
66 | """ |
---|---|
67 | Encodes `raw_data` into a sequence of bytes using RLP. |
68 |
|
69 | Parameters |
70 | ---------- |
71 | raw_data : |
72 | A `Bytes`, `Uint`, `Uint256` or sequence of `RLP` encodable |
73 | objects. |
74 |
|
75 | Returns |
76 | ------- |
77 | encoded : `ethereum.base_types.Bytes` |
78 | The RLP encoded bytes representing `raw_data`. |
79 | """ |
80 | if isinstance(raw_data, Sequence): |
81 | if isinstance(raw_data, (bytearray, bytes)): |
82 | return encode_bytes(raw_data) |
83 | elif isinstance(raw_data, str): |
84 | return encode_bytes(raw_data.encode()) |
85 | else: |
86 | return encode_sequence(raw_data) |
87 | elif isinstance(raw_data, (Uint, FixedUnsigned)): |
88 | return encode(raw_data.to_be_bytes()) |
89 | elif isinstance(raw_data, bool): |
90 | if raw_data: |
91 | return encode_bytes(b"\x01") |
92 | else: |
93 | return encode_bytes(b"") |
94 | elif is_dataclass(raw_data): |
95 | return encode(astuple(raw_data)) |
96 | else: |
97 | raise RLPEncodingError( |
98 | "RLP Encoding of type {} is not supported".format(type(raw_data)) |
99 | ) |
encode_bytes
Encodes raw_bytes
, a sequence of bytes, using RLP.
Parameters
raw_bytes : Bytes to encode with RLP.
Returns
encoded : ethereum.base_types.Bytes
The RLP encoded bytes representing raw_bytes
.
def encode_bytes(raw_bytes: Bytes) -> Bytes:
103 | """ |
---|---|
104 | Encodes `raw_bytes`, a sequence of bytes, using RLP. |
105 |
|
106 | Parameters |
107 | ---------- |
108 | raw_bytes : |
109 | Bytes to encode with RLP. |
110 |
|
111 | Returns |
112 | ------- |
113 | encoded : `ethereum.base_types.Bytes` |
114 | The RLP encoded bytes representing `raw_bytes`. |
115 | """ |
116 | len_raw_data = len(raw_bytes) |
117 | |
118 | if len_raw_data == 1 and raw_bytes[0] < 0x80: |
119 | return raw_bytes |
120 | elif len_raw_data < 0x38: |
121 | return bytes([0x80 + len_raw_data]) + raw_bytes |
122 | else: |
123 | # length of raw data represented as big endian bytes |
124 | len_raw_data_as_be = Uint(len_raw_data).to_be_bytes() |
125 | return ( |
126 | bytes([0xB7 + len(len_raw_data_as_be)]) |
127 | + len_raw_data_as_be |
128 | + raw_bytes |
129 | ) |
encode_sequence
Encodes a list of RLP encodable objects (raw_sequence
) using RLP.
Parameters
raw_sequence : Sequence of RLP encodable objects.
Returns
encoded : ethereum.base_types.Bytes
The RLP encoded bytes representing raw_sequence
.
def encode_sequence(raw_sequence: Sequence[Extended]) -> Bytes:
133 | """ |
---|---|
134 | Encodes a list of RLP encodable objects (`raw_sequence`) using RLP. |
135 |
|
136 | Parameters |
137 | ---------- |
138 | raw_sequence : |
139 | Sequence of RLP encodable objects. |
140 |
|
141 | Returns |
142 | ------- |
143 | encoded : `ethereum.base_types.Bytes` |
144 | The RLP encoded bytes representing `raw_sequence`. |
145 | """ |
146 | joined_encodings = get_joined_encodings(raw_sequence) |
147 | len_joined_encodings = len(joined_encodings) |
148 | |
149 | if len_joined_encodings < 0x38: |
150 | return Bytes([0xC0 + len_joined_encodings]) + joined_encodings |
151 | else: |
152 | len_joined_encodings_as_be = Uint(len_joined_encodings).to_be_bytes() |
153 | return ( |
154 | Bytes([0xF7 + len(len_joined_encodings_as_be)]) |
155 | + len_joined_encodings_as_be |
156 | + joined_encodings |
157 | ) |
get_joined_encodings
Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.
Parameters
raw_sequence : Sequence to encode with RLP.
Returns
joined_encodings : ethereum.base_types.Bytes
The concatenated RLP encoded bytes for each item in sequence
raw_sequence.
def get_joined_encodings(raw_sequence: Sequence[Extended]) -> Bytes:
161 | """ |
---|---|
162 | Obtain concatenation of rlp encoding for each item in the sequence |
163 | raw_sequence. |
164 |
|
165 | Parameters |
166 | ---------- |
167 | raw_sequence : |
168 | Sequence to encode with RLP. |
169 |
|
170 | Returns |
171 | ------- |
172 | joined_encodings : `ethereum.base_types.Bytes` |
173 | The concatenated RLP encoded bytes for each item in sequence |
174 | raw_sequence. |
175 | """ |
176 | return b"".join(encode(item) for item in raw_sequence) |
decode
Decodes an integer, byte sequence, or list of RLP encodable objects
from the byte sequence encoded_data
, using RLP.
Parameters
encoded_data : A sequence of bytes, in RLP form.
Returns
decoded_data : RLP
Object decoded from encoded_data
.
def decode(encoded_data: Bytes) -> Simple:
185 | """ |
---|---|
186 | Decodes an integer, byte sequence, or list of RLP encodable objects |
187 | from the byte sequence `encoded_data`, using RLP. |
188 |
|
189 | Parameters |
190 | ---------- |
191 | encoded_data : |
192 | A sequence of bytes, in RLP form. |
193 |
|
194 | Returns |
195 | ------- |
196 | decoded_data : `RLP` |
197 | Object decoded from `encoded_data`. |
198 | """ |
199 | if len(encoded_data) <= 0: |
200 | raise RLPDecodingError("Cannot decode empty bytestring") |
201 | |
202 | if encoded_data[0] <= 0xBF: |
203 | # This means that the raw data is of type bytes |
204 | return decode_to_bytes(encoded_data) |
205 | else: |
206 | # This means that the raw data is of type sequence |
207 | return decode_to_sequence(encoded_data) |
U
210 | U = TypeVar("U", bound=Extended) |
---|
decode_to
Decode the bytes in encoded_data
to an object of type cls
. cls
can be
a Bytes
subclass, a dataclass, Uint
, U256
or Tuple[cls]
.
Parameters
cls: Type[U]
The type to decode to.
encoded_data :
A sequence of bytes, in RLP form.
Returns
decoded_data : U
Object decoded from encoded_data
.
def decode_to(cls: Type[U], encoded_data: Bytes) -> U:
214 | """ |
---|---|
215 | Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be |
216 | a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`. |
217 |
|
218 | Parameters |
219 | ---------- |
220 | cls: `Type[U]` |
221 | The type to decode to. |
222 | encoded_data : |
223 | A sequence of bytes, in RLP form. |
224 |
|
225 | Returns |
226 | ------- |
227 | decoded_data : `U` |
228 | Object decoded from `encoded_data`. |
229 | """ |
230 | decoded = decode(encoded_data) |
231 | return |
_deserialize_to
_deserialize_to
_deserialize_to
def _deserialize_to(class_: object, value: Simple) -> Extended:
245 | if not isinstance(class_, type): |
---|---|
246 | return _deserialize_to_annotation(class_, value) |
247 | elif is_dataclass(class_): |
248 | return _deserialize_to_dataclass(class_, value) |
249 | elif issubclass(class_, (Uint, FixedUnsigned)): |
250 | return _deserialize_to_uint(class_, value) |
251 | elif issubclass(class_, (Bytes, FixedBytes)): |
252 | return _deserialize_to_bytes(class_, value) |
253 | elif class_ is bool: |
254 | return _deserialize_to_bool(value) |
255 | else: |
256 | raise NotImplementedError(class_) |
_deserialize_to_dataclass
def _deserialize_to_dataclass(cls: Type[U], decoded: Simple) -> U:
260 | assert is_dataclass(cls) |
---|---|
261 | hints = get_type_hints(cls) |
262 | target_fields = fields(cls) |
263 | |
264 | if isinstance(decoded, bytes): |
265 | raise RLPDecodingError(f"got `bytes` while decoding `{cls.__name__}`") |
266 | |
267 | if len(target_fields) != len(decoded): |
268 | name = cls.__name__ |
269 | actual = len(decoded) |
270 | expected = len(target_fields) |
271 | raise RLPDecodingError( |
272 | f"`{name}` needs {expected} field(s), but got {actual} instead" |
273 | ) |
274 | |
275 | values: Dict[str, Any] = {} |
276 | |
277 | for value, target_field in zip(decoded, target_fields): |
278 | resolved_type = hints[target_field.name] |
279 | values[target_field.name] = |
280 | |
281 | result = cls(**values) |
282 | assert isinstance(result, cls) |
283 | return cast(U, result) |
_deserialize_to_bool
def _deserialize_to_bool(value: Simple) -> bool:
287 | if value == b"": |
---|---|
288 | return False |
289 | elif value == b"\x01": |
290 | return True |
291 | else: |
292 | raise RLPDecodingError |
_deserialize_to_bytes
def _deserialize_to_bytes(class_: Union[Type[Bytes], Type[FixedBytes]], value: Simple) -> Union[Bytes, FixedBytes]:
298 | if not isinstance(value, bytes): |
---|---|
299 | raise RLPDecodingError |
300 | try: |
301 | return class_(value) |
302 | except ValueError as e: |
303 | raise RLPDecodingError from e |
_deserialize_to_uint
def _deserialize_to_uint(class_: Union[Type[Uint], Type[FixedUnsigned]], decoded: Simple) -> Union[Uint, FixedUnsigned]:
309 | if not isinstance(decoded, bytes): |
---|---|
310 | raise RLPDecodingError |
311 | try: |
312 | return class_.from_be_bytes(decoded) |
313 | except ValueError as e: |
314 | raise RLPDecodingError from e |
_deserialize_to_annotation
def _deserialize_to_annotation(annotation: object, value: Simple) -> Extended:
318 | origin = get_origin(annotation) |
---|---|
319 | if origin is Union: |
320 | return _deserialize_to_union(annotation, value) |
321 | elif origin in (Tuple, tuple): |
322 | return _deserialize_to_tuple(annotation, value) |
323 | elif origin is None: |
324 | raise Exception(annotation) |
325 | else: |
326 | raise NotImplementedError(f"RLP non-type {origin!r}") |
_deserialize_to_union
def _deserialize_to_union(annotation: object, value: Simple) -> Extended:
330 | arguments = get_args(annotation) |
---|---|
331 | successes = [] |
332 | failures = [] |
333 | for argument in arguments: |
334 | try: |
335 | success = |
336 | except Exception as e: |
337 | failures.append(e) |
338 | continue |
339 |
|
340 | successes.append(success) |
341 | |
342 | if len(successes) == 1: |
343 | return successes[0] |
344 | elif not successes: |
345 | raise RLPDecodingError(f"no matching union variant\n{failures!r}") |
346 | else: |
347 | raise RLPDecodingError("multiple matching union variants") |
_deserialize_to_tuple
def _deserialize_to_tuple(annotation: object, values: Simple) -> Sequence[Extended]:
353 | if isinstance(values, bytes): |
---|---|
354 | raise RLPDecodingError |
355 | arguments = list(get_args(annotation)) |
356 | |
357 | if arguments[-1] is Ellipsis: |
358 | arguments.pop() |
359 | fill_count = len(values) - len(arguments) |
360 | arguments = list(arguments) + [arguments[-1]] * fill_count |
361 | |
362 | decoded = [] |
363 | for argument, value in zip(arguments, values): |
364 | decoded.append( |
365 | |
366 | return tuple(decoded) |
decode_to_bytes
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type bytes
.
Parameters
encoded_bytes : RLP encoded byte stream.
Returns
decoded : ethereum.base_types.Bytes
RLP decoded Bytes data
def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
370 | """ |
---|---|
371 | Decodes a rlp encoded byte stream assuming that the decoded data |
372 | should be of type `bytes`. |
373 |
|
374 | Parameters |
375 | ---------- |
376 | encoded_bytes : |
377 | RLP encoded byte stream. |
378 |
|
379 | Returns |
380 | ------- |
381 | decoded : `ethereum.base_types.Bytes` |
382 | RLP decoded Bytes data |
383 | """ |
384 | if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80: |
385 | return encoded_bytes |
386 | elif encoded_bytes[0] <= 0xB7: |
387 | len_raw_data = encoded_bytes[0] - 0x80 |
388 | if len_raw_data >= len(encoded_bytes): |
389 | raise RLPDecodingError |
390 | raw_data = encoded_bytes[1 : 1 + len_raw_data] |
391 | if len_raw_data == 1 and raw_data[0] < 0x80: |
392 | raise RLPDecodingError |
393 | return raw_data |
394 | else: |
395 | # This is the index in the encoded data at which decoded data |
396 | # starts from. |
397 | decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7 |
398 | if decoded_data_start_idx - 1 >= len(encoded_bytes): |
399 | raise RLPDecodingError |
400 | if encoded_bytes[1] == 0: |
401 | raise RLPDecodingError |
402 | len_decoded_data = int( |
403 | Uint.from_be_bytes(encoded_bytes[1:decoded_data_start_idx]) |
404 | ) |
405 | if len_decoded_data < 0x38: |
406 | raise RLPDecodingError |
407 | decoded_data_end_idx = decoded_data_start_idx + int(len_decoded_data) |
408 | if decoded_data_end_idx - 1 >= len(encoded_bytes): |
409 | raise RLPDecodingError |
410 | return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx] |
decode_to_sequence
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type Sequence
of objects.
Parameters
encoded_sequence : An RLP encoded Sequence.
Returns
decoded : Sequence[RLP]
Sequence of objects decoded from encoded_sequence
.
def decode_to_sequence(encoded_sequence: Bytes) -> Sequence[Simple]:
414 | """ |
---|---|
415 | Decodes a rlp encoded byte stream assuming that the decoded data |
416 | should be of type `Sequence` of objects. |
417 |
|
418 | Parameters |
419 | ---------- |
420 | encoded_sequence : |
421 | An RLP encoded Sequence. |
422 |
|
423 | Returns |
424 | ------- |
425 | decoded : `Sequence[RLP]` |
426 | Sequence of objects decoded from `encoded_sequence`. |
427 | """ |
428 | if encoded_sequence[0] <= 0xF7: |
429 | len_joined_encodings = encoded_sequence[0] - 0xC0 |
430 | if len_joined_encodings >= len(encoded_sequence): |
431 | raise RLPDecodingError |
432 | joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings] |
433 | else: |
434 | joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7 |
435 | if joined_encodings_start_idx - 1 >= len(encoded_sequence): |
436 | raise RLPDecodingError |
437 | if encoded_sequence[1] == 0: |
438 | raise RLPDecodingError |
439 | len_joined_encodings = int( |
440 | Uint.from_be_bytes(encoded_sequence[1:joined_encodings_start_idx]) |
441 | ) |
442 | if len_joined_encodings < 0x38: |
443 | raise RLPDecodingError |
444 | joined_encodings_end_idx = ( |
445 | joined_encodings_start_idx + len_joined_encodings |
446 | ) |
447 | if joined_encodings_end_idx - 1 >= len(encoded_sequence): |
448 | raise RLPDecodingError |
449 | joined_encodings = encoded_sequence[ |
450 | joined_encodings_start_idx:joined_encodings_end_idx |
451 | ] |
452 | |
453 | return decode_joined_encodings(joined_encodings) |
decode_joined_encodings
Decodes joined_encodings
, which is a concatenation of RLP encoded
objects.
Parameters
joined_encodings : concatenation of RLP encoded objects
Returns
decoded : List[RLP]
A list of objects decoded from joined_encodings
.
def decode_joined_encodings(joined_encodings: Bytes) -> Sequence[Simple]:
457 | """ |
---|---|
458 | Decodes `joined_encodings`, which is a concatenation of RLP encoded |
459 | objects. |
460 |
|
461 | Parameters |
462 | ---------- |
463 | joined_encodings : |
464 | concatenation of RLP encoded objects |
465 |
|
466 | Returns |
467 | ------- |
468 | decoded : `List[RLP]` |
469 | A list of objects decoded from `joined_encodings`. |
470 | """ |
471 | decoded_sequence = [] |
472 | |
473 | item_start_idx = 0 |
474 | while item_start_idx < len(joined_encodings): |
475 | encoded_item_length = decode_item_length( |
476 | joined_encodings[item_start_idx:] |
477 | ) |
478 | if item_start_idx + encoded_item_length - 1 >= len(joined_encodings): |
479 | raise RLPDecodingError |
480 | encoded_item = joined_encodings[ |
481 | item_start_idx : item_start_idx + encoded_item_length |
482 | ] |
483 | decoded_sequence.append(decode(encoded_item)) |
484 | item_start_idx += encoded_item_length |
485 | |
486 | return decoded_sequence |
decode_item_length
Find the length of the rlp encoding for the first object in the
encoded sequence.
Here encoded_data
refers to concatenation of rlp encoding for each
item in a sequence.
NOTE - This is a helper function not described in the spec. It was introduced as the spec doesn't discuss about decoding the RLP encoded data.
Parameters
encoded_data : RLP encoded data for a sequence of objects.
Returns
rlp_length : int
def decode_item_length(encoded_data: Bytes) -> int:
490 | """ |
---|---|
491 | Find the length of the rlp encoding for the first object in the |
492 | encoded sequence. |
493 | Here `encoded_data` refers to concatenation of rlp encoding for each |
494 | item in a sequence. |
495 |
|
496 | NOTE - This is a helper function not described in the spec. It was |
497 | introduced as the spec doesn't discuss about decoding the RLP encoded |
498 | data. |
499 |
|
500 | Parameters |
501 | ---------- |
502 | encoded_data : |
503 | RLP encoded data for a sequence of objects. |
504 |
|
505 | Returns |
506 | ------- |
507 | rlp_length : `int` |
508 | """ |
509 | if len(encoded_data) <= 0: |
510 | raise RLPDecodingError |
511 | |
512 | first_rlp_byte = encoded_data[0] |
513 | |
514 | # This is the length of the big endian representation of the length of |
515 | # rlp encoded object byte stream. |
516 | length_length = 0 |
517 | decoded_data_length = 0 |
518 | |
519 | # This occurs only when the raw_data is a single byte whose value < 128 |
520 | if first_rlp_byte < 0x80: |
521 | # We return 1 here, as the end formula |
522 | # 1 + length_length + decoded_data_length would be invalid for |
523 | # this case. |
524 | return 1 |
525 | # This occurs only when the raw_data is a byte stream with length < 56 |
526 | # and doesn't fall into the above cases |
527 | elif first_rlp_byte <= 0xB7: |
528 | decoded_data_length = first_rlp_byte - 0x80 |
529 | # This occurs only when the raw_data is a byte stream and doesn't fall |
530 | # into the above cases |
531 | elif first_rlp_byte <= 0xBF: |
532 | length_length = first_rlp_byte - 0xB7 |
533 | if length_length >= len(encoded_data): |
534 | raise RLPDecodingError |
535 | if encoded_data[1] == 0: |
536 | raise RLPDecodingError |
537 | decoded_data_length = int( |
538 | Uint.from_be_bytes(encoded_data[1 : 1 + length_length]) |
539 | ) |
540 | # This occurs only when the raw_data is a sequence of objects with |
541 | # length(concatenation of encoding of each object) < 56 |
542 | elif first_rlp_byte <= 0xF7: |
543 | decoded_data_length = first_rlp_byte - 0xC0 |
544 | # This occurs only when the raw_data is a sequence of objects and |
545 | # doesn't fall into the above cases. |
546 | elif first_rlp_byte <= 0xFF: |
547 | length_length = first_rlp_byte - 0xF7 |
548 | if length_length >= len(encoded_data): |
549 | raise RLPDecodingError |
550 | if encoded_data[1] == 0: |
551 | raise RLPDecodingError |
552 | decoded_data_length = int( |
553 | Uint.from_be_bytes(encoded_data[1 : 1 + length_length]) |
554 | ) |
555 | |
556 | return 1 + length_length + decoded_data_length |
rlp_hash
Obtain the keccak-256 hash of the rlp encoding of the passed in data.
Parameters
data : The data for which we need the rlp hash.
Returns
hash : Hash32
The rlp hash of the passed in data.
def rlp_hash(data: Extended) -> Hash32:
560 | """ |
---|---|
561 | Obtain the keccak-256 hash of the rlp encoding of the passed in data. |
562 |
|
563 | Parameters |
564 | ---------- |
565 | data : |
566 | The data for which we need the rlp hash. |
567 |
|
568 | Returns |
569 | ------- |
570 | hash : `Hash32` |
571 | The rlp hash of the passed in data. |
572 | """ |
573 | return keccak256(encode(data)) |