Skip to content

Commit 7be63da

Browse files
authored
Merge pull request #432 from cuducos/etl-nxt/writer
Carregando dados no banco de dados
2 parents e047f6e + 6aad855 commit 7be63da

File tree

19 files changed

+1840
-123
lines changed

19 files changed

+1840
-123
lines changed

cmd/cmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func CLI() *cobra.Command {
109109
sampleCLI(),
110110
)
111111
if os.Getenv("DEBUG") != "" {
112-
rootCmd.AddCommand(addDataDir(transformNextCmd))
112+
rootCmd.AddCommand(addDataDir(transformNextCLI()))
113113
rootCmd.AddCommand(addDataDir(cleanupTempCmd))
114114
}
115115
return rootCmd

cmd/transform_next.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package cmd
22

33
import (
4+
"fmt"
5+
46
"github.com/cuducos/minha-receita/transformnext"
57
"github.com/spf13/cobra"
68
)
@@ -9,7 +11,20 @@ var transformNextCmd = &cobra.Command{
911
Use: "transform-next",
1012
Short: "Experimental ETL, work in progress, NOT recommended",
1113
RunE: func(_ *cobra.Command, _ []string) error {
12-
return transformnext.Transform(dir)
14+
db, err := loadDatabase()
15+
if err != nil {
16+
return fmt.Errorf("could not find database: %w", err)
17+
}
18+
defer db.Close()
19+
if cleanUp {
20+
if err := db.Drop(); err != nil {
21+
return err
22+
}
23+
if err := db.Create(); err != nil {
24+
return err
25+
}
26+
}
27+
return transformnext.Transform(dir, db, batchSize, maxParallelDBQueries, !noPrivacy)
1328
},
1429
}
1530

@@ -20,3 +35,17 @@ var cleanupTempCmd = &cobra.Command{
2035
return transformnext.Cleanup()
2136
},
2237
}
38+
39+
func transformNextCLI() *cobra.Command {
40+
transformNextCmd.Flags().IntVarP(
41+
&maxParallelDBQueries,
42+
"max-parallel-db-queries",
43+
"m",
44+
transformnext.MaxParallelDBQueries,
45+
"maximum parallel database queries",
46+
)
47+
transformNextCmd.Flags().BoolVarP(&cleanUp, "clean-up", "c", cleanUp, "drop & recreate the database table before starting")
48+
transformNextCmd.Flags().IntVarP(&batchSize, "batch-size", "b", transformnext.BatchSize, "size of the batch to save to the database")
49+
transformNextCmd.Flags().BoolVarP(&noPrivacy, "no-privacy", "p", noPrivacy, "include email addresses, CPF and other PII in the JSON data")
50+
return transformNextCmd
51+
}

testdata/Qualificacoes.zip

18 Bytes
Binary file not shown.

testutils/arrays.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,6 @@ func AssertArraysHaveSameItems(t *testing.T, a1, a2 []string) {
3030
}
3131

3232
for k := range diff {
33-
t.Errorf("%q appears %d in the first array, but %d in the second array", k, c1[k], c2[k])
33+
t.Errorf("%q appears %d in the first array, but %d in the second array\nFirst array: %#v\nSecond array:%#v", k, c1[k], c2[k], c1, c2)
3434
}
3535
}

