Skip to content

Commit d748bd1

Browse files
committed
test out query stream
1 parent 41a1ac0 commit d748bd1

File tree

1 file changed

+45
-28
lines changed

1 file changed

+45
-28
lines changed

svc/rater.ts

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import { gcFetcher } from './fetcher/getGcData.ts';
22
import db from './store/db.ts';
33
import { average, eachLimitPromise, getRetrieverCapacity, isRadiant, redisCount } from './util/utility.ts';
4+
import QueryStream from 'pg-query-stream';
5+
import pg from 'pg';
6+
import config from '../config.ts';
47

58
const DEFAULT_RATING = 4000;
69
const kFactor = 32;
@@ -93,38 +96,52 @@ async function doRate() {
9396
}
9497
}
9598

99+
type DataRow = {
100+
match_seq_num: number; match_id: number; pgroup: PGroup;
101+
};
102+
const Client = pg.Client;
103+
104+
async function processRow(row: DataRow) {
105+
const { data } = await gcFetcher.getOrFetchData(row.match_id, {
106+
pgroup: row.pgroup,
107+
});
108+
if (data) {
109+
// If successful, update
110+
await db.raw(
111+
'UPDATE rating_queue SET gcdata = ? WHERE match_seq_num = ?',
112+
[JSON.stringify(data), row.match_seq_num],
113+
);
114+
}
115+
// else {
116+
// // Match can't be rated due to lack of data (community prediction?)
117+
// await db.raw(
118+
// 'DELETE FROM rating_queue WHERE match_seq_num = ?',
119+
// row.match_seq_num,
120+
// );
121+
// redisCount('rater_skip');
122+
// }
123+
}
124+
96125
async function prefetchGcData() {
97126
while (true) {
98-
const capacity = await getRetrieverCapacity();
99-
const { rows } = await db.raw<{
100-
rows: { match_seq_num: number; match_id: number; pgroup: PGroup }[];
101-
}>(
102-
'SELECT match_seq_num, match_id, pgroup from rating_queue WHERE gcdata IS NULL ORDER BY match_seq_num LIMIT ?',
103-
[100]
127+
const query = new QueryStream(
128+
`SELECT match_seq_num, match_id, pgroup from rating_queue WHERE gcdata IS NULL ORDER BY match_seq_num`
104129
);
105-
if (rows.length) {
106-
await eachLimitPromise(rows, async (row) => {
107-
const { data } = await gcFetcher.getOrFetchDataWithRetry(row.match_id, {
108-
pgroup: row.pgroup,
109-
}, 500);
110-
if (data) {
111-
// If successful, update
112-
await db.raw(
113-
'UPDATE rating_queue SET gcdata = ? WHERE match_seq_num = ?',
114-
[JSON.stringify(data), row.match_seq_num],
115-
);
116-
} else {
117-
// Match can't be rated due to lack of data (community prediction?)
118-
await db.raw(
119-
'DELETE FROM rating_queue WHERE match_seq_num = ?',
120-
row.match_seq_num,
121-
);
122-
redisCount('rater_skip');
123-
}
124-
}, capacity);
125-
} else {
130+
const pg = new Client(config.POSTGRES_URL);
131+
await pg.connect();
132+
const stream = pg.query(query);
133+
stream.on('readable', async () => {
134+
let row: DataRow;
135+
while ((row = stream.read())) {
136+
await processRow(row);
137+
const capacity = await getRetrieverCapacity();
138+
await new Promise((resolve) => setTimeout(resolve, 1000 / capacity));
139+
}
140+
});
141+
stream.on('end', async () => {
142+
await pg.end();
126143
await new Promise((resolve) => setTimeout(resolve, 1000));
127-
}
144+
});
128145
}
129146
}
130147

0 commit comments

Comments
 (0)