33import os
44import shutil
55from dataclasses import dataclass
6- from datetime import datetime , timedelta
6+ from datetime import UTC , datetime , timedelta
77from enum import Enum , unique
88from glob import glob
99
@@ -222,7 +222,7 @@ def close(self, rolename: str, md: Metadata) -> None:
222222
223223 _ , expiry_days = self .signing_expiry_period (rolename )
224224
225- md .signed .expires = datetime .utcnow ( ) + timedelta (days = expiry_days )
225+ md .signed .expires = datetime .now ( UTC ) + timedelta (days = expiry_days )
226226
227227 md .signatures .clear ()
228228 for key in self ._get_keys (rolename ):
@@ -244,9 +244,8 @@ def close(self, rolename: str, md: Metadata) -> None:
244244 md .signatures [key .keyid ] = Signature (key .keyid , "" )
245245
246246 if rolename in ["timestamp" , "snapshot" ]:
247- root_md : Metadata [Root ] = self .open ("root" )
248247 # repository should never write unsigned online roles
249- root_md . verify_delegate (rolename , md )
248+ self . root (). verify_delegate (rolename , md . signed_bytes , md . signatures )
250249
251250 self ._write (rolename , md )
252251
@@ -321,7 +320,7 @@ def open_prev(self, role: str) -> Metadata | None:
321320 return None
322321
323322 def _validate_role (
324- self , delegator : Metadata , rolename : str
323+ self , delegator : Root | Targets , rolename : str
325324 ) -> tuple [bool , str | None ]:
326325 """Validate role compatibility with this repository
327326
@@ -340,7 +339,7 @@ def _validate_role(
340339 return False , f"Version { md .signed .version } is not valid for { rolename } "
341340
342341 days = md .signed .unrecognized_fields ["x-tuf-on-ci-expiry-period" ]
343- if md .signed .expires > datetime .utcnow ( ) + timedelta (days = days ):
342+ if md .signed .expires > datetime .now ( UTC ) + timedelta (days = days ):
344343 return False , f"Expiry date is further than expected { days } days ahead"
345344
346345 if isinstance (md .signed , Root ):
@@ -384,7 +383,7 @@ def _validate_role(
384383 # * check that target files in metadata match the files in targets/
385384
386385 try :
387- delegator .verify_delegate (rolename , md )
386+ delegator .verify_delegate (rolename , md . signed_bytes , md . signatures )
388387 except UnsignedMetadataError :
389388 return False , None
390389
@@ -483,16 +482,18 @@ def _get_signing_status(
483482 # Find delegating metadata. For root handle the special case of known good
484483 # delegating metadata.
485484 if known_good :
486- delegator = None
485+ delegator : Root | Targets | None = None
487486 if rolename == "root" :
488- delegator = self .open_prev ("root" )
487+ root_md = self .open_prev ("root" )
488+ if root_md :
489+ delegator = root_md .signed
489490 if not delegator :
490491 # Not root role or there is no known-good root metadata yet
491492 return None
492493 elif rolename in ["root" , "targets" ]:
493- delegator = self .open ( " root" )
494+ delegator = self .root ( )
494495 else :
495- delegator = self .open ( " targets" )
496+ delegator = self .targets ( )
496497
497498 # Build list of invites to all delegated roles of rolename
498499 delegation_names = []
@@ -503,7 +504,7 @@ def _get_signing_status(
503504 for delegation_name in delegation_names :
504505 invites .update (self .state .invited_signers_for_role (delegation_name ))
505506
506- role = delegator .signed . get_delegated_role (rolename )
507+ role = delegator .get_delegated_role (rolename )
507508
508509 # Build lists of signed signers and not signed signers
509510 for key in self ._get_keys (rolename , known_good ):
@@ -585,15 +586,14 @@ def build(self, metadata_path: str, artifact_path: str | None):
585586
586587 def bump_expiring (self , rolename : str ) -> int | None :
587588 """Create a new version of role if it is about to expire"""
588- now = datetime .utcnow ()
589589 bumped = True
590590
591591 with self .edit (rolename ) as signed :
592592 signing_days , _ = self .signing_expiry_period (rolename )
593593 delta = timedelta (days = signing_days )
594594
595595 logger .debug (f"{ rolename } signing period starts { signed .expires - delta } " )
596- if now + delta < signed .expires :
596+ if datetime . now ( UTC ) + delta < signed .expires :
597597 # no need to bump version
598598 bumped = False
599599 raise AbortEdit
@@ -622,13 +622,13 @@ def update_targets(self, rolename: str) -> bool:
622622
623623 def is_signed (self , rolename : str ) -> bool :
624624 """Return True if role is correctly signed"""
625- role_md = self .open (rolename )
625+ md = self .open (rolename )
626626 if rolename in ["root" , "timestamp" , "snapshot" , "targets" ]:
627- delegator = self .open ( " root" )
627+ delegator : Root | Targets = self .root ( )
628628 else :
629- delegator = self .open ( " targets" )
629+ delegator = self .targets ( )
630630 try :
631- delegator .verify_delegate (rolename , role_md )
631+ delegator .verify_delegate (rolename , md . signed_bytes , md . signatures )
632632 except UnsignedMetadataError :
633633 return False
634634
@@ -639,4 +639,4 @@ def is_in_signing_period(self, rolename: str) -> bool:
639639 role_md = self .open (rolename )
640640 signing_days , _ = self .signing_expiry_period (rolename )
641641 delta = timedelta (days = signing_days )
642- return datetime .utcnow ( ) >= role_md .signed .expires - delta
642+ return datetime .now ( UTC ) >= role_md .signed .expires - delta
0 commit comments