Skip to content

Commit edc473d

Browse files
committed
refactor blob store config
1 parent 3d3b292 commit edc473d

File tree

10 files changed

+287
-281
lines changed

10 files changed

+287
-281
lines changed

global.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ type MetricName =
417417
| 'fullhistory_short'
418418
| 'match_archive_read'
419419
| 'match_archive_write'
420+
| 'blob_archive_read'
420421
| 'auto_parse'
421422
| 'added_match'
422423
| 'distinct_match_player'

store/buildMatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ async function buildMatch(
109109
let [match, odData]: [
110110
Match | ParsedMatch | null,
111111
GetMatchDataMetadata | null,
112-
] = await getMatchDataFromBlobWithMetadata(matchId, true);
112+
] = await getMatchDataFromBlobWithMetadata(matchId);
113113
if (!match) {
114114
return null;
115115
}

store/buildStatus.ts

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -72,57 +72,71 @@ export async function buildStatus() {
7272
'+inf',
7373
),
7474
tracked_players: async () => redis.zcard('tracked'),
75+
76+
registry_proxy: async () => redis.zcard('registry:proxy'),
77+
registry_retriever: async () => redis.zcard('registry:retriever'),
78+
registry_parser: async () => redis.zcard('registry:parser'),
79+
7580
matches_last_day: async () => countDay('added_match'),
81+
matches_prev_hour: async () => countLastHour('added_match'),
7682
distinct_match_players_last_day: async () =>
7783
countDayDistinct('distinct_match_player'),
7884
distinct_match_players_user_last_day: async () =>
7985
countDayDistinct('distinct_match_player_user'),
8086
distinct_match_players_recent_user_last_day: async () =>
8187
countDayDistinct('distinct_match_player_recent_user'),
82-
matches_prev_hour: async () => countLastHour('added_match'),
83-
auto_parse_last_day: async () => countDay('auto_parse'),
84-
requests_last_day: async () => countDay('request'),
85-
distinct_requests_last_day: async () =>
86-
countDayDistinct('distinct_request'),
87-
requests_ui_day: async () => countDay('request_ui'),
88-
requests_api_key_last_day: async () => countDay('request_api_key'),
89-
registry_proxy: async () => redis.zcard('registry:proxy'),
90-
registry_retriever: async () => redis.zcard('registry:retriever'),
91-
registry_parser: async () => redis.zcard('registry:parser'),
88+
9289
retriever_matches_current_hour: async () => countHour('retriever'),
9390
retriever_matches_last_day: async () => countDay('retriever'),
9491
retriever_players_last_day: async () => countDay('retriever_player'),
92+
auto_parse_last_day: async () => countDay('auto_parse'),
9593
parse_jobs_last_day: async () => countDay('parser_job'),
9694
parse_fails_last_day: async () => countDay('parser_fail'),
9795
parse_crashes_last_day: async () => countDay('parser_crash'),
9896
parse_skips_last_day: async () => countDay('parser_skip'),
9997
parsed_matches_last_day: async () => countDay('parser'),
100-
reparse_early_last_day: async () => countDay('reparse_early'),
101-
reapi_last_day: async () => countDay('reapi'),
102-
regcdata_last_day: async () => countDay('regcdata'),
103-
reparse_last_day: async () => countDay('reparse'),
104-
oldparse_last_day: async () => countDay('oldparse'),
105-
meta_parsed_last_day: async () => countDay('meta_parse'),
98+
99+
requests_last_day: async () => countDay('request'),
100+
distinct_requests_last_day: async () =>
101+
countDayDistinct('distinct_request'),
102+
requests_ui_day: async () => countDay('request_ui'),
103+
requests_api_key_last_day: async () => countDay('request_api_key'),
106104
fullhistory_last_day: async () => countDay('fullhistory'),
107105
fullhistory_short_last_day: async () => countDay('fullhistory_short'),
108106
fullhistory_ops_last_day: async () => countDay('fullhistory_op'),
109107
fullhistory_skips_last_day: async () => countDay('fullhistory_skip'),
110-
// gen_api_key_invalid_last_day: async () => getRedisCountDay('gen_api_key_invalid'),
111-
steam_api_calls_last_day: async () => countDay('steam_api_call'),
112-
steam_proxy_calls_last_day: async () => countDay('steam_proxy_call'),
113-
steam_429_last_day: async () => countDay('steam_429'),
114-
steam_403_last_day: async () => countDay('steam_403'),
115-
steam_api_backfill_last_day: async () => countDay('steam_api_backfill'),
116-
steam_api_notfound_last_day: async () => countDay('steam_api_notfound'),
117-
steam_gc_backfill_last_day: async () => countDay('steam_gc_backfill'),
108+
meta_parsed_last_day: async () => countDay('meta_parse'),
109+
110+
// reapi_last_day: async () => countDay('reapi'),
111+
regcdata_last_day: async () => countDay('regcdata'),
112+
reparse_last_day: async () => countDay('reparse'),
113+
reparse_early_last_day: async () => countDay('reparse_early'),
114+
// oldparse_last_day: async () => countDay('oldparse'),
115+
116+
blob_archive_read_last_day: async () => countDay('blob_archive_read'),
118117
match_archive_read_last_day: async () => countDay('match_archive_read'),
119118
match_archive_write_last_day: async () => countDay('match_archive_write'),
120119
incomplete_archive_last_day: async () => countDay('incomplete_archive'),
120+
121121
api_hits_last_day: async () => countDay('api_hits'),
122122
api_hits_ui_last_day: async () => countDay('api_hits_ui'),
123123
build_match_last_day: async () => countDay('build_match'),
124124
get_player_matches_last_day: async () => countDay('player_matches'),
125-
self_player_matches_last_day: async () => countDay('self_profile_view'),
125+
// self_player_matches_last_day: async () => countDay('self_profile_view'),
126+
// gen_api_key_invalid_last_day: async () => getRedisCountDay('gen_api_key_invalid'),
127+
error_last_day: async () => countDay('500_error'),
128+
web_crash_last_day: async () => countDay('web_crash'),
129+
skip_seq_num_last_day: async () => countDay('skip_seq_num'),
130+
secondary_scanner_last_day: async () => countDay('secondary_scanner'),
131+
132+
steam_api_calls_last_day: async () => countDay('steam_api_call'),
133+
steam_proxy_calls_last_day: async () => countDay('steam_proxy_call'),
134+
steam_429_last_day: async () => countDay('steam_429'),
135+
steam_403_last_day: async () => countDay('steam_403'),
136+
steam_api_backfill_last_day: async () => countDay('steam_api_backfill'),
137+
// steam_api_notfound_last_day: async () => countDay('steam_api_notfound'),
138+
// steam_gc_backfill_last_day: async () => countDay('steam_gc_backfill'),
139+
126140
match_cache_hit_last_day: async () => countDay('match_cache_hit'),
127141
player_cache_hit_last_day: async () => countDay('player_cache_hit'),
128142
player_cache_miss_last_day: async () => countDay('player_cache_miss'),
@@ -137,10 +151,6 @@ export async function buildStatus() {
137151
auto_player_cache_last_day: async () => countDay('auto_player_cache'),
138152
distinct_auto_player_cache_last_day: async () =>
139153
countDayDistinct('distinct_auto_player_cache'),
140-
error_last_day: async () => countDay('500_error'),
141-
web_crash_last_day: async () => countDay('web_crash'),
142-
skip_seq_num_last_day: async () => countDay('skip_seq_num'),
143-
secondary_scanner_last_day: async () => countDay('secondary_scanner'),
144154
api_paths: async () => {
145155
const results = await redis.zrangebyscore(
146156
'api_paths',

store/getApiData.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,19 @@ const blobArchive = config.ENABLE_BLOB_ARCHIVE ? new Archive('blob') : null;
1212
* @param matchId
1313
* @returns
1414
*/
15-
export async function readApiData(matchId: number): Promise<ApiMatch | null> {
15+
export async function readApiData(matchId: number, noBlobStore?: boolean): Promise<ApiMatch | null> {
1616
const result = await cassandra.execute(
1717
'SELECT api FROM match_blobs WHERE match_id = ?',
1818
[matchId],
1919
{ prepare: true, fetchSize: 1, autoPage: true },
2020
);
2121
const row = result.rows[0];
2222
let data = row?.api ? (JSON.parse(row.api) as ApiMatch) : undefined;
23-
if (!data && blobArchive) {
23+
if (!data && blobArchive && !noBlobStore) {
2424
const archive = await blobArchive.archiveGet(`${matchId}_api`);
25+
if (archive) {
26+
redisCount(redis, 'blob_archive_read');
27+
}
2528
data = archive ? JSON.parse(archive.toString()) as ApiMatch : undefined;
2629
}
2730
if (!data) {

store/getArchivedData.ts

Lines changed: 8 additions & 219 deletions
Original file line numberDiff line numberDiff line change
@@ -1,229 +1,18 @@
11
import config from '../config';
2+
import { redisCount } from '../util/utility';
23
import { Archive } from './archive';
3-
import {
4-
getFullPlayerMatchesWithMetadata,
5-
getMatchDataFromBlobWithMetadata,
6-
} from './queries';
74
import db from './db';
85
import redis from './redis';
9-
import cassandra from './cassandra';
10-
import type { PutObjectCommandOutput } from '@aws-sdk/client-s3';
11-
import { isDataComplete, redisCount } from '../util/utility';
12-
import QueryStream from 'pg-query-stream';
13-
import { Client } from 'pg';
14-
import crypto from 'crypto';
156

16-
const matchArchive = new Archive('match');
17-
const playerArchive = new Archive('player');
7+
const matchArchive = config.ENABLE_MATCH_ARCHIVE ? new Archive('match') : null;
8+
const playerArchive = config.ENABLE_PLAYER_ARCHIVE ? new Archive('player') : null;
189

19-
export async function doArchivePlayerMatches(
20-
accountId: number,
21-
): Promise<PutObjectCommandOutput | { message: string} | null> {
22-
if (!config.ENABLE_PLAYER_ARCHIVE) {
23-
return null;
24-
}
25-
// Fetch our combined list of archive and current, selecting all fields
26-
const full = await getFullPlayerMatchesWithMetadata(accountId);
27-
const toArchive = full[0];
28-
console.log(full[1]);
29-
toArchive.forEach((m, i) => {
30-
Object.keys(m).forEach((key) => {
31-
if (m[key as keyof ParsedPlayerMatch] === null) {
32-
// Remove any null values from the matches for storage
33-
delete m[key as keyof ParsedPlayerMatch];
34-
}
35-
});
36-
});
37-
// TODO (howard) Make sure the new list is longer than the old list
38-
// Make sure we're archiving at least 1 match
39-
if (!toArchive.length) {
40-
return null;
41-
}
42-
// Put the blob
43-
return playerArchive.archivePut(
44-
accountId.toString(),
45-
Buffer.from(JSON.stringify(toArchive)),
46-
);
47-
// TODO (howard) delete the archived values from player_caches
48-
// TODO (howard) keep the 20 highest match IDs for recentMatches
49-
// TODO (howard) mark the user archived so we don't need to query archive on every request
50-
// TODO (howard) add redis counts
51-
}
52-
53-
async function doArchiveFromBlob(matchId: number) {
54-
if (!config.ENABLE_MATCH_ARCHIVE) {
55-
return;
56-
}
57-
// Don't backfill when determining whether to archive
58-
const [match, metadata] = await getMatchDataFromBlobWithMetadata(
59-
matchId,
60-
false,
61-
);
62-
if (!match) {
63-
// Invalid/not found, skip
64-
return;
65-
}
66-
if (metadata?.has_api && !metadata?.has_gcdata && !metadata?.has_parsed) {
67-
// if it only contains API data, delete?
68-
// If the match is old we might not be able to get back ability builds, HD/TD/HH
69-
// We might also drop gcdata, identity, and ranks here
70-
// await deleteMatch(matchId);
71-
// console.log('DELETE match %s, apionly', matchId);
72-
return;
73-
}
74-
if (metadata?.has_parsed) {
75-
const isArchived = Boolean(
76-
(
77-
await db.raw(
78-
'select match_id from parsed_matches where match_id = ? and is_archived IS TRUE',
79-
[matchId],
80-
)
81-
).rows[0],
82-
);
83-
if (isArchived) {
84-
console.log('ALREADY ARCHIVED match %s', matchId);
85-
await deleteParsed(matchId);
86-
return;
87-
}
88-
// check data completeness with isDataComplete
89-
if (!isDataComplete(match as ParsedMatch)) {
90-
redisCount(redis, 'incomplete_archive');
91-
console.log('INCOMPLETE match %s', matchId);
92-
return;
93-
}
94-
redisCount(redis, 'match_archive_write');
95-
// console.log('SIMULATE ARCHIVE match %s', matchId);
96-
// TODO (howard) don't actually archive until verification of data format
97-
return;
98-
// Archive the data since it's parsed. This might also contain api and gcdata
99-
const blob = Buffer.from(JSON.stringify(match));
100-
const result = await matchArchive.archivePut(matchId.toString(), blob);
101-
if (result) {
102-
// Mark the match archived
103-
await db.raw(
104-
`UPDATE parsed_matches SET is_archived = TRUE WHERE match_id = ?`,
105-
[matchId],
106-
);
107-
// Delete the parsed data (this keeps the api and gcdata around in Cassandra since it doesn't take a ton of space)
108-
await deleteParsed(matchId);
109-
console.log('ARCHIVE match %s, parsed', matchId);
110-
}
111-
return result;
112-
}
113-
// if it's something else, e.g. contains api and gcdata only, leave it for now
114-
// console.log('SKIP match %s, unparsed', matchId);
115-
return;
116-
}
117-
118-
async function deleteParsed(matchId: number) {
119-
await cassandra.execute(
120-
'DELETE parsed from match_blobs WHERE match_id = ?',
121-
[matchId],
122-
{
123-
prepare: true,
124-
},
125-
);
126-
}
127-
128-
export async function archivePostgresStream() {
129-
// Archive parsed matches that aren't archived from postgres records
130-
const max = await getCurrentMaxArchiveID();
131-
const query = new QueryStream(
132-
`
133-
SELECT match_id
134-
from parsed_matches
135-
WHERE is_archived IS NULL
136-
and match_id < ?
137-
ORDER BY match_id asc`,
138-
[max],
139-
);
140-
const pg = new Client(config.POSTGRES_URL);
141-
await pg.connect();
142-
const stream = pg.query(query);
143-
let i = 0;
144-
stream.on('readable', async () => {
145-
let row;
146-
while ((row = stream.read())) {
147-
i += 1;
148-
console.log(i);
149-
try {
150-
await doArchiveFromBlob(row.match_id);
151-
} catch (e) {
152-
console.error(e);
153-
}
154-
}
155-
});
156-
stream.on('end', async () => {
157-
await pg.end();
158-
});
159-
}
160-
161-
async function archiveSequential(start: number, max: number) {
162-
// Archive sequentially starting at a given ID (not all IDs may be valid)
163-
for (let i = start; i < max; i++) {
164-
console.log(i);
165-
try {
166-
await doArchiveFromBlob(i);
167-
} catch (e) {
168-
console.error(e);
169-
}
170-
}
171-
}
172-
173-
async function archiveRandom(max: number) {
174-
const rand = randomInt(0, max);
175-
// Bruteforce 1000 IDs starting at a random value (not all IDs may be valid)
176-
const page = [];
177-
for (let i = 0; i < 1000; i++) {
178-
page.push(rand + i);
179-
}
180-
console.log(page[0]);
181-
await Promise.allSettled(page.map((i) => doArchiveFromBlob(i)));
182-
}
183-
184-
export async function archiveToken(max: number) {
185-
// Archive random matches from Cassandra using token range (not all may be parsed)
186-
let page = await getTokenRange(1000);
187-
page = page.filter((id) => id < max);
188-
console.log(page[0]);
189-
await Promise.allSettled(page.map((i) => doArchiveFromBlob(i)));
190-
}
191-
192-
function randomBigInt(byteCount: number) {
193-
return BigInt(`0x${crypto.randomBytes(byteCount).toString('hex')}`);
194-
}
195-
196-
function randomInt(min: number, max: number) {
197-
return Math.floor(Math.random() * (max - min) + min);
198-
}
199-
200-
export async function getCurrentMaxArchiveID() {
201-
// Get the current max_match_id from postgres, subtract 200000000
202-
const max = (await db.raw('select max(match_id) from public_matches'))
203-
?.rows?.[0]?.max;
204-
const limit = max - 100000000;
205-
return limit;
206-
}
207-
208-
async function getTokenRange(size: number) {
209-
// Convert to signed 64-bit integer
210-
const signedBigInt = BigInt.asIntN(64, randomBigInt(8));
211-
// Get a page of matches (efffectively random, but guaranteed sequential read on one node)
212-
const result = await cassandra.execute(
213-
'select match_id, token(match_id) from match_blobs where token(match_id) >= ? limit ? ALLOW FILTERING;',
214-
[signedBigInt.toString(), size],
215-
{
216-
prepare: true,
217-
fetchSize: size,
218-
autoPage: true,
219-
},
220-
);
221-
return result.rows.map((row) => Number(row.match_id));
222-
}
223-
224-
export async function readArchivedPlayerMatches(
10+
export async function tryReadArchivedPlayerMatches(
22511
accountId: number,
22612
): Promise<ParsedPlayerMatch[]> {
13+
if (!playerArchive) {
14+
return [];
15+
}
22716
console.time('archive:' + accountId);
22817
const blob = await playerArchive.archiveGet(accountId.toString());
22918
const arr = blob ? JSON.parse(blob.toString()) : [];
@@ -240,7 +29,7 @@ export async function tryReadArchivedMatch(
24029
matchId: number,
24130
): Promise<ParsedMatch | null> {
24231
try {
243-
if (!config.ENABLE_MATCH_ARCHIVE) {
32+
if (!matchArchive) {
24433
return null;
24534
}
24635
// Check if the parsed data is archived

0 commit comments

Comments
 (0)