Skip to content

Commit 06d64b3

Browse files
Merge pull request #1566 from ClickHouse/fix_cancel_query
Add `Close` function to batch interface
2 parents 71e3975 + a0e6569 commit 06d64b3

File tree

7 files changed

+125
-0
lines changed

7 files changed

+125
-0
lines changed

clickhouse.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ func (ch *clickhouse) Exec(ctx context.Context, query string, args ...any) error
147147
if err != nil {
148148
return err
149149
}
150+
conn.debugf("[acquired] connection [%d]", conn.id)
151+
150152
if err := conn.exec(ctx, query, args...); err != nil {
151153
ch.release(conn, err)
152154
return err
@@ -345,6 +347,8 @@ func (ch *clickhouse) release(conn *connect, err error) {
345347
return
346348
}
347349
conn.released = true
350+
conn.debugf("[released] connection [%d]", conn.id)
351+
348352
select {
349353
case <-ch.open:
350354
default:

conn_batch.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,26 @@ func (b *batch) closeQuery() error {
320320
return nil
321321
}
322322

323+
// Close will end the current INSERT without sending the currently buffered rows, and release the connection.
324+
// This may result in zero row inserts if no rows were appended.
325+
// If a batch was already sent this does nothing.
326+
// This should be called via defer after a batch is opened to prevent
327+
// batches from falling out of scope and timing out.
328+
func (b *batch) Close() error {
329+
if b.sent || b.released {
330+
return nil
331+
}
332+
333+
if err := b.closeQuery(); err != nil {
334+
return err
335+
}
336+
b.sent = true
337+
338+
b.release(nil)
339+
340+
return nil
341+
}
342+
323343
type batchColumn struct {
324344
err error
325345
batch driver.Batch

conn_http_batch.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ func (b *httpBatch) Flush() error {
109109
return nil
110110
}
111111

112+
func (b *httpBatch) Close() error {
113+
b.sent = true
114+
return nil
115+
}
116+
112117
func (b *httpBatch) Abort() error {
113118
defer func() {
114119
b.sent = true

examples/clickhouse_api/batch.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ func BatchInsert() error {
5353
if err != nil {
5454
return err
5555
}
56+
defer batch.Close()
57+
5658
for i := 0; i < 1000; i++ {
5759
err := batch.Append(
5860
uint8(42),

lib/driver/driver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type (
8787
IsSent() bool
8888
Rows() int
8989
Columns() []column.Interface
90+
Close() error
9091
}
9192
BatchColumn interface {
9293
Append(any) error

tests/abort_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,25 @@ func TestAbort(t *testing.T) {
5656
}
5757
}
5858
}
59+
60+
func TestBatchClose(t *testing.T) {
61+
conn, err := GetNativeConnection(nil, nil, &clickhouse.Compression{
62+
Method: clickhouse.CompressionLZ4,
63+
})
64+
require.NoError(t, err)
65+
ctx := context.Background()
66+
67+
batch, err := conn.PrepareBatch(ctx, "INSERT INTO function null('x UInt64') VALUES (1)")
68+
require.NoError(t, err)
69+
require.NoError(t, batch.Close())
70+
require.NoError(t, batch.Close()) // No error on multiple calls
71+
72+
batch, err = conn.PrepareBatch(ctx, "INSERT INTO function null('x UInt64') VALUES (1)")
73+
require.NoError(t, err)
74+
if assert.NoError(t, batch.Append(uint8(1))) && assert.NoError(t, batch.Send()) {
75+
var col1 uint8
76+
if err := conn.QueryRow(ctx, "SELECT 1").Scan(&col1); assert.NoError(t, err) {
77+
assert.Equal(t, uint8(1), col1)
78+
}
79+
}
80+
}

tests/batch_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package tests
1919

2020
import (
2121
"testing"
22+
"time"
2223

2324
"github.com/ClickHouse/clickhouse-go/v2"
2425
"github.com/stretchr/testify/require"
@@ -51,3 +52,73 @@ func TestBatchContextCancellation(t *testing.T) {
5152
// assert if connection is properly released after context cancellation
5253
require.NoError(t, conn.Exec(context.Background(), "SELECT 1"))
5354
}
55+
56+
func TestBatchCloseConnectionReleased(t *testing.T) {
57+
te, err := GetTestEnvironment(testSet)
58+
require.NoError(t, err)
59+
opts := ClientOptionsFromEnv(te, clickhouse.Settings{}, false)
60+
opts.MaxOpenConns = 1
61+
conn, err := GetConnectionWithOptions(&opts)
62+
require.NoError(t, err)
63+
64+
b, err := conn.PrepareBatch(context.Background(), "INSERT INTO function null('x UInt64')")
65+
require.NoError(t, err)
66+
for i := 0; i < 100; i++ {
67+
require.NoError(t, b.Append(i))
68+
}
69+
70+
err = b.Close()
71+
require.NoError(t, err)
72+
73+
// assert if connection is properly released after close called
74+
require.NoError(t, conn.Exec(context.Background(), "SELECT 1"))
75+
}
76+
77+
func TestBatchSendConnectionReleased(t *testing.T) {
78+
te, err := GetTestEnvironment(testSet)
79+
require.NoError(t, err)
80+
opts := ClientOptionsFromEnv(te, clickhouse.Settings{}, false)
81+
opts.MaxOpenConns = 1
82+
conn, err := GetConnectionWithOptions(&opts)
83+
require.NoError(t, err)
84+
85+
b, err := conn.PrepareBatch(context.Background(), "INSERT INTO function null('x UInt64')")
86+
require.NoError(t, err)
87+
for i := 0; i < 100; i++ {
88+
require.NoError(t, b.Append(i))
89+
}
90+
91+
err = b.Send()
92+
require.NoError(t, err)
93+
94+
// Close should be deferred after the batch is opened
95+
// Validate that it can be called after Send
96+
err = b.Close()
97+
require.NoError(t, err)
98+
99+
// assert if connection is properly released after Send called
100+
require.NoError(t, conn.Exec(context.Background(), "SELECT 1"))
101+
}
102+
103+
// This test validates that connections are blocked if a batch is not properly
104+
// cleaned up. This isn't required behavior, but this test confirms it happens.
105+
func TestBatchCloseConnectionHold(t *testing.T) {
106+
te, err := GetTestEnvironment(testSet)
107+
require.NoError(t, err)
108+
opts := ClientOptionsFromEnv(te, clickhouse.Settings{}, false)
109+
opts.MaxOpenConns = 1
110+
opts.DialTimeout = 2 * time.Second // Lower timeout for faster acquire error
111+
conn, err := GetConnectionWithOptions(&opts)
112+
require.NoError(t, err)
113+
114+
b, err := conn.PrepareBatch(context.Background(), "INSERT INTO function null('x UInt64')")
115+
require.NoError(t, err)
116+
for i := 0; i < 100; i++ {
117+
require.NoError(t, b.Append(i))
118+
}
119+
120+
// batch.Close() should be called here
121+
122+
// assert if connection is blocked if close is not called.
123+
require.ErrorIs(t, conn.Exec(context.Background(), "SELECT 1"), clickhouse.ErrAcquireConnTimeout)
124+
}

0 commit comments

Comments
 (0)