Skip to content

Commit 9d79baa

Browse files
fixup! Batcher runner implementation
Signed-off-by: Alexandros Filios <[email protected]>
1 parent 1eb8edc commit 9d79baa

File tree

3 files changed

+60
-22
lines changed

3 files changed

+60
-22
lines changed

pkg/runner/batch.go

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,32 +11,16 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14-
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
1514
"github.com/pkg/errors"
1615
)
1716

18-
var logger = flogging.MustGetLogger("batch-executor")
19-
20-
type BatchExecutor[I any, O any] interface {
21-
Execute(input I) (O, error)
22-
}
23-
24-
type BatchRunner[V any] interface {
25-
Run(v V) error
26-
}
27-
28-
type Output[O any] struct {
29-
Val O
30-
Err error
31-
}
32-
3317
type batcher[I any, O any] struct {
3418
idx uint32
3519
inputs []chan I
3620
outputs []chan O
3721
locks []sync.Mutex
3822
len uint32
39-
executor func([]I) []O
23+
executor ExecuteFunc[I, O]
4024
timeout time.Duration
4125
}
4226

@@ -120,7 +104,7 @@ type batchExecutor[I any, O any] struct {
120104
*batcher[I, Output[O]]
121105
}
122106

123-
func NewBatchExecutor[I any, O any](executor func([]I) []Output[O], capacity int, timeout time.Duration) BatchExecutor[I, O] {
107+
func NewBatchExecutor[I any, O any](executor ExecuteFunc[I, Output[O]], capacity int, timeout time.Duration) BatchExecutor[I, O] {
124108
return &batchExecutor[I, O]{batcher: newBatcher(executor, capacity, timeout)}
125109
}
126110

@@ -133,10 +117,6 @@ type batchRunner[V any] struct {
133117
*batcher[V, error]
134118
}
135119

136-
func NewSerialRunner[V any](runner func([]V) []error) BatchRunner[V] {
137-
return NewBatchRunner(runner, 1, 1*time.Hour)
138-
}
139-
140120
func NewBatchRunner[V any](runner func([]V) []error, capacity int, timeout time.Duration) BatchRunner[V] {
141121
return &batchRunner[V]{batcher: newBatcher(runner, capacity, timeout)}
142122
}

pkg/runner/runner.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package runner
8+
9+
import "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
10+
11+
var logger = flogging.MustGetLogger("batch-executor")
12+
13+
type BatchExecutor[I any, O any] interface {
14+
Execute(input I) (O, error)
15+
}
16+
17+
type BatchRunner[V any] interface {
18+
Run(v V) error
19+
}
20+
21+
type Output[O any] struct {
22+
Val O
23+
Err error
24+
}
25+
26+
type ExecuteFunc[I any, O any] func([]I) []O

pkg/runner/serial.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package runner
8+
9+
func NewSerialRunner[V any](runner ExecuteFunc[V, error]) BatchRunner[V] {
10+
return &serialRunner[V]{executor: runner}
11+
}
12+
13+
type serialRunner[V any] struct {
14+
executor ExecuteFunc[V, error]
15+
}
16+
17+
func (r *serialRunner[V]) Run(val V) error {
18+
return r.executor([]V{val})[0]
19+
}
20+
21+
func NewSerialExecutor[I any, O any](executor ExecuteFunc[I, Output[O]]) BatchExecutor[I, O] {
22+
return &serialExecutor[I, O]{executor: executor}
23+
}
24+
25+
type serialExecutor[I any, O any] struct {
26+
executor ExecuteFunc[I, Output[O]]
27+
}
28+
29+
func (r *serialExecutor[I, O]) Execute(input I) (O, error) {
30+
res := r.executor([]I{input})[0]
31+
return res.Val, res.Err
32+
}

0 commit comments

Comments
 (0)