ethereum.forks.amsterdam.trie

State Trie.

.. contents:: Table of Contents :backlinks: none :local:

Introduction

The state trie is the structure responsible for storing .fork_types.Account objects.

EMPTY_TRIE_ROOT

67
EMPTY_TRIE_ROOT = Root(
68
    hex_to_bytes(
69
        "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"
70
    )
71
)

Node

73
Node = Account | Bytes | LegacyTransaction | Receipt | Uint | U256 | Withdrawal

K

74
K = TypeVar("K", bound=Bytes)

V

75
V = TypeVar(
76
    "V",
77
    Optional[Account],
78
    Optional[Bytes],
79
    Bytes,
80
    Optional[LegacyTransaction | Bytes],
81
    Optional[Receipt | Bytes],
82
    Optional[Withdrawal | Bytes],
83
    Uint,
84
    U256,
85
)

encode_internal_node

Encodes a Merkle Trie node into its RLP form. The RLP will then be serialized into a Bytes object and hashed unless it is less than 32 bytes when serialized.

This function also accepts None, representing the absence of a node, which is encoded to b"".

Parameters

node : Optional[InternalNode] The node to encode.

Returns

encoded : Extended The node encoded as RLP.

def encode_internal_node(node: Optional[InternalNode]) -> Extended:
89
    """
90
    Encodes a Merkle Trie node into its RLP form. The RLP will then be
91
    serialized into a `Bytes` object and hashed unless it is less than 32 bytes
92
    when serialized.
93
94
    This function also accepts `None`, representing the absence of a node,
95
    which is encoded to `b""`.
96
97
    Parameters
98
    ----------
99
    node : Optional[InternalNode]
100
        The node to encode.
101
102
    Returns
103
    -------
104
    encoded : `Extended`
105
        The node encoded as RLP.
106
107
    """
108
    unencoded: Extended
109
    if node is None:
110
        unencoded = b""
111
    elif isinstance(node, LeafNode):
112
        unencoded = (
113
            nibble_list_to_compact(node.rest_of_key, True),
114
            node.value,
115
        )
116
    elif isinstance(node, ExtensionNode):
117
        unencoded = (
118
            nibble_list_to_compact(node.key_segment, False),
119
            node.subnode,
120
        )
121
    elif isinstance(node, BranchNode):
122
        unencoded = list(node.subnodes) + [node.value]
123
    else:
124
        raise AssertionError(f"Invalid internal node type {type(node)}!")
125
126
    encoded = rlp.encode(unencoded)
127
    if len(encoded) < 32:
128
        return unencoded
129
    else:
130
        return keccak256(encoded)

encode_node

Encode a Node for storage in the Merkle Trie.

Currently mostly an unimplemented stub.

def encode_node(node: Node, ​​storage_root: Optional[Bytes]) -> Bytes:
134
    """
135
    Encode a Node for storage in the Merkle Trie.
136
137
    Currently mostly an unimplemented stub.
138
    """
139
    if isinstance(node, Account):
140
        assert storage_root is not None
141
        return encode_account(node, storage_root)
142
    elif isinstance(
143
        node, (LegacyTransaction, Receipt, Withdrawal, U256, Uint)
144
    ):
145
        return rlp.encode(node)
146
    elif isinstance(node, Bytes):
147
        return node
148
    else:
149
        return previous_trie.encode_node(node, storage_root)

Trie

The Merkle Trie.

152
@dataclass
class Trie:

secured

158
    secured: bool

default

159
    default: V

_data

160
    _data: Dict[K, V] = field(default_factory=dict)

copy_trie

Create a copy of trie. Since only frozen objects may be stored in tries, the contents are reused.

Parameters

trie: Trie Trie to copy.

Returns

new_trie : Trie[K, V] A copy of the trie.

def copy_trie(trie: Trie[K, V]) -> Trie[K, V]:
164
    """
165
    Create a copy of `trie`. Since only frozen objects may be stored in tries,
166
    the contents are reused.
167
168
    Parameters
169
    ----------
170
    trie: `Trie`
171
        Trie to copy.
172
173
    Returns
174
    -------
175
    new_trie : `Trie[K, V]`
176
        A copy of the trie.
177
178
    """
179
    return Trie(trie.secured, trie.default, copy.copy(trie._data))

trie_set

Stores an item in a Merkle Trie.

This method deletes the key if value == trie.default, because the Merkle Trie represents the default value by omitting it from the trie.

Parameters

trie: Trie Trie to store in. key : Bytes Key to lookup. value : V Node to insert at key.

def trie_set(trie: Trie[K, V], ​​key: K, ​​value: V) -> None:
183
    """
184
    Stores an item in a Merkle Trie.
185
186
    This method deletes the key if `value == trie.default`, because the Merkle
187
    Trie represents the default value by omitting it from the trie.
188
189
    Parameters
190
    ----------
191
    trie: `Trie`
192
        Trie to store in.
193
    key : `Bytes`
194
        Key to lookup.
195
    value : `V`
196
        Node to insert at `key`.
197
198
    """
199
    if value == trie.default:
