Skip to content

Commit fa356b9

Browse files
committed
Implements pre/post-load routine in the new ETL
1 parent 9eb82bc commit fa356b9

File tree

1 file changed

+68
-19
lines changed

1 file changed

+68
-19
lines changed

transformnext/transform.go

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strings"
1111
"time"
1212

13+
"github.com/cuducos/minha-receita/download"
1314
"github.com/schollz/progressbar/v3"
1415
"golang.org/x/sync/errgroup"
1516
)
@@ -24,8 +25,22 @@ const (
2425
MaxParallelDBQueries = 8
2526
)
2627

28+
var extraIndexes = [...]string{
29+
"cnae_fiscal",
30+
"cnaes_secundarios.codigo",
31+
"codigo_municipio",
32+
"codigo_municipio_ibge",
33+
"codigo_natureza_juridica",
34+
"qsa.cnpj_cpf_do_socio",
35+
"uf",
36+
}
37+
2738
type database interface {
39+
PreLoad() error
2840
CreateCompanies([][]string) error
41+
PostLoad() error
42+
CreateExtraIndexes([]string) error
43+
MetaSave(string, string) error
2944
}
3045

3146
func sources() map[string]*source { // all but Estabelecimentos (this one is loaded later on)
@@ -65,27 +80,35 @@ func newProgressBar(label string, srcs int) (*progressbar.ProgressBar, error) {
6580
return bar, bar.RenderBlank()
6681
}
6782

68-
func Cleanup() error {
69-
return filepath.WalkDir(os.TempDir(), func(pth string, d fs.DirEntry, err error) error {
70-
if !d.IsDir() {
71-
return nil
72-
}
73-
if !strings.HasPrefix(d.Name(), "minha-receita-") {
74-
return nil
75-
}
76-
part := strings.Split(d.Name(), "-")
77-
if len(part) != 4 {
78-
return nil
79-
}
80-
if _, err := time.Parse("20060102150405", part[2]); err != nil {
81-
return nil
82-
}
83-
fmt.Printf("Removing %s\n", pth)
84-
return os.RemoveAll(pth)
85-
})
83+
func saveUpdatedAt(db database, dir string) error {
84+
slog.Info("Saving the updated at date to the database…")
85+
p := filepath.Join(dir, download.FederalRevenueUpdatedAt)
86+
v, err := os.ReadFile(p)
87+
if err != nil {
88+
return fmt.Errorf("error reading %s: %w", p, err)
89+
90+
}
91+
return db.MetaSave("updated-at", string(v))
92+
}
93+
94+
func postLoad(db database) error {
95+
slog.Info("Consolidating the database…")
96+
if err := db.PostLoad(); err != nil {
97+
return err
98+
}
99+
slog.Info("Database consolidated!")
100+
slog.Info("Creating indexes…")
101+
if err := db.CreateExtraIndexes(extraIndexes[:]); err != nil {
102+
return err
103+
}
104+
slog.Info("Indexes created!")
105+
return nil
86106
}
87107

88108
func Transform(dir string, db database, batch, maxDB int, privacy bool) error {
109+
if err := db.PreLoad(); err != nil {
110+
return err
111+
}
89112
srcs := sources()
90113
tmp, err := os.MkdirTemp("", fmt.Sprintf("minha-receita-%s-*", time.Now().Format("20060102150405")))
91114
if err != nil {
@@ -121,5 +144,31 @@ func Transform(dir string, db database, batch, maxDB int, privacy bool) error {
121144
if err := g.Wait(); err != nil {
122145
return err
123146
}
124-
return writeJSONs(ctx, srcs, kv, db, maxDB, batch, dir, privacy)
147+
if err := writeJSONs(ctx, srcs, kv, db, maxDB, batch, dir, privacy); err != nil {
148+
return err
149+
}
150+
if err := postLoad(db); err != nil {
151+
return err
152+
}
153+
return saveUpdatedAt(db, dir)
154+
}
155+
156+
func Cleanup() error {
157+
return filepath.WalkDir(os.TempDir(), func(pth string, d fs.DirEntry, err error) error {
158+
if !d.IsDir() {
159+
return nil
160+
}
161+
if !strings.HasPrefix(d.Name(), "minha-receita-") {
162+
return nil
163+
}
164+
part := strings.Split(d.Name(), "-")
165+
if len(part) != 4 {
166+
return nil
167+
}
168+
if _, err := time.Parse("20060102150405", part[2]); err != nil {
169+
return nil
170+
}
171+
fmt.Printf("Removing %s\n", pth)
172+
return os.RemoveAll(pth)
173+
})
125174
}

0 commit comments

Comments
 (0)