Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/gofmt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
GOEXPERIMENT: jsonv2
steps:
- uses: actions/checkout@v4
- uses: WillAbides/setup-go[email protected]
- uses: actions/setup-go@v6
with:
go-version: "1.25.x"
- run: if [ "$(gofmt -s -l . | wc -l)" -gt 0 ]; then exit 1; fi
2 changes: 1 addition & 1 deletion .github/workflows/golint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
GOEXPERIMENT: jsonv2
steps:
- uses: actions/checkout@v4
- uses: WillAbides/setup-go[email protected]
- uses: actions/setup-go@v6
with:
go-version: "1.25.x"
- uses: golangci/golangci-lint-action@v8
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: ikalnytskyi/action-setup-postgres@v7
with:
postgres-version: 16
- uses: ankane/setup-mongodb@v1
- uses: WillAbides/setup-go[email protected]
- uses: actions/setup-go@v6
with:
go-version: ${{ matrix.go }}
- run: go test ./...
Expand Down
2 changes: 1 addition & 1 deletion cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func CLI() *cobra.Command {
sampleCLI(),
)
if os.Getenv("DEBUG") != "" {
rootCmd.AddCommand(addDataDir(transformNextCmd))
rootCmd.AddCommand(addDataDir(transformNextCLI()))
rootCmd.AddCommand(addDataDir(cleanupTempCmd))
}
return rootCmd
Expand Down
31 changes: 30 additions & 1 deletion cmd/transform_next.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"fmt"

"github.com/cuducos/minha-receita/transformnext"
"github.com/spf13/cobra"
)
Expand All @@ -9,7 +11,20 @@ var transformNextCmd = &cobra.Command{
Use: "transform-next",
Short: "Experimental ETL, work in progress, NOT recommended",
RunE: func(_ *cobra.Command, _ []string) error {
return transformnext.Transform(dir)
db, err := loadDatabase()
if err != nil {
return fmt.Errorf("could not find database: %w", err)
}
defer db.Close()
if cleanUp {
if err := db.Drop(); err != nil {
return err
}
if err := db.Create(); err != nil {
return err
}
}
return transformnext.Transform(dir, db, batchSize, maxParallelDBQueries, !noPrivacy)
},
}

Expand All @@ -20,3 +35,17 @@ var cleanupTempCmd = &cobra.Command{
return transformnext.Cleanup()
},
}

func transformNextCLI() *cobra.Command {
transformNextCmd.Flags().IntVarP(
&maxParallelDBQueries,
"max-parallel-db-queries",
"m",
transformnext.MaxParallelDBQueries,
"maximum parallel database queries",
)
transformNextCmd.Flags().BoolVarP(&cleanUp, "clean-up", "c", cleanUp, "drop & recreate the database table before starting")
transformNextCmd.Flags().IntVarP(&batchSize, "batch-size", "b", transformnext.BatchSize, "size of the batch to save to the database")
transformNextCmd.Flags().BoolVarP(&noPrivacy, "no-privacy", "p", noPrivacy, "include email addresses, CPF and other PII in the JSON data")
return transformNextCmd
}
Binary file modified testdata/Qualificacoes.zip
Binary file not shown.
2 changes: 1 addition & 1 deletion testutils/arrays.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ func AssertArraysHaveSameItems(t *testing.T, a1, a2 []string) {
}

