Skip to content
3 changes: 3 additions & 0 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/*
* ingester is used as the template for livestore/instance_search.go any changes here should be reflected there and vice versa.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with the fork. hopefully we will delete this soon.

*/
package ingester

import (
Expand Down
14 changes: 11 additions & 3 deletions modules/livestore/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package livestore

import (
"context"
"flag"
"sync"
"time"

Expand Down Expand Up @@ -29,7 +30,7 @@ type instance struct {
enc encoding.VersionedEncoding

// Block management
blocksMtx sync.Mutex
blocksMtx sync.RWMutex
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed more fine grained mutex when dealing with blocks.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rwmutex's are slower than plain ol' mutexes. unless it's heavy on reads a lot of times a plain mutex is faster. no idea what the ratio is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ingester code used RWMutex so assuming we thought about that then :)

headBlock common.WALBlock
walBlocks map[uuid.UUID]common.WALBlock
completeBlocks map[uuid.UUID]*ingester.LocalBlock
Expand Down Expand Up @@ -72,7 +73,7 @@ func newInstance(instanceID string, wal *wal.WAL, overrides Overrides, logger lo
}

func (i *instance) pushBytes(ts time.Time, req *tempopb.PushBytesRequest) {
if len(req.Traces) >= len(req.Ids) {
if len(req.Traces) != len(req.Ids) {
level.Error(i.logger).Log("msg", "mismatched traces and ids length", "IDs", len(req.Ids), "traces", len(req.Traces))
return
}
Expand Down Expand Up @@ -228,6 +229,13 @@ func (i *instance) cutBlocks(immediate bool) (uuid.UUID, error) {
return id, nil
}

var blockConfig = common.BlockConfig{}

func init() {
// TODO MRD this is a hack until we roll the config into the livestore
blockConfig.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we dont do this bloom filters try to apply int64 max transformations.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

odd we're passing in a defaulted block config. should we not add that to the config struct in config.go?

}

func (i *instance) completeBlock(ctx context.Context, id uuid.UUID) error {
i.blocksMtx.Lock()
walBlock := i.walBlocks[id]
Expand All @@ -249,7 +257,7 @@ func (i *instance) completeBlock(ctx context.Context, id uuid.UUID) error {
}
defer iter.Close()

newMeta, err := i.enc.CreateBlock(ctx, &common.BlockConfig{}, walBlock.BlockMeta(), iter, reader, writer)
newMeta, err := i.enc.CreateBlock(ctx, &blockConfig, walBlock.BlockMeta(), iter, reader, writer)
if err != nil {
level.Error(i.logger).Log("msg", "failed to create complete block", "id", id, "err", err)
return err
Expand Down
Loading
Loading