Skip to content

Commit 1432f59

Browse files
committed
Send to builder without share queue
1 parent f35b99c commit 1432f59

File tree

5 files changed

+133
-101
lines changed

5 files changed

+133
-101
lines changed

cmd/receiver-proxy/main.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,16 +212,19 @@ func runMain(cCtx *cli.Context) error {
212212
maxUserRPS := cCtx.Int(flagMaxUserRPS)
213213

214214
proxyConfig := &proxy.ReceiverProxyConfig{
215-
ReceiverProxyConstantConfig: proxy.ReceiverProxyConstantConfig{Log: log, FlashbotsSignerAddress: flashbotsSignerAddress},
216-
BuilderConfigHubEndpoint: builderConfigHubEndpoint,
217-
ArchiveEndpoint: archiveEndpoint,
218-
ArchiveConnections: connectionsPerPeer,
219-
LocalBuilderEndpoint: builderEndpoint,
220-
EthRPC: rpcEndpoint,
221-
MaxRequestBodySizeBytes: maxRequestBodySizeBytes,
222-
ConnectionsPerPeer: connectionsPerPeer,
223-
MaxUserRPS: maxUserRPS,
224-
ArchiveWorkerCount: archiveWorkerCount,
215+
ReceiverProxyConstantConfig: proxy.ReceiverProxyConstantConfig{
216+
Log: log,
217+
FlashbotsSignerAddress: flashbotsSignerAddress,
218+
LocalBuilderEndpoint: builderEndpoint,
219+
},
220+
BuilderConfigHubEndpoint: builderConfigHubEndpoint,
221+
ArchiveEndpoint: archiveEndpoint,
222+
ArchiveConnections: connectionsPerPeer,
223+
EthRPC: rpcEndpoint,
224+
MaxRequestBodySizeBytes: maxRequestBodySizeBytes,
225+
ConnectionsPerPeer: connectionsPerPeer,
226+
MaxUserRPS: maxUserRPS,
227+
ArchiveWorkerCount: archiveWorkerCount,
225228
}
226229

227230
instance, err := proxy.NewReceiverProxy(*proxyConfig)

proxy/receiver_api.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (prx *ReceiverProxy) UserJSONRPCHandler(maxRequestBodySizeBytes int64) (*rp
8080

8181
// readyHandler calls /readyz on rbuilder
8282
func (prx *ReceiverProxy) readyHandler(w http.ResponseWriter, r *http.Request) error {
83-
resp, err := http.Get(prx.localBuilderEndpoint + "/readyz")
83+
resp, err := http.Get(prx.LocalBuilderEndpoint + "/readyz")
8484
if err != nil {
8585
prx.Log.Warn("Failed to check builder readiness", slog.Any("error", err))
8686
http.Error(w, "not ready", http.StatusServiceUnavailable)
@@ -419,10 +419,13 @@ func (prx *ReceiverProxy) HandleParsedRequest(ctx context.Context, parsedRequest
419419
incRequestDurationStep(time.Since(startAt), parsedRequest.method, "", "serialize_parsed_request")
420420
startAt = time.Now()
421421

422-
select {
423-
case <-ctx.Done():
424-
prx.Log.Error("Shared queue is stalling")
425-
case prx.shareQueue <- &parsedRequest:
422+
// since we send to local builder while handling the request we can skip sharing request
423+
if !parsedRequest.systemEndpoint {
424+
select {
425+
case <-ctx.Done():
426+
prx.Log.Error("Shared queue is stalling")
427+
case prx.shareQueue <- &parsedRequest:
428+
}
426429
}
427430

428431
incRequestDurationStep(time.Since(startAt), parsedRequest.method, "", "share_queue")
@@ -437,5 +440,14 @@ func (prx *ReceiverProxy) HandleParsedRequest(ctx context.Context, parsedRequest
437440
}
438441

439442
incRequestDurationStep(time.Since(startAt), parsedRequest.method, "", "archive_queue")
443+
startAt = time.Now()
444+
// since we always send to local builder we do it here to avoid queue entirery
445+
446+
err = prx.localBuilderSender.SendRequest(&parsedRequest)
447+
if err != nil {
448+
prx.Log.Debug("Failed to send request to a local builder", slog.Any("error", err))
449+
}
450+
451+
incRequestDurationStep(time.Since(startAt), parsedRequest.method, "", "local_builder")
440452
return nil
441453
}

proxy/receiver_proxy.go

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@ type ReceiverProxy struct {
3939

4040
OrderflowSigner *signature.Signer
4141

42-
localBuilder rpcclient.RPCClient
43-
localBuilderEndpoint string
44-
4542
UserHandler http.Handler
4643
SystemHandler http.Handler
4744

@@ -61,13 +58,16 @@ type ReceiverProxy struct {
6158
peerUpdaterClose chan struct{}
6259

6360
userAPIRateLimiter *rate.Limiter
61+
62+
localBuilderSender LocalBuilderSender
6463
}
6564

6665
type ReceiverProxyConstantConfig struct {
6766
Log *slog.Logger
6867
// Name is optional field and it used to distringuish multiple proxies when running in the same process in tests
6968
Name string
7069
FlashbotsSignerAddress common.Address
70+
LocalBuilderEndpoint string
7171
}
7272

7373
type ReceiverProxyConfig struct {
@@ -76,7 +76,6 @@ type ReceiverProxyConfig struct {
7676
BuilderConfigHubEndpoint string
7777
ArchiveEndpoint string
7878
ArchiveConnections int
79-
LocalBuilderEndpoint string
8079

8180
// EthRPC should support eth_blockNumber API
8281
EthRPC string
@@ -94,26 +93,23 @@ func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) {
9493
return nil, err
9594
}
9695

97-
localCl := HTTPClientLocalhost(DefaultLocalhostMaxIdleConn)
98-
99-
localBuilder := rpcclient.NewClientWithOpts(config.LocalBuilderEndpoint, &rpcclient.RPCClientOpts{
100-
HTTPClient: localCl,
101-
})
102-
10396
limit := rate.Limit(config.MaxUserRPS)
10497
if config.MaxUserRPS == 0 {
10598
limit = rate.Inf
10699
}
107100
userAPIRateLimiter := rate.NewLimiter(limit, config.MaxUserRPS)
101+
localBuilderSender, err := NewLocalBuilderSender(config.Log, config.LocalBuilderEndpoint, config.ConnectionsPerPeer)
102+
if err != nil {
103+
return nil, err
104+
}
108105
prx := &ReceiverProxy{
109106
ReceiverProxyConstantConfig: config.ReceiverProxyConstantConfig,
110107
ConfigHub: NewBuilderConfigHub(config.Log, config.BuilderConfigHubEndpoint),
111108
OrderflowSigner: orderflowSigner,
112-
localBuilder: localBuilder,
113-
localBuilderEndpoint: config.LocalBuilderEndpoint,
114109
requestUniqueKeysRLU: expirable.NewLRU[uuid.UUID, struct{}](requestsRLUSize, nil, requestsRLUTTL),
115110
replacementNonceRLU: expirable.NewLRU[replacementNonceKey, int](replacementNonceSize, nil, replacementNonceTTL),
116111
userAPIRateLimiter: userAPIRateLimiter,
112+
localBuilderSender: localBuilderSender,
117113
}
118114
maxRequestBodySizeBytes := DefaultMaxRequestBodySizeBytes
119115
if config.MaxRequestBodySizeBytes != 0 {
@@ -138,13 +134,12 @@ func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) {
138134
prx.updatePeers = updatePeersCh
139135

140136
queue := ShareQueue{
141-
name: prx.Name,
142-
log: prx.Log,
143-
queue: shareQeueuCh,
144-
updatePeers: updatePeersCh,
145-
localBuilderEndpoint: config.LocalBuilderEndpoint,
146-
signer: prx.OrderflowSigner,
147-
workersPerPeer: config.ConnectionsPerPeer,
137+
name: prx.Name,
138+
log: prx.Log,
139+
queue: shareQeueuCh,
140+
updatePeers: updatePeersCh,
141+
signer: prx.OrderflowSigner,
142+
workersPerPeer: config.ConnectionsPerPeer,
148143
}
149144
go queue.Run()
150145

proxy/receiver_proxy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,11 @@ func createProxy(localBuilder, name, certPath, certKeyPath string) *ReceiverProx
252252
Log: log,
253253
Name: name,
254254
FlashbotsSignerAddress: flashbotsSigner.Address(),
255+
LocalBuilderEndpoint: localBuilder,
255256
},
256257

257258
BuilderConfigHubEndpoint: builderHub.URL,
258259
ArchiveEndpoint: archiveServer.URL,
259-
LocalBuilderEndpoint: localBuilder,
260260
EthRPC: "eth-rpc-not-set",
261261
MaxUserRPS: 10,
262262
})

proxy/sharing.go

Lines changed: 88 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"log/slog"
77
"net/http"
8-
"os"
98
"time"
109

1110
"github.com/flashbots/go-utils/jsonrpc"
@@ -19,20 +18,19 @@ var (
1918
ShareWorkerQueueSize = 10000
2019
requestTimeout = time.Second * 10
2120

22-
errUnknownRequestType = errors.New("unknwon request type for sharing")
21+
errUnknownRequestType = errors.New("unknown request type for sharing")
2322
)
2423

2524
const (
2625
bigRequestSize = 50_000
2726
)
2827

2928
type ShareQueue struct {
30-
name string
31-
log *slog.Logger
32-
queue chan *ParsedRequest
33-
updatePeers chan []ConfighubBuilder
34-
localBuilderEndpoint string
35-
signer *signature.Signer
29+
name string
30+
log *slog.Logger
31+
queue chan *ParsedRequest
32+
updatePeers chan []ConfighubBuilder
33+
signer *signature.Signer
3634
// if > 0 share queue will spawn multiple senders per peer
3735
workersPerPeer int
3836
}
@@ -73,23 +71,7 @@ func (sq *ShareQueue) Run() {
7371
if sq.workersPerPeer > 0 {
7472
workersPerPeer = sq.workersPerPeer
7573
}
76-
var (
77-
localBuilder *shareQueuePeer
78-
peers []shareQueuePeer
79-
)
80-
if len(sq.localBuilderEndpoint) > 0 {
81-
client, err := NewFastHTTPClient(nil, workersPerPeer)
82-
if err != nil {
83-
sq.log.Error("Failed to intitialize local builder enpoint")
84-
os.Exit(1)
85-
}
86-
builderPeer := newShareQueuePeer("local-builder", client, ConfighubBuilder{}, sq.localBuilderEndpoint)
87-
localBuilder = &builderPeer
88-
for worker := range workersPerPeer {
89-
go sq.proxyRequests(localBuilder, worker)
90-
}
91-
defer localBuilder.Close()
92-
}
74+
var peers []shareQueuePeer
9375
for {
9476
select {
9577
case req, more := <-sq.queue:
@@ -98,9 +80,6 @@ func (sq *ShareQueue) Run() {
9880
sq.log.Info("Share queue closing, queue channel closed")
9981
return
10082
}
101-
if localBuilder != nil {
102-
localBuilder.SendRequest(sq.log, req)
103-
}
10483
if !req.systemEndpoint {
10584
for _, peer := range peers {
10685
peer.SendRequest(sq.log, req)
@@ -166,6 +145,83 @@ func (sq *ShareQueue) Run() {
166145
}
167146
}
168147

148+
type LocalBuilderSender struct {
149+
logger *slog.Logger
150+
client *fasthttp.Client
151+
endpoint string
152+
}
153+
154+
func NewLocalBuilderSender(logger *slog.Logger, endpoint string, maxOpenConnections int) (LocalBuilderSender, error) {
155+
logger = logger.With(slog.String("peer", "local-builder"))
156+
157+
client, err := NewFastHTTPClient(nil, maxOpenConnections)
158+
if err != nil {
159+
return LocalBuilderSender{}, err
160+
}
161+
162+
return LocalBuilderSender{
163+
logger, client, endpoint,
164+
}, nil
165+
}
166+
167+
func (s *LocalBuilderSender) SendRequest(req *ParsedRequest) error {
168+
request := fasthttp.AcquireRequest()
169+
request.SetRequestURI(s.endpoint)
170+
request.Header.SetMethod(http.MethodPost)
171+
request.Header.SetContentTypeBytes([]byte("application/json"))
172+
defer fasthttp.ReleaseRequest(request)
173+
174+
return sendShareRequest(s.logger, req, request, s.client, "local-builder")
175+
}
176+
177+
func sendShareRequest(logger *slog.Logger, req *ParsedRequest, request *fasthttp.Request, client *fasthttp.Client, peerName string) error {
178+
if req.serializedJSONRPCRequest == nil {
179+
logger.Debug("Skip sharing request that is not serialized properly")
180+
return nil
181+
}
182+
183+
timeInQueue := time.Since(req.receivedAt)
184+
185+
request.Header.Set(signature.HTTPHeader, req.signatureHeader)
186+
request.SetBodyRaw(req.serializedJSONRPCRequest)
187+
188+
resp := fasthttp.AcquireResponse()
189+
start := time.Now()
190+
err := client.DoTimeout(request, resp, requestTimeout)
191+
requestDuration := time.Since(start)
192+
timeE2E := timeInQueue + requestDuration
193+
194+
// in background update metrics and handle response
195+
go func() {
196+
isBig := req.size >= bigRequestSize
197+
timeShareQueuePeerQueueDuration(peerName, timeInQueue, req.method, req.systemEndpoint, isBig)
198+
timeShareQueuePeerRPCDuration(peerName, requestDuration.Milliseconds(), isBig)
199+
timeShareQueuePeerE2EDuration(peerName, timeE2E, req.method, req.systemEndpoint, isBig)
200+
201+
logSendErrorLevel := slog.LevelDebug
202+
if peerName == "local-builder" {
203+
logSendErrorLevel = slog.LevelWarn
204+
}
205+
if err != nil {
206+
logger.Log(context.Background(), logSendErrorLevel, "Error while proxying request", slog.Any("error", err))
207+
incShareQueuePeerRPCErrors(peerName)
208+
} else {
209+
var parsedResp jsonrpc.JSONRPCResponse
210+
err = json.Unmarshal(resp.Body(), &parsedResp)
211+
if err != nil {
212+
logger.Log(context.Background(), logSendErrorLevel, "Error parsing response while proxying", slog.Any("error", err))
213+
incShareQueuePeerRPCErrors(peerName)
214+
} else if parsedResp.Error != nil {
215+
logger.Log(context.Background(), logSendErrorLevel, "Error returned from target while proxying", slog.Any("error", parsedResp.Error))
216+
incShareQueuePeerRPCErrors(peerName)
217+
}
218+
}
219+
fasthttp.ReleaseResponse(resp)
220+
}()
221+
222+
return nil
223+
}
224+
169225
func (sq *ShareQueue) proxyRequests(peer *shareQueuePeer, worker int) {
170226
proxiedRequestCount := 0
171227
logger := sq.log.With(slog.String("peer", peer.name), slog.String("name", sq.name), slog.Int("worker", worker))
@@ -191,44 +247,10 @@ func (sq *ShareQueue) proxyRequests(peer *shareQueuePeer, worker int) {
191247
continue
192248
}
193249

194-
timeInQueue := time.Since(req.receivedAt)
195-
196-
request.Header.Set(signature.HTTPHeader, req.signatureHeader)
197-
request.SetBodyRaw(req.serializedJSONRPCRequest)
198-
199-
resp := fasthttp.AcquireResponse()
200-
start := time.Now()
201-
err := peer.client.DoTimeout(request, resp, requestTimeout)
202-
requestDuration := time.Since(start)
203-
timeE2E := timeInQueue + requestDuration
204-
205-
// in background update metrics and handle response
206-
go func() {
207-
isBig := req.size >= bigRequestSize
208-
timeShareQueuePeerQueueDuration(peer.name, timeInQueue, req.method, req.systemEndpoint, isBig)
209-
timeShareQueuePeerRPCDuration(peer.name, requestDuration.Milliseconds(), isBig)
210-
timeShareQueuePeerE2EDuration(peer.name, timeE2E, req.method, req.systemEndpoint, isBig)
211-
212-
logSendErrorLevel := slog.LevelDebug
213-
if peer.name == "local-builder" {
214-
logSendErrorLevel = slog.LevelWarn
215-
}
216-
if err != nil {
217-
logger.Log(context.Background(), logSendErrorLevel, "Error while proxying request", slog.Any("error", err))
218-
incShareQueuePeerRPCErrors(peer.name)
219-
} else {
220-
var parsedResp jsonrpc.JSONRPCResponse
221-
err = json.Unmarshal(resp.Body(), &parsedResp)
222-
if err != nil {
223-
logger.Log(context.Background(), logSendErrorLevel, "Error parsing response while proxying", slog.Any("error", err))
224-
incShareQueuePeerRPCErrors(peer.name)
225-
} else if parsedResp.Error != nil {
226-
logger.Log(context.Background(), logSendErrorLevel, "Error returned from target while proxying", slog.Any("error", parsedResp.Error))
227-
incShareQueuePeerRPCErrors(peer.name)
228-
}
229-
}
230-
fasthttp.ReleaseResponse(resp)
231-
}()
250+
err := sendShareRequest(logger, req, request, peer.client, peer.name)
251+
if err != nil {
252+
logger.Debug("Failed to proxy a request", slog.Any("error", err))
253+
}
232254

233255
proxiedRequestCount += 1
234256
logger.Debug("Message proxied")

0 commit comments

Comments
 (0)