Skip to content

Commit 966be5d

Browse files
committed
Batch: add abort method #469
1 parent 137f8b6 commit 966be5d

File tree

3 files changed

+67
-0
lines changed

3 files changed

+67
-0
lines changed

conn_batch.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package clickhouse
2020
import (
2121
"context"
2222
"fmt"
23+
"os"
2324
"regexp"
2425
"strings"
2526
"time"
@@ -74,6 +75,17 @@ type batch struct {
7475
onProcess *onProcess
7576
}
7677

78+
func (b *batch) Abort() error {
79+
defer func() {
80+
b.sent = true
81+
b.release(os.ErrProcessDone)
82+
}()
83+
if b.sent {
84+
return ErrBatchAlreadySent
85+
}
86+
return nil
87+
}
88+
7789
func (b *batch) Append(v ...interface{}) error {
7890
if b.sent {
7991
return ErrBatchAlreadySent

lib/driver/driver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type (
6969
Err() error
7070
}
7171
Batch interface {
72+
Abort() error
7273
Append(v ...interface{}) error
7374
AppendStruct(v interface{}) error
7475
Column(int) BatchColumn

tests/abort_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package tests
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/ClickHouse/clickhouse-go/v2"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestAbort(t *testing.T) {
12+
var (
13+
ctx = context.Background()
14+
conn, err = clickhouse.Open(&clickhouse.Options{
15+
Addr: []string{"127.0.0.1:9000"},
16+
Auth: clickhouse.Auth{
17+
Database: "default",
18+
Username: "default",
19+
Password: "",
20+
},
21+
Compression: &clickhouse.Compression{
22+
Method: clickhouse.CompressionLZ4,
23+
},
24+
MaxOpenConns: 1,
25+
})
26+
)
27+
if assert.NoError(t, err) {
28+
const ddl = `
29+
CREATE TABLE test_abort (
30+
Col1 UInt8
31+
) Engine Memory
32+
`
33+
defer func() {
34+
conn.Exec(ctx, "DROP TABLE test_abort")
35+
}()
36+
if err := conn.Exec(ctx, ddl); assert.NoError(t, err) {
37+
if batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_abort"); assert.NoError(t, err) {
38+
if assert.NoError(t, batch.Abort()) {
39+
if err := batch.Abort(); assert.Error(t, err) {
40+
assert.Equal(t, clickhouse.ErrBatchAlreadySent, err)
41+
}
42+
}
43+
}
44+
if batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_abort"); assert.NoError(t, err) {
45+
if assert.NoError(t, batch.Append(uint8(1))) && assert.NoError(t, batch.Send()) {
46+
var col1 uint8
47+
if err := conn.QueryRow(ctx, "SELECT * FROM test_abort").Scan(&col1); assert.NoError(t, err) {
48+
assert.Equal(t, uint8(1), col1)
49+
}
50+
}
51+
}
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)