Skip to content

Implement release connection in batch #1062

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The client is tested against the currently [supported versions](https://github.c
* Connection pool
* Failover and load balancing
* [Bulk write support](examples/clickhouse_api/batch.go) (for `database/sql` [use](examples/std/batch.go) `begin->prepare->(in loop exec)->commit`)
* [PrepareBatch options](#preparebatch-options)
* [AsyncInsert](benchmark/v2/write-async/main.go) (more details in [Async insert](#async-insert) section)
* Named and numeric placeholders support
* LZ4/ZSTD compression support
Expand Down Expand Up @@ -281,6 +282,11 @@ HTTP protocol supports batching. It can be enabled by setting `async_insert` whe

For more details please see [asynchronous inserts](https://clickhouse.com/docs/en/optimize/asynchronous-inserts#enabling-asynchronous-inserts) documentation.

## PrepareBatch options

Available options:
- [WithReleaseConnection](examples/clickhouse_api/batch_release_connection.go) - after PrepareBatch connection will be returned to the pool. It can help you make a long-lived batch.

## Benchmark

| [V1 (READ)](benchmark/v1/read/main.go) | [V2 (READ) std](benchmark/v2/read/main.go) | [V2 (READ) clickhouse API](benchmark/v2/read-native/main.go) |
Expand All @@ -305,6 +311,7 @@ go get -u github.com/ClickHouse/clickhouse-go/v2
### native interface

* [batch](examples/clickhouse_api/batch.go)
* [batch with release connection](examples/clickhouse_api/batch_release_connection.go)
* [async insert](examples/clickhouse_api/async.go)
* [batch struct](examples/clickhouse_api/append_struct.go)
* [columnar](examples/clickhouse_api/columnar_insert.go)
Expand Down
14 changes: 12 additions & 2 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,28 @@ func (ch *clickhouse) Exec(ctx context.Context, query string, args ...any) error
return nil
}

func (ch *clickhouse) PrepareBatch(ctx context.Context, query string) (driver.Batch, error) {
func (ch *clickhouse) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
conn, err := ch.acquire(ctx)
if err != nil {
return nil, err
}
batch, err := conn.prepareBatch(ctx, query, ch.release, ch.acquire)
batch, err := conn.prepareBatch(ctx, query, getPrepareBatchOptions(opts...), ch.release, ch.acquire)
if err != nil {
return nil, err
}
return batch, nil
}

func getPrepareBatchOptions(opts ...driver.PrepareBatchOption) driver.PrepareBatchOptions {
var options driver.PrepareBatchOptions

for _, opt := range opts {
opt(&options)
}

return options
}

func (ch *clickhouse) AsyncInsert(ctx context.Context, query string, wait bool) error {
conn, err := ch.acquire(ctx)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions clickhouse_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ type stdConnect interface {
query(ctx context.Context, release func(*connect, error), query string, args ...any) (*rows, error)
exec(ctx context.Context, query string, args ...any) error
ping(ctx context.Context) (err error)
prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error)
prepareBatch(ctx context.Context, query string, options ldriver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error)
asyncInsert(ctx context.Context, query string, wait bool) error
}

Expand Down Expand Up @@ -273,7 +273,7 @@ func (std *stdDriver) Prepare(query string) (driver.Stmt, error) {
}

func (std *stdDriver) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
batch, err := std.conn.prepareBatch(ctx, query, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil })
batch, err := std.conn.prepareBatch(ctx, query, ldriver.PrepareBatchOptions{}, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil })
if err != nil {
if isConnBrokenError(err) {
std.debugf("PrepareContext got a fatal error, resetting connection: %v\n", err)
Expand Down
66 changes: 39 additions & 27 deletions conn_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
var splitInsertRe = regexp.MustCompile(`(?i)\sVALUES\s*\(`)
var columnMatch = regexp.MustCompile(`.*\((?P<Columns>.+)\)$`)

func (c *connect) prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
//defer func() {
// if err := recover(); err != nil {
// fmt.Printf("panic occurred on %d:\n", c.num)
Expand Down Expand Up @@ -74,7 +74,8 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(*
if err = block.SortColumns(columns); err != nil {
return nil, err
}
return &batch{

b := &batch{
ctx: ctx,
query: query,
conn: c,
Expand All @@ -83,16 +84,22 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(*
connRelease: release,
connAcquire: acquire,
onProcess: onProcess,
}, nil
}

if opts.ReleaseConnection {
b.release(b.closeQuery())
}

return b, nil
}

type batch struct {
err error
ctx context.Context
query string
conn *connect
sent bool
released bool
sent bool // sent signalize that batch is send to ClickHouse.
released bool // released signalize that conn was returned to pool and can't be used.
block *proto.Block
connRelease func(*connect, error)
connAcquire func(context.Context) (*connect, error)
Expand Down Expand Up @@ -175,47 +182,35 @@ func (b *batch) Send() (err error) {
b.sent = true
b.release(err)
}()
if b.sent {
return b.retry()
}
if b.err != nil {
return b.err
}
if b.sent || b.released {
if err = b.resetConnection(); err != nil {
return err
}
}
if b.block.Rows() != 0 {
if err = b.conn.sendData(b.block, ""); err != nil {
return err
}
}
if err = b.conn.sendData(&proto.Block{}, ""); err != nil {
return err
}
if err = b.conn.process(b.ctx, b.onProcess); err != nil {
if err = b.closeQuery(); err != nil {
return err
}
return nil
}

func (b *batch) retry() (err error) {
// exit early if Send() hasn't been attepted
if !b.sent {
return ErrBatchNotSent
}

if err = b.resetConnection(); err != nil {
return err
}

b.sent = false
b.released = false
return b.Send()
}

func (b *batch) resetConnection() (err error) {
// acquire a new conn
if b.conn, err = b.connAcquire(b.ctx); err != nil {
return err
}

defer func() {
b.released = false
}()

options := queryOptions(b.ctx)
if deadline, ok := b.ctx.Deadline(); ok {
b.conn.conn.SetDeadline(deadline)
Expand All @@ -242,6 +237,11 @@ func (b *batch) Flush() error {
if b.err != nil {
return b.err
}
if b.released {
if err := b.resetConnection(); err != nil {
return err
}
}
if b.block.Rows() != 0 {
if err := b.conn.sendData(b.block, ""); err != nil {
return err
Expand All @@ -251,6 +251,18 @@ func (b *batch) Flush() error {
return nil
}

func (b *batch) closeQuery() error {
if err := b.conn.sendData(&proto.Block{}, ""); err != nil {
return err
}

if err := b.conn.process(b.ctx, b.onProcess); err != nil {
return err
}

return nil
}

type batchColumn struct {
err error
batch driver.Batch
Expand Down
5 changes: 3 additions & 2 deletions conn_http_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (
// \x60 represents a backtick
var httpInsertRe = regexp.MustCompile(`(?i)^INSERT INTO\s+\x60?([\w.^\(]+)\x60?\s*(\([^\)]*\))?`)

// release is ignored, because http used by std with empty release function
func (h *httpConnect) prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
// release is ignored, because http used by std with empty release function.
// Also opts ignored because all options unused in http batch.
func (h *httpConnect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
matches := httpInsertRe.FindStringSubmatch(query)
if len(matches) < 3 {
return nil, errors.New("cannot get table name from query")
Expand Down
125 changes: 125 additions & 0 deletions examples/clickhouse_api/batch_release_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package clickhouse_api

import (
"context"
"errors"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

func BatchWithReleaseConnection() error {
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
if err := conn.Exec(ctx, `DROP TABLE IF EXISTS example`); err != nil {
return err
}
err = conn.Exec(ctx, `
CREATE TABLE IF NOT EXISTS example (
Col1 UInt64,
Col2 String
) engine=Memory
`)

batch, err := New(ctx, conn, "INSERT INTO example")
if err != nil {
return err
}

if err = batch.Append(1, "test-1"); err != nil {
return err
}

if err = batch.Send(); err != nil {
return err
}

if err = batch.Append(2, "test-2"); err != nil {
return err
}

if err = batch.Send(); err != nil {
return err
}

var count uint64
if err = conn.QueryRow(context.Background(), `SELECT COUNT(*) FROM example`).Scan(&count); err != nil {
return err
}

if count != uint64(2) {
return errors.New("count must be 2")
}

return nil
}

type YourBatch struct {
ctx context.Context

insertStatement string

conn driver.Conn
batch driver.Batch
}

func New(ctx context.Context, conn driver.Conn, insertStatement string) (*YourBatch, error) {
batch, err := conn.PrepareBatch(ctx, insertStatement, driver.WithReleaseConnection())
if err != nil {
return nil, err
}

return &YourBatch{
ctx: ctx,
insertStatement: insertStatement,
conn: conn,
batch: batch,
}, nil
}

func (b *YourBatch) Append(col1 uint64, col2 string) error {
return b.batch.Append(
col1,
col2,
)
}

func (b *YourBatch) Send() error {
if err := b.batch.Send(); err != nil {
return err
}

return b.reset()
}

func (b *YourBatch) reset() error {
batch, err := b.conn.PrepareBatch(b.ctx, b.insertStatement, driver.WithReleaseConnection())
if err != nil {
return err
}

b.batch = batch

return nil
}
4 changes: 4 additions & 0 deletions examples/clickhouse_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func TestBatchInsert(t *testing.T) {
require.NoError(t, BatchInsert())
}

func TestBatchWithReleaseConnection(t *testing.T) {
require.NoError(t, BatchWithReleaseConnection())
}

func TestAuthConnect(t *testing.T) {
require.NoError(t, Auth())
}
Expand Down
2 changes: 1 addition & 1 deletion lib/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type (
Select(ctx context.Context, dest any, query string, args ...any) error
Query(ctx context.Context, query string, args ...any) (Rows, error)
QueryRow(ctx context.Context, query string, args ...any) Row
PrepareBatch(ctx context.Context, query string) (Batch, error)
PrepareBatch(ctx context.Context, query string, opts ...PrepareBatchOption) (Batch, error)
Exec(ctx context.Context, query string, args ...any) error
AsyncInsert(ctx context.Context, query string, wait bool) error
Ping(context.Context) error
Expand Down
13 changes: 13 additions & 0 deletions lib/driver/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package driver

type PrepareBatchOptions struct {
ReleaseConnection bool
}

type PrepareBatchOption func(options *PrepareBatchOptions)

func WithReleaseConnection() PrepareBatchOption {
return func(options *PrepareBatchOptions) {
options.ReleaseConnection = true
}
}
Loading