Skip to content

Commit d16c293

Browse files
Allow parallel block processing
Signed-off-by: Alexandros Filios <[email protected]>
1 parent f1d9927 commit d16c293

File tree

1 file changed

+39
-19
lines changed

1 file changed

+39
-19
lines changed

platform/fabric/core/generic/finality/listenermanager.go

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ type ListenerManager[T TxInfo] interface {
4545
type TxInfoCallback[T TxInfo] func(T) error
4646

4747
type DeliveryListenerManagerConfig struct {
48-
MapperParallelism int
49-
ListenerTimeout time.Duration
50-
LRUSize int
51-
LRUBuffer int
48+
MapperParallelism int
49+
BlockProcessParallelism int
50+
ListenerTimeout time.Duration
51+
LRUSize int
52+
LRUBuffer int
5253
}
5354

5455
func NewListenerManager[T TxInfo](config DeliveryListenerManagerConfig, delivery *fabric.Delivery, tracer trace.Tracer, mapper TxInfoMapper[T]) (*listenerManager[T], error) {
@@ -71,12 +72,13 @@ func NewListenerManager[T TxInfo](config DeliveryListenerManagerConfig, delivery
7172
txInfos = cache.NewMapCache[driver2.TxID, T]()
7273
}
7374
flm := &listenerManager[T]{
74-
mapper: &parallelBlockMapper[T]{cap: max(config.MapperParallelism, 1), mapper: mapper},
75-
tracer: tracer,
76-
listeners: listeners,
77-
txInfos: txInfos,
78-
ignoreBlockErrors: true,
79-
delivery: delivery,
75+
mapper: &parallelBlockMapper[T]{cap: max(config.MapperParallelism, 1), mapper: mapper},
76+
tracer: tracer,
77+
listeners: listeners,
78+
txInfos: txInfos,
79+
ignoreBlockErrors: true,
80+
blockProcessingParallelism: config.BlockProcessParallelism,
81+
delivery: delivery,
8082
}
8183
logger.Infof("Starting delivery service...")
8284
go flm.start()
@@ -110,22 +112,40 @@ func fetchTxs[T TxInfo](evicted map[driver2.TxID][]ListenerEntry[T], mapper TxIn
110112

111113
func (m *listenerManager[T]) start() {
112114
// In case the delivery service fails, it will try to reconnect automatically.
113-
err := m.delivery.ScanBlock(context.Background(), func(ctx context.Context, block *common.Block) (bool, error) {
114-
err := m.onBlock(ctx, block)
115-
return !m.ignoreBlockErrors && err != nil, err
116-
})
115+
err := m.delivery.ScanBlock(context.Background(), m.newBlockCallback())
117116
logger.Errorf("failed running delivery: %v", err)
118117
}
119118

119+
func (m *listenerManager[T]) newBlockCallback() fabric.BlockCallback {
120+
if m.blockProcessingParallelism <= 1 {
121+
return func(ctx context.Context, block *common.Block) (bool, error) {
122+
err := m.onBlock(ctx, block)
123+
return !m.ignoreBlockErrors && err != nil, err
124+
}
125+
}
126+
eg := errgroup.Group{}
127+
eg.SetLimit(m.blockProcessingParallelism)
128+
return func(ctx context.Context, block *common.Block) (bool, error) {
129+
eg.Go(func() error {
130+
if err := m.onBlock(ctx, block); err != nil {
131+
logger.Warnf("Mapping block [%d] errored: %v", block.Header.Number, err)
132+
}
133+
return nil
134+
})
135+
return false, nil
136+
}
137+
}
138+
120139
type listenerManager[T TxInfo] struct {
121140
tracer trace.Tracer
122141
mapper *parallelBlockMapper[T]
123142

124-
mu sync.RWMutex
125-
listeners cache.Map[driver2.TxID, []ListenerEntry[T]]
126-
txInfos cache.Map[driver2.TxID, T]
127-
delivery *fabric.Delivery
128-
ignoreBlockErrors bool
143+
mu sync.RWMutex
144+
listeners cache.Map[driver2.TxID, []ListenerEntry[T]]
145+
txInfos cache.Map[driver2.TxID, T]
146+
delivery *fabric.Delivery
147+
blockProcessingParallelism int
148+
ignoreBlockErrors bool
129149
}
130150

131151
func (m *listenerManager[T]) onBlock(ctx context.Context, block *common.Block) error {

0 commit comments

Comments
 (0)