diff --git a/README.md b/README.md index a93da40822..3cd0ca0f8c 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ The client is tested against the currently [supported versions](https://github.c * Connection pool * Failover and load balancing * [Bulk write support](examples/clickhouse_api/batch.go) (for `database/sql` [use](examples/std/batch.go) `begin->prepare->(in loop exec)->commit`) +* [PrepareBatch options](#preparebatch-options) * [AsyncInsert](benchmark/v2/write-async/main.go) (more details in [Async insert](#async-insert) section) * Named and numeric placeholders support * LZ4/ZSTD compression support @@ -281,6 +282,11 @@ HTTP protocol supports batching. It can be enabled by setting `async_insert` whe For more details please see [asynchronous inserts](https://clickhouse.com/docs/en/optimize/asynchronous-inserts#enabling-asynchronous-inserts) documentation. +## PrepareBatch options + +Available options: +- [WithReleaseConnection](examples/clickhouse_api/batch_release_connection.go) - after PrepareBatch connection will be returned to the pool. It can help you make a long-lived batch. + ## Benchmark | [V1 (READ)](benchmark/v1/read/main.go) | [V2 (READ) std](benchmark/v2/read/main.go) | [V2 (READ) clickhouse API](benchmark/v2/read-native/main.go) | @@ -305,6 +311,7 @@ go get -u github.com/ClickHouse/clickhouse-go/v2 ### native interface * [batch](examples/clickhouse_api/batch.go) +* [batch with release connection](examples/clickhouse_api/batch_release_connection.go) * [async insert](examples/clickhouse_api/async.go) * [batch struct](examples/clickhouse_api/append_struct.go) * [columnar](examples/clickhouse_api/columnar_insert.go) diff --git a/clickhouse.go b/clickhouse.go index 98be00a921..2766107c54 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -153,18 +153,28 @@ func (ch *clickhouse) Exec(ctx context.Context, query string, args ...any) error return nil } -func (ch *clickhouse) PrepareBatch(ctx context.Context, query string) (driver.Batch, error) { +func (ch *clickhouse) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) { conn, err := ch.acquire(ctx) if err != nil { return nil, err } - batch, err := conn.prepareBatch(ctx, query, ch.release, ch.acquire) + batch, err := conn.prepareBatch(ctx, query, getPrepareBatchOptions(opts...), ch.release, ch.acquire) if err != nil { return nil, err } return batch, nil } +func getPrepareBatchOptions(opts ...driver.PrepareBatchOption) driver.PrepareBatchOptions { + var options driver.PrepareBatchOptions + + for _, opt := range opts { + opt(&options) + } + + return options +} + func (ch *clickhouse) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error { conn, err := ch.acquire(ctx) if err != nil { diff --git a/clickhouse_std.go b/clickhouse_std.go index 1742633418..24e294c7bf 100644 --- a/clickhouse_std.go +++ b/clickhouse_std.go @@ -170,7 +170,7 @@ type stdConnect interface { query(ctx context.Context, release func(*connect, error), query string, args ...any) (*rows, error) exec(ctx context.Context, query string, args ...any) error ping(ctx context.Context) (err error) - prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error) + prepareBatch(ctx context.Context, query string, options ldriver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error) asyncInsert(ctx context.Context, query string, wait bool, args ...any) error } @@ -270,7 +270,7 @@ func (std *stdDriver) Prepare(query string) (driver.Stmt, error) { } func (std *stdDriver) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { - batch, err := std.conn.prepareBatch(ctx, query, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil }) + batch, err := std.conn.prepareBatch(ctx, query, ldriver.PrepareBatchOptions{}, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil }) if err != nil { if isConnBrokenError(err) { std.debugf("PrepareContext got a fatal error, resetting connection: %v\n", err) diff --git a/conn_batch.go b/conn_batch.go index ae436391bd..cf0217e53d 100644 --- a/conn_batch.go +++ b/conn_batch.go @@ -35,7 +35,7 @@ import ( var splitInsertRe = regexp.MustCompile(`(?i)\sVALUES\s*\(`) var columnMatch = regexp.MustCompile(`.*\((?P.+)\)$`) -func (c *connect) prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) { +func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) { //defer func() { // if err := recover(); err != nil { // fmt.Printf("panic occurred on %d:\n", c.num) @@ -74,7 +74,8 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(* if err = block.SortColumns(columns); err != nil { return nil, err } - return &batch{ + + b := &batch{ ctx: ctx, query: query, conn: c, @@ -83,7 +84,13 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(* connRelease: release, connAcquire: acquire, onProcess: onProcess, - }, nil + } + + if opts.ReleaseConnection { + b.release(b.closeQuery()) + } + + return b, nil } type batch struct { @@ -91,8 +98,8 @@ type batch struct { ctx context.Context query string conn *connect - sent bool - released bool + sent bool // sent signalize that batch is send to ClickHouse. + released bool // released signalize that conn was returned to pool and can't be used. block *proto.Block connRelease func(*connect, error) connAcquire func(context.Context) (*connect, error) @@ -175,47 +182,35 @@ func (b *batch) Send() (err error) { b.sent = true b.release(err) }() - if b.sent { - return b.retry() - } if b.err != nil { return b.err } + if b.sent || b.released { + if err = b.resetConnection(); err != nil { + return err + } + } if b.block.Rows() != 0 { if err = b.conn.sendData(b.block, ""); err != nil { return err } } - if err = b.conn.sendData(&proto.Block{}, ""); err != nil { - return err - } - if err = b.conn.process(b.ctx, b.onProcess); err != nil { + if err = b.closeQuery(); err != nil { return err } return nil } -func (b *batch) retry() (err error) { - // exit early if Send() hasn't been attepted - if !b.sent { - return ErrBatchNotSent - } - - if err = b.resetConnection(); err != nil { - return err - } - - b.sent = false - b.released = false - return b.Send() -} - func (b *batch) resetConnection() (err error) { // acquire a new conn if b.conn, err = b.connAcquire(b.ctx); err != nil { return err } + defer func() { + b.released = false + }() + options := queryOptions(b.ctx) if deadline, ok := b.ctx.Deadline(); ok { b.conn.conn.SetDeadline(deadline) @@ -242,6 +237,11 @@ func (b *batch) Flush() error { if b.err != nil { return b.err } + if b.released { + if err := b.resetConnection(); err != nil { + return err + } + } if b.block.Rows() != 0 { if err := b.conn.sendData(b.block, ""); err != nil { return err @@ -255,6 +255,18 @@ func (b *batch) Rows() int { return b.block.Rows() } +func (b *batch) closeQuery() error { + if err := b.conn.sendData(&proto.Block{}, ""); err != nil { + return err + } + + if err := b.conn.process(b.ctx, b.onProcess); err != nil { + return err + } + + return nil +} + type batchColumn struct { err error batch driver.Batch diff --git a/conn_http_batch.go b/conn_http_batch.go index 891d36e8ad..96eb71162d 100644 --- a/conn_http_batch.go +++ b/conn_http_batch.go @@ -33,8 +33,9 @@ import ( // \x60 represents a backtick var httpInsertRe = regexp.MustCompile(`(?i)^INSERT INTO\s+\x60?([\w.^\(]+)\x60?\s*(\([^\)]*\))?`) -// release is ignored, because http used by std with empty release function -func (h *httpConnect) prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) { +// release is ignored, because http used by std with empty release function. +// Also opts ignored because all options unused in http batch. +func (h *httpConnect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) { matches := httpInsertRe.FindStringSubmatch(query) if len(matches) < 3 { return nil, errors.New("cannot get table name from query") diff --git a/examples/clickhouse_api/batch_release_connection.go b/examples/clickhouse_api/batch_release_connection.go new file mode 100644 index 0000000000..1690c60805 --- /dev/null +++ b/examples/clickhouse_api/batch_release_connection.go @@ -0,0 +1,125 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package clickhouse_api + +import ( + "context" + "errors" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" +) + +func BatchWithReleaseConnection() error { + conn, err := GetNativeConnection(nil, nil, nil) + if err != nil { + return err + } + ctx := context.Background() + defer func() { + conn.Exec(ctx, "DROP TABLE example") + }() + if err := conn.Exec(ctx, `DROP TABLE IF EXISTS example`); err != nil { + return err + } + err = conn.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS example ( + Col1 UInt64, + Col2 String + ) engine=Memory + `) + + batch, err := New(ctx, conn, "INSERT INTO example") + if err != nil { + return err + } + + if err = batch.Append(1, "test-1"); err != nil { + return err + } + + if err = batch.Send(); err != nil { + return err + } + + if err = batch.Append(2, "test-2"); err != nil { + return err + } + + if err = batch.Send(); err != nil { + return err + } + + var count uint64 + if err = conn.QueryRow(context.Background(), `SELECT COUNT(*) FROM example`).Scan(&count); err != nil { + return err + } + + if count != uint64(2) { + return errors.New("count must be 2") + } + + return nil +} + +type YourBatch struct { + ctx context.Context + + insertStatement string + + conn driver.Conn + batch driver.Batch +} + +func New(ctx context.Context, conn driver.Conn, insertStatement string) (*YourBatch, error) { + batch, err := conn.PrepareBatch(ctx, insertStatement, driver.WithReleaseConnection()) + if err != nil { + return nil, err + } + + return &YourBatch{ + ctx: ctx, + insertStatement: insertStatement, + conn: conn, + batch: batch, + }, nil +} + +func (b *YourBatch) Append(col1 uint64, col2 string) error { + return b.batch.Append( + col1, + col2, + ) +} + +func (b *YourBatch) Send() error { + if err := b.batch.Send(); err != nil { + return err + } + + return b.reset() +} + +func (b *YourBatch) reset() error { + batch, err := b.conn.PrepareBatch(b.ctx, b.insertStatement, driver.WithReleaseConnection()) + if err != nil { + return err + } + + b.batch = batch + + return nil +} diff --git a/examples/clickhouse_api/main_test.go b/examples/clickhouse_api/main_test.go index 0037099e5c..a9f62e4617 100644 --- a/examples/clickhouse_api/main_test.go +++ b/examples/clickhouse_api/main_test.go @@ -87,6 +87,10 @@ func TestBatchInsert(t *testing.T) { require.NoError(t, BatchInsert()) } +func TestBatchWithReleaseConnection(t *testing.T) { + require.NoError(t, BatchWithReleaseConnection()) +} + func TestAuthConnect(t *testing.T) { require.NoError(t, Auth()) } diff --git a/lib/driver/driver.go b/lib/driver/driver.go index acb183a4c7..f88bb43e14 100644 --- a/lib/driver/driver.go +++ b/lib/driver/driver.go @@ -54,7 +54,7 @@ type ( Select(ctx context.Context, dest any, query string, args ...any) error Query(ctx context.Context, query string, args ...any) (Rows, error) QueryRow(ctx context.Context, query string, args ...any) Row - PrepareBatch(ctx context.Context, query string) (Batch, error) + PrepareBatch(ctx context.Context, query string, opts ...PrepareBatchOption) (Batch, error) Exec(ctx context.Context, query string, args ...any) error AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error Ping(context.Context) error diff --git a/lib/driver/options.go b/lib/driver/options.go new file mode 100644 index 0000000000..d81760c987 --- /dev/null +++ b/lib/driver/options.go @@ -0,0 +1,13 @@ +package driver + +type PrepareBatchOptions struct { + ReleaseConnection bool +} + +type PrepareBatchOption func(options *PrepareBatchOptions) + +func WithReleaseConnection() PrepareBatchOption { + return func(options *PrepareBatchOptions) { + options.ReleaseConnection = true + } +} diff --git a/tests/batch_release_connection_test.go b/tests/batch_release_connection_test.go new file mode 100644 index 0000000000..ba083e99f3 --- /dev/null +++ b/tests/batch_release_connection_test.go @@ -0,0 +1,72 @@ +package tests + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/stretchr/testify/require" + "testing" +) + +func TestBatchReleaseConnection(t *testing.T) { + conn, err := GetNativeConnection(nil, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + ctx := context.Background() + require.NoError(t, err) + + const tableName = "test_release_connection" + + var ddl = fmt.Sprintf(` + CREATE TABLE %s ( + Col1 UInt64 + , Col2 String + ) Engine MergeTree() ORDER BY tuple() + `, tableName) + defer func() { + dropTable(conn, tableName) + }() + require.NoError(t, conn.Exec(ctx, ddl)) + batch, err := conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s", tableName), driver.WithReleaseConnection()) + require.NoError(t, err) + require.NoError(t, batch.Append(uint64(1), "test")) + require.NoError(t, batch.Send()) + require.Equal(t, uint64(1), getRowsCount(t, conn, tableName)) + + require.NoError(t, batch.Send()) + require.Equal(t, uint64(2), getRowsCount(t, conn, tableName)) + + deduplicateTable(t, conn, tableName) + require.Equal(t, uint64(1), getRowsCount(t, conn, tableName)) +} + +func TestBatchReleaseConnectionFlush(t *testing.T) { + conn, err := GetNativeConnection(nil, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + ctx := context.Background() + require.NoError(t, err) + + const tableName = "test_release_connection_flush" + + var ddl = fmt.Sprintf(` + CREATE TABLE %s ( + Col1 UInt64 + , Col2 String + ) Engine MergeTree() ORDER BY tuple() + `, tableName) + defer func() { + dropTable(conn, tableName) + }() + require.NoError(t, conn.Exec(ctx, ddl)) + batch, err := conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s", tableName), driver.WithReleaseConnection()) + require.NoError(t, err) + + require.NoError(t, batch.Append(uint64(1), "test")) + require.NoError(t, batch.Flush()) + + require.NoError(t, batch.Send()) + + require.Equal(t, uint64(1), getRowsCount(t, conn, tableName)) +} diff --git a/tests/utils.go b/tests/utils.go index e3b291d670..18159e643f 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -446,6 +446,17 @@ func getDatabaseName(testSet string) string { return fmt.Sprintf("clickhouse-go-%s-%s-%d", testSet, testUUID, testTimestamp) } +func getRowsCount(t *testing.T, conn driver.Conn, table string) uint64 { + var count uint64 + err := conn.QueryRow(context.Background(), fmt.Sprintf(`SELECT COUNT(*) FROM %s`, table)).Scan(&count) + require.NoError(t, err) + return count +} + +func deduplicateTable(t *testing.T, conn driver.Conn, table string) { + require.NoError(t, conn.Exec(context.Background(), fmt.Sprintf(`OPTIMIZE TABLE %s DEDUPLICATE`, table))) +} + func GetEnv(key, fallback string) string { if value, ok := os.LookupEnv(key); ok { return value