ethereum_rlp.rlp
Defines the serialization and deserialization format used throughout Ethereum.
_UNION_TYPES
| 34 | _UNION_TYPES: Tuple[object, ...] |
|---|
RLP
Protocol that describes the requirements to be RLP-encodable.
class RLP:
__dataclass_fields__
| 51 | __dataclass_fields__: ClassVar[Dict[str, Field[object]]] |
|---|
Simple
| 54 | Simple: TypeAlias = Union[Sequence["Simple"], bytes] |
|---|
Extended
| 56 | Extended: TypeAlias = Union[ |
|---|---|
| 57 | Sequence["Extended"], bytearray, bytes, Uint, FixedUnsigned, str, bool, RLP |
| 58 | ] |
encode
Encodes raw_data into a sequence of bytes using RLP.
def encode(raw_data: Extended) -> Bytes:
| 67 | """ |
|---|---|
| 68 | Encodes `raw_data` into a sequence of bytes using RLP. |
| 69 | """ |
| 70 | if isinstance(raw_data, Sequence): |
| 71 | if isinstance(raw_data, (bytearray, bytes)): |
| 72 | return encode_bytes(raw_data) |
| 73 | elif isinstance(raw_data, str): |
| 74 | return encode_bytes(raw_data.encode()) |
| 75 | else: |
| 76 | return encode_sequence(raw_data) |
| 77 | elif isinstance(raw_data, (Uint, FixedUnsigned)): |
| 78 | return encode(raw_data.to_be_bytes()) |
| 79 | elif isinstance(raw_data, bool): |
| 80 | if raw_data: |
| 81 | return encode_bytes(b"\x01") |
| 82 | else: |
| 83 | return encode_bytes(b"") |
| 84 | elif is_dataclass(raw_data): |
| 85 | return encode(astuple(raw_data)) |
| 86 | else: |
| 87 | raise EncodingError( |
| 88 | "RLP Encoding of type {} is not supported".format(type(raw_data)) |
| 89 | ) |
encode_bytes
Encodes raw_bytes, a sequence of bytes, using RLP.
def encode_bytes(raw_bytes: Bytes) -> Bytes:
| 93 | """ |
|---|---|
| 94 | Encodes `raw_bytes`, a sequence of bytes, using RLP. |
| 95 | """ |
| 96 | len_raw_data = len(raw_bytes) |
| 97 | |
| 98 | if len_raw_data == 1 and raw_bytes[0] < 0x80: |
| 99 | return raw_bytes |
| 100 | elif len_raw_data < 0x38: |
| 101 | return bytes([0x80 + len_raw_data]) + raw_bytes |
| 102 | else: |
| 103 | # length of raw data represented as big endian bytes |
| 104 | len_raw_data_as_be = Uint(len_raw_data).to_be_bytes() |
| 105 | return ( |
| 106 | bytes([0xB7 + len(len_raw_data_as_be)]) |
| 107 | + len_raw_data_as_be |
| 108 | + raw_bytes |
| 109 | ) |
encode_sequence
Encodes a list of RLP encodable objects (raw_sequence) using RLP.
def encode_sequence(raw_sequence: Sequence[Extended]) -> Bytes:
| 113 | """ |
|---|---|
| 114 | Encodes a list of RLP encodable objects (`raw_sequence`) using RLP. |
| 115 | """ |
| 116 | joined_encodings = join_encodings(raw_sequence) |
| 117 | len_joined_encodings = len(joined_encodings) |
| 118 | |
| 119 | if len_joined_encodings < 0x38: |
| 120 | return Bytes([0xC0 + len_joined_encodings]) + joined_encodings |
| 121 | else: |
| 122 | len_joined_encodings_as_be = Uint(len_joined_encodings).to_be_bytes() |
| 123 | return ( |
| 124 | Bytes([0xF7 + len(len_joined_encodings_as_be)]) |
| 125 | + len_joined_encodings_as_be |
| 126 | + joined_encodings |
| 127 | ) |
join_encodings
Obtain concatenation of rlp encoding for each item in the sequence raw_sequence.
decode
Decodes an integer, byte sequence, or list of RLP encodable objects
from the byte sequence encoded_data, using RLP.
def decode(encoded_data: Bytes) -> Simple:
| 144 | """ |
|---|---|
| 145 | Decodes an integer, byte sequence, or list of RLP encodable objects |
| 146 | from the byte sequence `encoded_data`, using RLP. |
| 147 | """ |
| 148 | if len(encoded_data) <= 0: |
| 149 | raise DecodingError("Cannot decode empty bytestring") |
| 150 | |
| 151 | if encoded_data[0] <= 0xBF: |
| 152 | # This means that the raw data is of type bytes |
| 153 | return decode_to_bytes(encoded_data) |
| 154 | else: |
| 155 | # This means that the raw data is of type sequence |
| 156 | return decode_to_sequence(encoded_data) |
U
| 159 | U = TypeVar("U", bound=Extended) |
|---|
decode_to
Decode the bytes in encoded_data to an object of type cls. cls can be
a Bytes subclass, a dataclass, Uint, U256 or Tuple[cls].
def decode_to(cls: Type[U], encoded_data: Bytes) -> U:
| 163 | """ |
|---|---|
| 164 | Decode the bytes in `encoded_data` to an object of type `cls`. `cls` can be |
| 165 | a `Bytes` subclass, a dataclass, `Uint`, `U256` or `Tuple[cls]`. |
| 166 | """ |
| 167 | decoded = decode(encoded_data) |
| 168 | try: |
| 169 | return |
| 170 | except Exception as e: |
| 171 | raise DecodingError(f"cannot decode into `{cls.__name__}`") from e |
deserialize_to
deserialize_to
deserialize_to
Convert the already decoded value (see decode) into an object of type
class_.
def deserialize_to(class_: object, value: Simple) -> Extended:
| 185 | """ |
|---|---|
| 186 | Convert the already decoded `value` (see [`decode`]) into an object of type |
| 187 | `class_`. |
| 188 | |
| 189 | [`decode`]: ref:ethereum_rlp.rlp.decode |
| 190 | """ |
| 191 | origin = get_origin(class_) |
| 192 | |
| 193 | while origin is Annotated: |
| 194 | assert isinstance(class_, _Annotation) |
| 195 | result, class_ = _deserialize_annotated(class_, value) |
| 196 | if result is not None: |
| 197 | return result |
| 198 | origin = get_origin(class_) |
| 199 | |
| 200 | if not isinstance(class_, type): |
| 201 | return _deserialize_to_annotation(class_, value) |
| 202 | elif is_dataclass(class_): |
| 203 | return _deserialize_to_dataclass(class_, value) |
| 204 | elif issubclass(class_, (Uint, FixedUnsigned)): |
| 205 | return _deserialize_to_uint(class_, value) |
| 206 | elif issubclass(class_, (Bytes, FixedBytes)): |
| 207 | return _deserialize_to_bytes(class_, value) |
| 208 | elif class_ is bool: |
| 209 | return _deserialize_to_bool(value) |
| 210 | else: |
| 211 | raise NotImplementedError(class_) |
_deserialize_to_dataclass
def _deserialize_to_dataclass(cls: Type[U], decoded: Simple) -> U:
| 215 | assert is_dataclass(cls) |
|---|---|
| 216 | hints = get_type_hints(cls, include_extras=True) |
| 217 | target_fields = fields(cls) |
| 218 | |
| 219 | if isinstance(decoded, bytes): |
| 220 | raise DecodingError(f"got `bytes` while decoding `{cls.__name__}`") |
| 221 | |
| 222 | if len(target_fields) != len(decoded): |
| 223 | name = cls.__name__ |
| 224 | actual = len(decoded) |
| 225 | expected = len(target_fields) |
| 226 | raise DecodingError( |
| 227 | f"`{name}` needs {expected} field(s), but got {actual} instead" |
| 228 | ) |
| 229 | |
| 230 | values: Dict[str, Any] = {} |
| 231 | |
| 232 | for value, target_field in zip(decoded, target_fields): |
| 233 | resolved_type = hints[target_field.name] |
| 234 | try: |
| 235 | values[target_field.name] = |
| 236 | except Exception as e: |
| 237 | msg = f"cannot decode field `{cls.__name__}.{target_field.name}`" |
| 238 | raise DecodingError(msg) from e |
| 239 | |
| 240 | result = cls(**values) |
| 241 | assert isinstance(result, cls) |
| 242 | return cast(U, result) |
_deserialize_to_bool
def _deserialize_to_bool(value: Simple) -> bool:
| 246 | if value == b"": |
|---|---|
| 247 | return False |
| 248 | elif value == b"\x01": |
| 249 | return True |
| 250 | else: |
| 251 | raise DecodingError("invalid boolean") |
_deserialize_to_bytes
def _deserialize_to_bytes(class_: Union[Type[Bytes], Type[FixedBytes]], value: Simple) -> Union[Bytes, FixedBytes]:
| 257 | if not isinstance(value, bytes): |
|---|---|
| 258 | raise DecodingError("invalid bytes") |
| 259 | try: |
| 260 | return class_(value) |
| 261 | except ValueError as e: |
| 262 | raise DecodingError from e |
_deserialize_to_uint
def _deserialize_to_uint(class_: Union[Type[Uint], Type[FixedUnsigned]], decoded: Simple) -> Union[Uint, FixedUnsigned]:
| 268 | if not isinstance(decoded, bytes): |
|---|---|
| 269 | raise DecodingError("invalid uint") |
| 270 | try: |
| 271 | return class_.from_be_bytes(decoded) |
| 272 | except ValueError as e: |
| 273 | raise DecodingError from e |
_Annotation
| 276 | @runtime_checkable |
|---|
class _Annotation:
__metadata__
| 278 | __metadata__: Sequence[object] |
|---|
__origin__
| 279 | __origin__: object |
|---|
_deserialize_annotated
def _deserialize_annotated(annotation: _Annotation, value: Simple) -> Union[Tuple[Extended, None], Tuple[None, object]]:
| 285 | codecs = [x for x in annotation.__metadata__ if isinstance(x, With)] |
|---|---|
| 286 | if not codecs: |
| 287 | return (None, annotation.__origin__) |
| 288 | |
| 289 | if len(codecs) > 1: |
| 290 | raise Exception( |
| 291 | "multiple rlp.With annotations applied to the same type" |
| 292 | ) |
| 293 | |
| 294 | codec = codecs[0] |
| 295 | result = codec._decoder(value) |
| 296 | |
| 297 | try: |
| 298 | assert isinstance( |
| 299 | result, annotation.__origin__ # type: ignore[arg-type] |
| 300 | ), "annotated returned wrong type" |
| 301 | except TypeError as e: |
| 302 | # TODO: Check annotation types that don't work with `isinstance`. |
| 303 | msg = f"annotation {annotation.__origin__} doesn't support isinstance" |
| 304 | raise NotImplementedError(msg) from e |
| 305 | |
| 306 | return (codec._decoder(value), None) |
_deserialize_to_annotation
def _deserialize_to_annotation(annotation: object, value: Simple) -> Extended:
| 310 | origin = get_origin(annotation) |
|---|---|
| 311 | if origin in _UNION_TYPES: |
| 312 | return _deserialize_to_union(annotation, value) |
| 313 | elif origin in (Tuple, tuple): |
| 314 | return _deserialize_to_tuple(annotation, value) |
| 315 | elif origin in (List, Sequence, list): |
| 316 | return _deserialize_to_list(annotation, value) |
| 317 | elif origin is None: |
| 318 | raise Exception(annotation) |
| 319 | else: |
| 320 | raise NotImplementedError(f"RLP non-type {origin!r}") |
_deserialize_to_union
def _deserialize_to_union(annotation: object, value: Simple) -> Extended:
| 324 | arguments = get_args(annotation) |
|---|---|
| 325 | successes: List[Extended] = [] |
| 326 | failures = [] |
| 327 | for argument in arguments: |
| 328 | try: |
| 329 | success = |
| 330 | except Exception as e: |
| 331 | failures.append(e) |
| 332 | continue |
| 333 | |
| 334 | successes.append(success) |
| 335 | |
| 336 | if len(successes) == 1: |
| 337 | return successes[0] |
| 338 | elif not successes: |
| 339 | raise DecodingError(f"no matching union variant\n{failures!r}") |
| 340 | else: |
| 341 | raise DecodingError("multiple matching union variants") |
_deserialize_to_tuple
def _deserialize_to_tuple(annotation: object, values: Simple) -> Sequence[Extended]:
| 347 | if isinstance(values, bytes): |
|---|---|
| 348 | raise DecodingError("invalid tuple") |
| 349 | arguments = list(get_args(annotation)) |
| 350 | |
| 351 | if arguments[-1] is Ellipsis: |
| 352 | arguments.pop() |
| 353 | fill_count = len(values) - len(arguments) |
| 354 | arguments = list(arguments) + [arguments[-1]] * fill_count |
| 355 | |
| 356 | decoded = [] |
| 357 | for index, (argument, value) in enumerate(zip(arguments, values)): |
| 358 | try: |
| 359 | deserialized = |
| 360 | except Exception as e: |
| 361 | msg = f"cannot decode tuple element {index} of type `{argument}`" |
| 362 | raise DecodingError(msg) from e |
| 363 | decoded.append(deserialized) |
| 364 | |
| 365 | return tuple(decoded) |
_deserialize_to_list
def _deserialize_to_list(annotation: object, values: Simple) -> Sequence[Extended]:
| 371 | if isinstance(values, bytes): |
|---|---|
| 372 | raise DecodingError("invalid list") |
| 373 | argument = get_args(annotation)[0] |
| 374 | results = [] |
| 375 | for index, value in enumerate(values): |
| 376 | try: |
| 377 | deserialized = |
| 378 | except Exception as e: |
| 379 | msg = f"cannot decode list item {index} of type `{annotation}`" |
| 380 | raise DecodingError(msg) from e |
| 381 | results.append(deserialized) |
| 382 | return results |
decode_to_bytes
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type bytes.
def decode_to_bytes(encoded_bytes: Bytes) -> Bytes:
| 386 | """ |
|---|---|
| 387 | Decodes a rlp encoded byte stream assuming that the decoded data |
| 388 | should be of type `bytes`. |
| 389 | """ |
| 390 | if len(encoded_bytes) == 1 and encoded_bytes[0] < 0x80: |
| 391 | return encoded_bytes |
| 392 | elif encoded_bytes[0] <= 0xB7: |
| 393 | len_raw_data = encoded_bytes[0] - 0x80 |
| 394 | if len_raw_data < 0: |
| 395 | raise DecodingError("negative length") |
| 396 | if len_raw_data >= len(encoded_bytes): |
| 397 | raise DecodingError("truncated") |
| 398 | raw_data = encoded_bytes[1 : 1 + len_raw_data] |
| 399 | if len_raw_data == 1 and raw_data[0] < 0x80: |
| 400 | raise DecodingError |
| 401 | return raw_data |
| 402 | else: |
| 403 | # This is the index in the encoded data at which decoded data |
| 404 | # starts from. |
| 405 | decoded_data_start_idx = 1 + encoded_bytes[0] - 0xB7 |
| 406 | if decoded_data_start_idx - 1 >= len(encoded_bytes): |
| 407 | raise DecodingError |
| 408 | if encoded_bytes[1] == 0: |
| 409 | raise DecodingError |
| 410 | len_decoded_data = int( |
| 411 | Uint.from_be_bytes(encoded_bytes[1:decoded_data_start_idx]) |
| 412 | ) |
| 413 | if len_decoded_data < 0x38: |
| 414 | raise DecodingError |
| 415 | decoded_data_end_idx = decoded_data_start_idx + int(len_decoded_data) |
| 416 | if decoded_data_end_idx - 1 >= len(encoded_bytes): |
| 417 | raise DecodingError |
| 418 | return encoded_bytes[decoded_data_start_idx:decoded_data_end_idx] |
decode_to_sequence
Decodes a rlp encoded byte stream assuming that the decoded data
should be of type Sequence of objects.
def decode_to_sequence(encoded_sequence: Bytes) -> Sequence[Simple]:
| 422 | """ |
|---|---|
| 423 | Decodes a rlp encoded byte stream assuming that the decoded data |
| 424 | should be of type `Sequence` of objects. |
| 425 | """ |
| 426 | if encoded_sequence[0] <= 0xF7: |
| 427 | len_joined_encodings = encoded_sequence[0] - 0xC0 |
| 428 | if len_joined_encodings >= len(encoded_sequence): |
| 429 | raise DecodingError |
| 430 | joined_encodings = encoded_sequence[1 : 1 + len_joined_encodings] |
| 431 | else: |
| 432 | joined_encodings_start_idx = 1 + encoded_sequence[0] - 0xF7 |
| 433 | if joined_encodings_start_idx - 1 >= len(encoded_sequence): |
| 434 | raise DecodingError |
| 435 | if encoded_sequence[1] == 0: |
| 436 | raise DecodingError |
| 437 | len_joined_encodings = int( |
| 438 | Uint.from_be_bytes(encoded_sequence[1:joined_encodings_start_idx]) |
| 439 | ) |
| 440 | if len_joined_encodings < 0x38: |
| 441 | raise DecodingError |
| 442 | joined_encodings_end_idx = ( |
| 443 | joined_encodings_start_idx + len_joined_encodings |
| 444 | ) |
| 445 | if joined_encodings_end_idx - 1 >= len(encoded_sequence): |
| 446 | raise DecodingError |
| 447 | joined_encodings = encoded_sequence[ |
| 448 | joined_encodings_start_idx:joined_encodings_end_idx |
| 449 | ] |
| 450 | |
| 451 | return decode_joined_encodings(joined_encodings) |
decode_joined_encodings
Decodes joined_encodings, which is a concatenation of RLP encoded
objects.
def decode_joined_encodings(joined_encodings: Bytes) -> Sequence[Simple]:
| 455 | """ |
|---|---|
| 456 | Decodes `joined_encodings`, which is a concatenation of RLP encoded |
| 457 | objects. |
| 458 | """ |
| 459 | decoded_sequence = [] |
| 460 | |
| 461 | item_start_idx = 0 |
| 462 | while item_start_idx < len(joined_encodings): |
| 463 | encoded_item_length = decode_item_length( |
| 464 | joined_encodings[item_start_idx:] |
| 465 | ) |
| 466 | if item_start_idx + encoded_item_length - 1 >= len(joined_encodings): |
| 467 | raise DecodingError |
| 468 | encoded_item = joined_encodings[ |
| 469 | item_start_idx : item_start_idx + encoded_item_length |
| 470 | ] |
| 471 | decoded_sequence.append(decode(encoded_item)) |
| 472 | item_start_idx += encoded_item_length |
| 473 | |
| 474 | return decoded_sequence |
decode_item_length
Find the length of the rlp encoding for the first object in the encoded sequence.
Here encoded_data refers to concatenation of rlp encoding for each
item in a sequence.
def decode_item_length(encoded_data: Bytes) -> int:
| 478 | """ |
|---|---|
| 479 | Find the length of the rlp encoding for the first object in the |
| 480 | encoded sequence. |
| 481 | |
| 482 | Here `encoded_data` refers to concatenation of rlp encoding for each |
| 483 | item in a sequence. |
| 484 | """ |
| 485 | if len(encoded_data) <= 0: |
| 486 | raise DecodingError |
| 487 | |
| 488 | first_rlp_byte = encoded_data[0] |
| 489 | |
| 490 | # This is the length of the big endian representation of the length of |
| 491 | # rlp encoded object byte stream. |
| 492 | length_length = 0 |
| 493 | decoded_data_length = 0 |
| 494 | |
| 495 | # This occurs only when the raw_data is a single byte whose value < 128 |
| 496 | if first_rlp_byte < 0x80: |
| 497 | # We return 1 here, as the end formula |
| 498 | # 1 + length_length + decoded_data_length would be invalid for |
| 499 | # this case. |
| 500 | return 1 |
| 501 | # This occurs only when the raw_data is a byte stream with length < 56 |
| 502 | # and doesn't fall into the above cases |
| 503 | elif first_rlp_byte <= 0xB7: |
| 504 | decoded_data_length = first_rlp_byte - 0x80 |
| 505 | # This occurs only when the raw_data is a byte stream and doesn't fall |
| 506 | # into the above cases |
| 507 | elif first_rlp_byte <= 0xBF: |
| 508 | length_length = first_rlp_byte - 0xB7 |
| 509 | if length_length >= len(encoded_data): |
| 510 | raise DecodingError |
| 511 | if encoded_data[1] == 0: |
| 512 | raise DecodingError |
| 513 | decoded_data_length = int( |
| 514 | Uint.from_be_bytes(encoded_data[1 : 1 + length_length]) |
| 515 | ) |
| 516 | # This occurs only when the raw_data is a sequence of objects with |
| 517 | # length(concatenation of encoding of each object) < 56 |
| 518 | elif first_rlp_byte <= 0xF7: |
| 519 | decoded_data_length = first_rlp_byte - 0xC0 |
| 520 | # This occurs only when the raw_data is a sequence of objects and |
| 521 | # doesn't fall into the above cases. |
| 522 | elif first_rlp_byte <= 0xFF: |
| 523 | length_length = first_rlp_byte - 0xF7 |
| 524 | if length_length >= len(encoded_data): |
| 525 | raise DecodingError |
| 526 | if encoded_data[1] == 0: |
| 527 | raise DecodingError |
| 528 | decoded_data_length = int( |
| 529 | Uint.from_be_bytes(encoded_data[1 : 1 + length_length]) |
| 530 | ) |
| 531 | |
| 532 | return 1 + length_length + decoded_data_length |
Decoder
| 535 | Decoder: TypeAlias = Callable[[Simple], Extended] |
|---|
With
When used with Annotated, indicates that a value needs to be
encoded/decoded using a custom function.
class With:
__init__
def __init__(self, decoder: Decoder) -> None:
| 547 | self._decoder = decoder |
|---|