Skip to content

Commit ffb30d7

Browse files
committed
remove match_blobs
1 parent f1cc49c commit ffb30d7

File tree

8 files changed

+24
-155
lines changed

8 files changed

+24
-155
lines changed

fetcher/base.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ export abstract class MatchFetcher<T> {
33
// Read from the store without fetching
44
public abstract readData(
55
matchId: number,
6-
noBlobStore: boolean | undefined,
76
): Promise<T | null>;
87
// Read from the store, fetch it from remote and save if needed
98
public abstract getOrFetchData(

fetcher/getApiData.ts

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,13 @@ import { insertMatch } from '../util/insert';
1212
*/
1313
async function readApiData(
1414
matchId: number,
15-
noBlobStore: boolean | undefined,
1615
): Promise<ApiMatch | null> {
1716
let data = null;
18-
if (!noBlobStore) {
19-
const archive = await blobArchive.archiveGet(`${matchId}_api`);
20-
if (archive) {
21-
redisCount('blob_archive_read');
22-
}
23-
data = archive ? (JSON.parse(archive.toString()) as ApiMatch) : null;
24-
}
25-
if (!data) {
26-
const result = await cassandra.execute(
27-
'SELECT api FROM match_blobs WHERE match_id = ?',
28-
[matchId],
29-
{ prepare: true, fetchSize: 1, autoPage: true },
30-
);
31-
const row = result.rows[0];
32-
data = row?.api ? (JSON.parse(row.api) as ApiMatch) : null;
33-
if (data) {
34-
redisCount('api_cassandra_read');
35-
}
17+
const archive = await blobArchive.archiveGet(`${matchId}_api`);
18+
if (archive) {
19+
redisCount('blob_archive_read');
3620
}
21+
data = archive ? (JSON.parse(archive.toString()) as ApiMatch) : null;
3722
return data;
3823
}
3924

@@ -51,7 +36,7 @@ async function getOrFetchApiData(matchId: number): Promise<{
5136
return { data: null, error: '[APIDATA]: invalid match_id' };
5237
}
5338
// Check if we have apidata cached
54-
let saved = await readApiData(matchId, false);
39+
let saved = await readApiData(matchId);
5540
if (saved) {
5641
return { data: saved, error: null };
5742
}
@@ -77,7 +62,7 @@ async function getOrFetchApiData(matchId: number): Promise<{
7762
await insertMatch(match, {
7863
type: 'api',
7964
});
80-
saved = await readApiData(matchId, false);
65+
saved = await readApiData(matchId);
8166
if (saved) {
8267
return { data: saved, error: null };
8368
}

fetcher/getGcData.ts

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import moment from 'moment';
22
import config from '../config';
33
import db from '../store/db';
44
import redis from '../store/redis';
5-
import cassandra from '../store/cassandra';
65
import { getRandomRetrieverUrl, redisCount } from '../util/utility';
76
import axios from 'axios';
87
import retrieverMatch from '../test/data/retriever_match.json';
@@ -17,28 +16,13 @@ import { MatchFetcher } from './base';
1716
*/
1817
async function readGcData(
1918
matchId: number,
20-
noBlobStore: boolean | undefined,
2119
): Promise<GcMatch | null> {
2220
let data = null;
23-
if (!noBlobStore) {
24-
const archive = await blobArchive.archiveGet(`${matchId}_gcdata`);
25-
if (archive) {
26-
redisCount('blob_archive_read');
27-
}
28-
data = archive ? (JSON.parse(archive.toString()) as GcMatch) : null;
29-
}
30-
if (!data) {
31-
const result = await cassandra.execute(
32-
'SELECT gcdata FROM match_blobs WHERE match_id = ?',
33-
[matchId],
34-
{ prepare: true, fetchSize: 1, autoPage: true },
35-
);
36-
const row = result.rows[0];
37-
data = row?.gcdata ? (JSON.parse(row.gcdata) as GcMatch) : null;
38-
if (data) {
39-
redisCount('gcdata_cassandra_read');
40-
}
21+
const archive = await blobArchive.archiveGet(`${matchId}_gcdata`);
22+
if (archive) {
23+
redisCount('blob_archive_read');
4124
}
25+
data = archive ? (JSON.parse(archive.toString()) as GcMatch) : null;
4226
if (
4327
data?.match_id == null ||
4428
data?.cluster == null ||
@@ -164,7 +148,7 @@ async function tryFetchGcData(
164148
): Promise<GcMatch | null> {
165149
try {
166150
await saveGcData(matchId, { pgroup });
167-
return readGcData(matchId, false);
151+
return readGcData(matchId);
168152
} catch (e) {
169153
console.log(e);
170154
return null;
@@ -189,7 +173,7 @@ async function getOrFetchGcData(
189173
throw new Error('invalid match_id');
190174
}
191175
// Check if we have gcdata cached
192-
const saved = await readGcData(matchId, false);
176+
const saved = await readGcData(matchId);
193177
if (saved) {
194178
redisCount('regcdata');
195179
if (config.DISABLE_REGCDATA) {
@@ -202,7 +186,7 @@ async function getOrFetchGcData(
202186
if (error) {
203187
return { data: null, error };
204188
}
205-
const result = await readGcData(matchId, false);
189+
const result = await readGcData(matchId);
206190
return { data: result, error: null };
207191
}
208192

fetcher/getParsedData.ts

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import config from '../config';
22
import { getRandomParserUrl, redisCount } from '../util/utility';
33
import { blobArchive } from '../store/archive';
4-
import cassandra from '../store/cassandra';
54
import db from '../store/db';
65
import { insertMatch } from '../util/insert';
76
import axios from 'axios';
@@ -14,28 +13,13 @@ import { MatchFetcher } from './base';
1413
*/
1514
async function readParsedData(
1615
matchId: number,
17-
noBlobStore: boolean | undefined,
1816
): Promise<ParserMatch | null> {
1917
let data = null;
20-
if (!noBlobStore) {
21-
const archive = await blobArchive.archiveGet(`${matchId}_parsed`);
22-
if (archive) {
23-
redisCount('blob_archive_read');
24-
}
25-
data = archive ? (JSON.parse(archive.toString()) as ParserMatch) : null;
26-
}
27-
if (!data) {
28-
const result = await cassandra.execute(
29-
'SELECT parsed FROM match_blobs WHERE match_id = ?',
30-
[matchId],
31-
{ prepare: true, fetchSize: 1, autoPage: true },
32-
);
33-
const row = result.rows[0];
34-
data = row?.parsed ? (JSON.parse(row.parsed) as ParserMatch) : null;
35-
if (data) {
36-
redisCount('parsed_cassandra_read');
37-
}
18+
const archive = await blobArchive.archiveGet(`${matchId}_parsed`);
19+
if (archive) {
20+
redisCount('blob_archive_read');
3821
}
22+
data = archive ? (JSON.parse(archive.toString()) as ParserMatch) : null;
3923
return data;
4024
}
4125

@@ -95,7 +79,7 @@ async function getOrFetchParseData(
9579
skipped: boolean;
9680
error: string | null;
9781
}> {
98-
const saved = await readParsedData(matchId, false);
82+
const saved = await readParsedData(matchId);
9983
if (saved) {
10084
redisCount('reparse');
10185
if (config.DISABLE_REPARSE) {

sql/create_tables.cql

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,3 @@ CREATE TABLE IF NOT EXISTS player_caches (
6868
hero_variant text,
6969
) WITH CLUSTERING ORDER BY (match_id DESC);
7070
ALTER TABLE player_caches WITH compaction = { 'class' : 'UnifiedCompactionStrategy' };
71-
72-
CREATE TABLE IF NOT EXISTS match_blobs (
73-
PRIMARY KEY (match_id),
74-
match_id bigint,
75-
api text,
76-
gcdata text,
77-
parsed text,
78-
identity text,
79-
ranks text,
80-
);

svc/archiver.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
// Cleans up old data from Cassandra and optionally archives it
2-
import { archiveToken } from '../util/archiveUtil';
32

43
async function start() {
54
while (true) {
65
try {
7-
await archiveToken();
6+
// await archiveToken();
87
await new Promise((resolve) => setTimeout(resolve, 1000));
98
} catch (e) {
109
console.error(e);

util/archiveUtil.ts

Lines changed: 1 addition & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
import config from '../config';
2-
import { blobArchive, matchArchive, playerArchive } from '../store/archive';
3-
import cassandra from '../store/cassandra';
2+
import { matchArchive, playerArchive } from '../store/archive';
43
import QueryStream from 'pg-query-stream';
54
import { Client } from 'pg';
6-
import crypto from 'crypto';
75
import db from '../store/db';
8-
import { apiFetcher } from '../fetcher/getApiData';
9-
import { gcFetcher } from '../fetcher/getGcData';
10-
import { parsedFetcher } from '../fetcher/getParsedData';
116
import type { PutObjectCommandOutput } from '@aws-sdk/client-s3';
127
import { getFullPlayerMatchesWithMetadata } from './buildPlayer';
138
import { getMatchDataFromBlobWithMetadata } from './buildMatch';
@@ -25,48 +20,6 @@ async function processMatch(matchId: number) {
2520
// }
2621
// await doArchiveMatchFromBlobs(matchId);
2722
// }
28-
await doMigrateMatchToBlobStore(matchId);
29-
}
30-
31-
/**
32-
* Moves individual match blobs from Cassandra to blob store
33-
* @param matchId
34-
* @returns
35-
*/
36-
async function doMigrateMatchToBlobStore(matchId: number) {
37-
const api = await apiFetcher.readData(matchId, true);
38-
const gcdata = await gcFetcher.readData(matchId, true);
39-
const parsed = await parsedFetcher.readData(matchId, true);
40-
if (api) {
41-
// If the match is old we might not be able to get back ability builds, HD/TD/HH from Steam so we want to keep the API data
42-
await blobArchive.archivePut(
43-
matchId.toString() + '_api',
44-
Buffer.from(JSON.stringify(api)),
45-
);
46-
}
47-
if (gcdata) {
48-
await blobArchive.archivePut(
49-
matchId.toString() + '_gcdata',
50-
Buffer.from(JSON.stringify(gcdata)),
51-
);
52-
}
53-
if (parsed) {
54-
await blobArchive.archivePut(
55-
matchId.toString() + '_parsed',
56-
Buffer.from(JSON.stringify(parsed)),
57-
);
58-
}
59-
await deleteMatch(matchId);
60-
}
61-
62-
async function deleteMatch(matchId: number) {
63-
await cassandra.execute(
64-
'DELETE from match_blobs WHERE match_id = ?',
65-
[matchId],
66-
{
67-
prepare: true,
68-
},
69-
);
7023
}
7124

7225
export async function archivePostgresStream() {
@@ -123,16 +76,6 @@ async function archiveRandom(max: number) {
12376
await Promise.allSettled(page.map((i) => processMatch(i)));
12477
}
12578

126-
export async function archiveToken(max?: number) {
127-
// Archive random matches from Cassandra using token range (not all may be parsed)
128-
let page = await getTokenRange(10);
129-
if (max) {
130-
page = page.filter((id) => id < max);
131-
}
132-
console.log(page[0]);
133-
await Promise.allSettled(page.map((i) => processMatch(i)));
134-
}
135-
13679
function randomInt(min: number, max: number) {
13780
return Math.floor(Math.random() * (max - min) + min);
13881
}
@@ -144,19 +87,6 @@ export async function getCurrentMaxArchiveID() {
14487
return limit;
14588
}
14689

147-
async function getTokenRange(size: number) {
148-
const result = await cassandra.execute(
149-
'select match_id, token(match_id) from match_blobs limit ? ALLOW FILTERING;',
150-
[size],
151-
{
152-
prepare: true,
153-
fetchSize: size,
154-
autoPage: true,
155-
},
156-
);
157-
return result.rows.map((row) => Number(row.match_id));
158-
}
159-
16090
async function doArchivePlayerMatches(
16191
accountId: number,
16292
): Promise<PutObjectCommandOutput | null> {
@@ -200,8 +130,6 @@ export async function doArchiveMatchFromBlobs(matchId: number) {
200130
// Don't read from archive when determining whether to archive
201131
const [match, metadata] = await getMatchDataFromBlobWithMetadata(matchId, {
202132
noArchive: true,
203-
// TODO Remove noBlobStore once migrated
204-
noBlobStore: true,
205133
});
206134
if (match && metadata?.has_parsed) {
207135
// check data completeness with isDataComplete

util/buildMatch.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,17 @@ export async function addPlayerBenchmarks(m: Match) {
142142

143143
export async function getMatchDataFromBlobWithMetadata(
144144
matchId: number,
145-
options?: { noArchive: boolean; noBlobStore: boolean },
145+
options?: { noArchive: boolean },
146146
): Promise<[Match | ParsedMatch | null, GetMatchDataMetadata | null]> {
147147
let [api, gcdata, parsed, archived]: [
148148
ApiMatch | null,
149149
GcMatch | null,
150150
ParserMatch | null,
151151
ParsedMatch | null,
152152
] = await Promise.all([
153-
apiFetcher.readData(matchId, options?.noBlobStore),
154-
gcFetcher.readData(matchId, options?.noBlobStore),
155-
parsedFetcher.readData(matchId, options?.noBlobStore),
153+
apiFetcher.readData(matchId),
154+
gcFetcher.readData(matchId),
155+
parsedFetcher.readData(matchId),
156156
!options?.noArchive
157157
? archivedFetcher.readData(matchId)
158158
: Promise.resolve(null),

0 commit comments

Comments
 (0)