ethereum_rlp.rlp
Defines the serialization and deserialization format used throughout Ethereum.
_UNION_TYPES
34 | _UNION_TYPES: Tuple[object, ...] |
---|
RLP
Protocol
that describes the requirements to be RLP-encodable.
class RLP:
__dataclass_fields__
51 | __dataclass_fields__: ClassVar[Dict[str, Field[object]]] |
---|
Simple
54 | Simple: TypeAlias = Union[Sequence["Simple"], bytes] |
---|
Extended
56 | Extended: TypeAlias = Union[ |
---|---|
57 | Sequence["Extended"], bytearray, bytes, Uint, FixedUnsigned, str, bool, RLP |
58 | ] |
encode
Encodes raw_data
into a sequence of bytes using RLP.
def encode(raw_data: Extended) -> Bytes:
67 | """ |
---|---|
68 | Encodes `raw_data` into a sequence of bytes using RLP. |
69 | """ |
70 | if isinstance(raw_data, Sequence): |
71 | if isinstance(raw_data, (bytearray, bytes)): |
72 | return encode_bytes(raw_data) |
73 | elif isinstance(raw_data, str): |
74 | return encode_bytes(raw_data.encode()) |
75 | else: |
76 | return encode_sequence(raw_data) |
77 | elif isinstance(raw_data, (Uint, FixedUnsigned)): |
78 | return encode(raw_data.to_be_bytes()) |
79 | elif isinstance(raw_data, bool): |
80 | if raw_data: |
81 | return encode_bytes(b"\x01") |
82 | else: |
83 | return encode_bytes(b"") |
84 | elif is_dataclass(raw_data): |
85 | return encode(astuple(raw_data)) |
86 | else: |
87 | raise EncodingError( |
88 | "RLP Encoding of type {} is not supported".format(type(raw_data)) |
89 | ) |
encode_bytes
Encodes raw_bytes
, a sequence of bytes, using RLP.
def encode_bytes(raw_bytes: Bytes) -> Bytes:
93 | """ |
---|---|
94 | Encodes `raw_bytes`, a sequence of bytes, using RLP. |
95 | """ |
96 | len_raw_data = len(raw_bytes) |
97 | |
98 | if len_raw_data == 1 and raw_bytes[0] < 0x80: |
99 | return raw_bytes |
100 | elif len_raw_data < 0x38: |
101 | return bytes([0x80 + len_raw_data]) + raw_bytes |
102 | else: |
103 | # length of raw data represented as big endian bytes |
104 | len_raw_data_as_be = Uint(len_raw_data).to_be_bytes() |
105 | return ( |
106 | bytes([0xB7 + len(len_raw_data_as_be)]) |
107 | + len_raw_data_as_be |
108 | + raw_bytes |
109 | ) |
encode_sequence
Encodes a list of RLP encodable objects (raw_sequence
) using RLP.
def encode_sequence(raw_sequence: Sequence[Extended]) -> Bytes:
113 | """ |
---|---|
114 | Encodes a list of RLP encodable objects (`raw_sequence`) using RLP. |
115 | """ |
116 | joined_encodings = join_encodings(raw_sequence) |
117 | len_joined_encodings = len(joined_encodings) |
118 | |
119 | if len_joined_encodings < 0x38: |
120 | return Bytes([0xC0 + len_joined_encodings]) + joined_encodings |
121 | else: |
122 | len_joined_encodings_as_be = Uint(len_joined_encodings).to_be_bytes() |
123 | return ( |
124 | Bytes([0xF7 + len(len_joined_encodings_as_be)]) |
125 | + len_joined_encodings_as_be |
126 | + joined_encodings |
127 | ) |
join_encodings
Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.
decode
Decodes an integer, byte sequence, or list of RLP encodable objects
from the byte sequence encoded_data
, using RLP.
def decode(encoded_data: Bytes) -> Simple:
144 | """ |
---|---|
145 | Decodes an integer, byte sequence, or list of RLP encodable objects |
146 | from the byte sequence `encoded_data`, using RLP. |
147 | """ |
148 | if len(encoded_data) <= 0: |
149 | raise DecodingError("Cannot decode empty bytestring") |
150 | |
151 | if encoded_data[0] <= 0xBF: |
152 | # This means that the raw data is of type bytes |
153 | return decode_to_bytes(encoded_data) |
154 | else: |
155 | # This means that the raw data is of type sequence |
156 | return decode_to_sequence(encoded_data) |
U
159 | U = TypeVar("U", bound=Extended) |
---|
decode_to
Decode the bytes in encoded_data
to an object of type cls
. cls
can be
a Bytes
subclass, a dataclass, Uint
, U256
or Tuple[cls]
.
def decode_to(cls: Type[U], encoded_data: Bytes) -> U:
163 | """ |
---|---|
164 | Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be |
165 | a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`. |
166 | """ |
167 | decoded = decode(encoded_data) |
168 | try: |
169 | return |
170 | except Exception as e: |
171 | raise DecodingError(f"cannot decode into `{cls.__name__}`") from e |
deserialize_to
deserialize_to
deserialize_to
Convert the already decoded value
(see decode
) into an object of type
class_
.
def deserialize_to(class_: object, value: Simple) -> Extended:
185 | """ |
---|---|
186 | Convert the already decoded `value` (see [`decode`]) into an object of type |
187 | `class_`. |
188 |
|
189 | [`decode`]: ref:ethereum_rlp.rlp.decode |
190 | """ |
191 | origin = get_origin(class_) |
192 | |
193 | while origin is Annotated: |
194 | assert isinstance(class_, _Annotation) |
195 | result, class_ = _deserialize_annotated(class_, value) |
196 | if result is not None: |
197 | return result |
198 | origin = get_origin(class_) |
199 | |
200 | if not isinstance(class_, type): |
201 | return _deserialize_to_annotation(class_, value) |
202 | elif is_dataclass(class_): |
203 | return _deserialize_to_dataclass(class_, value) |
204 | elif issubclass(class_, (Uint, FixedUnsigned)): |
205 | return _deserialize_to_uint(class_, value) |
206 | elif issubclass(class_, (Bytes, FixedBytes)): |
207 | return _deserialize_to_bytes(class_, value) |
208 | elif class_ is bool: |
209 | return _deserialize_to_bool(value) |
210 | else: |
211 | raise NotImplementedError(class_) |
_deserialize_to_dataclass
def _deserialize_to_dataclass(cls: Type[U], decoded: Simple) -> U:
215 | assert is_dataclass(cls) |
---|---|
216 | hints = get_type_hints(cls, include_extras=True) |
217 | target_fields = fields(cls) |
218 | |
219 | if isinstance(decoded, bytes): |
220 | raise DecodingError(f"got `bytes` while decoding `{cls.__name__}`") |
221 | |
222 | if len(target_fields) != len(decoded): |
223 | name = cls.__name__ |
224 | actual = len(decoded) |
225 | expected = len(target_fields) |
226 | raise DecodingError( |
227 | f"`{name}` needs {expected} field(s), but got {actual} instead" |
228 | ) |
229 | |
230 | values: Dict[str, Any] = {} |
231 | |
232 | for value, target_field in zip(decoded, target_fields): |
233 | resolved_type = hints[target_field.name] |
234 | try: |
235 | values[target_field.name] = |
236 | except Exception as e: |
237 | msg = f"cannot decode field `{cls.__name__}.{target_field.name}`" |
238 | raise DecodingError(msg) from e |
239 | |
240 | result = cls(**values) |
241 | assert isinstance(result, cls) |
242 | return cast(U, result) |
_deserialize_to_bool
def _deserialize_to_bool(value: Simple) -> bool:
246 | if value == b"": |
---|---|
247 | return False |
248 | elif value == b"\x01": |
249 | return True |
250 | else: |
251 | raise DecodingError("invalid boolean") |
_deserialize_to_bytes
def _deserialize_to_bytes(class_: Union[Type[Bytes], Type[FixedBytes]], value: Simple) -> Union[Bytes, FixedBytes]:
257 | if not isinstance(value, bytes): |
---|---|
258 | raise DecodingError("invalid bytes") |
259 | try: |
260 | return class_(value) |
261 | except ValueError as e: |
262 | raise DecodingError from e |
_deserialize_to_uint
def _deserialize_to_uint(class_: Union[Type[Uint], Type[FixedUnsigned]], decoded: Simple) -> Union[Uint, FixedUnsigned]:
268 | if not isinstance(decoded, bytes): |
---|---|
269 | raise DecodingError("invalid uint") |
270 | try: |
271 | return class_.from_be_bytes(decoded) |
272 | except ValueError as e: |
273 | raise DecodingError from e |
_Annotation
276 | @runtime_checkable |
---|
class _Annotation:
__metadata__
278 | __metadata__: Sequence[object] |
---|
__origin__
279 | __origin__: object |
---|
_deserialize_annotated
def _deserialize_annotated(annotation: _Annotation, value: Simple) -> Union[Tuple[Extended, None], Tuple[None, object]]:
285 | codecs = [x for x in annotation.__metadata__ if isinstance(x, With)] |
---|---|
286 | if not codecs: |
287 | return (None, annotation.__origin__) |
288 | |
289 | if len(codecs) > 1: |
290 | raise Exception( |
291 | "multiple rlp.With annotations applied to the same type" |
292 | ) |
293 | |
294 | codec = codecs[0] |
295 | result = codec._decoder(value) |
296 | |
297 | try: |
298 | assert isinstance( |
299 | result, annotation.__origin__ # type: ignore[arg-type] |
300 | ), "annotated returned wrong type" |
301 | except TypeError as e: |
302 | # TODO: Check annotation types that don't work with `isinstance`. |
303 | msg = f"annotation {annotation.__origin__} doesn't support isinstance" |
304 | raise NotImplementedError(msg) from e |
305 | |
306 | return (codec._decoder(value), None) |
_deserialize_to_annotation
def _deserialize_to_annotation(annotation: object, value: Simple) -> Extended:
310 | origin = get_origin(annotation) |
---|---|
311 | if origin in _UNION_TYPES: |
312 | return _deserialize_to_union(annotation, value) |
313 | elif origin in (Tuple, tuple): |
314 | return _deserialize_to_tuple(annotation, value) |
315 | elif origin in (List, Sequence, list): |
316 | return _deserialize_to_list(annotation, value) |
317 | elif origin is None: |
318 | raise Exception(annotation) |
319 | else: |
320 | raise NotImplementedError(f"RLP non-type {origin!r}") |
_deserialize_to_union
def _deserialize_to_union(annotation: object, value: Simple) -> Extended:
324 | arguments = get_args(annotation) |
---|---|
325 | successes: List[Extended] = [] |
326 | failures = [] |
327 | for argument in arguments: |
328 | try: |
329 | success = |
330 | except Exception as e: |
331 | failures.append(e) |
332 | continue |
333 |
|
334 | successes.append(success) |
335 | |
336 | if len(successes) == 1: |
337 | return successes[0] |
338 | elif not successes: |
339 | raise DecodingError(f"no matching union variant\n{failures!r}") |
340 | else: |
341 | raise DecodingError("multiple matching union variants") |
_deserialize_to_tuple
def _deserialize_to_tuple(annotation: object, values: Simple) -> Sequence[Extended]:
347 | if isinstance(values, bytes): |
---|---|
348 | raise DecodingError("invalid tuple") |
349 | arguments = list(get_args(annotation)) |
350 | |
351 | if arguments[-1] is Ellipsis: |
352 | arguments.pop() |
353 | fill_count = len(values) - len(arguments) |
354 | arguments = list(arguments) + [arguments[-1]] * fill_count |
355 | |
356 | decoded = [] |
357 | for index, (argument, value) in enumerate(zip(arguments, values)): |
358 | try: |
359 | deserialized = |
360 | except Exception as e: |
361 | msg = f"cannot decode tuple element {index} of type `{argument}`" |
362 | raise DecodingError(msg) from e |
363 | decoded.append(deserialized) |
364 | |
365 | return tuple(decoded) |
_deserialize_to_list
def _deserialize_to_list(annotation: object, values: Simple) -> Sequence[Extended]:
371 | if isinstance(values, bytes): |
---|---|
372 | raise DecodingError("invalid list") |
373 | argument = get_args(annotation)[0] |
374 | results = [] |
375 | for index, value in enumerate(values): |
376 | try: |
377 | deserialized = |
378 | except Exception as e: |
379 | msg = f"cannot decode list item {index} of type `{annotation}`" |
380 | raise DecodingError(msg) from e |
381 | results.append(deserialized) |
382 | return results |
decode_to_bytes
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type bytes
.
def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
386 | """ |
---|---|
387 | Decodes a rlp encoded byte stream assuming that the decoded data |
388 | should be of type `bytes`. |
389 | """ |
390 | if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80: |
391 | return encoded_bytes |
392 | elif encoded_bytes[0] <= 0xB7: |
393 | len_raw_data = encoded_bytes[0] - 0x80 |
394 | if len_raw_data < 0: |
395 | raise DecodingError("negative length") |
396 | if len_raw_data >= len(encoded_bytes): |
397 | raise DecodingError("truncated") |
398 | raw_data = encoded_bytes[1 : 1 + len_raw_data] |
399 | if len_raw_data == 1 and raw_data[0] < 0x80: |
400 | raise DecodingError |
401 | return raw_data |
402 | else: |
403 | # This is the index in the encoded data at which decoded data |
404 | # starts from. |
405 | decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7 |
406 | if decoded_data_start_idx - 1 >= len(encoded_bytes): |
407 | raise DecodingError |
408 | if encoded_bytes[1] == 0: |
409 | raise DecodingError |
410 | len_decoded_data = int( |
411 | Uint.from_be_bytes(encoded_bytes[1:decoded_data_start_idx]) |
412 | ) |
413 | if len_decoded_data < 0x38: |
414 | raise DecodingError |
415 | decoded_data_end_idx = decoded_data_start_idx + int(len_decoded_data) |
416 | if decoded_data_end_idx - 1 >= len(encoded_bytes): |
417 | raise DecodingError |
418 | return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx] |
decode_to_sequence
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type Sequence
of objects.
def decode_to_sequence(encoded_sequence: Bytes) -> Sequence[Simple]:
422 | """ |
---|---|
423 | Decodes a rlp encoded byte stream assuming that the decoded data |
424 | should be of type `Sequence` of objects. |
425 | """ |
426 | if encoded_sequence[0] <= 0xF7: |
427 | len_joined_encodings = encoded_sequence[0] - 0xC0 |
428 | if len_joined_encodings >= len(encoded_sequence): |
429 | raise DecodingError |
430 | joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings] |
431 | else: |
432 | joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7 |
433 | if joined_encodings_start_idx - 1 >= len(encoded_sequence): |
434 | raise DecodingError |
435 | if encoded_sequence[1] == 0: |
436 | raise DecodingError |
437 | len_joined_encodings = int( |
438 | Uint.from_be_bytes(encoded_sequence[1:joined_encodings_start_idx]) |
439 | ) |
440 | if len_joined_encodings < 0x38: |
441 | raise DecodingError |
442 | joined_encodings_end_idx = ( |
443 | joined_encodings_start_idx + len_joined_encodings |
444 | ) |
445 | if joined_encodings_end_idx - 1 >= len(encoded_sequence): |
446 | raise DecodingError |
447 | joined_encodings = encoded_sequence[ |
448 | joined_encodings_start_idx:joined_encodings_end_idx |
449 | ] |
450 | |
451 | return decode_joined_encodings(joined_encodings) |
decode_joined_encodings
Decodes joined_encodings
, which is a concatenation of RLP encoded
objects.
def decode_joined_encodings(joined_encodings: Bytes) -> Sequence[Simple]:
455 | """ |
---|---|
456 | Decodes `joined_encodings`, which is a concatenation of RLP encoded |
457 | objects. |
458 | """ |
459 | decoded_sequence = [] |
460 | |
461 | item_start_idx = 0 |
462 | while item_start_idx < len(joined_encodings): |
463 | encoded_item_length = decode_item_length( |
464 | joined_encodings[item_start_idx:] |
465 | ) |
466 | if item_start_idx + encoded_item_length - 1 >= len(joined_encodings): |
467 | raise DecodingError |
468 | encoded_item = joined_encodings[ |
469 | item_start_idx : item_start_idx + encoded_item_length |
470 | ] |
471 | decoded_sequence.append(decode(encoded_item)) |
472 | item_start_idx += encoded_item_length |
473 | |
474 | return decoded_sequence |
decode_item_length
Find the length of the rlp encoding for the first object in the encoded sequence.
Here encoded_data
refers to concatenation of rlp encoding for each
item in a sequence.
def decode_item_length(encoded_data: Bytes) -> int:
478 | """ |
---|---|
479 | Find the length of the rlp encoding for the first object in the |
480 | encoded sequence. |
481 |
|
482 | Here `encoded_data` refers to concatenation of rlp encoding for each |
483 | item in a sequence. |
484 | """ |
485 | if len(encoded_data) <= 0: |
486 | raise DecodingError |
487 | |
488 | first_rlp_byte = encoded_data[0] |
489 | |
490 | # This is the length of the big endian representation of the length of |
491 | # rlp encoded object byte stream. |
492 | length_length = 0 |
493 | decoded_data_length = 0 |
494 | |
495 | # This occurs only when the raw_data is a single byte whose value < 128 |
496 | if first_rlp_byte < 0x80: |
497 | # We return 1 here, as the end formula |
498 | # 1 + length_length + decoded_data_length would be invalid for |
499 | # this case. |
500 | return 1 |
501 | # This occurs only when the raw_data is a byte stream with length < 56 |
502 | # and doesn't fall into the above cases |
503 | elif first_rlp_byte <= 0xB7: |
504 | decoded_data_length = first_rlp_byte - 0x80 |
505 | # This occurs only when the raw_data is a byte stream and doesn't fall |
506 | # into the above cases |
507 | elif first_rlp_byte <= 0xBF: |
508 | length_length = first_rlp_byte - 0xB7 |
509 | if length_length >= len(encoded_data): |
510 | raise DecodingError |
511 | if encoded_data[1] == 0: |
512 | raise DecodingError |
513 | decoded_data_length = int( |
514 | Uint.from_be_bytes(encoded_data[1 : 1 + length_length]) |
515 | ) |
516 | # This occurs only when the raw_data is a sequence of objects with |
517 | # length(concatenation of encoding of each object) < 56 |
518 | elif first_rlp_byte <= 0xF7: |
519 | decoded_data_length = first_rlp_byte - 0xC0 |
520 | # This occurs only when the raw_data is a sequence of objects and |
521 | # doesn't fall into the above cases. |
522 | elif first_rlp_byte <= 0xFF: |
523 | length_length = first_rlp_byte - 0xF7 |
524 | if length_length >= len(encoded_data): |
525 | raise DecodingError |
526 | if encoded_data[1] == 0: |
527 | raise DecodingError |
528 | decoded_data_length = int( |
529 | Uint.from_be_bytes(encoded_data[1 : 1 + length_length]) |
530 | ) |
531 | |
532 | return 1 + length_length + decoded_data_length |
Decoder
535 | Decoder: TypeAlias = Callable[[Simple], Extended] |
---|
With
When used with Annotated
, indicates that a value needs to be
encoded/decoded using a custom function.
class With:
__init__
def __init__(self, decoder: Decoder) -> None:
547 | self._decoder = decoder |
---|