Skip to content

Commit 1d7e2cc

Browse files
committed
add simple message queue
1 parent 8d57f07 commit 1d7e2cc

File tree

7 files changed

+117
-13
lines changed

7 files changed

+117
-13
lines changed

packages/background/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
"ledger-bitcoin": "^0.2.3",
5757
"long": "^4.0.0",
5858
"miscreant": "0.3.2",
59+
"p-queue": "^6.6.2",
5960
"pbkdf2": "^3.1.2",
6061
"utility-types": "^3.10.0"
6162
},

packages/background/src/index.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,12 +293,16 @@ export function init(
293293
keyRingBitcoinService
294294
);
295295

296+
const txExecutableMQ =
297+
BackgroundTxExecutor.createMessageQueue<BackgroundTxExecutor.TxExecutableEvent>();
298+
296299
const recentSendHistoryService =
297300
new RecentSendHistory.RecentSendHistoryService(
298301
storeCreator("recent-send-history"),
299302
chainsService,
300303
backgroundTxService,
301-
notification
304+
notification,
305+
txExecutableMQ.publisher
302306
);
303307

304308
const settingsService = new Settings.SettingsService(
@@ -323,7 +327,8 @@ export function init(
323327
backgroundTxService,
324328
backgroundTxEthereumService,
325329
analyticsService,
326-
recentSendHistoryService
330+
recentSendHistoryService,
331+
txExecutableMQ.subscriber
327332
);
328333

329334
Interaction.init(router, interactionService);

packages/background/src/recent-send-history/service.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import {
3838
import { CoinPretty } from "@keplr-wallet/unit";
3939
import { simpleFetch } from "@keplr-wallet/simple-fetch";
4040
import { id } from "@ethersproject/hash";
41+
import { Publisher, TxExecutableEvent } from "../tx-executor/internal";
4142

4243
const SWAP_API_ENDPOINT = process.env["KEPLR_API_ENDPOINT"] ?? "";
4344

@@ -70,7 +71,8 @@ export class RecentSendHistoryService {
7071
protected readonly kvStore: KVStore,
7172
protected readonly chainsService: ChainsService,
7273
protected readonly txService: BackgroundTxService,
73-
protected readonly notification: Notification
74+
protected readonly notification: Notification,
75+
protected readonly publisher: Publisher<TxExecutableEvent> // TODO: publish tx executable event when 트래킹 인덱스가 증가되었을 때
7476
) {
7577
makeObservable(this);
7678
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from "./service";
22
export * from "./init";
3+
export * from "./message-queue";
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import PQueue from "p-queue";
2+
3+
class MessageQueueCore<T = unknown> {
4+
public subscriber: ((msg: T) => Promise<void> | void) | null = null;
5+
public buffer: T[] = [];
6+
public queue: PQueue;
7+
private isFlushing = false;
8+
9+
constructor(concurrency = 1) {
10+
this.queue = new PQueue({ concurrency });
11+
}
12+
13+
enqueue(message: T) {
14+
this.buffer.push(message);
15+
this.flush();
16+
}
17+
18+
/**
19+
* subscriber 설정
20+
*/
21+
setSubscriber(handler: (msg: T) => Promise<void> | void) {
22+
this.subscriber = handler;
23+
this.flush();
24+
}
25+
26+
private flush() {
27+
if (this.isFlushing) return;
28+
if (!this.subscriber) return;
29+
if (this.buffer.length === 0) return;
30+
31+
this.isFlushing = true;
32+
33+
try {
34+
console.log("[MessageQueueCore] flush start", this.buffer.length);
35+
36+
while (this.buffer.length > 0) {
37+
const msg = this.buffer.shift()!;
38+
this.queue.add(async () => {
39+
try {
40+
await this.subscriber!(msg);
41+
} catch (e) {
42+
console.error("[MessageQueueCore] handler error:", e);
43+
}
44+
});
45+
}
46+
} finally {
47+
this.isFlushing = false;
48+
}
49+
}
50+
}
51+
export class Publisher<T = unknown> {
52+
constructor(private core: MessageQueueCore<T>) {}
53+
54+
publish(message: T) {
55+
this.core.enqueue(message);
56+
}
57+
}
58+
59+
export class Subscriber<T = unknown> {
60+
constructor(private core: MessageQueueCore<T>) {}
61+
62+
subscribe(handler: (msg: T) => Promise<void> | void) {
63+
this.core.setSubscriber(handler);
64+
}
65+
}
66+
67+
export interface TxExecutableEvent {
68+
readonly executionId: string;
69+
readonly executableChainIds: string[];
70+
}

packages/background/src/tx-executor/service.ts

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import {
4646
calculateCosmosStdFee,
4747
} from "./utils/cosmos";
4848
import { fillUnsignedEVMTx } from "./utils/evm";
49+
import { Subscriber, TxExecutableEvent } from "./internal";
4950
export class BackgroundTxExecutorService {
5051
@observable
5152
protected recentTxExecutionSeq: number = 0;
@@ -61,7 +62,8 @@ export class BackgroundTxExecutorService {
6162
protected readonly backgroundTxService: BackgroundTxService,
6263
protected readonly backgroundTxEthereumService: BackgroundTxEthereumService,
6364
protected readonly analyticsService: AnalyticsService,
64-
protected readonly recentSendHistoryService: RecentSendHistoryService
65+
protected readonly recentSendHistoryService: RecentSendHistoryService,
66+
protected readonly subscriber: Subscriber<TxExecutableEvent>
6567
) {
6668
makeObservable(this);
6769
}
@@ -103,16 +105,31 @@ export class BackgroundTxExecutorService {
103105
);
104106
});
105107

106-
// TODO: 간단한 메시지 큐를 구현해서 recent send history service에서 multi tx를 처리할 조건이 만족되었을 때
107-
// 이 서비스로 메시지를 보내 트랜잭션을 자동으로 실행할 수 있도록 한다. 굳
108+
this.subscriber.subscribe(async ({ executionId, executableChainIds }) => {
109+
const execution = this.getTxExecution(executionId);
110+
if (!execution) {
111+
return;
112+
}
113+
114+
const newExecutableChainIds = executableChainIds.filter((chainId) =>
115+
execution.executableChainIds.includes(chainId)
116+
);
108117

109-
// CHECK: 현재 활성화되어 있는 vault에서만 실행할 수 있으면 좋을 듯, how? vaultId 변경 감지? how?
110-
// CHECK: 굳이 이걸 백그라운드에서 자동으로 실행할 필요가 있을까?
111-
// 불러왔는데 pending 상태거나 오래된 실행이면 사실상 이 작업을 이어가는 것이 의미가 있는지 의문이 든다.
112-
// for (const execution of this.getRecentDirectTxsExecutions()) {
118+
if (newExecutableChainIds.length === 0) {
119+
return;
120+
}
121+
122+
// update the executable chain ids
123+
for (const chainId of newExecutableChainIds) {
124+
execution.executableChainIds.push(chainId);
125+
}
113126

114-
// this.executeDirectTxs(execution.id);
115-
// }
127+
// cause new executable chain ids are available, resume the execution
128+
129+
// CHECK: 현재 활성화되어 있는 vault에서만 실행할 수 있으면 좋을 듯, how? vaultId 변경 감지? how?
130+
// 불러왔는데 pending 상태거나 오래된 실행이면 사실상 이 작업을 이어가는 것이 의미가 있는지 의문이 든다.
131+
await this.executeTxs(executionId);
132+
});
116133
}
117134

118135
/**
@@ -225,10 +242,17 @@ export class BackgroundTxExecutorService {
225242
throw new KeplrError("direct-tx-executor", 105, "Execution not found");
226243
}
227244

245+
if (execution.status === TxExecutionStatus.PROCESSING) {
246+
throw new KeplrError(
247+
"direct-tx-executor",
248+
108,
249+
"Execution is already processing"
250+
);
251+
}
252+
228253
// Only pending/processing/blocked executions can be executed
229254
const needResume =
230255
execution.status === TxExecutionStatus.PENDING ||
231-
execution.status === TxExecutionStatus.PROCESSING ||
232256
execution.status === TxExecutionStatus.BLOCKED;
233257
if (!needResume) {
234258
return execution.status;

yarn.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5857,6 +5857,7 @@ __metadata:
58575857
ledger-bitcoin: ^0.2.3
58585858
long: ^4.0.0
58595859
miscreant: 0.3.2
5860+
p-queue: ^6.6.2
58605861
pbkdf2: ^3.1.2
58615862
utility-types: ^3.10.0
58625863
peerDependencies:

0 commit comments

Comments
 (0)