diff --git a/clickhouse.go b/clickhouse.go index f5399f8717..2dcd1081f8 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -147,6 +147,8 @@ func (ch *clickhouse) Exec(ctx context.Context, query string, args ...any) error if err != nil { return err } + conn.debugf("[acquired] connection [%d]", conn.id) + if err := conn.exec(ctx, query, args...); err != nil { ch.release(conn, err) return err @@ -345,6 +347,8 @@ func (ch *clickhouse) release(conn *connect, err error) { return } conn.released = true + conn.debugf("[released] connection [%d]", conn.id) + select { case <-ch.open: default: diff --git a/conn_batch.go b/conn_batch.go index 41090d24db..510a9b9850 100644 --- a/conn_batch.go +++ b/conn_batch.go @@ -320,6 +320,26 @@ func (b *batch) closeQuery() error { return nil } +// Close will end the current INSERT without sending the currently buffered rows, and release the connection. +// This may result in zero row inserts if no rows were appended. +// If a batch was already sent this does nothing. +// This should be called via defer after a batch is opened to prevent +// batches from falling out of scope and timing out. +func (b *batch) Close() error { + if b.sent || b.released { + return nil + } + + if err := b.closeQuery(); err != nil { + return err + } + b.sent = true + + b.release(nil) + + return nil +} + type batchColumn struct { err error batch driver.Batch diff --git a/conn_http_batch.go b/conn_http_batch.go index 90cd254e02..21f3bbf4be 100644 --- a/conn_http_batch.go +++ b/conn_http_batch.go @@ -109,6 +109,11 @@ func (b *httpBatch) Flush() error { return nil } +func (b *httpBatch) Close() error { + b.sent = true + return nil +} + func (b *httpBatch) Abort() error { defer func() { b.sent = true diff --git a/examples/clickhouse_api/batch.go b/examples/clickhouse_api/batch.go index 5ab6be8c24..41d5a8a7b1 100644 --- a/examples/clickhouse_api/batch.go +++ b/examples/clickhouse_api/batch.go @@ -53,6 +53,8 @@ func BatchInsert() error { if err != nil { return err } + defer batch.Close() + for i := 0; i < 1000; i++ { err := batch.Append( uint8(42), diff --git a/lib/driver/driver.go b/lib/driver/driver.go index a3a9c35ff6..354295f75e 100644 --- a/lib/driver/driver.go +++ b/lib/driver/driver.go @@ -87,6 +87,7 @@ type ( IsSent() bool Rows() int Columns() []column.Interface + Close() error } BatchColumn interface { Append(any) error diff --git a/tests/abort_test.go b/tests/abort_test.go index f47d7bcd06..5fdf977cc1 100644 --- a/tests/abort_test.go +++ b/tests/abort_test.go @@ -56,3 +56,25 @@ func TestAbort(t *testing.T) { } } } + +func TestBatchClose(t *testing.T) { + conn, err := GetNativeConnection(nil, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + require.NoError(t, err) + ctx := context.Background() + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO function null('x UInt64') VALUES (1)") + require.NoError(t, err) + require.NoError(t, batch.Close()) + require.NoError(t, batch.Close()) // No error on multiple calls + + batch, err = conn.PrepareBatch(ctx, "INSERT INTO function null('x UInt64') VALUES (1)") + require.NoError(t, err) + if assert.NoError(t, batch.Append(uint8(1))) && assert.NoError(t, batch.Send()) { + var col1 uint8 + if err := conn.QueryRow(ctx, "SELECT 1").Scan(&col1); assert.NoError(t, err) { + assert.Equal(t, uint8(1), col1) + } + } +} diff --git a/tests/batch_test.go b/tests/batch_test.go index f0c813832e..f2df01b554 100644 --- a/tests/batch_test.go +++ b/tests/batch_test.go @@ -19,6 +19,7 @@ package tests import ( "testing" + "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/stretchr/testify/require" @@ -51,3 +52,73 @@ func TestBatchContextCancellation(t *testing.T) { // assert if connection is properly released after context cancellation require.NoError(t, conn.Exec(context.Background(), "SELECT 1")) } + +func TestBatchCloseConnectionReleased(t *testing.T) { + te, err := GetTestEnvironment(testSet) + require.NoError(t, err) + opts := ClientOptionsFromEnv(te, clickhouse.Settings{}, false) + opts.MaxOpenConns = 1 + conn, err := GetConnectionWithOptions(&opts) + require.NoError(t, err) + + b, err := conn.PrepareBatch(context.Background(), "INSERT INTO function null('x UInt64')") + require.NoError(t, err) + for i := 0; i < 100; i++ { + require.NoError(t, b.Append(i)) + } + + err = b.Close() + require.NoError(t, err) + + // assert if connection is properly released after close called + require.NoError(t, conn.Exec(context.Background(), "SELECT 1")) +} + +func TestBatchSendConnectionReleased(t *testing.T) { + te, err := GetTestEnvironment(testSet) + require.NoError(t, err) + opts := ClientOptionsFromEnv(te, clickhouse.Settings{}, false) + opts.MaxOpenConns = 1 + conn, err := GetConnectionWithOptions(&opts) + require.NoError(t, err) + + b, err := conn.PrepareBatch(context.Background(), "INSERT INTO function null('x UInt64')") + require.NoError(t, err) + for i := 0; i < 100; i++ { + require.NoError(t, b.Append(i)) + } + + err = b.Send() + require.NoError(t, err) + + // Close should be deferred after the batch is opened + // Validate that it can be called after Send + err = b.Close() + require.NoError(t, err) + + // assert if connection is properly released after Send called + require.NoError(t, conn.Exec(context.Background(), "SELECT 1")) +} + +// This test validates that connections are blocked if a batch is not properly +// cleaned up. This isn't required behavior, but this test confirms it happens. +func TestBatchCloseConnectionHold(t *testing.T) { + te, err := GetTestEnvironment(testSet) + require.NoError(t, err) + opts := ClientOptionsFromEnv(te, clickhouse.Settings{}, false) + opts.MaxOpenConns = 1 + opts.DialTimeout = 2 * time.Second // Lower timeout for faster acquire error + conn, err := GetConnectionWithOptions(&opts) + require.NoError(t, err) + + b, err := conn.PrepareBatch(context.Background(), "INSERT INTO function null('x UInt64')") + require.NoError(t, err) + for i := 0; i < 100; i++ { + require.NoError(t, b.Append(i)) + } + + // batch.Close() should be called here + + // assert if connection is blocked if close is not called. + require.ErrorIs(t, conn.Exec(context.Background(), "SELECT 1"), clickhouse.ErrAcquireConnTimeout) +}