ethereum_rlp.rlp
Defines the serialization and deserialization format used throughout Ethereum.
_UNION_TYPES
| 36 | _UNION_TYPES: Tuple[object, ...] |
|---|
RLP
Protocol that describes the requirements to be RLP-encodable.
class RLP:
__dataclass_fields__
| 53 | __dataclass_fields__: ClassVar[Dict[str, Field[object]]] |
|---|
Simple
| 56 | Simple: TypeAlias = Union[Sequence["Simple"], bytes] |
|---|
Extended
| 58 | Extended: TypeAlias = Union[ |
|---|---|
| 59 | Sequence["Extended"], bytearray, bytes, Uint, FixedUnsigned, str, bool, RLP |
| 60 | ] |
encode
Encodes raw_data into a sequence of bytes using RLP.
def encode(raw_data: Extended) -> Bytes:
| 73 | """ |
|---|---|
| 74 | Encodes `raw_data` into a sequence of bytes using RLP. |
| 75 | """ |
| 76 | # These if statements are ordered by frequency in `fill` (except `str` must |
| 77 | # precede `Sequence`). |
| 78 | if isinstance(raw_data, (bytes, bytearray)): |
| 79 | return encode_bytes(raw_data) |
| 80 | elif isinstance(raw_data, Unsigned): |
| 81 | return encode_bytes(raw_data.to_be_bytes()) |
| 82 | elif isinstance(raw_data, str): |
| 83 | return encode_bytes(raw_data.encode()) |
| 84 | elif isinstance(raw_data, collections.abc.Sequence): |
| 85 | # Testing against `collections.abc.Sequence` is equivalent to |
| 86 | # `typing.Sequence`, but faster. |
| 87 | # `cast()` has a performance penalty, whereas `type: ignore` is free. |
| 88 | return encode_sequence(raw_data) # type: ignore[arg-type] |
| 89 | elif is_dataclass(raw_data): |
| 90 | return encode_sequence( |
| 91 | getattr(raw_data, field.name) for field in fields(raw_data) |
| 92 | ) |
| 93 | elif isinstance(raw_data, bool): |
| 94 | if raw_data: |
| 95 | return encode_bytes(b"\x01") |
| 96 | else: |
| 97 | return encode_bytes(b"") |
| 98 | else: |
| 99 | raise EncodingError( |
| 100 | "RLP Encoding of type {} is not supported".format(type(raw_data)) |
| 101 | ) |
encode_bytes
Encodes raw_bytes, a sequence of bytes, using RLP.
def encode_bytes(raw_bytes: Bytes) -> Bytes:
| 105 | """ |
|---|---|
| 106 | Encodes `raw_bytes`, a sequence of bytes, using RLP. |
| 107 | """ |
| 108 | len_raw_data = len(raw_bytes) |
| 109 | |
| 110 | if len_raw_data == 1 and raw_bytes[0] < 0x80: |
| 111 | return raw_bytes |
| 112 | elif len_raw_data < 0x38: |
| 113 | return bytes([0x80 + len_raw_data]) + raw_bytes |
| 114 | else: |
| 115 | # length of raw data represented as big endian bytes |
| 116 | len_raw_data_as_be = Uint(len_raw_data).to_be_bytes() |
| 117 | return ( |
| 118 | bytes([0xB7 + len(len_raw_data_as_be)]) |
| 119 | + len_raw_data_as_be |
| 120 | + raw_bytes |
| 121 | ) |
encode_sequence
Encodes a list of RLP encodable objects (raw_sequence) using RLP.
def encode_sequence(raw_sequence: Iterable[Extended]) -> Bytes:
| 127 | """ |
|---|---|
| 128 | Encodes a list of RLP encodable objects (`raw_sequence`) using RLP. |
| 129 | """ |
| 130 | joined_encodings = join_encodings(raw_sequence) |
| 131 | len_joined_encodings = len(joined_encodings) |
| 132 | |
| 133 | if len_joined_encodings < 0x38: |
| 134 | return Bytes([0xC0 + len_joined_encodings]) + joined_encodings |
| 135 | else: |
| 136 | len_joined_encodings_as_be = Uint(len_joined_encodings).to_be_bytes() |
| 137 | return ( |
| 138 | Bytes([0xF7 + len(len_joined_encodings_as_be)]) |
| 139 | + len_joined_encodings_as_be |
| 140 | + joined_encodings |
| 141 | ) |
join_encodings
Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.
decode
Decodes an integer, byte sequence, or list of RLP encodable objects
from the byte sequence encoded_data, using RLP.
def decode(encoded_data: Bytes) -> Simple:
| 158 | """ |
|---|---|
| 159 | Decodes an integer, byte sequence, or list of RLP encodable objects |
| 160 | from the byte sequence `encoded_data`, using RLP. |
| 161 | """ |
| 162 | if len(encoded_data) <= 0: |
| 163 | raise DecodingError("Cannot decode empty bytestring") |
| 164 | |
| 165 | if encoded_data[0] <= 0xBF: |
| 166 | # This means that the raw data is of type bytes |
| 167 | return decode_to_bytes(encoded_data) |
| 168 | else: |
| 169 | # This means that the raw data is of type sequence |
| 170 | return decode_to_sequence(encoded_data) |
U
| 173 | U = TypeVar("U", bound=Extended) |
|---|
decode_to
Decode the bytes in encoded_data to an object of type cls. cls can be
a Bytes subclass, a dataclass, Uint, U256 or Tuple[cls].
def decode_to(cls: Type[U], encoded_data: Bytes) -> U:
| 177 | """ |
|---|---|
| 178 | Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be |
| 179 | a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`. |
| 180 | """ |
| 181 | decoded = decode(encoded_data) |
| 182 | try: |
| 183 | return |
| 184 | except Exception as e: |
| 185 | raise DecodingError(f"cannot decode into `{cls.__name__}`") from e |
deserialize_to
deserialize_to
deserialize_to
Convert the already decoded value (see decode) into an object of type
class_.
def deserialize_to(class_: object, value: Simple) -> Extended:
| 199 | """ |
|---|---|
| 200 | Convert the already decoded `value` (see [`decode`]) into an object of type |
| 201 | `class_`. |
| 202 | |
| 203 | [`decode`]: ref:ethereum_rlp.rlp.decode |
| 204 | """ |
| 205 | origin = get_origin(class_) |
| 206 | |
| 207 | while origin is Annotated: |
| 208 | assert isinstance(class_, _Annotation) |
| 209 | result, class_ = _deserialize_annotated(class_, value) |
| 210 | if result is not None: |
| 211 | return result |
| 212 | origin = get_origin(class_) |
| 213 | |
| 214 | if not isinstance(class_, type): |
| 215 | return _deserialize_to_annotation(class_, value) |
| 216 | elif is_dataclass(class_): |
| 217 | return _deserialize_to_dataclass(class_, value) |
| 218 | elif issubclass(class_, (Uint, FixedUnsigned)): |
| 219 | return _deserialize_to_uint(class_, value) |
| 220 | elif issubclass(class_, (Bytes, FixedBytes)): |
| 221 | return _deserialize_to_bytes(class_, value) |
| 222 | elif class_ is bool: |
| 223 | return _deserialize_to_bool(value) |
| 224 | else: |
| 225 | raise NotImplementedError(class_) |
_deserialize_to_dataclass
def _deserialize_to_dataclass(cls: Type[U], decoded: Simple) -> U:
| 229 | assert is_dataclass(cls) |
|---|---|
| 230 | hints = get_type_hints(cls, include_extras=True) |
| 231 | target_fields = fields(cls) |
| 232 | |
| 233 | if isinstance(decoded, bytes): |
| 234 | raise DecodingError(f"got `bytes` while decoding `{cls.__name__}`") |
| 235 | |
| 236 | if len(target_fields) != len(decoded): |
| 237 | name = cls.__name__ |
| 238 | actual = len(decoded) |
| 239 | expected = len(target_fields) |
| 240 | raise DecodingError( |
| 241 | f"`{name}` needs {expected} field(s), but got {actual} instead" |
| 242 | ) |
| 243 | |
| 244 | values: Dict[str, Any] = {} |
| 245 | |
| 246 | for value, target_field in zip(decoded, target_fields): |
| 247 | resolved_type = hints[target_field.name] |
| 248 | try: |
| 249 | values[target_field.name] = |
| 250 | except Exception as e: |
| 251 | msg = f"cannot decode field `{cls.__name__}.{target_field.name}`" |
| 252 | raise DecodingError(msg) from e |
| 253 | |
| 254 | result = cls(**values) |
| 255 | assert isinstance(result, cls) |
| 256 | return cast(U, result) |
_deserialize_to_bool
def _deserialize_to_bool(value: Simple) -> bool:
| 260 | if value == b"": |
|---|---|
| 261 | return False |
| 262 | elif value == b"\x01": |
| 263 | return True |
| 264 | else: |
| 265 | raise DecodingError("invalid boolean") |
_deserialize_to_bytes
def _deserialize_to_bytes(class_: Union[Type[Bytes], Type[FixedBytes]], value: Simple) -> Union[Bytes, FixedBytes]:
| 271 | if not isinstance(value, bytes): |
|---|---|
| 272 | raise DecodingError("invalid bytes") |
| 273 | try: |
| 274 | return class_(value) |
| 275 | except ValueError as e: |
| 276 | raise DecodingError from e |
_deserialize_to_uint
def _deserialize_to_uint(class_: Union[Type[Uint], Type[FixedUnsigned]], decoded: Simple) -> Union[Uint, FixedUnsigned]:
| 282 | if not isinstance(decoded, bytes): |
|---|---|
| 283 | raise DecodingError("invalid uint") |
| 284 | if len(decoded) > 0 and decoded[0] == 0: |
| 285 | raise DecodingError("non-canonical integer") |
| 286 | try: |
| 287 | return class_.from_be_bytes(decoded) |
| 288 | except ValueError as e: |
| 289 | raise DecodingError from e |
_Annotation
| 292 | @runtime_checkable |
|---|
class _Annotation:
__metadata__
| 294 | __metadata__: Sequence[object] |
|---|
__origin__
| 295 | __origin__: object |
|---|
_deserialize_annotated
def _deserialize_annotated(annotation: _Annotation, value: Simple) -> Union[Tuple[Extended, None], Tuple[None, object]]:
| 301 | codecs = [x for x in annotation.__metadata__ if isinstance(x, With)] |
|---|---|
| 302 | if not codecs: |
| 303 | return (None, annotation.__origin__) |
| 304 | |
| 305 | if len(codecs) > 1: |
| 306 | raise Exception( |
| 307 | "multiple rlp.With annotations applied to the same type" |
| 308 | ) |
| 309 | |
| 310 | codec = codecs[0] |
| 311 | result = codec._decoder(value) |
| 312 | |
| 313 | try: |
| 314 | assert isinstance( |
| 315 | result, annotation.__origin__ # type: ignore[arg-type] |
| 316 | ), "annotated returned wrong type" |
| 317 | except TypeError as e: |
| 318 | # TODO: Check annotation types that don't work with `isinstance`. |
| 319 | msg = f"annotation {annotation.__origin__} doesn't support isinstance" |
| 320 | raise NotImplementedError(msg) from e |
| 321 | |
| 322 | return (codec._decoder(value), None) |
_deserialize_to_annotation
def _deserialize_to_annotation(annotation: object, value: Simple) -> Extended:
| 326 | origin = get_origin(annotation) |
|---|---|
| 327 | if origin in _UNION_TYPES: |
| 328 | return _deserialize_to_union(annotation, value) |
| 329 | elif origin in (Tuple, tuple): |
| 330 | return _deserialize_to_tuple(annotation, value) |
| 331 | elif origin in (List, Sequence, list): |
| 332 | return _deserialize_to_list(annotation, value) |
| 333 | elif origin is None: |
| 334 | raise Exception(annotation) |
| 335 | else: |
| 336 | raise NotImplementedError(f"RLP non-type {origin!r}") |
_deserialize_to_union
def _deserialize_to_union(annotation: object, value: Simple) -> Extended:
| 340 | arguments = get_args(annotation) |
|---|---|
| 341 | successes: List[Extended] = [] |
| 342 | failures = [] |
| 343 | for argument in arguments: |
| 344 | try: |
| 345 | success = |
| 346 | except Exception as e: |
| 347 | failures.append(e) |
| 348 | continue |
| 349 | |
| 350 | successes.append(success) |
| 351 | |
| 352 | if len(successes) == 1: |
| 353 | return successes[0] |
| 354 | elif not successes: |
| 355 | raise DecodingError(f"no matching union variant\n{failures!r}") |
| 356 | else: |
| 357 | raise DecodingError("multiple matching union variants") |
_deserialize_to_tuple
def _deserialize_to_tuple(annotation: object, values: Simple) -> Sequence[Extended]:
| 363 | if isinstance(values, bytes): |
|---|---|
| 364 | raise DecodingError("invalid tuple") |
| 365 | arguments = list(get_args(annotation)) |
| 366 | |
| 367 | if arguments[-1] is Ellipsis: |
| 368 | arguments.pop() |
| 369 | fill_count = len(values) - len(arguments) |
| 370 | arguments = list(arguments) + [arguments[-1]] * fill_count |
| 371 | |
| 372 | decoded = [] |
| 373 | for index, (argument, value) in enumerate(zip(arguments, values)): |
| 374 | try: |
| 375 | deserialized = |
| 376 | except Exception as e: |
| 377 | msg = f"cannot decode tuple element {index} of type `{argument}`" |
| 378 | raise DecodingError(msg) from e |
| 379 | decoded.append(deserialized) |
| 380 | |
| 381 | return tuple(decoded) |
_deserialize_to_list
def _deserialize_to_list(annotation: object, values: Simple) -> Sequence[Extended]:
| 387 | if isinstance(values, bytes): |
|---|---|
| 388 | raise DecodingError("invalid list") |
| 389 | argument = get_args(annotation)[0] |
| 390 | results = [] |
| 391 | for index, value in enumerate(values): |
| 392 | try: |
| 393 | deserialized = |
| 394 | except Exception as e: |
| 395 | msg = f"cannot decode list item {index} of type `{annotation}`" |
| 396 | raise DecodingError(msg) from e |
| 397 | results.append(deserialized) |
| 398 | return results |
decode_to_bytes
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type bytes.
def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
| 402 | """ |
|---|---|
| 403 | Decodes a rlp encoded byte stream assuming that the decoded data |
| 404 | should be of type `bytes`. |
| 405 | """ |
| 406 | if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80: |
| 407 | return encoded_bytes |
| 408 | elif encoded_bytes[0] <= 0xB7: |
| 409 | len_raw_data = encoded_bytes[0] - 0x80 |
| 410 | if len_raw_data < 0: |
| 411 | raise DecodingError("negative length") |
| 412 | if len_raw_data >= len(encoded_bytes): |
| 413 | raise DecodingError("truncated") |
| 414 | if 1 + len_raw_data < len(encoded_bytes): |
| 415 | raise DecodingError("trailing bytes") |
| 416 | raw_data = encoded_bytes[1 : 1 + len_raw_data] |
| 417 | if len_raw_data == 1 and raw_data[0] < 0x80: |
| 418 | raise DecodingError |
| 419 | return raw_data |
| 420 | else: |
| 421 | # This is the index in the encoded data at which decoded data |
| 422 | # starts from. |
| 423 | decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7 |
| 424 | if decoded_data_start_idx - 1 >= len(encoded_bytes): |
| 425 | raise DecodingError("truncated") |
| 426 | if encoded_bytes[1] == 0: |
| 427 | raise DecodingError |
| 428 | len_decoded_data = int( |
| 429 | Uint.from_be_bytes(encoded_bytes[1:decoded_data_start_idx]) |
| 430 | ) |
| 431 | if len_decoded_data < 0x38: |
| 432 | raise DecodingError |
| 433 | decoded_data_end_idx = decoded_data_start_idx + int(len_decoded_data) |
| 434 | if decoded_data_end_idx - 1 >= len(encoded_bytes): |
| 435 | raise DecodingError("truncated") |
| 436 | if decoded_data_end_idx < len(encoded_bytes): |
| 437 | raise DecodingError("trailing bytes") |
| 438 | return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx] |
decode_to_sequence
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type Sequence of objects.
def decode_to_sequence(encoded_sequence: Bytes) -> Sequence[Simple]:
| 442 | """ |
|---|---|
| 443 | Decodes a rlp encoded byte stream assuming that the decoded data |
| 444 | should be of type `Sequence` of objects. |
| 445 | """ |
| 446 | if encoded_sequence[0] <= 0xF7: |
| 447 | len_joined_encodings = encoded_sequence[0] - 0xC0 |
| 448 | if len_joined_encodings >= len(encoded_sequence): |
| 449 | raise DecodingError("truncated") |
| 450 | if 1 + len_joined_encodings < len(encoded_sequence): |
| 451 | raise DecodingError("trailing bytes") |
| 452 | joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings] |
| 453 | else: |
| 454 | joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7 |
| 455 | if joined_encodings_start_idx - 1 >= len(encoded_sequence): |
| 456 | raise DecodingError("truncated") |
| 457 | if encoded_sequence[1] == 0: |
| 458 | raise DecodingError |
| 459 | len_joined_encodings = int( |
| 460 | Uint.from_be_bytes(encoded_sequence[1:joined_encodings_start_idx]) |
| 461 | ) |
| 462 | if len_joined_encodings < 0x38: |
| 463 | raise DecodingError |
| 464 | joined_encodings_end_idx = ( |
| 465 | joined_encodings_start_idx + len_joined_encodings |
| 466 | ) |
| 467 | if joined_encodings_end_idx - 1 >= len(encoded_sequence): |
| 468 | raise DecodingError("truncated") |
| 469 | if joined_encodings_end_idx < len(encoded_sequence): |
| 470 | raise DecodingError("trailing bytes") |
| 471 | joined_encodings = encoded_sequence[ |
| 472 | joined_encodings_start_idx:joined_encodings_end_idx |
| 473 | ] |
| 474 | |
| 475 | return decode_joined_encodings(joined_encodings) |
decode_joined_encodings
Decodes joined_encodings, which is a concatenation of RLP encoded
objects.
def decode_joined_encodings(joined_encodings: Bytes) -> Sequence[Simple]:
| 479 | """ |
|---|---|
| 480 | Decodes `joined_encodings`, which is a concatenation of RLP encoded |
| 481 | objects. |
| 482 | """ |
| 483 | decoded_sequence = [] |
| 484 | |
| 485 | item_start_idx = 0 |
| 486 | while item_start_idx < len(joined_encodings): |
| 487 | encoded_item_length = decode_item_length( |
| 488 | joined_encodings[item_start_idx:] |
| 489 | ) |
| 490 | if item_start_idx + encoded_item_length - 1 >= len(joined_encodings): |
| 491 | raise DecodingError("truncated") |
| 492 | encoded_item = joined_encodings[ |
| 493 | item_start_idx : item_start_idx + encoded_item_length |
| 494 | ] |
| 495 | decoded_sequence.append(decode(encoded_item)) |
| 496 | item_start_idx += encoded_item_length |
| 497 | |
| 498 | return decoded_sequence |
decode_item_length
Find the length of the rlp encoding for the first object in the encoded sequence.
Here encoded_data refers to concatenation of rlp encoding for each
item in a sequence.
def decode_item_length(encoded_data: Bytes) -> int:
| 502 | """ |
|---|---|
| 503 | Find the length of the rlp encoding for the first object in the |
| 504 | encoded sequence. |
| 505 | |
| 506 | Here `encoded_data` refers to concatenation of rlp encoding for each |
| 507 | item in a sequence. |
| 508 | """ |
| 509 | if len(encoded_data) <= 0: |
| 510 | raise DecodingError |
| 511 | |
| 512 | first_rlp_byte = encoded_data[0] |
| 513 | |
| 514 | # This is the length of the big endian representation of the length of |
| 515 | # rlp encoded object byte stream. |
| 516 | length_length = 0 |
| 517 | decoded_data_length = 0 |
| 518 | |
| 519 | # This occurs only when the raw_data is a single byte whose value < 128 |
| 520 | if first_rlp_byte < 0x80: |
| 521 | # We return 1 here, as the end formula |
| 522 | # 1 + length_length + decoded_data_length would be invalid for |
| 523 | # this case. |
| 524 | return 1 |
| 525 | # This occurs only when the raw_data is a byte stream with length < 56 |
| 526 | # and doesn't fall into the above cases |
| 527 | elif first_rlp_byte <= 0xB7: |
| 528 | decoded_data_length = first_rlp_byte - 0x80 |
| 529 | # This occurs only when the raw_data is a byte stream and doesn't fall |
| 530 | # into the above cases |
| 531 | elif first_rlp_byte <= 0xBF: |
| 532 | length_length = first_rlp_byte - 0xB7 |
| 533 | if length_length >= len(encoded_data): |
| 534 | raise DecodingError("truncated") |
| 535 | if encoded_data[1] == 0: |
| 536 | raise DecodingError |
| 537 | decoded_data_length = int( |
| 538 | Uint.from_be_bytes(encoded_data[1 : 1 + length_length]) |
| 539 | ) |
| 540 | # This occurs only when the raw_data is a sequence of objects with |
| 541 | # length(concatenation of encoding of each object) < 56 |
| 542 | elif first_rlp_byte <= 0xF7: |
| 543 | decoded_data_length = first_rlp_byte - 0xC0 |
| 544 | # This occurs only when the raw_data is a sequence of objects and |
| 545 | # doesn't fall into the above cases. |
| 546 | else: |
| 547 | assert first_rlp_byte <= 0xFF |
| 548 | length_length = first_rlp_byte - 0xF7 |
| 549 | if length_length >= len(encoded_data): |
| 550 | raise DecodingError("truncated") |
| 551 | if encoded_data[1] == 0: |
| 552 | raise DecodingError |
| 553 | decoded_data_length = int( |
| 554 | Uint.from_be_bytes(encoded_data[1 : 1 + length_length]) |
| 555 | ) |
| 556 | |
| 557 | return 1 + length_length + decoded_data_length |
Decoder
| 560 | Decoder: TypeAlias = Callable[[Simple], Extended] |
|---|
With
When used with Annotated, indicates that a value needs to be
encoded/decoded using a custom function.
class With:
__init__
def __init__(self, decoder: Decoder) -> None:
| 572 | self._decoder = decoder |
|---|