200
        if key in trie._data:
201
            del trie._data[key]
202
    else:
203
        trie._data[key] = value

trie_get

Gets an item from the Merkle Trie.

This method returns trie.default if the key is missing.

Parameters

trie: Trie to lookup in. key : Key to lookup.

Returns

node : V Node at key in the trie.

def trie_get(trie: Trie[K, V], ​​key: K) -> V:
207
    """
208
    Gets an item from the Merkle Trie.
209
210
    This method returns `trie.default` if the key is missing.
211
212
    Parameters
213
    ----------
214
    trie:
215
        Trie to lookup in.
216
    key :
217
        Key to lookup.
218
219
    Returns
220
    -------
221
    node : `V`
222
        Node at `key` in the trie.
223
224
    """
225
    return trie._data.get(key, trie.default)

common_prefix_length

Find the longest common prefix of two sequences.

def common_prefix_length(a: Sequence, ​​b: Sequence) -> int:
229
    """
230
    Find the longest common prefix of two sequences.
231
    """
232
    for i in range(len(a)):
233
        if i >= len(b) or a[i] != b[i]:
234
            return i
235
    return len(a)

nibble_list_to_compact

Compresses nibble-list into a standard byte array with a flag.

A nibble-list is a list of byte values no greater than 15. The flag is encoded in high nibble of the highest byte. The flag nibble can be broken down into two two-bit flags.

Highest nibble::

+---+---+----------+--------+
| _ | _ | is_leaf | parity |
+---+---+----------+--------+
  3   2      1         0

The lowest bit of the nibble encodes the parity of the length of the remaining nibbles -- 0 when even and 1 when odd. The second lowest bit is used to distinguish leaf and extension nodes. The other two bits are not used.

Parameters

x : Array of nibbles. is_leaf : True if this is part of a leaf node, or false if it is an extension node.

Returns

compressed : bytearray Compact byte array.

def nibble_list_to_compact(x: Bytes, ​​is_leaf: bool) -> Bytes:
239
    """
240
    Compresses nibble-list into a standard byte array with a flag.
241
242
    A nibble-list is a list of byte values no greater than `15`. The flag is
243
    encoded in high nibble of the highest byte. The flag nibble can be broken
244
    down into two two-bit flags.
245
246
    Highest nibble::
247
248
        +---+---+----------+--------+
249
        | _ | _ | is_leaf | parity |
250
        +---+---+----------+--------+
251
          3   2      1         0
252
253
254
    The lowest bit of the nibble encodes the parity of the length of the
255
    remaining nibbles -- `0` when even and `1` when odd. The second lowest bit
256
    is used to distinguish leaf and extension nodes. The other two bits are not
257
    used.
258
259
    Parameters
260
    ----------
261
    x :
262
        Array of nibbles.
263
    is_leaf :
264
        True if this is part of a leaf node, or false if it is an extension
265
        node.
266
267
    Returns
268
    -------
269
    compressed : `bytearray`
270
        Compact byte array.
271
272
    """
273
    compact = bytearray()
274
275
    if len(x) % 2 == 0:  # ie even length
276
        compact.append(16 * (2 * is_leaf))
277
        for i in range(0, len(x), 2):
278
            compact.append(16 * x[i] + x[i + 1])
279
    else:
280
        compact.append(16 * ((2 * is_leaf) + 1) + x[0])
281
        for i in range(1, len(x), 2):
282
            compact.append(16 * x[i] + x[i + 1])
283
284
    return Bytes(compact)

bytes_to_nibble_list

Converts a Bytes into to a sequence of nibbles (bytes with value < 16).

Parameters

bytes_: The Bytes to convert.

Returns

nibble_list : Bytes The Bytes in nibble-list format.

def bytes_to_nibble_list(bytes_: Bytes) -> Bytes:
288
    """
289
    Converts a `Bytes` into to a sequence of nibbles (bytes with value < 16).
290
291
    Parameters
292
    ----------
293
    bytes_:
294
        The `Bytes` to convert.
295
296
    Returns
297
    -------
298
    nibble_list : `Bytes`
299
        The `Bytes` in nibble-list format.
300
301
    """
302
    nibble_list = bytearray(2 * len(bytes_))
303
    for byte_index, byte in enumerate(bytes_):
304
        nibble_list[byte_index * 2] = (byte & 0xF0) >> 4
305
        nibble_list[byte_index * 2 + 1] = byte & 0x0F
306
    return Bytes(nibble_list)

_prepare_trie

Prepares the trie for root calculation. Removes values that are empty, hashes the keys (if secured == True) and encodes all the nodes.

Parameters

trie : The Trie to prepare. get_storage_root : Function to get the storage root of an account. Needed to encode Account objects.

Returns

out : Mapping[ethereum.base_types.Bytes, Node] Object with keys mapped to nibble-byte form.

