Skip to content

Commit cac0b4d

Browse files
committed
benchmark e2e latency
1 parent 1762106 commit cac0b4d

File tree

4 files changed

+253
-34
lines changed

4 files changed

+253
-34
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ build: ## Build the HTTP server
2626
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/sender-proxy cmd/sender-proxy/main.go
2727
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/receiver-proxy cmd/receiver-proxy/main.go
2828
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/test-orderflow-sender cmd/test-tx-sender/main.go
29+
go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/test-e2e-latency cmd/test-e2e-latency/main.go
2930

3031
.PHONY: build-receiver-proxy
3132
build-receiver-proxy: ## Build only the receiver-proxy

cmd/test-e2e-latency/main.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"encoding/json"
7+
"errors"
8+
"fmt"
9+
"io"
10+
"log"
11+
"log/slog"
12+
"net/http"
13+
"os"
14+
"slices"
15+
"sync"
16+
"time"
17+
18+
"github.com/ethereum/go-ethereum/common/hexutil"
19+
"github.com/flashbots/go-utils/rpcclient"
20+
"github.com/flashbots/go-utils/signature"
21+
"github.com/urfave/cli/v2" // imports as package "cli"
22+
)
23+
24+
var flags []cli.Flag = []cli.Flag{
25+
// input and output
26+
&cli.StringFlag{
27+
Name: "local-orderflow-endpoint",
28+
Value: "http://127.0.0.1",
29+
Usage: "address to send orderflow to",
30+
EnvVars: []string{"LOCAL_ORDERPLOW_ENDPOINT"},
31+
},
32+
&cli.IntFlag{
33+
Name: "local-receiver-server-port",
34+
Value: 8646,
35+
Usage: "address to send orderflow to",
36+
EnvVars: []string{"LOCAL_RECEIVER_SERVER__PORT"},
37+
},
38+
&cli.IntFlag{
39+
Name: "num-senders",
40+
Value: 50,
41+
Usage: "Number of senders",
42+
EnvVars: []string{"NUM_SENDERS"},
43+
},
44+
&cli.IntFlag{
45+
Name: "num-requests",
46+
Value: 50000,
47+
Usage: "Number of requests",
48+
EnvVars: []string{"NUM_REQUESTS"},
49+
},
50+
}
51+
52+
// test tx
53+
54+
func main() {
55+
app := &cli.App{
56+
Name: "test-tx-sender",
57+
Usage: "send test transactions",
58+
Flags: flags,
59+
Action: func(cCtx *cli.Context) error {
60+
orderflowSigner, err := signature.NewRandomSigner()
61+
if err != nil {
62+
return err
63+
}
64+
slog.Info("Ordeflow signing address", "address", orderflowSigner.Address())
65+
66+
localOrderflowEndpoint := cCtx.String("local-orderflow-endpoint")
67+
client := rpcclient.NewClientWithOpts(localOrderflowEndpoint, &rpcclient.RPCClientOpts{
68+
Signer: orderflowSigner,
69+
})
70+
slog.Info("Created client")
71+
72+
receiverPort := cCtx.Int("local-receiver-server-port")
73+
74+
senders := cCtx.Int("num-senders")
75+
requests := cCtx.Int("num-requests")
76+
77+
return runE2ELatencyTest(client, receiverPort, senders, requests)
78+
79+
},
80+
}
81+
82+
if err := app.Run(os.Args); err != nil {
83+
log.Fatal(err)
84+
}
85+
}
86+
87+
type sharedState struct {
88+
sentAt map[uint64]time.Time
89+
receivedAt map[uint64]time.Time
90+
mu sync.Mutex
91+
}
92+
93+
func (s *sharedState) ServeHTTP(w http.ResponseWriter, r *http.Request) {
94+
receivedAt := time.Now()
95+
body, _ := io.ReadAll(r.Body)
96+
97+
// serve builderhub API
98+
if r.URL.Path == "/api/l1-builder/v1/register_credentials/orderflow_proxy" {
99+
w.WriteHeader(http.StatusOK)
100+
return
101+
} else if r.URL.Path == "/api/l1-builder/v1/builders" {
102+
res, err := json.Marshal([]int{})
103+
if err != nil {
104+
w.WriteHeader(http.StatusInternalServerError)
105+
return
106+
}
107+
w.WriteHeader(http.StatusOK)
108+
_, _ = w.Write(res)
109+
return
110+
}
111+
112+
resp, err := json.Marshal(struct{}{})
113+
if err != nil {
114+
w.WriteHeader(http.StatusInternalServerError)
115+
return
116+
}
117+
_, _ = w.Write(resp)
118+
119+
// forwarded request received
120+
type jsonRPCRequest struct {
121+
Params []hexutil.Bytes `json:"params"`
122+
}
123+
124+
var request jsonRPCRequest
125+
err = json.Unmarshal(body, &request)
126+
if err != nil {
127+
return
128+
}
129+
if len(request.Params) != 1 {
130+
return
131+
}
132+
133+
decoded := binary.BigEndian.Uint64(request.Params[0])
134+
135+
s.mu.Lock()
136+
s.receivedAt[decoded] = receivedAt
137+
s.mu.Unlock()
138+
139+
}
140+
141+
func (s *sharedState) RunSender(client rpcclient.RPCClient, start, count int, wg *sync.WaitGroup) {
142+
defer wg.Done()
143+
for i := start; i < start+count; i += 1 {
144+
b := make([]byte, 8)
145+
//nolint:gosec
146+
binary.BigEndian.PutUint64(b, uint64(i))
147+
request := hexutil.Bytes(b)
148+
149+
s.mu.Lock()
150+
//nolint:gosec
151+
s.sentAt[uint64(i)] = time.Now()
152+
s.mu.Unlock()
153+
// send eth_sendRawTransactions
154+
resp, err := client.Call(context.Background(), "eth_sendRawTransaction", request)
155+
if err != nil {
156+
slog.Error("RPC request failed", "error", err)
157+
continue
158+
}
159+
if resp.Error != nil {
160+
slog.Error("RPC returned error", "error", resp.Error)
161+
continue
162+
}
163+
}
164+
}
165+
166+
func runE2ELatencyTest(client rpcclient.RPCClient, receiverPort int, senders, requests int) error {
167+
state := &sharedState{
168+
sentAt: make(map[uint64]time.Time),
169+
receivedAt: make(map[uint64]time.Time),
170+
mu: sync.Mutex{},
171+
}
172+
173+
//nolint:gosec
174+
receiverServer := &http.Server{
175+
Addr: fmt.Sprintf("0.0.0.0:%d", receiverPort),
176+
Handler: state,
177+
}
178+
179+
go func() {
180+
if err := receiverServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
181+
slog.Error("Failed while listening to server", "error", err)
182+
os.Exit(1)
183+
}
184+
}()
185+
186+
countPerSender := requests / senders
187+
188+
slog.Info("Waiting for startup")
189+
time.Sleep(time.Second * 5)
190+
slog.Info("Sending started")
191+
192+
start := time.Now()
193+
offset := 0
194+
var wg sync.WaitGroup
195+
for range senders {
196+
wg.Add(1)
197+
go state.RunSender(client, offset, countPerSender, &wg)
198+
offset += countPerSender
199+
}
200+
wg.Wait()
201+
sendingTime := time.Since(start)
202+
203+
slog.Info("Waiting to finish")
204+
time.Sleep(time.Second * 1)
205+
206+
state.mu.Lock()
207+
defer state.mu.Unlock()
208+
209+
missedRequests := 0
210+
totalRequests := len(state.sentAt)
211+
rps := float64(totalRequests) / (float64(sendingTime.Milliseconds()) / 1000.0)
212+
213+
values := make([]float64, 0, totalRequests)
214+
for request, sentAt := range state.sentAt {
215+
receivedAt, ok := state.receivedAt[request]
216+
if !ok {
217+
missedRequests += 1
218+
continue
219+
}
220+
diff := float64(receivedAt.Sub(sentAt).Microseconds()) / 1000.0
221+
values = append(values, diff)
222+
}
223+
slices.Sort(values)
224+
p50 := values[(len(values)*50)/100]
225+
p99 := values[(len(values)*99)/100]
226+
p100 := values[len(values)-1]
227+
228+
missedPercentage := float64(missedRequests) / float64(totalRequests) * 100.0
229+
230+
slog.Info("Results", "p50", p50, "p99", p99, "p100", p100, "totalReq", totalRequests, "missedRequests", missedRequests, "missedPercent", missedPercentage, "averageRPS", rps)
231+
232+
return nil
233+
}