for k := range diff {
t.Errorf("%q appears %d in the first array, but %d in the second array", k, c1[k], c2[k])
t.Errorf("%q appears %d in the first array, but %d in the second array\nFirst array: %#v\nSecond array:%#v", k, c1[k], c2[k], c1, c2)
}
}
14 changes: 7 additions & 7 deletions transform/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ func TestLoad(t *testing.T) {
prefix string
value []string
}{
{"p-19131243", []string{`{"identificador_de_socio":2,"nome_socio":"FERNANDA CAMPAGNUCCI PEREIRA","cnpj_cpf_do_socio":"***690948**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2019-10-25","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":4,"faixa_etaria":"Entre 31 a 40 anos"}`}},
{"p-19131243", []string{`{"identificador_de_socio":2,"nome_socio":"FERNANDA CAMPAGNUCCI PEREIRA","cnpj_cpf_do_socio":"***690948**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2019-10-25","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":4,"faixa_etaria":"Entre 31 a 40 anos"}`}},
{"p-33683111", []string{
`{"identificador_de_socio":2,"nome_socio":"ANDRE DE CESERO","cnpj_cpf_do_socio":"***220050**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-06-16","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":6,"faixa_etaria":"Entre 51 a 60 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"ANTONIO DE PADUA FERREIRA PASSOS","cnpj_cpf_do_socio":"***595901**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-12-08","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"WILSON BIANCARDI COURY","cnpj_cpf_do_socio":"***414127**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2019-06-18","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":8,"faixa_etaria":"Entre 71 a 80 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"GILENO GURJAO BARRETO","cnpj_cpf_do_socio":"***099595**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2020-02-03","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"RICARDO CEZAR DE MOURA JUCA","cnpj_cpf_do_socio":"***989951**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2020-05-12","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"ANTONINO DOS SANTOS GUERRA NETO","cnpj_cpf_do_socio":"***073447**","codigo_qualificacao_socio":5,"qualificacao_socio":"Administrador","data_entrada_sociedade":"2019-02-11","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"ANDRE DE CESERO","cnpj_cpf_do_socio":"***220050**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-06-16","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":6,"faixa_etaria":"Entre 51 a 60 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"ANTONIO DE PADUA FERREIRA PASSOS","cnpj_cpf_do_socio":"***595901**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-12-08","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"WILSON BIANCARDI COURY","cnpj_cpf_do_socio":"***414127**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2019-06-18","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":8,"faixa_etaria":"Entre 71 a 80 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"GILENO GURJAO BARRETO","cnpj_cpf_do_socio":"***099595**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2020-02-03","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"RICARDO CEZAR DE MOURA JUCA","cnpj_cpf_do_socio":"***989951**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2020-05-12","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`,
`{"identificador_de_socio":2,"nome_socio":"ANTONINO DOS SANTOS GUERRA NETO","cnpj_cpf_do_socio":"***073447**","codigo_qualificacao_socio":5,"qualificacao_socio":"Administrador","data_entrada_sociedade":"2019-02-11","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`,
}},
} {
assertKeyValues(t, kv, tc.prefix, tc.value)
Expand Down
114 changes: 57 additions & 57 deletions transformnext/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,58 +12,60 @@ import (
"github.com/dgraph-io/badger/v4"
)

// As of 2025-11 the longest sequence we've got was 257, so setting it to 512 to
// have some room — maybe this could be set from a CLI flag to avoid recompiling
// when source data changes and needs more space.
const defaultPoolSize = 512

type kv struct {
db *badger.DB
buf sync.Pool
bytes sync.Pool
db *badger.DB
pool sync.Pool
}

func (kv *kv) serialize(row []string) ([]byte, error) {
buf := kv.buf.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
kv.buf.Put(buf)
}()
func (kv *kv) serialize(b []byte, row []string) ([]byte, error) {
var err error
for _, v := range row {
s := uint32(len(v)) // used to deserialize later on
if err := binary.Write(buf, binary.LittleEndian, s); err != nil {
return nil, err
}
if _, err := buf.Write([]byte(v)); err != nil {
b, err = binary.Append(b, binary.LittleEndian, s)
if err != nil {
return nil, err
}
b = append(b, v...)
}
return buf.Bytes(), nil
return b, nil
}

func (kv *kv) deserialize(b []byte) ([]string, error) {
if b == nil {
func (kv *kv) deserialize(val []byte) ([]string, error) {
if val == nil {
return nil, nil
}
var out []string
r := bytes.NewReader(b)
r := bytes.NewReader(val)
for r.Len() > 0 {
var s uint32
if err := binary.Read(r, binary.LittleEndian, &s); err != nil {
return nil, fmt.Errorf("error reading size: %w", err)
}
raw := kv.bytes.Get().(*[]byte)
if cap(*raw) < int(s) {
return nil, fmt.Errorf("buffer from pool too small (%d): needs %d", cap(*raw), s)
} else {
*raw = (*raw)[:s]
}
n, err := io.ReadFull(r, *raw)
err := func() error {
var s uint32
if err := binary.Read(r, binary.LittleEndian, &s); err != nil {
return fmt.Errorf("error reading size: %w", err)
}
b := kv.pool.Get().(*[]byte)
*b = (*b)[:s]
defer kv.pool.Put(b)
if cap(*b) < int(s) {
return fmt.Errorf("buffer from pool too small (%d): needs %d", cap(*b), s)
}
n, err := io.ReadFull(r, *b)
if err != nil {
return fmt.Errorf("could not deserialize value: %w", err)
}
if n != int(s) {
return fmt.Errorf("expected to read %d bytes, got %d", s, n)
}
out = append(out, string(*b))
return nil
}()
if err != nil {
kv.bytes.Put(raw)
return nil, fmt.Errorf("could not deserialize value: %w", err)
}
if n != int(s) {
kv.bytes.Put(raw)
return nil, fmt.Errorf("expected to read %d bytes, got %d", s, n)
return nil, err
}
out = append(out, string(*raw))
kv.bytes.Put(raw)
}
return out, nil
}
Expand All @@ -72,18 +74,23 @@ func (kv *kv) put(src *source, id string, row []string) error {
if len(row) == 0 {
return nil
}
k := src.keyFor(id)
v, err := kv.serialize(row)
key := src.keyFor(id)
b := kv.pool.Get().(*[]byte)
*b = (*b)[:0]
defer kv.pool.Put(b)
val, err := kv.serialize(*b, row)
if err != nil {
return fmt.Errorf("could not serialize row %v: %w", row, err)
}
return kv.db.Update(func(txn *badger.Txn) error {
return txn.Set(k, v)
return txn.Set(key, val)
})
}

func (kv *kv) get(k []byte) ([]string, error) {
var b []byte
val := kv.pool.Get().(*[]byte)
*val = (*val)[:0]
defer kv.pool.Put(val)
err := kv.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(k)
if err != nil {
Expand All @@ -92,7 +99,7 @@ func (kv *kv) get(k []byte) ([]string, error) {
}
return fmt.Errorf("could not get key: %w", err)
}
b, err = item.ValueCopy(nil)
*val, err = item.ValueCopy(*val)
if err != nil {
return fmt.Errorf("could not read value: %w", err)
}
Expand All @@ -101,7 +108,7 @@ func (kv *kv) get(k []byte) ([]string, error) {
if err != nil {
return nil, fmt.Errorf("could not get key %s: %w", string(k), err)
}
return kv.deserialize(b)
return kv.deserialize(*val)
}

func (kv *kv) getPrefix(k []byte) ([][]string, error) {
Expand Down Expand Up @@ -141,27 +148,20 @@ func (*noLogger) Debugf(string, ...any) {}
func newBadger(dir string, ro bool) (*kv, error) {
opt := badger.DefaultOptions(dir).WithReadOnly(ro).WithBypassLockGuard(true).WithDetectConflicts(false)
slog.Debug("Creating temporary key-value storage", "path", dir)
if os.Getenv("DEBUG") == "" {
if os.Getenv("DEBUG") != "badger" { // TODO: remove that after moving transformnext into transform
opt = opt.WithLogger(&noLogger{})
}
db, err := badger.Open(opt)
if err != nil {
return nil, fmt.Errorf("could not open badger at %s: %w", dir, err)
}
kv := &kv{db: db}
kv.buf = sync.Pool{
New: func() any {
return &bytes.Buffer{}
},
}
kv.bytes = sync.Pool{
New: func() any {
// as of 2025-11 the longest sequence we've got was 159, so setting
// it to 256 to have some room — but this could be set from a cli
// flag to avoid recompiling when source data changes and needs
// more space
b := make([]byte, 0, 256)
return &b
kv := &kv{
db: db,
pool: sync.Pool{
New: func() any {
b := make([]byte, defaultPoolSize)
return &b
},
},
}
return kv, nil
Expand Down
3 changes: 2 additions & 1 deletion transformnext/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func TestSerializeDeserialize(t *testing.T) {
{"empty", []string{}},
} {
t.Run(tc.name, func(t *testing.T) {
s, err := kv.serialize(tc.row)
var b []byte
s, err := kv.serialize(b, tc.row)
if err != nil {
t.Errorf("expected no error serializing, got %s", err)
}
Expand Down
Loading
Loading