def _prepare_trie(trie: Trie[K, V], ​​get_storage_root: Optional[Callable[[Address], Root]]) -> Mapping[Bytes, Bytes]:
313
    """
314
    Prepares the trie for root calculation. Removes values that are empty,
315
    hashes the keys (if `secured == True`) and encodes all the nodes.
316
317
    Parameters
318
    ----------
319
    trie :
320
        The `Trie` to prepare.
321
    get_storage_root :
322
        Function to get the storage root of an account. Needed to encode
323
        `Account` objects.
324
325
    Returns
326
    -------
327
    out : `Mapping[ethereum.base_types.Bytes, Node]`
328
        Object with keys mapped to nibble-byte form.
329
330
    """
331
    mapped: MutableMapping[Bytes, Bytes] = {}
332
333
    for preimage, value in trie._data.items():
334
        if isinstance(value, Account):
335
            assert get_storage_root is not None
336
            address = Address(preimage)
337
            encoded_value = encode_node(value, get_storage_root(address))
338
        elif value is None:
339
            raise AssertionError("cannot encode `None`")
340
        else:
341
            encoded_value = encode_node(value)
342
        if encoded_value == b"":
343
            raise AssertionError
344
        key: Bytes
345
        if trie.secured:
346
            # "secure" tries hash keys once before construction
347
            key = keccak256(preimage)
348
        else:
349
            key = preimage
350
        mapped[bytes_to_nibble_list(key)] = encoded_value
351
352
    return mapped

root

Computes the root of a modified merkle patricia trie (MPT).

Parameters

trie : Trie to get the root of. get_storage_root : Function to get the storage root of an account. Needed to encode Account objects.

Returns

root : .fork_types.Root MPT root of the underlying key-value pairs.

def root(trie: Trie[K, V], ​​get_storage_root: Optional[Callable[[Address], Root]]) -> Root:
359
    """
360
    Computes the root of a modified merkle patricia trie (MPT).
361
362
    Parameters
363
    ----------
364
    trie :
365
        `Trie` to get the root of.
366
    get_storage_root :
367
        Function to get the storage root of an account. Needed to encode
368
        `Account` objects.
369
370
371
    Returns
372
    -------
373
    root : `.fork_types.Root`
374
        MPT root of the underlying key-value pairs.
375
376
    """
377
    obj = _prepare_trie(trie, get_storage_root)
378
379
    root_node = encode_internal_node(patricialize(obj, Uint(0)))
380
    if len(rlp.encode(root_node)) < 32:
381
        return keccak256(rlp.encode(root_node))
382
    else:
383
        assert isinstance(root_node, Bytes)
384
        return Root(root_node)

patricialize

Structural composition function.

Used to recursively patricialize and merkleize a dictionary. Includes memoization of the tree structure and hashes.

Parameters

obj : Underlying trie key-value pairs, with keys in nibble-list format. level : Current trie level.

Returns

node : ethereum.base_types.Bytes Root node of obj.

def patricialize(obj: Mapping[Bytes, Bytes], ​​level: Uint) -> Optional[InternalNode]:
390
    """
391
    Structural composition function.
392
393
    Used to recursively patricialize and merkleize a dictionary. Includes
394
    memoization of the tree structure and hashes.
395
396
    Parameters
397
    ----------
398
    obj :
399
        Underlying trie key-value pairs, with keys in nibble-list format.
400
    level :
401
        Current trie level.
402
403
    Returns
404
    -------
405
    node : `ethereum.base_types.Bytes`
406
        Root node of `obj`.
407
408
    """
409
    if len(obj) == 0:
410
        return None
411
412
    arbitrary_key = next(iter(obj))
413
414
    # if leaf node
415
    if len(obj) == 1:
416
        leaf = LeafNode(arbitrary_key[level:], obj[arbitrary_key])
417
        return leaf
418
419
    # prepare for extension node check by finding max j such that all keys in
420
    # obj have the same key[i:j]
421
    substring = arbitrary_key[level:]
422
    prefix_length = len(substring)
423
    for key in obj:
424
        prefix_length = min(
425
            prefix_length, common_prefix_length(substring, key[level:])
426
        )
427
428
        # finished searching, found another key at the current level
429
        if prefix_length == 0:
430
            break
431
432
    # if extension node
433
    if prefix_length > 0:
434
        prefix = arbitrary_key[int(level) : int(level) + prefix_length]
435
        return ExtensionNode(
436
            prefix,
437
            encode_internal_node(
438
                patricialize(obj, level + Uint(prefix_length))
439
            ),
440
        )
441
442
    branches: List[MutableMapping[Bytes, Bytes]] = []
443
    for _ in range(16):
444
        branches.append({})
445
    value = b""
446
    for key in obj:
447
        if len(key) == level:
448
            # shouldn't ever have an account or receipt in an internal node
449
            if isinstance(obj[key], (Account, Receipt, Uint)):
450
                raise AssertionError
451
            value = obj[key]
452
        else:
453
            branches[key[level]][key] = obj[key]
454
455
    subnodes = tuple(
456
        encode_internal_node(patricialize(branches[k], level + Uint(1)))
457
        for k in range(16)
458
    )
459
    return BranchNode(
460
        cast(BranchSubnodes, assert_type(subnodes, Tuple[Extended, ...])),
461
        value,
462
    )