From d66e71e2dec9637cc015d2769185e8716d5cae68 Mon Sep 17 00:00:00 2001 From: Stepan Rabotkin Date: Mon, 7 Aug 2023 14:31:39 +0300 Subject: [PATCH 1/4] Implement release connetion --- clickhouse.go | 3 +- conn_batch.go | 77 ++++++++++++++++++-------- conn_http_batch.go | 5 ++ lib/driver/driver.go | 1 + tests/batch_release_connection_test.go | 76 +++++++++++++++++++++++++ tests/utils.go | 11 ++++ 6 files changed, 148 insertions(+), 25 deletions(-) create mode 100644 tests/batch_release_connection_test.go diff --git a/clickhouse.go b/clickhouse.go index 8b18d2ddad..679e959b77 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -44,12 +44,11 @@ type ( var ( ErrBatchInvalid = errors.New("clickhouse: batch is invalid. check appended data is correct") ErrBatchAlreadySent = errors.New("clickhouse: batch has already been sent") - ErrBatchNotSent = errors.New("clickhouse: invalid retry, batch not sent yet") + ErrBatchUnexpectedRelease = errors.New("clickhouse: unexpected connection release") ErrAcquireConnTimeout = errors.New("clickhouse: acquire conn timeout. you can increase the number of max open conn or the dial timeout") ErrUnsupportedServerRevision = errors.New("clickhouse: unsupported server revision") ErrBindMixedParamsFormats = errors.New("clickhouse [bind]: mixed named, numeric or positional parameters") ErrAcquireConnNoAddress = errors.New("clickhouse: no valid address supplied") - ErrServerUnexpectedData = errors.New("code: 101, message: Unexpected packet Data received from client") ) type OpError struct { diff --git a/conn_batch.go b/conn_batch.go index c9c272b624..87134e2319 100644 --- a/conn_batch.go +++ b/conn_batch.go @@ -78,6 +78,7 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(* ctx: ctx, query: query, conn: c, + structMap: c.structMap, block: block, released: false, connRelease: release, @@ -93,7 +94,9 @@ type batch struct { conn *connect sent bool released bool + flushed bool block *proto.Block + structMap *structMap connRelease func(*connect, error) connAcquire func(context.Context) (*connect, error) onProcess *onProcess @@ -136,7 +139,7 @@ func (b *batch) AppendStruct(v any) error { if b.err != nil { return b.err } - values, err := b.conn.structMap.Map("AppendStruct", b.block.ColumnsNames(), v, false) + values, err := b.structMap.Map("AppendStruct", b.block.ColumnsNames(), v, false) if err != nil { return err } @@ -175,47 +178,35 @@ func (b *batch) Send() (err error) { b.sent = true b.release(err) }() - if b.sent { - return b.retry() - } if b.err != nil { return b.err } + if b.sent || b.released { + if err = b.resetConnection(); err != nil { + return err + } + } if b.block.Rows() != 0 { if err = b.conn.sendData(b.block, ""); err != nil { return err } } - if err = b.conn.sendData(&proto.Block{}, ""); err != nil { - return err - } - if err = b.conn.process(b.ctx, b.onProcess); err != nil { + if err = b.closeQuery(); err != nil { return err } return nil } -func (b *batch) retry() (err error) { - // exit early if Send() hasn't been attepted - if !b.sent { - return ErrBatchNotSent - } - - if err = b.resetConnection(); err != nil { - return err - } - - b.sent = false - b.released = false - return b.Send() -} - func (b *batch) resetConnection() (err error) { // acquire a new conn if b.conn, err = b.connAcquire(b.ctx); err != nil { return err } + defer func() { + b.released = false + }() + options := queryOptions(b.ctx) if deadline, ok := b.ctx.Deadline(); ok { b.conn.conn.SetDeadline(deadline) @@ -242,15 +233,55 @@ func (b *batch) Flush() error { if b.err != nil { return b.err } + if b.released { + if err := b.resetConnection(); err != nil { + return err + } + } if b.block.Rows() != 0 { if err := b.conn.sendData(b.block, ""); err != nil { return err } + + b.flushed = true } b.block.Reset() return nil } +func (b *batch) ReleaseConnection() error { + if b.sent { + return ErrBatchAlreadySent + } + if b.err != nil { + return b.err + } + if b.released || b.flushed { + return ErrBatchUnexpectedRelease + } + + if err := b.closeQuery(); err != nil { + b.release(err) + return err + } + + b.release(nil) + + return nil +} + +func (b *batch) closeQuery() error { + if err := b.conn.sendData(&proto.Block{}, ""); err != nil { + return err + } + + if err := b.conn.process(b.ctx, b.onProcess); err != nil { + return err + } + + return nil +} + type batchColumn struct { err error batch driver.Batch diff --git a/conn_http_batch.go b/conn_http_batch.go index 900b3c8717..eaa08575c1 100644 --- a/conn_http_batch.go +++ b/conn_http_batch.go @@ -118,6 +118,11 @@ func (b *httpBatch) Flush() error { return nil } +// ReleaseConnection doesn't support by HTTP batch. +func (b *httpBatch) ReleaseConnection() error { + return nil +} + func (b *httpBatch) Abort() error { defer func() { b.sent = true diff --git a/lib/driver/driver.go b/lib/driver/driver.go index 9b102460ef..8993b5c5ac 100644 --- a/lib/driver/driver.go +++ b/lib/driver/driver.go @@ -84,6 +84,7 @@ type ( Flush() error Send() error IsSent() bool + ReleaseConnection() error } BatchColumn interface { Append(any) error diff --git a/tests/batch_release_connection_test.go b/tests/batch_release_connection_test.go new file mode 100644 index 0000000000..9170d0f1e3 --- /dev/null +++ b/tests/batch_release_connection_test.go @@ -0,0 +1,76 @@ +package tests + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/stretchr/testify/require" + "testing" +) + +func TestBatchReleaseConnection(t *testing.T) { + conn, err := GetNativeConnection(nil, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + ctx := context.Background() + require.NoError(t, err) + + const tableName = "test_release_connection" + + var ddl = fmt.Sprintf(` + CREATE TABLE %s ( + Col1 UInt64 + , Col2 String + ) Engine MergeTree() ORDER BY tuple() + `, tableName) + defer func() { + dropTable(conn, tableName) + }() + require.NoError(t, conn.Exec(ctx, ddl)) + batch, err := conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s", tableName)) + require.NoError(t, err) + require.NoError(t, batch.Append(uint64(1), "test")) + + require.NoError(t, batch.ReleaseConnection()) + require.Error(t, batch.ReleaseConnection()) + + require.NoError(t, batch.Send()) + require.Equal(t, uint64(1), getRowsCount(t, conn, tableName)) + + require.NoError(t, batch.Send()) + require.Equal(t, uint64(2), getRowsCount(t, conn, tableName)) + + deduplicateTable(t, conn, tableName) + require.Equal(t, uint64(1), getRowsCount(t, conn, tableName)) +} + +func TestBatchReleaseConnectionFlush(t *testing.T) { + conn, err := GetNativeConnection(nil, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + ctx := context.Background() + require.NoError(t, err) + + const tableName = "test_release_connection_flush" + + var ddl = fmt.Sprintf(` + CREATE TABLE %s ( + Col1 UInt64 + , Col2 String + ) Engine MergeTree() ORDER BY tuple() + `, tableName) + defer func() { + dropTable(conn, tableName) + }() + require.NoError(t, conn.Exec(ctx, ddl)) + batch, err := conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s", tableName)) + require.NoError(t, err) + require.NoError(t, batch.Append(uint64(1), "test")) + + require.NoError(t, batch.ReleaseConnection()) + require.NoError(t, batch.Flush()) + require.Error(t, batch.ReleaseConnection()) + require.NoError(t, batch.Send()) + + require.Equal(t, uint64(1), getRowsCount(t, conn, tableName)) +} diff --git a/tests/utils.go b/tests/utils.go index e3b291d670..18159e643f 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -446,6 +446,17 @@ func getDatabaseName(testSet string) string { return fmt.Sprintf("clickhouse-go-%s-%s-%d", testSet, testUUID, testTimestamp) } +func getRowsCount(t *testing.T, conn driver.Conn, table string) uint64 { + var count uint64 + err := conn.QueryRow(context.Background(), fmt.Sprintf(`SELECT COUNT(*) FROM %s`, table)).Scan(&count) + require.NoError(t, err) + return count +} + +func deduplicateTable(t *testing.T, conn driver.Conn, table string) { + require.NoError(t, conn.Exec(context.Background(), fmt.Sprintf(`OPTIMIZE TABLE %s DEDUPLICATE`, table))) +} + func GetEnv(key, fallback string) string { if value, ok := os.LookupEnv(key); ok { return value From 2ab0f52d21a55c85774e815698d80c62dd045347 Mon Sep 17 00:00:00 2001 From: Stepan Rabotkin Date: Mon, 7 Aug 2023 16:50:30 +0300 Subject: [PATCH 2/4] Add example --- conn_batch.go | 10 +- .../batch_release_connection.go | 133 ++++++++++++++++++ examples/clickhouse_api/main_test.go | 4 + 3 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 examples/clickhouse_api/batch_release_connection.go diff --git a/conn_batch.go b/conn_batch.go index 87134e2319..691ddc6704 100644 --- a/conn_batch.go +++ b/conn_batch.go @@ -78,7 +78,6 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(* ctx: ctx, query: query, conn: c, - structMap: c.structMap, block: block, released: false, connRelease: release, @@ -92,11 +91,10 @@ type batch struct { ctx context.Context query string conn *connect - sent bool - released bool - flushed bool + sent bool // sent signalize that batch is send to ClickHouse. + released bool // released signalize that conn was returned to pool and can't be used. + flushed bool // flushed signalize that Flush operation was called and user can't return conn to pool using ReleaseConnection. block *proto.Block - structMap *structMap connRelease func(*connect, error) connAcquire func(context.Context) (*connect, error) onProcess *onProcess @@ -139,7 +137,7 @@ func (b *batch) AppendStruct(v any) error { if b.err != nil { return b.err } - values, err := b.structMap.Map("AppendStruct", b.block.ColumnsNames(), v, false) + values, err := b.conn.structMap.Map("AppendStruct", b.block.ColumnsNames(), v, false) if err != nil { return err } diff --git a/examples/clickhouse_api/batch_release_connection.go b/examples/clickhouse_api/batch_release_connection.go new file mode 100644 index 0000000000..9fdff4027e --- /dev/null +++ b/examples/clickhouse_api/batch_release_connection.go @@ -0,0 +1,133 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package clickhouse_api + +import ( + "context" + "errors" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" +) + +func BatchWithReleaseConnection() error { + conn, err := GetNativeConnection(nil, nil, nil) + if err != nil { + return err + } + ctx := context.Background() + defer func() { + conn.Exec(ctx, "DROP TABLE example") + }() + if err := conn.Exec(ctx, `DROP TABLE IF EXISTS example`); err != nil { + return err + } + err = conn.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS example ( + Col1 UInt64, + Col2 String + ) engine=Memory + `) + + batch, err := New(ctx, conn, "INSERT INTO example") + if err != nil { + return err + } + + if err = batch.Append(1, "test-1"); err != nil { + return err + } + + if err = batch.Send(); err != nil { + return err + } + + if err = batch.Append(2, "test-2"); err != nil { + return err + } + + if err = batch.Send(); err != nil { + return err + } + + var count uint64 + if err = conn.QueryRow(context.Background(), `SELECT COUNT(*) FROM example`).Scan(&count); err != nil { + return err + } + + if count != uint64(2) { + return errors.New("count must be 2") + } + + return nil +} + +type YourBatch struct { + ctx context.Context + + insertStatement string + + conn driver.Conn + batch driver.Batch +} + +func New(ctx context.Context, conn driver.Conn, insertStatement string) (*YourBatch, error) { + batch, err := conn.PrepareBatch(ctx, insertStatement) + if err != nil { + return nil, err + } + + if err = batch.ReleaseConnection(); err != nil { + return nil, err + } + + return &YourBatch{ + ctx: ctx, + insertStatement: insertStatement, + conn: conn, + batch: batch, + }, nil +} + +func (b *YourBatch) Append(col1 uint64, col2 string) error { + return b.batch.Append( + col1, + col2, + ) +} + +func (b *YourBatch) Send() error { + if err := b.batch.Send(); err != nil { + return err + } + + return b.reset() +} + +func (b *YourBatch) reset() error { + batch, err := b.conn.PrepareBatch(b.ctx, b.insertStatement) + if err != nil { + return err + } + + if err = batch.ReleaseConnection(); err != nil { + return err + } + + b.batch = batch + + return nil +} diff --git a/examples/clickhouse_api/main_test.go b/examples/clickhouse_api/main_test.go index 0037099e5c..a9f62e4617 100644 --- a/examples/clickhouse_api/main_test.go +++ b/examples/clickhouse_api/main_test.go @@ -87,6 +87,10 @@ func TestBatchInsert(t *testing.T) { require.NoError(t, BatchInsert()) } +func TestBatchWithReleaseConnection(t *testing.T) { + require.NoError(t, BatchWithReleaseConnection()) +} + func TestAuthConnect(t *testing.T) { require.NoError(t, Auth()) } From 2ce7d490fdebaa93b9161ac5d34b9809b9487364 Mon Sep 17 00:00:00 2001 From: Stepan Rabotkin Date: Tue, 8 Aug 2023 23:37:29 +0300 Subject: [PATCH 3/4] Improve API and fix code review issues --- clickhouse.go | 17 +++++++-- clickhouse_std.go | 4 +- conn_batch.go | 37 +++++-------------- conn_http_batch.go | 10 ++--- .../batch_release_connection.go | 12 +----- lib/driver/driver.go | 3 +- lib/driver/options.go | 13 +++++++ tests/batch_release_connection_test.go | 14 +++---- 8 files changed, 50 insertions(+), 60 deletions(-) create mode 100644 lib/driver/options.go diff --git a/clickhouse.go b/clickhouse.go index 679e959b77..ce3157d8ec 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -44,11 +44,12 @@ type ( var ( ErrBatchInvalid = errors.New("clickhouse: batch is invalid. check appended data is correct") ErrBatchAlreadySent = errors.New("clickhouse: batch has already been sent") - ErrBatchUnexpectedRelease = errors.New("clickhouse: unexpected connection release") + ErrBatchNotSent = errors.New("clickhouse: invalid retry, batch not sent yet") ErrAcquireConnTimeout = errors.New("clickhouse: acquire conn timeout. you can increase the number of max open conn or the dial timeout") ErrUnsupportedServerRevision = errors.New("clickhouse: unsupported server revision") ErrBindMixedParamsFormats = errors.New("clickhouse [bind]: mixed named, numeric or positional parameters") ErrAcquireConnNoAddress = errors.New("clickhouse: no valid address supplied") + ErrServerUnexpectedData = errors.New("code: 101, message: Unexpected packet Data received from client") ) type OpError struct { @@ -152,18 +153,28 @@ func (ch *clickhouse) Exec(ctx context.Context, query string, args ...any) error return nil } -func (ch *clickhouse) PrepareBatch(ctx context.Context, query string) (driver.Batch, error) { +func (ch *clickhouse) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) { conn, err := ch.acquire(ctx) if err != nil { return nil, err } - batch, err := conn.prepareBatch(ctx, query, ch.release, ch.acquire) + batch, err := conn.prepareBatch(ctx, query, getPrepareBatchOptions(opts...), ch.release, ch.acquire) if err != nil { return nil, err } return batch, nil } +func getPrepareBatchOptions(opts ...driver.PrepareBatchOption) driver.PrepareBatchOptions { + var options driver.PrepareBatchOptions + + for _, opt := range opts { + opt(&options) + } + + return options +} + func (ch *clickhouse) AsyncInsert(ctx context.Context, query string, wait bool) error { conn, err := ch.acquire(ctx) if err != nil { diff --git a/clickhouse_std.go b/clickhouse_std.go index 03eb5a5ecc..3ebf2efbee 100644 --- a/clickhouse_std.go +++ b/clickhouse_std.go @@ -170,7 +170,7 @@ type stdConnect interface { query(ctx context.Context, release func(*connect, error), query string, args ...any) (*rows, error) exec(ctx context.Context, query string, args ...any) error ping(ctx context.Context) (err error) - prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error) + prepareBatch(ctx context.Context, query string, options ldriver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error) asyncInsert(ctx context.Context, query string, wait bool) error } @@ -273,7 +273,7 @@ func (std *stdDriver) Prepare(query string) (driver.Stmt, error) { } func (std *stdDriver) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { - batch, err := std.conn.prepareBatch(ctx, query, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil }) + batch, err := std.conn.prepareBatch(ctx, query, ldriver.PrepareBatchOptions{}, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil }) if err != nil { if isConnBrokenError(err) { std.debugf("PrepareContext got a fatal error, resetting connection: %v\n", err) diff --git a/conn_batch.go b/conn_batch.go index 691ddc6704..e1ffddbeef 100644 --- a/conn_batch.go +++ b/conn_batch.go @@ -35,7 +35,7 @@ import ( var splitInsertRe = regexp.MustCompile(`(?i)\sVALUES\s*\(`) var columnMatch = regexp.MustCompile(`.*\((?P.+)\)$`) -func (c *connect) prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) { +func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) { //defer func() { // if err := recover(); err != nil { // fmt.Printf("panic occurred on %d:\n", c.num) @@ -74,7 +74,8 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(* if err = block.SortColumns(columns); err != nil { return nil, err } - return &batch{ + + b := &batch{ ctx: ctx, query: query, conn: c, @@ -83,7 +84,13 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(* connRelease: release, connAcquire: acquire, onProcess: onProcess, - }, nil + } + + if opts.ReleaseConnection { + b.release(b.closeQuery()) + } + + return b, nil } type batch struct { @@ -93,7 +100,6 @@ type batch struct { conn *connect sent bool // sent signalize that batch is send to ClickHouse. released bool // released signalize that conn was returned to pool and can't be used. - flushed bool // flushed signalize that Flush operation was called and user can't return conn to pool using ReleaseConnection. block *proto.Block connRelease func(*connect, error) connAcquire func(context.Context) (*connect, error) @@ -240,34 +246,11 @@ func (b *batch) Flush() error { if err := b.conn.sendData(b.block, ""); err != nil { return err } - - b.flushed = true } b.block.Reset() return nil } -func (b *batch) ReleaseConnection() error { - if b.sent { - return ErrBatchAlreadySent - } - if b.err != nil { - return b.err - } - if b.released || b.flushed { - return ErrBatchUnexpectedRelease - } - - if err := b.closeQuery(); err != nil { - b.release(err) - return err - } - - b.release(nil) - - return nil -} - func (b *batch) closeQuery() error { if err := b.conn.sendData(&proto.Block{}, ""); err != nil { return err diff --git a/conn_http_batch.go b/conn_http_batch.go index eaa08575c1..022996fbf1 100644 --- a/conn_http_batch.go +++ b/conn_http_batch.go @@ -33,8 +33,9 @@ import ( // \x60 represents a backtick var httpInsertRe = regexp.MustCompile(`(?i)^INSERT INTO\s+\x60?([\w.^\(]+)\x60?\s*(\([^\)]*\))?`) -// release is ignored, because http used by std with empty release function -func (h *httpConnect) prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) { +// release is ignored, because http used by std with empty release function. +// Also opts ignored because all options unused in http batch. +func (h *httpConnect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) { matches := httpInsertRe.FindStringSubmatch(query) if len(matches) < 3 { return nil, errors.New("cannot get table name from query") @@ -118,11 +119,6 @@ func (b *httpBatch) Flush() error { return nil } -// ReleaseConnection doesn't support by HTTP batch. -func (b *httpBatch) ReleaseConnection() error { - return nil -} - func (b *httpBatch) Abort() error { defer func() { b.sent = true diff --git a/examples/clickhouse_api/batch_release_connection.go b/examples/clickhouse_api/batch_release_connection.go index 9fdff4027e..1690c60805 100644 --- a/examples/clickhouse_api/batch_release_connection.go +++ b/examples/clickhouse_api/batch_release_connection.go @@ -85,15 +85,11 @@ type YourBatch struct { } func New(ctx context.Context, conn driver.Conn, insertStatement string) (*YourBatch, error) { - batch, err := conn.PrepareBatch(ctx, insertStatement) + batch, err := conn.PrepareBatch(ctx, insertStatement, driver.WithReleaseConnection()) if err != nil { return nil, err } - if err = batch.ReleaseConnection(); err != nil { - return nil, err - } - return &YourBatch{ ctx: ctx, insertStatement: insertStatement, @@ -118,15 +114,11 @@ func (b *YourBatch) Send() error { } func (b *YourBatch) reset() error { - batch, err := b.conn.PrepareBatch(b.ctx, b.insertStatement) + batch, err := b.conn.PrepareBatch(b.ctx, b.insertStatement, driver.WithReleaseConnection()) if err != nil { return err } - if err = batch.ReleaseConnection(); err != nil { - return err - } - b.batch = batch return nil diff --git a/lib/driver/driver.go b/lib/driver/driver.go index 8993b5c5ac..51d0f44ec6 100644 --- a/lib/driver/driver.go +++ b/lib/driver/driver.go @@ -54,7 +54,7 @@ type ( Select(ctx context.Context, dest any, query string, args ...any) error Query(ctx context.Context, query string, args ...any) (Rows, error) QueryRow(ctx context.Context, query string, args ...any) Row - PrepareBatch(ctx context.Context, query string) (Batch, error) + PrepareBatch(ctx context.Context, query string, opts ...PrepareBatchOption) (Batch, error) Exec(ctx context.Context, query string, args ...any) error AsyncInsert(ctx context.Context, query string, wait bool) error Ping(context.Context) error @@ -84,7 +84,6 @@ type ( Flush() error Send() error IsSent() bool - ReleaseConnection() error } BatchColumn interface { Append(any) error diff --git a/lib/driver/options.go b/lib/driver/options.go new file mode 100644 index 0000000000..d81760c987 --- /dev/null +++ b/lib/driver/options.go @@ -0,0 +1,13 @@ +package driver + +type PrepareBatchOptions struct { + ReleaseConnection bool +} + +type PrepareBatchOption func(options *PrepareBatchOptions) + +func WithReleaseConnection() PrepareBatchOption { + return func(options *PrepareBatchOptions) { + options.ReleaseConnection = true + } +} diff --git a/tests/batch_release_connection_test.go b/tests/batch_release_connection_test.go index 9170d0f1e3..ba083e99f3 100644 --- a/tests/batch_release_connection_test.go +++ b/tests/batch_release_connection_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/stretchr/testify/require" "testing" ) @@ -27,13 +28,9 @@ func TestBatchReleaseConnection(t *testing.T) { dropTable(conn, tableName) }() require.NoError(t, conn.Exec(ctx, ddl)) - batch, err := conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s", tableName)) + batch, err := conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s", tableName), driver.WithReleaseConnection()) require.NoError(t, err) require.NoError(t, batch.Append(uint64(1), "test")) - - require.NoError(t, batch.ReleaseConnection()) - require.Error(t, batch.ReleaseConnection()) - require.NoError(t, batch.Send()) require.Equal(t, uint64(1), getRowsCount(t, conn, tableName)) @@ -63,13 +60,12 @@ func TestBatchReleaseConnectionFlush(t *testing.T) { dropTable(conn, tableName) }() require.NoError(t, conn.Exec(ctx, ddl)) - batch, err := conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s", tableName)) + batch, err := conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s", tableName), driver.WithReleaseConnection()) require.NoError(t, err) - require.NoError(t, batch.Append(uint64(1), "test")) - require.NoError(t, batch.ReleaseConnection()) + require.NoError(t, batch.Append(uint64(1), "test")) require.NoError(t, batch.Flush()) - require.Error(t, batch.ReleaseConnection()) + require.NoError(t, batch.Send()) require.Equal(t, uint64(1), getRowsCount(t, conn, tableName)) From 2008d01ab10c9ed1827b50772fa5c33fac5b89f5 Mon Sep 17 00:00:00 2001 From: Stepan Rabotkin Date: Wed, 9 Aug 2023 20:15:20 +0300 Subject: [PATCH 4/4] Add prepare batch section to README.md --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index a93da40822..3cd0ca0f8c 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ The client is tested against the currently [supported versions](https://github.c * Connection pool * Failover and load balancing * [Bulk write support](examples/clickhouse_api/batch.go) (for `database/sql` [use](examples/std/batch.go) `begin->prepare->(in loop exec)->commit`) +* [PrepareBatch options](#preparebatch-options) * [AsyncInsert](benchmark/v2/write-async/main.go) (more details in [Async insert](#async-insert) section) * Named and numeric placeholders support * LZ4/ZSTD compression support @@ -281,6 +282,11 @@ HTTP protocol supports batching. It can be enabled by setting `async_insert` whe For more details please see [asynchronous inserts](https://clickhouse.com/docs/en/optimize/asynchronous-inserts#enabling-asynchronous-inserts) documentation. +## PrepareBatch options + +Available options: +- [WithReleaseConnection](examples/clickhouse_api/batch_release_connection.go) - after PrepareBatch connection will be returned to the pool. It can help you make a long-lived batch. + ## Benchmark | [V1 (READ)](benchmark/v1/read/main.go) | [V2 (READ) std](benchmark/v2/read/main.go) | [V2 (READ) clickhouse API](benchmark/v2/read-native/main.go) | @@ -305,6 +311,7 @@ go get -u github.com/ClickHouse/clickhouse-go/v2 ### native interface * [batch](examples/clickhouse_api/batch.go) +* [batch with release connection](examples/clickhouse_api/batch_release_connection.go) * [async insert](examples/clickhouse_api/async.go) * [batch struct](examples/clickhouse_api/append_struct.go) * [columnar](examples/clickhouse_api/columnar_insert.go)