Skip to content

Commit 360d7a8

Browse files
authored
Send raft pending logs events on any subscribed channels. (#1157)
* Send raft pending logs events on any subscribed channels. Fixes bug introduced with recent upstream geth merges. * Add tests verifying raft sends events to correct feed and subscribers receive those events
1 parent 5131682 commit 360d7a8

File tree

7 files changed

+141
-19
lines changed

7 files changed

+141
-19
lines changed

core/events.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,6 @@ import (
2424
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
2525
type NewTxsEvent struct{ Txs []*types.Transaction }
2626

27-
// PendingLogsEvent is posted pre mining and notifies of pending logs.
28-
type PendingLogsEvent struct {
29-
Logs []*types.Log
30-
}
31-
3227
// PendingStateEvent is posted pre mining and notifies of pending state changes.
3328
type PendingStateEvent struct{}
3429

eth/api_backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func (b *EthAPIBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
249249
}
250250

251251
func (b *EthAPIBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
252-
return b.eth.miner.SubscribePendingLogs(ch)
252+
return b.eth.SubscribePendingLogs(ch) // Quorum
253253
}
254254

255255
func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {

eth/api_backend_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package eth
2+
3+
import (
4+
"testing"
5+
6+
"github.com/ethereum/go-ethereum/core/types"
7+
"github.com/ethereum/go-ethereum/node"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestEthAPIBackend_SubscribePendingLogsEvent(t *testing.T) {
12+
conf := &Config{
13+
RaftMode: false,
14+
}
15+
stack, err := node.New(&node.Config{})
16+
if err != nil {
17+
t.Fatalf("failed to create node, err = %v", err)
18+
}
19+
eth, err := New(stack, conf)
20+
if err != nil {
21+
t.Fatalf("failed to create eth service, err = %v", err)
22+
}
23+
24+
b := &EthAPIBackend{
25+
eth: eth,
26+
}
27+
28+
ch := make(chan []*types.Log, 1)
29+
30+
_ = b.SubscribePendingLogsEvent(ch)
31+
32+
recipientCount := eth.ConsensusServicePendingLogsFeed().Send([]*types.Log{})
33+
34+
require.Zero(t, recipientCount, "not using consensus service so its event feed should not have subscribers")
35+
require.Zero(t, len(ch), "not using consensus service so subscribed channel should not have received event")
36+
}
37+
38+
func TestEthAPIBackend_SubscribePendingLogsEvent_SubscribesToConsensusServiceFeed(t *testing.T) {
39+
conf := &Config{
40+
RaftMode: true,
41+
}
42+
stack, err := node.New(&node.Config{})
43+
if err != nil {
44+
t.Fatalf("failed to create node, err = %v", err)
45+
}
46+
eth, err := New(stack, conf)
47+
if err != nil {
48+
t.Fatalf("failed to create eth service, err = %v ", err)
49+
}
50+
51+
b := &EthAPIBackend{
52+
eth: eth,
53+
}
54+
55+
ch := make(chan []*types.Log, 1)
56+
57+
_ = b.SubscribePendingLogsEvent(ch)
58+
59+
recipientCount := eth.ConsensusServicePendingLogsFeed().Send([]*types.Log{})
60+
61+
require.NotZero(t, recipientCount, "consensus service in use so its event feed should have subscribers")
62+
require.Equal(t, 1, len(ch), "consensus service in use so subscribed channel should have received event")
63+
}

eth/backend.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ type Ethereum struct {
9494
// Quorum - Multitenancy
9595
// contractAuthzProvider is set after node starts instead in New()
9696
contractAuthzProvider multitenancy.ContractAuthorizationProvider
97+
98+
// Quorum - consensus as eth-service (e.g. raft)
99+
consensusServicePendingLogsFeed *event.Feed
97100
}
98101

99102
// Quorum
@@ -153,18 +156,19 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
153156
}
154157

155158
eth := &Ethereum{
156-
config: config,
157-
chainDb: chainDb,
158-
eventMux: stack.EventMux(),
159-
accountManager: stack.AccountManager(),
160-
engine: CreateConsensusEngine(stack, chainConfig, config, config.Miner.Notify, config.Miner.Noverify, chainDb),
161-
closeBloomHandler: make(chan struct{}),
162-
networkID: config.NetworkId,
163-
gasPrice: config.Miner.GasPrice,
164-
etherbase: config.Miner.Etherbase,
165-
bloomRequests: make(chan chan *bloombits.Retrieval),
166-
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
167-
p2pServer: stack.Server(),
159+
config: config,
160+
chainDb: chainDb,
161+
eventMux: stack.EventMux(),
162+
accountManager: stack.AccountManager(),
163+
engine: CreateConsensusEngine(stack, chainConfig, config, config.Miner.Notify, config.Miner.Noverify, chainDb),
164+
closeBloomHandler: make(chan struct{}),
165+
networkID: config.NetworkId,
166+
gasPrice: config.Miner.GasPrice,
167+
etherbase: config.Miner.Etherbase,
168+
bloomRequests: make(chan chan *bloombits.Retrieval),
169+
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
170+
p2pServer: stack.Server(),
171+
consensusServicePendingLogsFeed: new(event.Feed),
168172
}
169173

170174
// Quorum: Set protocol Name/Version
@@ -637,3 +641,18 @@ func (s *Ethereum) Stop() error {
637641
func (s *Ethereum) CalcGasLimit(block *types.Block) uint64 {
638642
return core.CalcGasLimit(block, s.config.Miner.GasFloor, s.config.Miner.GasCeil)
639643
}
644+
645+
// (Quorum)
646+
// ConsensusServicePendingLogsFeed returns an event.Feed. When the consensus protocol does not use eth.worker (e.g. raft), the event.Feed should be used to send logs from transactions included in the pending block
647+
func (s *Ethereum) ConsensusServicePendingLogsFeed() *event.Feed {
648+
return s.consensusServicePendingLogsFeed
649+
}
650+
651+
// (Quorum)
652+
// SubscribePendingLogs starts delivering logs from transactions included in the consensus engine's pending block to the given channel.
653+
func (s *Ethereum) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
654+
if s.config.RaftMode {
655+
return s.consensusServicePendingLogsFeed.Subscribe(ch)
656+
}
657+
return s.miner.SubscribePendingLogs(ch)
658+
}

raft/backend.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type RaftService struct {
3535
minter *minter
3636
nodeKey *ecdsa.PrivateKey
3737
calcGasLimitFunc func(block *types.Block) uint64
38+
39+
pendingLogsFeed *event.Feed
3840
}
3941

4042
func New(stack *node.Node, chainConfig *params.ChainConfig, raftId, raftPort uint16, joinExisting bool, blockTime time.Duration, e *eth.Ethereum, startPeers []*enode.Node, datadir string, useDns bool) (*RaftService, error) {
@@ -48,6 +50,7 @@ func New(stack *node.Node, chainConfig *params.ChainConfig, raftId, raftPort uin
4850
startPeers: startPeers,
4951
nodeKey: stack.GetNodeKey(),
5052
calcGasLimitFunc: e.CalcGasLimit,
53+
pendingLogsFeed: e.ConsensusServicePendingLogsFeed(),
5154
}
5255

5356
service.minter = newMinter(chainConfig, service, blockTime)

raft/backend_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package raft
2+
3+
import (
4+
"io/ioutil"
5+
"os"
6+
"testing"
7+
"time"
8+
9+
"github.com/ethereum/go-ethereum/eth"
10+
"github.com/ethereum/go-ethereum/node"
11+
"github.com/ethereum/go-ethereum/params"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func Test_New_RegistersEthServicePendingLogsFeed(t *testing.T) {
16+
conf := &eth.Config{
17+
RaftMode: true,
18+
}
19+
stack, err := node.New(&node.Config{})
20+
if err != nil {
21+
t.Fatalf("failed to create node, err = %v", err)
22+
}
23+
ethService, err := eth.New(stack, conf)
24+
if err != nil {
25+
t.Fatalf("failed to create eth service, err = %v", err)
26+
}
27+
28+
tmpWorkingDir, err := ioutil.TempDir("", "")
29+
if err != nil {
30+
t.Fatal(err)
31+
}
32+
defer func() {
33+
_ = os.RemoveAll(tmpWorkingDir)
34+
}()
35+
36+
raftService, err := New(stack, &params.ChainConfig{}, 0, 0, false, time.Second, ethService, nil, tmpWorkingDir, false)
37+
if err != nil {
38+
t.Fatalf("failed to create raft service, err = %v", err)
39+
}
40+
41+
require.Equal(t, ethService.ConsensusServicePendingLogsFeed(), raftService.pendingLogsFeed, "raft service has not been set up with Ethereum service's consensusServicePendingLogsFeed")
42+
}

raft/minter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func (minter *minter) firePendingBlockEvents(logs []*types.Log) {
301301
}
302302

303303
go func() {
304-
minter.mux.Post(core.PendingLogsEvent{Logs: copiedLogs})
304+
minter.eth.pendingLogsFeed.Send(copiedLogs)
305305
minter.mux.Post(core.PendingStateEvent{})
306306
}()
307307
}

0 commit comments

Comments
 (0)