Skip to content

Commit c2729a2

Browse files
committed
initial mirroring script implementation
1 parent 008c61b commit c2729a2

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed

machine/mirror/wort-mirror.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# /// script
2+
# requires-python = ">=3.12"
3+
# dependencies = [
4+
# "aiofiles",
5+
# "httpx",
6+
# "httpx-retries",
7+
# "polars",
8+
# ]
9+
# ///
10+
11+
import hashlib
12+
import shutil
13+
14+
import aiofiles
15+
import httpx
16+
from httpx_retries import RetryTransport
17+
import polars as pl
18+
19+
20+
ARCHIVE_URL = "https://farm.cse.ucdavis.edu/~irber"
21+
22+
23+
async def main(args):
24+
manifest_url = f"{args.archive_url}/wort-{args.database}/SOURMASH-MANIFEST.parquet"
25+
manifest_df = (
26+
pl.scan_parquet(manifest_url)
27+
.select(["internal_location", "sha256"])
28+
.unique(subset=["internal_location"])
29+
# .head(5)
30+
)
31+
32+
limiter = asyncio.Semaphore(args.max_downloaders)
33+
34+
already_mirrored_locations = set()
35+
async with limiter:
36+
for root, dirs, files in args.basedir.walk(top_down=True):
37+
for name in files:
38+
already_mirrored_locations.add(
39+
str(root.relative_to(args.basedir) / name)
40+
)
41+
print(len(already_mirrored_locations))
42+
43+
if args.full_check:
44+
# check sha56
45+
internal_locations = []
46+
sha256_sums = []
47+
48+
# async with asyncio.TaskGroup() as tg:
49+
for location in already_mirrored_locations:
50+
async with aiofiles.open(args.basedir / location, mode="rb") as f:
51+
h = hashlib.new("sha256")
52+
while (chnk := await f.read(1024 * 1024)) != b"":
53+
h.update(chnk)
54+
sha256 = h.hexdigest()
55+
56+
internal_locations.append(location)
57+
sha256_sums.append(sha256)
58+
else:
59+
internal_locations = list(already_mirrored_locations)
60+
61+
print(f"{len(internal_locations)} sha256 calculated")
62+
63+
already_mirrored = {"internal_location": internal_locations}
64+
join_columns = ["internal_location"]
65+
schema = {"internal_location": pl.String}
66+
67+
if args.full_check:
68+
already_mirrored["sha256"] = sha256_sums
69+
schema["sha256"] = pl.String
70+
join_columns.append("sha256")
71+
72+
already_mirrored_df = pl.from_dict(already_mirrored, schema=schema).lazy()
73+
print(already_mirrored_df.collect())
74+
75+
to_mirror_df = manifest_df.join(already_mirrored_df, on=join_columns, how="anti")
76+
77+
print(to_mirror_df.collect())
78+
79+
try:
80+
async with httpx.AsyncClient(
81+
timeout=30.0,
82+
# limits=httpx.Limits(max_connections=args.max_downloaders),
83+
base_url=f"{args.archive_url}/wort-{args.database}/",
84+
transport=RetryTransport(),
85+
) as client:
86+
async with asyncio.TaskGroup() as tg:
87+
for location, sha256 in to_mirror_df.collect().iter_rows():
88+
tg.create_task(
89+
download_sig(
90+
location,
91+
sha256,
92+
args.basedir,
93+
client,
94+
limiter,
95+
args.dry_run,
96+
)
97+
)
98+
except* Exception as eg:
99+
print(*[str(e)[:50] for e in eg.exceptions])
100+
101+
102+
async def download_sig(location, sha256, basedir, client, limiter, dry_run):
103+
async with limiter:
104+
if dry_run:
105+
print(f"download: {location}")
106+
return
107+
108+
async with client.stream("GET", location) as response:
109+
h = hashlib.new("sha256")
110+
total_bytes = 0
111+
response.raise_for_status()
112+
# download to temp location
113+
async with aiofiles.tempfile.NamedTemporaryFile(delete=False) as f:
114+
async for chnk in response.aiter_raw(1024 * 1024):
115+
h.update(chnk)
116+
await f.write(chnk)
117+
total_bytes += len(chnk)
118+
119+
if sha256 != h.hexdigest():
120+
# TODO: raise exception, download failed?
121+
# or maybe retry?
122+
print(f"download failed! expected {sha256}, got {h.hexdigest()}")
123+
124+
await f.flush()
125+
126+
# move to final location
127+
## TODO: the goal here is to avoid incomplete downloads,
128+
## but I'm still getting incomplete files =/
129+
print(f"completed {location}, {total_bytes:,} bytes")
130+
await asyncio.to_thread(shutil.copyfile, f.name, basedir / location)
131+
132+
133+
if __name__ == "__main__":
134+
import argparse
135+
import asyncio
136+
import pathlib
137+
138+
parser = argparse.ArgumentParser()
139+
parser.add_argument("-d", "--dry-run", default=True, action="store_true")
140+
parser.add_argument("-a", "--archive-url", default=ARCHIVE_URL)
141+
parser.add_argument("-m", "--max-downloaders", type=int, default=30)
142+
parser.add_argument("-f", "--full-check", default=False, action="store_true")
143+
parser.add_argument(
144+
"database", default="img", choices=["full", "img", "genomes", "sra"]
145+
)
146+
parser.add_argument("basedir", type=pathlib.Path)
147+
148+
args = parser.parse_args()
149+
150+
asyncio.run(main(args))

0 commit comments

Comments
 (0)