Skip to content

Commit d9d153f

Browse files
committed
update augment_csv to add a creation time column
1 parent dd44bc5 commit d9d153f

File tree

2 files changed

+67
-19
lines changed

2 files changed

+67
-19
lines changed

machine/archive/scripts/augment_csv.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
# ]
66
# ///
77

8+
from datetime import datetime
9+
810
import polars as pl
911

1012

@@ -16,12 +18,43 @@ def main(args):
1618
separator=" ",
1719
new_columns=["sha256", "sep", "path"],
1820
).drop("sep")
21+
22+
s3_metadata = {
23+
"creation_date": [],
24+
"size": [],
25+
"filename": [],
26+
}
27+
with args.s3_metadata.open() as f:
28+
_ = next(f)
29+
30+
for line in f:
31+
date, time, size, filename = line.strip().split()
32+
dt = datetime(
33+
*[int(v) for v in date.split("/")], *[int(v) for v in time.split(":")]
34+
)
35+
s3_metadata["creation_date"].append(dt)
36+
s3_metadata["size"].append(int(size))
37+
s3_metadata["filename"].append(f"sigs/{filename}")
38+
39+
s3_metadata_df = pl.from_dict(
40+
s3_metadata,
41+
schema={
42+
"creation_date": pl.Datetime,
43+
"size": pl.Int64,
44+
"filename": pl.String,
45+
},
46+
).lazy()
47+
48+
print(s3_metadata_df.head(5).collect())
49+
1950
df = manifest_df.join(sha256_df, left_on="internal_location", right_on="path")
2051
if args.basepath:
2152
df = df.with_columns(
2253
pl.col("internal_location").str.strip_prefix(args.basepath)
2354
)
2455

56+
df = df.join(s3_metadata_df, left_on="internal_location", right_on="filename")
57+
2558
match args.format:
2659
case "parquet":
2760
df.sink_parquet(
@@ -38,12 +71,14 @@ def main(args):
3871

3972
if __name__ == "__main__":
4073
import argparse
74+
import pathlib
4175

4276
parser = argparse.ArgumentParser()
4377
parser.add_argument("-b", "--basepath", default=None)
4478
parser.add_argument("-F", "--format", choices=["parquet", "csv"], default="parquet")
4579
parser.add_argument("manifest")
4680
parser.add_argument("sha256")
81+
parser.add_argument("s3_metadata", type=pathlib.Path)
4782
parser.add_argument("output", type=argparse.FileType("w", encoding="utf-8"))
4883

4984
args = parser.parse_args()

machine/mirror/wort-mirror.py

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414
import aiofiles
1515
import httpx
16-
from httpx_retries import RetryTransport
1716
import polars as pl
17+
from httpx_retries import RetryTransport
1818

1919

2020
ARCHIVE_URL = "https://farm.cse.ucdavis.edu/~irber"
@@ -75,6 +75,8 @@ async def main(args):
7575

7676
print(to_mirror_df.collect())
7777

78+
(args.basedir / "sigs").mkdir(parents=True, exist_ok=True)
79+
7880
async with httpx.AsyncClient(
7981
timeout=30.0,
8082
# limits=httpx.Limits(max_connections=args.max_downloaders),
@@ -95,22 +97,25 @@ async def main(args):
9597
)
9698
)
9799
except* Exception as eg:
98-
print(*[str(e)[:50] for e in eg.exceptions])
99-
100-
# copy manifest
101-
if args.dry_run:
102-
print(f"download: {manifest_url}")
103-
return
104-
105-
async with client.stream("GET", "SOURMASH-MANIFEST.parquet") as response:
106-
async with aiofiles.tempfile.NamedTemporaryFile() as f:
107-
async for chnk in response.aiter_raw(1024 * 1024):
108-
await f.write(chnk)
109-
await f.flush()
110-
111-
await asyncio.to_thread(
112-
shutil.copyfile, f.name, args.basedir / "SOURMASH-MANIFEST.parquet"
113-
)
100+
print(*[str(e)[:80] for e in eg.exceptions])
101+
print(len(eg.exceptions))
102+
else:
103+
# copy manifest
104+
if args.dry_run:
105+
print(f"download: {manifest_url}")
106+
return
107+
108+
async with client.stream("GET", "SOURMASH-MANIFEST.parquet") as response:
109+
async with aiofiles.tempfile.NamedTemporaryFile() as f:
110+
async for chnk in response.aiter_raw(1024 * 1024):
111+
await f.write(chnk)
112+
await f.flush()
113+
114+
await asyncio.to_thread(
115+
shutil.copyfile,
116+
f.name,
117+
args.basedir / "SOURMASH-MANIFEST.parquet",
118+
)
114119

115120

116121
async def download_sig(location, sha256, basedir, client, limiter, dry_run):
@@ -175,9 +180,17 @@ async def download_sig(location, sha256, basedir, client, limiter, dry_run):
175180
help="Calculate sha256 for local files, instead of depending only on filename",
176181
)
177182
parser.add_argument(
178-
"database", default="img", choices=DATABASES, metavar="database", help=f"Which database to download. Available databases: {', '.join(DATABASES)}"
183+
"database",
184+
default="img",
185+
choices=DATABASES,
186+
metavar="database",
187+
help=f"Which database to download. Available databases: {', '.join(DATABASES)}",
188+
)
189+
parser.add_argument(
190+
"basedir",
191+
type=pathlib.Path,
192+
help="base directory for the mirror (existing or new)",
179193
)
180-
parser.add_argument("basedir", type=pathlib.Path, help="base directory for the mirror (existing or new)")
181194

182195
args = parser.parse_args()
183196

0 commit comments

Comments
 (0)