Skip to content

Commit 2f81496

Browse files
committed
Uses source.sendTo in KV storage loading
1 parent d83bcf1 commit 2f81496

File tree

11 files changed

+287
-376
lines changed

11 files changed

+287
-376
lines changed

cmd/transform.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88
)
99

1010
const transformHelper = `
11-
Convert the CSV files from the Federal Revenue for venues (ESTABELE group of
12-
files) into records in the database, 1 record per CNPJ, joining information
13-
from all other source CSV files.
11+
Convert the CSV files from the Federal Revenue for venues (Estabelecimentos*.zip
12+
group of files) into records in the database, 1 record per CNPJ, joining
13+
information from all other source CSV files.
1414
1515
The transformation process is divided into two steps:
1616
1. Load relational data to a key-value store
@@ -19,6 +19,7 @@ The transformation process is divided into two steps:
1919

2020
var (
2121
maxParallelDBQueries int
22+
maxParallelKVWrites int
2223
batchSize int
2324
cleanUp bool
2425
noPrivacy bool
@@ -47,7 +48,7 @@ var transformCmd = &cobra.Command{
4748
return err
4849
}
4950
}
50-
return transform.Transform(dir, db, maxParallelDBQueries, batchSize, !noPrivacy)
51+
return transform.Transform(dir, db, maxParallelDBQueries, maxParallelKVWrites, batchSize, !noPrivacy)
5152
},
5253
}
5354

@@ -61,6 +62,13 @@ func transformCLI() *cobra.Command {
6162
transform.MaxParallelDBQueries,
6263
"maximum parallel database queries",
6364
)
65+
transformCmd.Flags().IntVarP(
66+
&maxParallelKVWrites,
67+
"max-parallel-kv-writes",
68+
"k",
69+
transform.MaxParallelKVWrites,
70+
"the default is optimized for high throughput SATA SSD. Recommended values are between 64 and 128 for HDD, 256 and 1,024 for SSD, and 4,096 and 16,384 for NVMe SSD.",
71+
)
6472
transformCmd.Flags().IntVarP(&batchSize, "batch-size", "b", transform.BatchSize, "size of the batch to save to the database")
6573
transformCmd.Flags().BoolVarP(&cleanUp, "clean-up", "c", cleanUp, "drop & recreate the database table before starting")
6674
transformCmd.Flags().BoolVarP(&noPrivacy, "no-privacy", "p", noPrivacy, "include email addresses, CPF and other PII in the JSON data")

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/schollz/progressbar/v3 v3.18.0
1515
github.com/spf13/cobra v1.9.1
1616
go.mongodb.org/mongo-driver v1.17.3
17+
golang.org/x/sync v0.14.0
1718
golang.org/x/text v0.25.0
1819
)
1920

@@ -46,7 +47,6 @@ require (
4647
go.opentelemetry.io/otel/trace v1.35.0 // indirect
4748
golang.org/x/crypto v0.36.0 // indirect
4849
golang.org/x/net v0.38.0 // indirect
49-
golang.org/x/sync v0.14.0 // indirect
5050
golang.org/x/sys v0.31.0 // indirect
5151
golang.org/x/term v0.30.0 // indirect
5252
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect

transform/badger.go

Lines changed: 40 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,6 @@ func keyForBase(n string) string { return fmt.Sprintf("b-%s", n) }
1515
func keyForSimpleTaxes(n string) string { return fmt.Sprintf("st-%s", n) }
1616
func keyForTaxRegime(n string) string { return fmt.Sprintf("tr-%s", cnpj.Unmask(n)) }
1717

18-
// functions to read data from Badger
19-
20-
func partnersOf(db *badger.DB, n string) ([]PartnerData, error) {
21-
p := []PartnerData{}
22-
err := db.View(func(txn *badger.Txn) error {
23-
i, err := txn.Get([]byte(keyForPartners(n)))
24-
if errors.Is(err, badger.ErrKeyNotFound) {
25-
return nil
26-
}
27-
if err != nil {
28-
return fmt.Errorf("could not get key %s: %w", keyForPartners(n), err)
29-
}
30-
v, err := i.ValueCopy(nil)
31-
if err != nil {
32-
return fmt.Errorf("could not read value for key %s: %w", keyForPartners(n), err)
33-
}
34-
if err := json.Unmarshal(v, &p); err != nil {
35-
return fmt.Errorf("could not parse partners: %w", err)
36-
}
37-
return nil
38-
})
39-
if err != nil {
40-
return nil, fmt.Errorf("error getting partners for %s: %w", n, err)
41-
}
42-
return p, nil
43-
}
44-
4518
func baseOf(db *badger.DB, n string) (baseData, error) {
4619
var d baseData
4720
err := db.View(func(txn *badger.Txn) error {
@@ -93,119 +66,52 @@ func simpleTaxesOf(db *badger.DB, n string) (simpleTaxesData, error) {
9366
}
9467

9568
func taxRegimeOf(db *badger.DB, n string) (TaxRegimes, error) {
96-
var d TaxRegimes
97-
err := db.View(func(txn *badger.Txn) error {
98-
i, err := txn.Get([]byte(keyForTaxRegime(n)))
99-
if errors.Is(err, badger.ErrKeyNotFound) {
100-
return nil
101-
}
102-
if err != nil {
103-
return fmt.Errorf("could not get key %s: %w", keyForSimpleTaxes(n), err)
104-
}
105-
v, err := i.ValueCopy(nil)
106-
if err != nil {
107-
return fmt.Errorf("could not read value for key %s: %w", keyForSimpleTaxes(n), err)
108-
}
109-
if err := json.Unmarshal(v, &d); err != nil {
110-
return fmt.Errorf("could not parse taxes: %w", err)
69+
var ts TaxRegimes
70+
pre := []byte(keyForTaxRegime(n))
71+
db.View(func(txn *badger.Txn) error {
72+
it := txn.NewIterator(badger.DefaultIteratorOptions)
73+
defer it.Close()
74+
for it.Seek(pre); it.ValidForPrefix(pre); it.Next() {
75+
var t TaxRegime
76+
i := it.Item()
77+
err := i.Value(func(v []byte) error {
78+
if err := json.Unmarshal(v, &t); err != nil {
79+
return fmt.Errorf("could not parse tax regime: %w", err)
80+
}
81+
ts = append(ts, t)
82+
return nil
83+
})
84+
if err != nil {
85+
return err
86+
}
11187
}
11288
return nil
11389
})
114-
if err != nil {
115-
return TaxRegimes{}, fmt.Errorf("error getting tax regimes for %s: %w", n, err)
116-
}
117-
sort.Sort(TaxRegimes(d))
118-
return d, nil
90+
sort.Sort(TaxRegimes(ts))
91+
return ts, nil
11992
}
12093

121-
// functions to write data to Badger
122-
123-
func mergePartners(db *badger.DB, k, b []byte) ([]byte, error) {
124-
curr := []byte("[]")
125-
err := db.View(func(tx *badger.Txn) error {
126-
i, err := tx.Get(k)
127-
if errors.Is(err, badger.ErrKeyNotFound) {
128-
return nil
129-
}
130-
if err != nil {
131-
return fmt.Errorf("error getting partner key: %w", err)
132-
}
133-
curr, err = i.ValueCopy(nil)
134-
if err != nil {
135-
return fmt.Errorf("error reading partner value: %w", err)
136-
}
137-
return nil
138-
})
139-
if err != nil {
140-
return nil, fmt.Errorf("error getting current partners: %w", err)
141-
}
142-
qsa := []PartnerData{}
143-
if curr != nil {
144-
if err := json.Unmarshal(curr, &qsa); err != nil {
145-
return nil, fmt.Errorf("could not parse partners: %w", err)
146-
}
147-
}
148-
var p PartnerData
149-
if err := json.Unmarshal(b, &p); err != nil {
150-
return nil, fmt.Errorf("could not parse partner: %w", err)
151-
}
152-
qsa = append(qsa, p)
153-
j, err := json.Marshal(&qsa)
154-
if err != nil {
155-
return nil, fmt.Errorf("could not convert partner to json: %w", err)
156-
}
157-
return j, nil
158-
}
159-
160-
func mergeTaxRegimes(db *badger.DB, k, b []byte) ([]byte, error) {
161-
curr := []byte("[]")
162-
err := db.View(func(tx *badger.Txn) error {
163-
i, err := tx.Get(k)
164-
if errors.Is(err, badger.ErrKeyNotFound) {
165-
return nil
166-
}
167-
if err != nil {
168-
return fmt.Errorf("error getting tax regime key: %w", err)
169-
}
170-
curr, err = i.ValueCopy(nil)
171-
if err != nil {
172-
return fmt.Errorf("error reading tax regime value: %w", err)
94+
func partnersOf(db *badger.DB, n string) ([]PartnerData, error) {
95+
var ps []PartnerData
96+
pre := []byte(keyForPartners(n))
97+
db.View(func(txn *badger.Txn) error {
98+
it := txn.NewIterator(badger.DefaultIteratorOptions)
99+
defer it.Close()
100+
for it.Seek(pre); it.ValidForPrefix(pre); it.Next() {
101+
var p PartnerData
102+
i := it.Item()
103+
err := i.Value(func(v []byte) error {
104+
if err := json.Unmarshal(v, &p); err != nil {
105+
return fmt.Errorf("could not parse parter: %w", err)
106+
}
107+
ps = append(ps, p)
108+
return nil
109+
})
110+
if err != nil {
111+
return err
112+
}
173113
}
174114
return nil
175115
})
176-
if err != nil {
177-
return nil, fmt.Errorf("error getting current tax regimes: %w", err)
178-
}
179-
ts := TaxRegimes{}
180-
if curr != nil {
181-
if err := json.Unmarshal(curr, &ts); err != nil {
182-
return nil, fmt.Errorf("could not parse tax regimes: %w", err)
183-
}
184-
}
185-
var t TaxRegime
186-
if err := json.Unmarshal(b, &t); err != nil {
187-
return nil, fmt.Errorf("could not parse tax regime: %w", err)
188-
}
189-
ts = append(ts, t)
190-
j, err := json.Marshal(&ts)
191-
if err != nil {
192-
return nil, fmt.Errorf("could not convert tax regime to json: %w", err)
193-
}
194-
return j, nil
195-
}
196-
197-
func saveItem(db *badger.DB, s sourceType, k, v []byte) (err error) {
198-
if s == partners {
199-
v, err = mergePartners(db, k, v)
200-
if err != nil {
201-
return fmt.Errorf("error merging partners: %w", err)
202-
}
203-
}
204-
if s == realProfit || s == presumedProfit || s == arbitratedProfit || s == noTaxes {
205-
v, err = mergeTaxRegimes(db, k, v)
206-
if err != nil {
207-
return fmt.Errorf("error merging taxes: %w", err)
208-
}
209-
}
210-
return db.Update(func(tx *badger.Txn) error { return tx.Set(k, v) })
116+
return ps, nil
211117
}

0 commit comments

Comments
 (0)