cmd/test-tx-sender/main.go

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@ package main
33
import (
44
"context"
55
"errors"
6-
"io"
76
"log"
87
"log/slog"
9-
"net/http"
108
"os"
119

1210
"github.com/ethereum/go-ethereum/common/hexutil"
11+
"github.com/flashbots/go-utils/rpcclient"
1312
"github.com/flashbots/go-utils/rpctypes"
1413
"github.com/flashbots/go-utils/signature"
1514
"github.com/flashbots/tdx-orderflow-proxy/proxy"
@@ -21,16 +20,10 @@ var flags []cli.Flag = []cli.Flag{
2120
// input and output
2221
&cli.StringFlag{
2322
Name: "local-orderflow-endpoint",
24-
Value: "https://127.0.0.1:443",
23+
Value: "http://127.0.0.1",
2524
Usage: "address to send orderflow to",
2625
EnvVars: []string{"LOCAL_ORDERPLOW_ENDPOINT"},
2726
},
28-
&cli.StringFlag{
29-
Name: "cert-endpoint",
30-
Value: "http://127.0.0.1:14727",
31-
Usage: "address that serves certifiate on /cert endpoint",
32-
EnvVars: []string{"CERT_ENDPOINT"},
33-
},
3427
&cli.StringFlag{
3528
Name: "signer-private-key",
3629
Value: "0x52da2727dd1180b547258c9ca7deb7f9576b2768f3f293b67f36505c85b2ddd0",
@@ -57,7 +50,6 @@ func main() {
5750
Flags: flags,
5851
Action: func(cCtx *cli.Context) error {
5952
localOrderflowEndpoint := cCtx.String("local-orderflow-endpoint")
60-
certEndpoint := cCtx.String("cert-endpoint")
6153
signerPrivateKey := cCtx.String("signer-private-key")
6254

6355
orderflowSigner, err := signature.NewSignerFromHexPrivateKey(signerPrivateKey)
@@ -66,16 +58,9 @@ func main() {
6658
}
6759
slog.Info("Ordeflow signing address", "address", orderflowSigner.Address())
6860

69-
cert, err := fetchCertificate(certEndpoint + "/cert")
70-
if err != nil {
71-
return err
72-
}
73-
slog.Info("Fetched certificate")
74-
75-
client, err := proxy.RPCClientWithCertAndSigner(localOrderflowEndpoint, cert, orderflowSigner, 1)
76-
if err != nil {
77-
return err
78-
}
61+
client := rpcclient.NewClientWithOpts(localOrderflowEndpoint, &rpcclient.RPCClientOpts{
62+
Signer: orderflowSigner,
63+
})
7964
slog.Info("Created client")
8065

8166
rpcEndpoint := cCtx.String("rpc-endpoint")
@@ -188,17 +173,3 @@ func main() {
188173
log.Fatal(err)
189174
}
190175
}
191-
192-
func fetchCertificate(endpoint string) ([]byte, error) {
193-
resp, err := http.Get(endpoint) //nolint:gosec
194-
if err != nil {
195-
return nil, err
196-
}
197-
defer resp.Body.Close()
198-
199-
body, err := io.ReadAll(resp.Body)
200-
if err != nil {
201-
return nil, err
202-
}
203-
return body, nil
204-
}

e2e-latenc-test.sh

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/bin/bash
2+
3+
make build
4+
5+
./build/receiver-proxy --pprof --user-listen-addr 0.0.0.0:9976 --builder-endpoint http://127.0.0.1:7890 --builder-confighub-endpoint http://127.0.0.1:7890 --orderflow-archive-endpoint http://127.0.0.1:7890 --connections-per-peer 100 > /tmp/log.txt 2>&1 &
6+
PROXY_PID=$!
7+
8+
./build/test-e2e-latency --local-orderflow-endpoint http://127.0.0.1:9976 --local-receiver-server-port 7890 --num-requests 100000 --num-senders 1000
9+
10+
# uncomment to send orderflow without proxy to see test setup overhead
11+
#./build/test-e2e-latency --local-orderflow-endpoint http://127.0.0.1:7890 --local-receiver-server-port 7890 --num-requests 100000 --num-senders 1000
12+
13+
#sleep 10
14+
kill $PROXY_PID

0 commit comments

Comments
 (0)