transform/kv_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,14 @@ func TestLoad(t *testing.T) {
110110
prefix string
111111
value []string
112112
}{
113-
{"p-19131243", []string{`{"identificador_de_socio":2,"nome_socio":"FERNANDA CAMPAGNUCCI PEREIRA","cnpj_cpf_do_socio":"***690948**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2019-10-25","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":4,"faixa_etaria":"Entre 31 a 40 anos"}`}},
113+
{"p-19131243", []string{`{"identificador_de_socio":2,"nome_socio":"FERNANDA CAMPAGNUCCI PEREIRA","cnpj_cpf_do_socio":"***690948**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2019-10-25","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":4,"faixa_etaria":"Entre 31 a 40 anos"}`}},
114114
{"p-33683111", []string{
115-
`{"identificador_de_socio":2,"nome_socio":"ANDRE DE CESERO","cnpj_cpf_do_socio":"***220050**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-06-16","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":6,"faixa_etaria":"Entre 51 a 60 anos"}`,
116-
`{"identificador_de_socio":2,"nome_socio":"ANTONIO DE PADUA FERREIRA PASSOS","cnpj_cpf_do_socio":"***595901**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-12-08","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`,
117-
`{"identificador_de_socio":2,"nome_socio":"WILSON BIANCARDI COURY","cnpj_cpf_do_socio":"***414127**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2019-06-18","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":8,"faixa_etaria":"Entre 71 a 80 anos"}`,
118-
`{"identificador_de_socio":2,"nome_socio":"GILENO GURJAO BARRETO","cnpj_cpf_do_socio":"***099595**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2020-02-03","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`,
119-
`{"identificador_de_socio":2,"nome_socio":"RICARDO CEZAR DE MOURA JUCA","cnpj_cpf_do_socio":"***989951**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2020-05-12","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`,
120-
`{"identificador_de_socio":2,"nome_socio":"ANTONINO DOS SANTOS GUERRA NETO","cnpj_cpf_do_socio":"***073447**","codigo_qualificacao_socio":5,"qualificacao_socio":"Administrador","data_entrada_sociedade":"2019-02-11","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`,
115+
`{"identificador_de_socio":2,"nome_socio":"ANDRE DE CESERO","cnpj_cpf_do_socio":"***220050**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-06-16","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":6,"faixa_etaria":"Entre 51 a 60 anos"}`,
116+
`{"identificador_de_socio":2,"nome_socio":"ANTONIO DE PADUA FERREIRA PASSOS","cnpj_cpf_do_socio":"***595901**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-12-08","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`,
117+
`{"identificador_de_socio":2,"nome_socio":"WILSON BIANCARDI COURY","cnpj_cpf_do_socio":"***414127**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2019-06-18","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":8,"faixa_etaria":"Entre 71 a 80 anos"}`,
118+
`{"identificador_de_socio":2,"nome_socio":"GILENO GURJAO BARRETO","cnpj_cpf_do_socio":"***099595**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2020-02-03","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`,
119+
`{"identificador_de_socio":2,"nome_socio":"RICARDO CEZAR DE MOURA JUCA","cnpj_cpf_do_socio":"***989951**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2020-05-12","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`,
120+
`{"identificador_de_socio":2,"nome_socio":"ANTONINO DOS SANTOS GUERRA NETO","cnpj_cpf_do_socio":"***073447**","codigo_qualificacao_socio":5,"qualificacao_socio":"Administrador","data_entrada_sociedade":"2019-02-11","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`,
121121
}},
122122
} {
123123
assertKeyValues(t, kv, tc.prefix, tc.value)

transformnext/badger.go

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -12,58 +12,60 @@ import (
1212
"github.com/dgraph-io/badger/v4"
1313
)
1414

15+
// As of 2025-11 the longest sequence we've got was 257, so setting it to 512 to
16+
// have some room — maybe this could be set from a CLI flag to avoid recompiling
17+
// when source data changes and needs more space.
18+
const defaultPoolSize = 512
19+
1520
type kv struct {
16-
db *badger.DB
17-
buf sync.Pool
18-
bytes sync.Pool
21+
db *badger.DB
22+
pool sync.Pool
1923
}
2024

21-
func (kv *kv) serialize(row []string) ([]byte, error) {
22-
buf := kv.buf.Get().(*bytes.Buffer)
23-
defer func() {
24-
buf.Reset()
25-
kv.buf.Put(buf)
26-
}()
25+
func (kv *kv) serialize(b []byte, row []string) ([]byte, error) {
26+
var err error
2727
for _, v := range row {
2828
s := uint32(len(v)) // used to deserialize later on
29-
if err := binary.Write(buf, binary.LittleEndian, s); err != nil {
30-
return nil, err
31-
}
32-
if _, err := buf.Write([]byte(v)); err != nil {
29+
b, err = binary.Append(b, binary.LittleEndian, s)
30+
if err != nil {
3331
return nil, err
3432
}
33+
b = append(b, v...)
3534
}
36-
return buf.Bytes(), nil
35+
return b, nil
3736
}
3837

39-
func (kv *kv) deserialize(b []byte) ([]string, error) {
40-
if b == nil {
38+
func (kv *kv) deserialize(val []byte) ([]string, error) {
39+
if val == nil {
4140
return nil, nil
4241
}
4342
var out []string
44-
r := bytes.NewReader(b)
43+
r := bytes.NewReader(val)
4544
for r.Len() > 0 {
46-
var s uint32
47-
if err := binary.Read(r, binary.LittleEndian, &s); err != nil {
48-
return nil, fmt.Errorf("error reading size: %w", err)
49-
}
50-
raw := kv.bytes.Get().(*[]byte)
51-
if cap(*raw) < int(s) {
52-
return nil, fmt.Errorf("buffer from pool too small (%d): needs %d", cap(*raw), s)
53-
} else {
54-
*raw = (*raw)[:s]
55-
}
56-
n, err := io.ReadFull(r, *raw)
45+
err := func() error {
46+
var s uint32
47+
if err := binary.Read(r, binary.LittleEndian, &s); err != nil {
48+
return fmt.Errorf("error reading size: %w", err)
49+
}
50+
b := kv.pool.Get().(*[]byte)
51+
*b = (*b)[:s]
52+
defer kv.pool.Put(b)
53+
if cap(*b) < int(s) {
54+
return fmt.Errorf("buffer from pool too small (%d): needs %d", cap(*b), s)
55+
}
56+
n, err := io.ReadFull(r, *b)
57+
if err != nil {
58+
return fmt.Errorf("could not deserialize value: %w", err)
59+
}
60+
if n != int(s) {
61+
return fmt.Errorf("expected to read %d bytes, got %d", s, n)
62+
}
63+
out = append(out, string(*b))
64+
return nil
65+
}()
5766
if err != nil {
58-
kv.bytes.Put(raw)
59-
return nil, fmt.Errorf("could not deserialize value: %w", err)
60-
}
61-
if n != int(s) {
62-
kv.bytes.Put(raw)
63-
return nil, fmt.Errorf("expected to read %d bytes, got %d", s, n)
67+
return nil, err
6468
}
65-
out = append(out, string(*raw))
66-
kv.bytes.Put(raw)
6769
}
6870
return out, nil
6971
}
@@ -72,18 +74,23 @@ func (kv *kv) put(src *source, id string, row []string) error {
7274
if len(row) == 0 {
7375
return nil
7476
}
75-
k := src.keyFor(id)
76-
v, err := kv.serialize(row)
77+
key := src.keyFor(id)
78+
b := kv.pool.Get().(*[]byte)
79+
*b = (*b)[:0]
80+
defer kv.pool.Put(b)
81+
val, err := kv.serialize(*b, row)
7782
if err != nil {
7883
return fmt.Errorf("could not serialize row %v: %w", row, err)
7984
}
8085
return kv.db.Update(func(txn *badger.Txn) error {
81-
return txn.Set(k, v)
86+
return txn.Set(key, val)
8287
})
8388
}
8489

8590
func (kv *kv) get(k []byte) ([]string, error) {
86-
var b []byte
91+
val := kv.pool.Get().(*[]byte)
92+
*val = (*val)[:0]
93+
defer kv.pool.Put(val)
8794
err := kv.db.View(func(txn *badger.Txn) error {
8895
item, err := txn.Get(k)
8996
if err != nil {
@@ -92,7 +99,7 @@ func (kv *kv) get(k []byte) ([]string, error) {
9299
}
93100
return fmt.Errorf("could not get key: %w", err)
94101
}
95-
b, err = item.ValueCopy(nil)
102+
*val, err = item.ValueCopy(*val)
96103
if err != nil {
97104
return fmt.Errorf("could not read value: %w", err)
98105
}
@@ -101,7 +108,7 @@ func (kv *kv) get(k []byte) ([]string, error) {
101108
if err != nil {
102109
return nil, fmt.Errorf("could not get key %s: %w", string(k), err)
103110
}
104-
return kv.deserialize(b)
111+
return kv.deserialize(*val)
105112
}
106113

107114
func (kv *kv) getPrefix(k []byte) ([][]string, error) {
@@ -141,27 +148,20 @@ func (*noLogger) Debugf(string, ...any) {}
141148
func newBadger(dir string, ro bool) (*kv, error) {
142149
opt := badger.DefaultOptions(dir).WithReadOnly(ro).WithBypassLockGuard(true).WithDetectConflicts(false)
143150
slog.Debug("Creating temporary key-value storage", "path", dir)
144-
if os.Getenv("DEBUG") == "" {
151+
if os.Getenv("DEBUG") != "badger" { // TODO: remove that after moving transformnext into transform
145152
opt = opt.WithLogger(&noLogger{})
146153
}
147154
db, err := badger.Open(opt)
148155
if err != nil {
149156
return nil, fmt.Errorf("could not open badger at %s: %w", dir, err)
150157
}
151-
kv := &kv{db: db}
152-
kv.buf = sync.Pool{
153-
New: func() any {
154-
return &bytes.Buffer{}
155-
},
156-
}
157-
kv.bytes = sync.Pool{
158-
New: func() any {
159-
// as of 2025-11 the longest sequence we've got was 159, so setting
160-
// it to 256 to have some room — but this could be set from a cli
161-
// flag to avoid recompiling when source data changes and needs
162-
// more space
163-
b := make([]byte, 0, 256)
164-
return &b
158+
kv := &kv{
159+
db: db,
160+
pool: sync.Pool{
161+
New: func() any {
162+
b := make([]byte, defaultPoolSize)
163+
return &b
164+
},
165165
},
166166
}
167167
return kv, nil

transformnext/badger_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ func TestSerializeDeserialize(t *testing.T) {
2222
{"empty", []string{}},
2323
} {
2424
t.Run(tc.name, func(t *testing.T) {
25-
s, err := kv.serialize(tc.row)
25+
var b []byte
26+
s, err := kv.serialize(b, tc.row)
2627
if err != nil {
2728
t.Errorf("expected no error serializing, got %s", err)
2829
}

0 commit comments

Comments
 (0)