diff --git a/.github/workflows/gofmt.yaml b/.github/workflows/gofmt.yaml index 8e3e5c36..42c75854 100644 --- a/.github/workflows/gofmt.yaml +++ b/.github/workflows/gofmt.yaml @@ -7,7 +7,7 @@ jobs: GOEXPERIMENT: jsonv2 steps: - uses: actions/checkout@v4 - - uses: WillAbides/setup-go-faster@v1.14.0 + - uses: actions/setup-go@v6 with: go-version: "1.25.x" - run: if [ "$(gofmt -s -l . | wc -l)" -gt 0 ]; then exit 1; fi diff --git a/.github/workflows/golint.yaml b/.github/workflows/golint.yaml index 4f11b539..db5a4e57 100644 --- a/.github/workflows/golint.yaml +++ b/.github/workflows/golint.yaml @@ -7,7 +7,7 @@ jobs: GOEXPERIMENT: jsonv2 steps: - uses: actions/checkout@v4 - - uses: WillAbides/setup-go-faster@v1.14.0 + - uses: actions/setup-go@v6 with: go-version: "1.25.x" - uses: golangci/golangci-lint-action@v8 diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index a0fa477d..328f79f5 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -19,8 +19,10 @@ jobs: steps: - uses: actions/checkout@v4 - uses: ikalnytskyi/action-setup-postgres@v7 + with: + postgres-version: 16 - uses: ankane/setup-mongodb@v1 - - uses: WillAbides/setup-go-faster@v1.14.0 + - uses: actions/setup-go@v6 with: go-version: ${{ matrix.go }} - run: go test ./... diff --git a/cmd/cmd.go b/cmd/cmd.go index cc10db89..57431e48 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -109,7 +109,7 @@ func CLI() *cobra.Command { sampleCLI(), ) if os.Getenv("DEBUG") != "" { - rootCmd.AddCommand(addDataDir(transformNextCmd)) + rootCmd.AddCommand(addDataDir(transformNextCLI())) rootCmd.AddCommand(addDataDir(cleanupTempCmd)) } return rootCmd diff --git a/cmd/transform_next.go b/cmd/transform_next.go index c31f760b..e0beb333 100644 --- a/cmd/transform_next.go +++ b/cmd/transform_next.go @@ -1,6 +1,8 @@ package cmd import ( + "fmt" + "github.com/cuducos/minha-receita/transformnext" "github.com/spf13/cobra" ) @@ -9,7 +11,20 @@ var transformNextCmd = &cobra.Command{ Use: "transform-next", Short: "Experimental ETL, work in progress, NOT recommended", RunE: func(_ *cobra.Command, _ []string) error { - return transformnext.Transform(dir) + db, err := loadDatabase() + if err != nil { + return fmt.Errorf("could not find database: %w", err) + } + defer db.Close() + if cleanUp { + if err := db.Drop(); err != nil { + return err + } + if err := db.Create(); err != nil { + return err + } + } + return transformnext.Transform(dir, db, batchSize, maxParallelDBQueries, !noPrivacy) }, } @@ -20,3 +35,17 @@ var cleanupTempCmd = &cobra.Command{ return transformnext.Cleanup() }, } + +func transformNextCLI() *cobra.Command { + transformNextCmd.Flags().IntVarP( + &maxParallelDBQueries, + "max-parallel-db-queries", + "m", + transformnext.MaxParallelDBQueries, + "maximum parallel database queries", + ) + transformNextCmd.Flags().BoolVarP(&cleanUp, "clean-up", "c", cleanUp, "drop & recreate the database table before starting") + transformNextCmd.Flags().IntVarP(&batchSize, "batch-size", "b", transformnext.BatchSize, "size of the batch to save to the database") + transformNextCmd.Flags().BoolVarP(&noPrivacy, "no-privacy", "p", noPrivacy, "include email addresses, CPF and other PII in the JSON data") + return transformNextCmd +} diff --git a/testdata/Qualificacoes.zip b/testdata/Qualificacoes.zip index 96089fc3..04c19cea 100644 Binary files a/testdata/Qualificacoes.zip and b/testdata/Qualificacoes.zip differ diff --git a/testutils/arrays.go b/testutils/arrays.go index 8278a584..0d194c50 100644 --- a/testutils/arrays.go +++ b/testutils/arrays.go @@ -30,6 +30,6 @@ func AssertArraysHaveSameItems(t *testing.T, a1, a2 []string) { } for k := range diff { - t.Errorf("%q appears %d in the first array, but %d in the second array", k, c1[k], c2[k]) + t.Errorf("%q appears %d in the first array, but %d in the second array\nFirst array: %#v\nSecond array:%#v", k, c1[k], c2[k], c1, c2) } } diff --git a/transform/kv_test.go b/transform/kv_test.go index 1156afde..66d4cedc 100644 --- a/transform/kv_test.go +++ b/transform/kv_test.go @@ -110,14 +110,14 @@ func TestLoad(t *testing.T) { prefix string value []string }{ - {"p-19131243", []string{`{"identificador_de_socio":2,"nome_socio":"FERNANDA CAMPAGNUCCI PEREIRA","cnpj_cpf_do_socio":"***690948**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2019-10-25","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":4,"faixa_etaria":"Entre 31 a 40 anos"}`}}, + {"p-19131243", []string{`{"identificador_de_socio":2,"nome_socio":"FERNANDA CAMPAGNUCCI PEREIRA","cnpj_cpf_do_socio":"***690948**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2019-10-25","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":4,"faixa_etaria":"Entre 31 a 40 anos"}`}}, {"p-33683111", []string{ - `{"identificador_de_socio":2,"nome_socio":"ANDRE DE CESERO","cnpj_cpf_do_socio":"***220050**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-06-16","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":6,"faixa_etaria":"Entre 51 a 60 anos"}`, - `{"identificador_de_socio":2,"nome_socio":"ANTONIO DE PADUA FERREIRA PASSOS","cnpj_cpf_do_socio":"***595901**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-12-08","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`, - `{"identificador_de_socio":2,"nome_socio":"WILSON BIANCARDI COURY","cnpj_cpf_do_socio":"***414127**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2019-06-18","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":8,"faixa_etaria":"Entre 71 a 80 anos"}`, - `{"identificador_de_socio":2,"nome_socio":"GILENO GURJAO BARRETO","cnpj_cpf_do_socio":"***099595**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2020-02-03","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`, - `{"identificador_de_socio":2,"nome_socio":"RICARDO CEZAR DE MOURA JUCA","cnpj_cpf_do_socio":"***989951**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2020-05-12","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`, - `{"identificador_de_socio":2,"nome_socio":"ANTONINO DOS SANTOS GUERRA NETO","cnpj_cpf_do_socio":"***073447**","codigo_qualificacao_socio":5,"qualificacao_socio":"Administrador","data_entrada_sociedade":"2019-02-11","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":null,"codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`, + `{"identificador_de_socio":2,"nome_socio":"ANDRE DE CESERO","cnpj_cpf_do_socio":"***220050**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-06-16","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":6,"faixa_etaria":"Entre 51 a 60 anos"}`, + `{"identificador_de_socio":2,"nome_socio":"ANTONIO DE PADUA FERREIRA PASSOS","cnpj_cpf_do_socio":"***595901**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2016-12-08","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`, + `{"identificador_de_socio":2,"nome_socio":"WILSON BIANCARDI COURY","cnpj_cpf_do_socio":"***414127**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2019-06-18","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":8,"faixa_etaria":"Entre 71 a 80 anos"}`, + `{"identificador_de_socio":2,"nome_socio":"GILENO GURJAO BARRETO","cnpj_cpf_do_socio":"***099595**","codigo_qualificacao_socio":16,"qualificacao_socio":"Presidente","data_entrada_sociedade":"2020-02-03","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`, + `{"identificador_de_socio":2,"nome_socio":"RICARDO CEZAR DE MOURA JUCA","cnpj_cpf_do_socio":"***989951**","codigo_qualificacao_socio":10,"qualificacao_socio":"Diretor","data_entrada_sociedade":"2020-05-12","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":5,"faixa_etaria":"Entre 41 a 50 anos"}`, + `{"identificador_de_socio":2,"nome_socio":"ANTONINO DOS SANTOS GUERRA NETO","cnpj_cpf_do_socio":"***073447**","codigo_qualificacao_socio":5,"qualificacao_socio":"Administrador","data_entrada_sociedade":"2019-02-11","codigo_pais":null,"pais":null,"cpf_representante_legal":"***000000**","nome_representante_legal":"","codigo_qualificacao_representante_legal":0,"qualificacao_representante_legal":"Não informada","codigo_faixa_etaria":7,"faixa_etaria":"Entre 61 a 70 anos"}`, }}, } { assertKeyValues(t, kv, tc.prefix, tc.value) diff --git a/transformnext/badger.go b/transformnext/badger.go index 7d5f0c78..53df0ce8 100644 --- a/transformnext/badger.go +++ b/transformnext/badger.go @@ -12,58 +12,60 @@ import ( "github.com/dgraph-io/badger/v4" ) +// As of 2025-11 the longest sequence we've got was 257, so setting it to 512 to +// have some room — maybe this could be set from a CLI flag to avoid recompiling +// when source data changes and needs more space. +const defaultPoolSize = 512 + type kv struct { - db *badger.DB - buf sync.Pool - bytes sync.Pool + db *badger.DB + pool sync.Pool } -func (kv *kv) serialize(row []string) ([]byte, error) { - buf := kv.buf.Get().(*bytes.Buffer) - defer func() { - buf.Reset() - kv.buf.Put(buf) - }() +func (kv *kv) serialize(b []byte, row []string) ([]byte, error) { + var err error for _, v := range row { s := uint32(len(v)) // used to deserialize later on - if err := binary.Write(buf, binary.LittleEndian, s); err != nil { - return nil, err - } - if _, err := buf.Write([]byte(v)); err != nil { + b, err = binary.Append(b, binary.LittleEndian, s) + if err != nil { return nil, err } + b = append(b, v...) } - return buf.Bytes(), nil + return b, nil } -func (kv *kv) deserialize(b []byte) ([]string, error) { - if b == nil { +func (kv *kv) deserialize(val []byte) ([]string, error) { + if val == nil { return nil, nil } var out []string - r := bytes.NewReader(b) + r := bytes.NewReader(val) for r.Len() > 0 { - var s uint32 - if err := binary.Read(r, binary.LittleEndian, &s); err != nil { - return nil, fmt.Errorf("error reading size: %w", err) - } - raw := kv.bytes.Get().(*[]byte) - if cap(*raw) < int(s) { - return nil, fmt.Errorf("buffer from pool too small (%d): needs %d", cap(*raw), s) - } else { - *raw = (*raw)[:s] - } - n, err := io.ReadFull(r, *raw) + err := func() error { + var s uint32 + if err := binary.Read(r, binary.LittleEndian, &s); err != nil { + return fmt.Errorf("error reading size: %w", err) + } + b := kv.pool.Get().(*[]byte) + *b = (*b)[:s] + defer kv.pool.Put(b) + if cap(*b) < int(s) { + return fmt.Errorf("buffer from pool too small (%d): needs %d", cap(*b), s) + } + n, err := io.ReadFull(r, *b) + if err != nil { + return fmt.Errorf("could not deserialize value: %w", err) + } + if n != int(s) { + return fmt.Errorf("expected to read %d bytes, got %d", s, n) + } + out = append(out, string(*b)) + return nil + }() if err != nil { - kv.bytes.Put(raw) - return nil, fmt.Errorf("could not deserialize value: %w", err) - } - if n != int(s) { - kv.bytes.Put(raw) - return nil, fmt.Errorf("expected to read %d bytes, got %d", s, n) + return nil, err } - out = append(out, string(*raw)) - kv.bytes.Put(raw) } return out, nil } @@ -72,18 +74,23 @@ func (kv *kv) put(src *source, id string, row []string) error { if len(row) == 0 { return nil } - k := src.keyFor(id) - v, err := kv.serialize(row) + key := src.keyFor(id) + b := kv.pool.Get().(*[]byte) + *b = (*b)[:0] + defer kv.pool.Put(b) + val, err := kv.serialize(*b, row) if err != nil { return fmt.Errorf("could not serialize row %v: %w", row, err) } return kv.db.Update(func(txn *badger.Txn) error { - return txn.Set(k, v) + return txn.Set(key, val) }) } func (kv *kv) get(k []byte) ([]string, error) { - var b []byte + val := kv.pool.Get().(*[]byte) + *val = (*val)[:0] + defer kv.pool.Put(val) err := kv.db.View(func(txn *badger.Txn) error { item, err := txn.Get(k) if err != nil { @@ -92,7 +99,7 @@ func (kv *kv) get(k []byte) ([]string, error) { } return fmt.Errorf("could not get key: %w", err) } - b, err = item.ValueCopy(nil) + *val, err = item.ValueCopy(*val) if err != nil { return fmt.Errorf("could not read value: %w", err) } @@ -101,7 +108,7 @@ func (kv *kv) get(k []byte) ([]string, error) { if err != nil { return nil, fmt.Errorf("could not get key %s: %w", string(k), err) } - return kv.deserialize(b) + return kv.deserialize(*val) } func (kv *kv) getPrefix(k []byte) ([][]string, error) { @@ -141,27 +148,20 @@ func (*noLogger) Debugf(string, ...any) {} func newBadger(dir string, ro bool) (*kv, error) { opt := badger.DefaultOptions(dir).WithReadOnly(ro).WithBypassLockGuard(true).WithDetectConflicts(false) slog.Debug("Creating temporary key-value storage", "path", dir) - if os.Getenv("DEBUG") == "" { + if os.Getenv("DEBUG") != "badger" { // TODO: remove that after moving transformnext into transform opt = opt.WithLogger(&noLogger{}) } db, err := badger.Open(opt) if err != nil { return nil, fmt.Errorf("could not open badger at %s: %w", dir, err) } - kv := &kv{db: db} - kv.buf = sync.Pool{ - New: func() any { - return &bytes.Buffer{} - }, - } - kv.bytes = sync.Pool{ - New: func() any { - // as of 2025-11 the longest sequence we've got was 159, so setting - // it to 256 to have some room — but this could be set from a cli - // flag to avoid recompiling when source data changes and needs - // more space - b := make([]byte, 0, 256) - return &b + kv := &kv{ + db: db, + pool: sync.Pool{ + New: func() any { + b := make([]byte, defaultPoolSize) + return &b + }, }, } return kv, nil diff --git a/transformnext/badger_test.go b/transformnext/badger_test.go index 9b2c31d9..2c7c54ef 100644 --- a/transformnext/badger_test.go +++ b/transformnext/badger_test.go @@ -22,7 +22,8 @@ func TestSerializeDeserialize(t *testing.T) { {"empty", []string{}}, } { t.Run(tc.name, func(t *testing.T) { - s, err := kv.serialize(tc.row) + var b []byte + s, err := kv.serialize(b, tc.row) if err != nil { t.Errorf("expected no error serializing, got %s", err) } diff --git a/transformnext/cast.go b/transformnext/cast.go new file mode 100644 index 00000000..28694cde --- /dev/null +++ b/transformnext/cast.go @@ -0,0 +1,128 @@ +package transformnext + +import ( + "fmt" + "strconv" + "strings" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/bsontype" + "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" +) + +const ( + dateInputFormat = "20060102" + dateOutputFormat = "2006-01-02" +) + +func toInt(v string) (*int, error) { + if v == "" { + return nil, nil + } + i, err := strconv.Atoi(v) + if err != nil { + return nil, fmt.Errorf("error converting %s to int: %w", v, err) + } + return &i, nil +} + +func toFloat(v string) (*float32, error) { + if v == "" { + return nil, nil + } + f, err := strconv.ParseFloat(strings.ReplaceAll(v, ",", "."), 32) + if err != nil { + return nil, fmt.Errorf("error converting %s to float32: %w", v, err) + } + f32 := float32(f) + return &f32, nil +} + +func toBool(v string) *bool { + v = strings.ToUpper(v) + var b bool + switch v { + case "S": + b = true + case "N": + b = false + default: + return nil + } + return &b +} + +type date time.Time + +func (d *date) UnmarshalJSON(b []byte) error { + s := strings.Trim(string(b), `"`) + if s == "" { + return nil + } + t, err := time.Parse(dateOutputFormat, s) + if err != nil { + return err + } + *d = date(t) + return nil +} + +func (d *date) MarshalJSON() ([]byte, error) { + t := time.Time(*d) + return []byte(`"` + t.Format(dateOutputFormat) + `"`), nil +} + +func (d date) MarshalBSONValue() (bsontype.Type, []byte, error) { + t := time.Time(d) + return bson.TypeString, bsoncore.AppendString(nil, t.Format(dateOutputFormat)), nil +} + +func (d *date) UnmarshalBSONValue(t bsontype.Type, v []byte) error { + switch t { + case bson.TypeString: + s, _, ok := bsoncore.ReadString(v) + if !ok { + return fmt.Errorf("invalid bson string") + } + if s == "" { + return nil + } + p, err := time.Parse(dateOutputFormat, s) + if err != nil { + return fmt.Errorf("invalid date parse: %s", err) + } + *d = date(p) + return nil + case bson.TypeDateTime: + i, _, ok := bsoncore.ReadDateTime(v) + if !ok { + return fmt.Errorf("invalid bson datetime") + } + *d = date(time.UnixMilli(i)) + return nil + default: + return fmt.Errorf("unsupported bson type for date: %v", t) + } +} + +// toDate expects a date as string in the format YYYYMMDD (that is the format +// used by the Federal Revenue in their CSV files). +func toDate(v string) (*date, error) { + onlyZeros := func(s string) bool { + v, err := strconv.Atoi(s) + if err != nil { + return false + } + return v == 0 + } + if v == "" || onlyZeros(v) { + return nil, nil + } + t, err := time.Parse(dateInputFormat, v) + if err != nil { + return nil, fmt.Errorf("error converting %s to Time: %w", v, err) + } + d := date(t) + return &d, nil +} diff --git a/transformnext/cast_test.go b/transformnext/cast_test.go new file mode 100644 index 00000000..993b5fa3 --- /dev/null +++ b/transformnext/cast_test.go @@ -0,0 +1,186 @@ +package transformnext + +import ( + "encoding/json/v2" + "testing" + "time" +) + +func TestToInt(t *testing.T) { + t.Run("successful *int casting", func(t *testing.T) { + n := 42 + tc := []struct { + value string + expected *int + }{ + {"42", &n}, + {"", nil}, + } + for _, c := range tc { + got, err := toInt(c.value) + if err != nil { + t.Errorf("expected no errors when converting %s to *int, got %s", c.value, err) + } + if c.expected != nil { + if *got != *c.expected { + t.Errorf("got %d, expected %d", *got, *c.expected) + } + } else { + if got != c.expected { + t.Errorf("got %d, expected nil", *got) + } + } + } + }) + + t.Run("unsuccessful *int casting", func(t *testing.T) { + tc := []string{"4.2", "foobar"} + for _, v := range tc { + _, err := toInt(v) + if err == nil { + t.Errorf("expected a error when converting %s to *int, got ni", v) + } + } + }) +} + +func TestToFloat(t *testing.T) { + t.Run("successful *float32 casting", func(t *testing.T) { + n1 := float32(42) + n2 := float32(0.42) + tc := []struct { + value string + expected *float32 + }{ + {"42", &n1}, + {"0.42", &n2}, + {"", nil}, + } + for _, c := range tc { + got, err := toFloat(c.value) + if err != nil { + t.Errorf("expected no errors when converting %s to *float32, got %s", c.value, err) + } + if c.expected != nil { + if *got != *c.expected { + t.Errorf("got %f, expected %f", *got, *c.expected) + } + } else { + if got != c.expected { + t.Errorf("got %f, expected nil", *got) + } + } + } + }) + + t.Run("unsuccessful *float32 casting", func(t *testing.T) { + _, err := toFloat("foobar") + if err == nil { + t.Errorf("expected a error when converting foobar to *float32, got nil") + } + }) +} + +func TestToBool(t *testing.T) { + expectedTrue := true + expectedFalse := false + tc := []struct { + value string + expected *bool + }{ + {"S", &expectedTrue}, + {"s", &expectedTrue}, + {"N", &expectedFalse}, + {"n", &expectedFalse}, + {"", nil}, + {" ", nil}, + {"42", nil}, + } + for _, c := range tc { + got := toBool(c.value) + if got == nil && c.expected != nil { + t.Errorf("expected %s to be nil, got nil", c.value) + } + if got != nil && *got != *c.expected { + t.Errorf("expected %s to be %t, got %t", c.value, *c.expected, *got) + } + } +} + +func TestToDate(t *testing.T) { + t.Run("successful date casting", func(t *testing.T) { + v := "19940717" + d, err := time.Parse(dateInputFormat, v) + if err != nil { + t.Errorf("could not create a date for the test") + } + expected := date(d) + + tc := []struct { + value string + expected *date + }{ + {v, &expected}, + {"", nil}, + {"00000000", nil}, + } + for _, c := range tc { + got, err := toDate(c.value) + if err != nil { + t.Errorf("expected no errors when converting %s to date, got %s", c.value, err) + } + if c.expected != nil { + if *got != *c.expected { + t.Errorf("got %q, expected %q", time.Time(*got), time.Time(*c.expected)) + } + } else { + if got != c.expected { + t.Errorf("got %q, expected nil", time.Time(*got)) + } + } + } + }) + + t.Run("unsuccessful date casting", func(t *testing.T) { + got, err := toDate("foobar") + if err == nil { + t.Errorf("expected a error when converting foobar to date, got nil") + } + if got != nil { + t.Errorf("expected nil, got %s", time.Time(*got)) + } + if err == nil { + t.Error("expected an error, got nil") + } + }) +} + +func TestDate(t *testing.T) { + t.Run("successful unmarshal and marshal", func(t *testing.T) { + var d date + err := json.Unmarshal([]byte(`"1967-06-30"`), &d) + if err != nil { + t.Errorf("expected no error on date Unmarshal, got %s", err) + } + + got := time.Time(d) + if got.Year() != 1967 { + t.Errorf("expected year to be 1967, got %d", got.Year()) + } + if got.Month() != 6 { + t.Errorf("expected year to be 6, got %d", got.Month()) + } + if got.Day() != 30 { + t.Errorf("expected year to be 30, got %d", got.Day()) + } + + b, err := d.MarshalJSON() + s := string(b) + if err != nil { + t.Errorf("expected no error on marshal %v, s %s", d, err) + } + if s != "\"1967-06-30\"" { + t.Errorf("expected result to be \"1967-06-30\", s %s", s) + } + }) +} diff --git a/transformnext/company.go b/transformnext/company.go new file mode 100644 index 00000000..576c392a --- /dev/null +++ b/transformnext/company.go @@ -0,0 +1,256 @@ +package transformnext + +import ( + "bytes" + "encoding/json/v2" + "fmt" + "log/slog" + "strings" + "sync" + + "golang.org/x/sync/errgroup" +) + +func maskCPF(name string) string { + if len(name) < 11 { + return name + } + tail := name[len(name)-11:] + for _, c := range tail { + if c < '0' || c > '9' { + return name + } + } + if len(name) > 11 { + prev := name[len(name)-12] + if prev >= '0' && prev <= '9' { + return name + } + } + return name[:len(name)-11] + "***" + tail[3:8] + "***" +} + +type CNAE struct { + Codigo int `json:"codigo" bson:"codigo"` + Descricao string `json:"descricao" bson:"descricao"` +} + +type TaxRegime struct { + Ano int `json:"ano" bson:"ano"` + CNPJDaSCP *string `json:"cnpj_da_scp" bson:"cnpj_da_scp"` + FormaDeTributação string `json:"forma_de_tributacao" bson:"forma_de_tributacao"` + QuantidadeDeEscrituracoes int `json:"quantidade_de_escrituracoes" bson:"quantidade_de_escrituracoes"` +} + +type Partner struct { + IdentificadorDeSocio *int `json:"identificador_de_socio" bson:"identificador_de_socio"` + NomeSocio string `json:"nome_socio" bson:"nome_socio"` + CNPJCPFDoSocio string `json:"cnpj_cpf_do_socio" bson:"cnpj_cpf_do_socio"` + CodigoQualificacaoSocio *int `json:"codigo_qualificacao_socio" bson:"codigo_qualificacao_socio"` + QualificaoSocio *string `json:"qualificacao_socio" bson:"qualificacao_socio"` + DataEntradaSociedade *date `json:"data_entrada_sociedade" bson:"data_entrada_sociedade"` + CodigoPais *int `json:"codigo_pais" bson:"codigo_pais"` + Pais *string `json:"pais" bson:"pais"` + CPFRepresentanteLegal string `json:"cpf_representante_legal" bson:"cpf_representante_legal"` + NomeRepresentanteLegal string `json:"nome_representante_legal" bson:"nome_representante_legal"` + CodigoQualificacaoRepresentanteLegal *int `json:"codigo_qualificacao_representante_legal" bson:"codigo_qualificacao_representante_legal"` + QualificacaoRepresentanteLegal *string `json:"qualificacao_representante_legal" bson:"qualificacao_representante_legal"` + CodigoFaixaEtaria *int `json:"codigo_faixa_etaria" bson:"codigo_faixa_etaria"` + FaixaEtaria *string `json:"faixa_etaria" bson:"faixa_etaria"` +} + +type Company struct { + CNPJ string `json:"cnpj" bson:"cnpj"` + IdentificadorMatrizFilial *int `json:"identificador_matriz_filial" bson:"identificador_matriz_filial"` + DescricaoMatrizFilial *string `json:"descricao_identificador_matriz_filial" bson:"descricao_identificador_matriz_filial"` + NomeFantasia string `json:"nome_fantasia" bson:"nome_fantasia"` + SituacaoCadastral *int `json:"situacao_cadastral" bson:"situacao_cadastral"` + DescricaoSituacaoCadastral *string `json:"descricao_situacao_cadastral" bson:"descricao_situacao_cadastral"` + DataSituacaoCadastral *date `json:"data_situacao_cadastral" bson:"data_situacao_cadastral"` + MotivoSituacaoCadastral *int `json:"motivo_situacao_cadastral" bson:"motivo_situacao_cadastral"` + DescricaoMotivoSituacaoCadastral *string `json:"descricao_motivo_situacao_cadastral" bson:"descricao_motivo_situacao_cadastral"` + NomeCidadeNoExterior string `json:"nome_cidade_no_exterior" bson:"nome_cidade_no_exterior"` + CodigoPais *int `json:"codigo_pais" bson:"codigo_pais"` + Pais *string `json:"pais" bson:"pais"` + DataInicioAtividade *date `json:"data_inicio_atividade" bson:"data_inicio_atividade"` + CNAEFiscal *int `json:"cnae_fiscal" bson:"cnae_fiscal"` + CNAEFiscalDescricao *string `json:"cnae_fiscal_descricao" bson:"cnae_fiscal_descricao"` + DescricaoTipoDeLogradouro string `json:"descricao_tipo_de_logradouro" bson:"descricao_tipo_de_logradouro"` + Logradouro string `json:"logradouro" bson:"logradouro"` + Numero string `json:"numero" bson:"numero"` + Complemento string `json:"complemento" bson:"complemento"` + Bairro string `json:"bairro" bson:"bairro"` + CEP string `json:"cep" bson:"cep"` + UF string `json:"uf" bson:"uf"` + CodigoMunicipio *int `json:"codigo_municipio" bson:"codigo_municipio"` + CodigoMunicipioIBGE *int `json:"codigo_municipio_ibge" bson:"codigo_municipio_ibge"` + Municipio *string `json:"municipio" bson:"municipio"` + Telefone1 string `json:"ddd_telefone_1" bson:"ddd_telefone_1"` + Telefone2 string `json:"ddd_telefone_2" bson:"ddd_telefone_2"` + Fax string `json:"ddd_fax" bson:"ddd_fax"` + Email *string `json:"email" bson:"email"` + SituacaoEspecial string `json:"situacao_especial" bson:"situacao_especial"` + DataSituacaoEspecial *date `json:"data_situacao_especial" bson:"data_situacao_especial"` + OpcaoPeloSimples *bool `json:"opcao_pelo_simples" bson:"opcao_pelo_simples"` + DataOpcaoPeloSimples *date `json:"data_opcao_pelo_simples" bson:"data_opcao_pelo_simples"` + DataExclusaoDoSimples *date `json:"data_exclusao_do_simples" bson:"data_exclusao_do_simples"` + OpcaoPeloMEI *bool `json:"opcao_pelo_mei" bson:"opcao_pelo_mei"` + DataOpcaoPeloMEI *date `json:"data_opcao_pelo_mei" bson:"data_opcao_pelo_mei"` + DataExclusaoDoMEI *date `json:"data_exclusao_do_mei" bson:"data_exclusao_do_mei"` + RazaoSocial string `json:"razao_social" bson:"razao_social"` + CodigoNaturezaJuridica *int `json:"codigo_natureza_juridica" bson:"codigo_natureza_juridica"` + NaturezaJuridica *string `json:"natureza_juridica" bson:"natureza_juridica"` + QualificacaoDoResponsavel *int `json:"qualificacao_do_responsavel" bson:"qualificacao_do_responsavel"` + CapitalSocial *float32 `json:"capital_social" bson:"capital_social"` + CodigoPorte *int `json:"codigo_porte" bson:"codigo_porte"` + Porte *string `json:"porte" bson:"porte"` + EnteFederativoResponsavel string `json:"ente_federativo_responsavel" bson:"ente_federativo_responsavel"` + QuadroSocietario []Partner `json:"qsa" bson:"qsa"` + CNAESecundarios []CNAE `json:"cnaes_secundarios" bson:"cnaes_secundarios"` + RegimeTributario []TaxRegime `json:"regime_tributario" bson:"regime_tributario"` +} + +func (c *Company) withPrivacy() { + c.NomeFantasia = strings.TrimSpace(maskCPF(c.NomeFantasia)) + c.Email = nil + if c.CodigoNaturezaJuridica != nil && c.NaturezaJuridica != nil && strings.Contains(strings.ToLower(*c.NaturezaJuridica), "individual") { + c.DescricaoTipoDeLogradouro = "" + c.Logradouro = "" + c.Numero = "" + c.Complemento = "" + c.Telefone1 = "" + c.Telefone2 = "" + c.Fax = "" + } +} + +func (c *Company) JSON(p *sync.Pool) (string, error) { + b := p.Get().(*bytes.Buffer) + defer func() { + b.Reset() + p.Put(b) + }() + if err := json.MarshalWrite(b, c); err != nil { + return "", fmt.Errorf("error while mashaling company JSON: %w", err) + } + return b.String(), nil +} + +func newCompany(srcs map[string]*source, kv *kv, row []string) (*Company, error) { + var c Company + var err error + var g errgroup.Group + c.CNPJ = strings.Join(row[:3], "") + c.IdentificadorMatrizFilial, err = toInt(row[3]) + if err != nil { + return nil, fmt.Errorf("could not parse IdentificadorMatrizFilial for %s: %w", c.CNPJ, err) + } + if err := c.descricaoMatrizFilial(); err != nil { + return nil, fmt.Errorf("could not parse IdentificadorMatrizFilial for %s: %w", c.CNPJ, err) + } + c.NomeFantasia = row[4] + c.SituacaoCadastral, err = toInt(row[5]) + if err != nil { + return nil, fmt.Errorf("could not parse SituacaoCadastral for %s: %w", c.CNPJ, err) + } + if err := c.descricaoSituacaoCadastral(); err != nil { + return nil, fmt.Errorf("could not get DescricaoSituacaoCadastral for %s: %w", c.CNPJ, err) + } + c.DataSituacaoCadastral, err = toDate(row[6]) + if err != nil { + return nil, fmt.Errorf("could not parse DataSituacaoCadastral for %s: %w", c.CNPJ, err) + } + c.MotivoSituacaoCadastral, err = toInt(row[7]) + if err != nil { + return nil, fmt.Errorf("could not parse MotivoSituacaoCadastral for %s: %w", c.CNPJ, err) + } + g.Go(func() error { + var err error + c.DescricaoMotivoSituacaoCadastral, err = stringFromKV(srcs, kv, "mot", row[7], 0) + if err != nil { + slog.Warn("unknown MotivoSituacaoCadastral", "code", row[7], "cnpj", c.CNPJ) + } + return nil + }) + c.NomeCidadeNoExterior = row[8] + c.CodigoPais, err = toInt(row[9]) + if err != nil { + return nil, fmt.Errorf("could not parse CodigoPais for %s: %w", c.CNPJ, err) + } + g.Go(func() error { + var err error + c.Pais, err = stringFromKV(srcs, kv, "pai", row[9], 0) + if err != nil { + slog.Warn("unknown CodigoPais", "code", row[9], "cnpj", c.CNPJ) + } + return nil + }) + c.DataInicioAtividade, err = toDate(row[10]) + if err != nil { + return nil, fmt.Errorf("could not parse DataInicioAtividade for %s: %w", c.CNPJ, err) + } + c.CNAEFiscal, err = toInt(row[11]) + if err != nil { + return nil, fmt.Errorf("could not parse CNAEFiscal for %s: %w", c.CNPJ, err) + } + g.Go(func() error { + var err error + c.CNAEFiscalDescricao, err = stringFromKV(srcs, kv, "cna", row[11], 0) + if err != nil { + return fmt.Errorf("could not parse CNAEFiscalDescricao for %s: %w", c.CNPJ, err) + } + return nil + }) + c.DescricaoTipoDeLogradouro = row[13] + c.Logradouro = row[14] + c.Numero = row[15] + c.Complemento = row[16] + c.Bairro = row[17] + c.CEP = row[18] + c.UF = row[19] + c.CodigoMunicipio, err = toInt(row[20]) + if err != nil { + return nil, fmt.Errorf("could not parse CodigoMunicipio for %s: %w", c.CNPJ, err) + } + if c.CodigoMunicipio != nil && *c.CodigoMunicipio != 9707 { // overseas city code + g.Go(func() error { + ibge, err := stringFromKV(srcs, kv, "tab", row[20], 3) + if err != nil { + slog.Debug("unknown CodigoMunicipioIBGE", "code", row[20], "cnpj", c.CNPJ) + return nil + } + c.CodigoMunicipioIBGE, err = toInt(*ibge) + if err != nil { + return fmt.Errorf("could not parse CodigoMunicipioIBGE number for %s: %w", c.CNPJ, err) + } + return nil + }) + } + g.Go(func() error { + var err error + c.Municipio, err = stringFromKV(srcs, kv, "mun", row[20], 0) + if err != nil { + slog.Warn("unknown Municipio", "code", row[20], "cnpj", c.CNPJ) + return nil + } + return nil + }) + c.Telefone1 = row[21] + row[22] + c.Telefone2 = row[23] + row[24] + c.Fax = row[25] + row[26] + c.Email = &row[27] + c.SituacaoEspecial = row[28] + c.DataSituacaoEspecial, err = toDate(row[29]) + if err != nil { + return nil, fmt.Errorf("could not parse DataSituacaoEspecial for %s: %w", c.CNPJ, err) + } + g.Go(func() error { return c.base(srcs, kv) }) + g.Go(func() error { return c.simples(srcs, kv) }) + g.Go(func() error { return c.cnaes(srcs, kv, row[12]) }) + g.Go(func() error { return c.partners(srcs, kv) }) + g.Go(func() error { return c.taxes(srcs, kv) }) + if err := g.Wait(); err != nil { + return nil, err + } + return &c, nil +} diff --git a/transformnext/company_test.go b/transformnext/company_test.go new file mode 100644 index 00000000..6204c033 --- /dev/null +++ b/transformnext/company_test.go @@ -0,0 +1,374 @@ +package transformnext + +import ( + "context" + "testing" + "time" +) + +var dataSituacaoCadastral = date(time.Date(2004, 5, 22, 0, 0, 0, 0, time.UTC)) + +func TestMaskCPF(t *testing.T) { + for _, tc := range []struct { + name string + want string + }{ + // MEI patterns (company name + CPF) + {"João Silva 12345678901", "João Silva ***45678***"}, + {"Maria Santos ME 98765432109", "Maria Santos ME ***65432***"}, + {"JOSE DA SILVA 11122233344", "JOSE DA SILVA ***22233***"}, + {"COMERCIO DE ALIMENTOS LTDA 55566677788", "COMERCIO DE ALIMENTOS LTDA ***66677***"}, + // Edge cases with non-digit before CPF + {"Empresa-12345678901", "Empresa-***45678***"}, + {"Nome 12345678901", "Nome ***45678***"}, + {"A12345678901", "A***45678***"}, + // Should NOT mask: 12 consecutive digits (not CPF pattern) + {"Empresa123456789012", "Empresa123456789012"}, + {"000012345678901", "000012345678901"}, + // Should NOT mask: too short + {"1234567890", "1234567890"}, + {"Short", "Short"}, + // Should NOT mask: non-digits in tail + {"NomeEmpresa1234567890X", "NomeEmpresa1234567890X"}, + {"Empresa 1234567890a", "Empresa 1234567890a"}, + {"Test 123456-78901", "Test 123456-78901"}, + // Exactly 11 chars (all digits) + {"12345678901", "***45678***"}, + // UTF-8 cases + {"João José 12345678901", "João José ***45678***"}, + {"Quitanda São Miguel 99988877766", "Quitanda São Miguel ***88877***"}, + {"Café é Bom 12312312312", "Café é Bom ***12312***"}, + } { + t.Run(tc.name, func(t *testing.T) { + got := maskCPF(tc.name) + if got != tc.want { + t.Errorf("expected masked %s to be %s, got %s", tc.name, tc.want, got) + } + }) + } +} + +func TestNewCompany(t *testing.T) { + row := []string{ + "33683111", // 0 CNPJ Base + "0002", // 1 CNPJ Ordem + "80", // 2 CNPJ DV + "2", // 3 Indentificador Matriz/Filial + "REGIONAL BRASILIA-DF", // 4 Nome Fantasia + "02", // 5 Situação Cadastral + "20040522", // 6 Data Situação Cadastral + "00", // 7 Motivo Situação Cadastral + "", // 8 Nome da cidade no exterior + "", // 9 Pais + "19670630", // 10 Data de Início da Ativiade + "6204000", // 11 CNAE Fiscal + "6201501,6202300,6203100,6209100,6311900", // 12 CNAEs Secundários + "AVENIDA", // 13 Tipo de Logradouro + "L2 SGAN", // 14 Logradouro + "601", // 15 Número + "MODULO G", // 16 Complemento + "ASA NORTE", // 17 Bairro + "70836900", // 18 CEP + "DF", // 19 UF + "9701", // 20 Município + "", // 21 DDD 1 + "", // 22 Telefone 1 + "", // 23 DDD 2 + "", // 24 Telefone 2 + "", // 25 DDD Fax + "", // 26 Fax + "test@ser.pro", // 27 Email + "", // 28 Situação Especial + "", // 29 Data Situação Especial + } + kv, err := newBadger(t.TempDir(), false) + if err != nil { + t.Fatalf("expected no error creatinh kv, got %s", err) + } + defer func() { + if err := kv.db.Close(); err != nil { + t.Errorf("expected no error closing badger, got %s", err) + } + }() + srcs := sources() + ctx := context.Background() + for key, src := range srcs { + if key == "est" { + continue + } + if err := loadCSVs(ctx, "../testdata", src, nil, kv); err != nil { + t.Fatalf("expected no error loading %s data, got %s", key, err) + } + } + got, err := newCompany(srcs, kv, row) + if err != nil { + t.Fatalf("expected no error creating a company, got %s", err) + } + if got.CNPJ != "33683111000280" { + t.Errorf("expected cnpj to be 33683111000280, got %s", got.CNPJ) + } + if *got.IdentificadorMatrizFilial != 2 { + t.Errorf("expected IdentificadorMatrizFilial to be 2, got %v", got.IdentificadorMatrizFilial) + } + if *got.DescricaoMatrizFilial != "FILIAL" { + t.Errorf("expected DescricaoMatrizFilial to be FILIAL, got %s", *got.DescricaoMatrizFilial) + } + if got.NomeFantasia != "REGIONAL BRASILIA-DF" { + t.Errorf("expected NomeFantasia to be REGIONAL BRASILIA-DF, got %s", got.NomeFantasia) + } + if *got.SituacaoCadastral != 2 { + t.Errorf("expected SituacaoCadastral to be 2, got %d", *got.SituacaoCadastral) + } + if *got.DataSituacaoCadastral != dataSituacaoCadastral { + t.Errorf("expected SituacaoCadastral to be %v, got %v", dataSituacaoCadastral, *got.DataSituacaoCadastral) + } + if *got.DescricaoSituacaoCadastral != "ATIVA" { + t.Errorf("expected DescricaoSituacaoCadastral to be ATIVA, got %s", *got.DescricaoSituacaoCadastral) + } + if *got.MotivoSituacaoCadastral != 0 { + t.Errorf("expected MotivoSituacaoCadastral to be 0, got %d", *got.MotivoSituacaoCadastral) + } + if got.DescricaoMotivoSituacaoCadastral == nil || *got.DescricaoMotivoSituacaoCadastral != "SEM MOTIVO" { + t.Errorf("expected DescricaoMotivoSituacaoCadastral to be SEM MOTIVO, got %v", got.DescricaoMotivoSituacaoCadastral) + } + if got.NomeCidadeNoExterior != "" { + t.Errorf("expected NomeCidadeNoExterior to be empty, got %s", got.NomeCidadeNoExterior) + } + if got.CodigoPais != nil { + t.Errorf("expected CodigoPais to be nil, got %v", got.CodigoPais) + } + if *got.DataInicioAtividade != date(time.Date(1967, 6, 30, 0, 0, 0, 0, time.UTC)) { + t.Errorf("expected DataInicioAtividade to be 1967-06-30, got %v", *got.DataInicioAtividade) + } + if *got.CNAEFiscal != 6204000 { + t.Errorf("expected CNAEFiscal to be 6204000, got %d", *got.CNAEFiscal) + } + if *got.CNAEFiscalDescricao != "Consultoria em tecnologia da informação" { + t.Errorf("expected CNAEFiscalDescricao to be Consultoria em tecnologia da informação, got %s", *got.CNAEFiscalDescricao) + } + if got.DescricaoTipoDeLogradouro != "AVENIDA" { + t.Errorf("expected DescricaoTipoDeLogradouro to be AVENIDA, got %s", got.DescricaoTipoDeLogradouro) + } + if got.Logradouro != "L2 SGAN" { + t.Errorf("expected Logradouro to be L2 SGAN, got %s", got.Logradouro) + } + if got.Numero != "601" { + t.Errorf("expected Numero to be 601, got %s", got.Numero) + } + if got.Complemento != "MODULO G" { + t.Errorf("expected Complemento to be MODULO G, got %s", got.Complemento) + } + if got.Bairro != "ASA NORTE" { + t.Errorf("expected Bairro to be ASA NORTE, got %s", got.Bairro) + } + if got.CEP != "70836900" { + t.Errorf("expected CEP to be 70836900, got %s", got.CEP) + } + if got.UF != "DF" { + t.Errorf("expected UF to be DF, got %s", got.UF) + } + if *got.CodigoMunicipio != 9701 { + t.Errorf("expected CodigoMunicipio to be 9701, got %d", *got.CodigoMunicipio) + } + if *got.CodigoMunicipioIBGE != 5300108 { + t.Errorf("expected CodigoMunicipioIBGE to be 5300108, got %d", *got.CodigoMunicipioIBGE) + } + if *got.Municipio != "BRASILIA" { + t.Errorf("expected Municipio to be BRASILIA, got %s", *got.Municipio) + } + if got.Telefone1 != "" { + t.Errorf("expected Telefone1 to be empty, got %s", got.Telefone1) + } + if got.Telefone2 != "" { + t.Errorf("expected Telefone2 to be empty, got %s", got.Telefone2) + } + if got.Fax != "" { + t.Errorf("expected Fax to be empty, got %s", got.Fax) + } + if got.Email == nil || *got.Email != "test@ser.pro" { + t.Errorf("expected Email to be empty string, got %v", got.Email) + } + if got.SituacaoEspecial != "" { + t.Errorf("expected SituacaoEspecial to be empty, got %s", got.SituacaoEspecial) + } + if got.DataSituacaoEspecial != nil { + t.Errorf("expected DataSituacaoEspecial to be nil, got %v", got.DataSituacaoEspecial) + } + if len(got.CNAESecundarios) != 5 { + t.Errorf("expected CNAESecundarios to have 5 items, got %d", len(got.CNAESecundarios)) + } + if got.RazaoSocial != "SERVICO FEDERAL DE PROCESSAMENTO DE DADOS (SERPRO)" { + t.Errorf("expected RazaoSocial to be SERVICO FEDERAL DE PROCESSAMENTO DE DADOS (SERPRO), got %s", got.RazaoSocial) + } + if *got.CodigoNaturezaJuridica != 2011 { + t.Errorf("expected CodigoNaturezaJuridica to be 2011, got %d", *got.CodigoNaturezaJuridica) + } + if *got.NaturezaJuridica != "Empresa Pública" { + t.Errorf("expected NaturezaJuridica to be Empresa Pública, got %s", *got.NaturezaJuridica) + } + if *got.QualificacaoDoResponsavel != 16 { + t.Errorf("expected QualificacaoDoResponsavel to be 16, got %d", *got.QualificacaoDoResponsavel) + } + if *got.CapitalSocial != 1061004829.23 { + t.Errorf("expected CapitalSocial to be 1061004829.23, got %f", *got.CapitalSocial) + } + if *got.CodigoPorte != 5 { + t.Errorf("expected CodigoPorte to be 5, got %d", *got.CodigoPorte) + } + if *got.Porte != "DEMAIS" { + t.Errorf("expected Porte to be DEMAIS, got %s", *got.Porte) + } + if got.EnteFederativoResponsavel != "" { + t.Errorf("expected EnteFederativoResponsavel to be empty, got %s", got.EnteFederativoResponsavel) + } + if len(got.QuadroSocietario) != 6 { + t.Errorf("expected QuadroSocietario to have 6 items, got %d items", len(got.QuadroSocietario)) + } + if len(got.RegimeTributario) != 1 { + t.Errorf("expected RegimeTributario to have 1 item, got %d items", len(got.RegimeTributario)) + } + if *got.OpcaoPeloSimples != true { + t.Errorf("expected OpcaoPeloSimples to be true, got %v", *got.OpcaoPeloSimples) + } + if *got.DataOpcaoPeloSimples != date(time.Date(2014, 1, 1, 0, 0, 0, 0, time.UTC)) { + t.Errorf("expected DataOpcaoPeloSimples to be 2014-01-01, got %v", *got.DataOpcaoPeloSimples) + } + if got.DataExclusaoDoSimples != nil { + t.Errorf("expected DataExclusaoDoSimples to be nil, got %v", got.DataExclusaoDoSimples) + } + if *got.OpcaoPeloMEI != false { + t.Errorf("expected OpcaoPeloMEI to be false, got %v", *got.OpcaoPeloMEI) + } + if got.DataOpcaoPeloMEI != nil { + t.Errorf("expected DataOpcaoPeloMEI to be nil, got %v", got.DataOpcaoPeloMEI) + } + if got.DataExclusaoDoMEI != nil { + t.Errorf("expected DataExclusaoDoMEI to be nil, got %v", got.DataExclusaoDoMEI) + } + if got.Pais != nil { + t.Errorf("expected Pais to be nil, got %v", got.Pais) + } + if len(got.QuadroSocietario) != 6 { + t.Errorf("expected QuadroSocietario to have 6 items, got %d", len(got.QuadroSocietario)) + } + // Partners are sorted alphabetically by name + if got.QuadroSocietario[0].NomeSocio != "ANDRE DE CESERO" { + t.Errorf("expected first partner to be ANDRE DE CESERO, got %s", got.QuadroSocietario[0].NomeSocio) + } + if got.QuadroSocietario[0].CNPJCPFDoSocio != "***220050**" { + t.Errorf("expected first partner CNPJ/CPF to be ***220050**, got %s", got.QuadroSocietario[0].CNPJCPFDoSocio) + } + if *got.QuadroSocietario[0].CodigoQualificacaoSocio != 10 { + t.Errorf("expected partner qualification code to be 10, got %d", *got.QuadroSocietario[0].CodigoQualificacaoSocio) + } + if *got.QuadroSocietario[0].QualificaoSocio != "Diretor" { + t.Errorf("expected partner qualification to be Diretor, got %s", *got.QuadroSocietario[0].QualificaoSocio) + } + if *got.QuadroSocietario[0].CodigoFaixaEtaria != 6 { + t.Errorf("expected partner age range code to be 6, got %d", *got.QuadroSocietario[0].CodigoFaixaEtaria) + } + if *got.QuadroSocietario[0].FaixaEtaria != "Entre 51 a 60 anos" { + t.Errorf("expected partner age range to be Entre 51 a 60 anos, got %s", *got.QuadroSocietario[0].FaixaEtaria) + } + if len(got.RegimeTributario) != 1 { + t.Errorf("expected RegimeTributario to have 1 item, got %d", len(got.RegimeTributario)) + } + if got.RegimeTributario[0].Ano != 2018 { + t.Errorf("expected tax regime year to be 2018, got %d", got.RegimeTributario[0].Ano) + } + if got.RegimeTributario[0].FormaDeTributação != "LUCRO PRESUMIDO" { + t.Errorf("expected tax regime type to be LUCRO PRESUMIDO, got %s", got.RegimeTributario[0].FormaDeTributação) + } + if got.RegimeTributario[0].QuantidadeDeEscrituracoes != 1 { + t.Errorf("expected tax regime quantity to be 1, got %d", got.RegimeTributario[0].QuantidadeDeEscrituracoes) + } +} + +func TestNewCompanyWithPrivacy(t *testing.T) { + kv, err := newBadger(t.TempDir(), false) + if err != nil { + t.Fatalf("expected no error creating kv, got %s", err) + } + defer func() { + if err := kv.db.Close(); err != nil { + t.Errorf("expected no error closing badger, got %s", err) + } + }() + srcs := sources() + ctx := context.Background() + for key, src := range srcs { + if key == "est" { + continue + } + if err := loadCSVs(ctx, "../testdata", src, nil, kv); err != nil { + t.Fatalf("expected no error loading %s data, got %s", key, err) + } + } + row := []string{ + "33683111", // 0 CNPJ Base + "0002", // 1 CNPJ Ordem + "80", // 2 CNPJ DV + "1", // 3 Indentificador Matriz/Filial (MATRIZ) + "João Silva 12345678901", // 4 Nome Fantasia with CPF + "02", // 5 Situação Cadastral + "20040522", // 6 Data Situação Cadastral + "00", // 7 Motivo Situação Cadastral + "", // 8 Nome da cidade no exterior + "", // 9 Pais + "19670630", // 10 Data de Início da Ativiade + "6204000", // 11 CNAE Fiscal + "", // 12 CNAEs Secundários + "RUA", // 13 Tipo de Logradouro + "L2 SGAN", // 14 Logradouro + "601", // 15 Número + "MODULO G", // 16 Complemento + "ASA NORTE", // 17 Bairro + "70836900", // 18 CEP + "DF", // 19 UF + "9701", // 20 Município + "61", // 21 DDD 1 + "12345678", // 22 Telefone 1 + "", // 23 DDD 2 + "87654321", // 24 Telefone 2 + "11", // 25 DDD Fax + "", // 26 Fax + "test@example.com", // 27 Email + "", // 28 Situação Especial + "", // 29 Data Situação Especial + } + got, err := newCompany(srcs, kv, row) + if err != nil { + t.Fatalf("expected no error creating a company, got %s", err) + } + got.withPrivacy() + if got.Email != nil { + t.Errorf("expected Email to be nil after privacy, got %v", got.Email) + } + // SERPRO is a public company (Empresa Pública), not an individual + // So address fields should NOT be cleared + if got.DescricaoTipoDeLogradouro != "RUA" { + t.Errorf("expected DescricaoTipoDeLogradouro to be RUA for public company, got %s", got.DescricaoTipoDeLogradouro) + } + if got.Logradouro != "L2 SGAN" { + t.Errorf("expected Logradouro to be L2 SGAN for public company, got %s", got.Logradouro) + } + if got.Numero != "601" { + t.Errorf("expected Numero to be 601 for public company, got %s", got.Numero) + } + if got.Complemento != "MODULO G" { + t.Errorf("expected Complemento to be MODULO G for public company, got %s", got.Complemento) + } + if got.Telefone1 != "6112345678" { + t.Errorf("expected Telefone1 to be 6112345678 for public company, got %s", got.Telefone1) + } + if got.Telefone2 != "87654321" { + t.Errorf("expected Telefone2 to be 87654321 for public company, got %s", got.Telefone2) + } + if got.Fax != "11" { + t.Errorf("expected Fax to be 11 for public company, got %s", got.Fax) + } + want := "João Silva ***45678***" + if got.NomeFantasia != want { + t.Errorf("expected NomeFantasia to be %s after privacy, got %s", want, got.NomeFantasia) + } +} diff --git a/transformnext/enrich.go b/transformnext/enrich.go new file mode 100644 index 00000000..33f84b56 --- /dev/null +++ b/transformnext/enrich.go @@ -0,0 +1,370 @@ +package transformnext + +import ( + "errors" + "fmt" + "sort" + "strings" + + "github.com/dgraph-io/badger/v4" + "golang.org/x/sync/errgroup" +) + +func stringsFromKV(srcs map[string]*source, kv *kv, prefix string, id string) ([]string, error) { + src, ok := srcs[prefix] + if !ok { + return nil, fmt.Errorf("could not find lookup %s", prefix) + } + k := src.keyFor(id) + v, err := kv.get(k) + if err != nil { + return nil, fmt.Errorf("could not find %s", string(k)) + } + return v, nil +} + +func stringFromKV(srcs map[string]*source, kv *kv, prefix string, id string, idx uint) (*string, error) { + if id == "" { + return nil, nil + } + v, err := stringsFromKV(srcs, kv, prefix, id) + if err != nil { + return nil, err + } + if len(v) <= int(idx) { + return nil, fmt.Errorf("value for id=%s prefix=%s has %d items, cannot load index %d: %v", id, prefix, len(v), idx, v) + } + return &v[idx], nil +} + +func (c *Company) base(srcs map[string]*source, kv *kv) error { + var err error + row, err := stringsFromKV(srcs, kv, "emp", c.CNPJ[:8]) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil + } + return err + } + if row == nil { + return nil + } + if len(row) != 6 { + return fmt.Errorf("expected exactly 6 columns for base data, got %d: %v", len(row), row) + } + c.RazaoSocial = row[0] + c.CodigoNaturezaJuridica, err = toInt(row[1]) + if err != nil { + return fmt.Errorf("could not parse CodigoNaturezaJuridica for %s: %w", c.CNPJ, err) + } + c.NaturezaJuridica, err = stringFromKV(srcs, kv, "nat", row[1], 0) + if err != nil { + return fmt.Errorf("could not parse NaturezaJuridica for %s: %w", c.CNPJ, err) + } + c.QualificacaoDoResponsavel, err = toInt(row[2]) + if err != nil { + return fmt.Errorf("could not parse QualificacaoDoResponsavel for %s: %w", c.CNPJ, err) + } + c.CapitalSocial, err = toFloat(row[3]) + if err != nil { + return fmt.Errorf("could not parse CapitalSocial for %s: %w", c.CNPJ, err) + } + c.CodigoPorte, err = toInt(row[4]) + if err != nil { + return fmt.Errorf("could not parse CodigoParse for %s: %w", c.CNPJ, err) + } + if err := c.porte(); err != nil { + return err + } + c.EnteFederativoResponsavel = row[5] + return nil +} + +func (c *Company) simples(srcs map[string]*source, kv *kv) error { + var err error + row, err := stringsFromKV(srcs, kv, "sim", c.CNPJ[:8]) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil + } + return err + } + if row == nil { + return nil + } + if len(row) != 6 { + return fmt.Errorf("expected exactly 6 columns for simples data, got %d: %v", len(row), row) + } + c.OpcaoPeloSimples = toBool(row[0]) + c.DataOpcaoPeloSimples, err = toDate(row[1]) + if err != nil { + return fmt.Errorf("could not parse DataOpcaoPeloSimples for %s: %w", c.CNPJ, err) + } + c.DataExclusaoDoSimples, err = toDate(row[2]) + if err != nil { + return fmt.Errorf("could not parse DataExclusaoDoSimples for %s: %w", c.CNPJ, err) + } + c.OpcaoPeloMEI = toBool(row[3]) + c.DataOpcaoPeloMEI, err = toDate(row[4]) + if err != nil { + return fmt.Errorf("could not parse DataOpcaoPeloMEI for %s: %w", c.CNPJ, err) + } + c.DataExclusaoDoMEI, err = toDate(row[5]) + if err != nil { + return fmt.Errorf("could not parse DataExclusaoDoMEI for %s: %w", c.CNPJ, err) + } + return nil +} + +func (c *Company) cnaes(srcs map[string]*source, kv *kv, codes string) error { + ch := make(chan CNAE) + done := make(chan struct{}, 1) + var g errgroup.Group + for code := range strings.SplitSeq(codes, ",") { + g.Go(func() error { + d, err := stringFromKV(srcs, kv, "cna", code, 0) + if err != nil { + return err + } + if d == nil { + return nil + } + n, err := toInt(code) + if err != nil { + return fmt.Errorf("could not parse CNAESecundarios for %s: %w", c.CNPJ, err) + } + ch <- CNAE{*n, *d} + return nil + }) + } + go func() { + for p := range ch { + c.CNAESecundarios = append(c.CNAESecundarios, p) + } + done <- struct{}{} + }() + err := g.Wait() + close(ch) + <-done + return err +} + +func (c *Company) partners(srcs map[string]*source, kv *kv) error { + src, ok := srcs["soc"] + if !ok { + return errors.New("could not find lookup soc") + } + k := src.keyPrefixFor(c.CNPJ[:8]) + rows, err := kv.getPrefix(k) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil + } + return fmt.Errorf("could not find %s", string(k)) + } + ch := make(chan Partner) + done := make(chan struct{}, 1) + var g errgroup.Group + for _, row := range rows { + g.Go(func() error { + var p Partner + var err error + p.IdentificadorDeSocio, err = toInt(row[0]) + if err != nil { + return fmt.Errorf("could not parse IdentificadorDeSocio for %s: %w", c.CNPJ, err) + } + p.NomeSocio = row[1] + p.CNPJCPFDoSocio = row[2] + p.CodigoQualificacaoSocio, err = toInt(row[3]) + if err != nil { + return fmt.Errorf("could not parse CodigoQualificacaoSocio for %s: %w", c.CNPJ, err) + } + p.QualificaoSocio, err = stringFromKV(srcs, kv, "qua", row[3], 0) + if err != nil { + return fmt.Errorf("could not parse QualificaoSocio for %s: %w", c.CNPJ, err) + } + p.DataEntradaSociedade, err = toDate(row[4]) + if err != nil { + return fmt.Errorf("could not parse DataEntradaSociedade for %s: %w", c.CNPJ, err) + } + p.CodigoPais, err = toInt(row[5]) + if err != nil { + return fmt.Errorf("could not parse CodigoPais for %s: %w", c.CNPJ, err) + } + p.Pais, err = stringFromKV(srcs, kv, "pai", row[5], 0) + if err != nil { + return fmt.Errorf("could not parse Pais for %s: %w", c.CNPJ, err) + } + p.CPFRepresentanteLegal = row[6] + p.NomeRepresentanteLegal = row[7] + p.CodigoQualificacaoRepresentanteLegal, err = toInt(row[8]) + if err != nil { + return fmt.Errorf("could not parse CodigoQualificacaoRepresentanteLegal for %s: %w", c.CNPJ, err) + } + p.QualificacaoRepresentanteLegal, err = stringFromKV(srcs, kv, "qua", row[8], 0) + if err != nil { + return fmt.Errorf("could not parse QualificacaoRepresentanteLegal for %s: %w", c.CNPJ, err) + } + p.CodigoFaixaEtaria, err = toInt(row[9]) + if err != nil { + return fmt.Errorf("could not parse CodigoFaixaEtaria for %s: %w", c.CNPJ, err) + } + if p.CodigoFaixaEtaria != nil { + var f string + switch *p.CodigoFaixaEtaria { + case 1: + f = "Entre 0 a 12 anos" + case 2: + f = "Entre 13 a 20 ano" + case 3: + f = "Entre 21 a 30 anos" + case 4: + f = "Entre 31 a 40 anos" + case 5: + f = "Entre 41 a 50 anos" + case 6: + f = "Entre 51 a 60 anos" + case 7: + f = "Entre 61 a 70 anos" + case 8: + f = "Entre 71 a 80 anos" + case 9: + f = "Maiores de 80 anos" + case 0: + f = "Não se aplica" + default: + return fmt.Errorf("unknown CodigoFaixaEtaria for %s: %d", c.CNPJ, *p.CodigoFaixaEtaria) + } + p.FaixaEtaria = &f + } + ch <- p + return nil + }) + } + go func() { + for p := range ch { + c.QuadroSocietario = append(c.QuadroSocietario, p) + } + sort.Slice(c.QuadroSocietario, func(i, j int) bool { + return c.QuadroSocietario[i].NomeSocio < c.QuadroSocietario[j].NomeSocio + }) + done <- struct{}{} + }() + err = g.Wait() + close(ch) + <-done + return err +} + +func (c *Company) taxes(srcs map[string]*source, kv *kv) error { + var g errgroup.Group + ch := make(chan TaxRegime) + done := make(chan struct{}, 1) + for _, p := range []string{"arb", "imu", "pre", "rea"} { + g.Go(func() error { + src, ok := srcs[p] + if !ok { + return fmt.Errorf("could not find lookup %s", p) + } + k := src.keyPrefixFor(c.CNPJ[:8]) + rows, err := kv.getPrefix(k) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil + } + return fmt.Errorf("could not find %s", string(k)) + } + for _, row := range rows { + var t TaxRegime + y, err := toInt(row[0]) + if err != nil { + return fmt.Errorf("could not parse Ano for %s: %w", string(k), err) + } + t.Ano = *y + t.CNPJDaSCP = &row[1] + t.FormaDeTributação = row[2] + q, err := toInt(row[3]) + if err != nil { + return fmt.Errorf("could not parse QuantidadeDeEscrituracoes for %s: %w", string(k), err) + } + t.QuantidadeDeEscrituracoes = *q + ch <- t + } + return nil + }) + } + go func() { + for t := range ch { + c.RegimeTributario = append(c.RegimeTributario, t) + } + sort.Slice(c.RegimeTributario, func(i, j int) bool { + return c.RegimeTributario[i].Ano < c.RegimeTributario[j].Ano + }) + done <- struct{}{} + }() + err := g.Wait() + close(ch) + <-done + return err +} + +func (c *Company) descricaoMatrizFilial() error { + if c.IdentificadorMatrizFilial == nil { + return fmt.Errorf("company %s missing IdentificadorMatrizFilial", c.CNPJ) + } + var d string + switch *c.IdentificadorMatrizFilial { + case 1: + d = "MATRIZ" + case 2: + d = "FILIAL" + default: + return fmt.Errorf("unknown IdentificadorMatrizFilial for %s: %d", c.CNPJ, *c.IdentificadorMatrizFilial) + } + c.DescricaoMatrizFilial = &d + return nil +} + +func (c *Company) descricaoSituacaoCadastral() error { + if c.SituacaoCadastral == nil { + return fmt.Errorf("company %s missing SituacaoCadastral", c.CNPJ) + } + var d string + switch *c.SituacaoCadastral { + case 1: + d = "NULA" + case 2: + d = "ATIVA" + case 3: + d = "SUSPENSA" + case 4: + d = "INAPTA" + case 8: + d = "BAIXADA" + default: + return fmt.Errorf("unknown IdentificadorMatrizFilial for %s: %d", c.CNPJ, *c.IdentificadorMatrizFilial) + } + c.DescricaoSituacaoCadastral = &d + return nil +} + +func (c *Company) porte() error { + if c.CodigoPorte == nil { + return fmt.Errorf("company %s missing CodigoPorte", c.CNPJ) + } + var p string + switch *c.CodigoPorte { + case 0: + p = "NÃO INFORMADO" + case 1: + p = "MICRO EMPRESA" + case 3: + p = "EMPRESA DE PEQUENO PORTE" + case 5: + p = "DEMAIS" + default: + return fmt.Errorf("unknown CodigoPorte for %s: %d", c.CNPJ, c.CodigoPorte) + } + c.Porte = &p + return nil +} diff --git a/transformnext/reader.go b/transformnext/reader.go index 8f3f55f7..d56ded75 100644 --- a/transformnext/reader.go +++ b/transformnext/reader.go @@ -13,6 +13,7 @@ import ( "regexp" "strings" + "github.com/cuducos/go-cnpj" "github.com/schollz/progressbar/v3" "golang.org/x/sync/errgroup" "golang.org/x/text/encoding/charmap" @@ -77,7 +78,13 @@ func (c *reader) readFromReader(ctx context.Context, f io.Reader, bar *progressb for n := range row { row[n] = cleanupColumn(row[n]) } - if err := kv.put(c.src, row[0], row[1:]); err != nil { + key := row[0] + val := row[1:] + if c.src.key == "imu" || c.src.key == "arb" || c.src.key == "pre" || c.src.key == "rea" { + key = cnpj.Base(row[1]) + val = append([]string{row[0]}, row[2:]...) + } + if err := kv.put(c.src, key, val); err != nil { return fmt.Errorf("could not save %s line %v to badger: %w", c.src.prefix, row, err) } s := b.read - prev diff --git a/transformnext/reader_test.go b/transformnext/reader_test.go index d295ef6c..f1746c3e 100644 --- a/transformnext/reader_test.go +++ b/transformnext/reader_test.go @@ -7,23 +7,27 @@ import ( func TestLoadCSVs(t *testing.T) { srcs := sources() - for idx, exp := range [][]string{ // expected value is the first column of each row - {"6204000", "6201501", "6202300", "6203100", "6209100", "6311900"}, - {"33683111", "19131243"}, - {"2023"}, - {"2023"}, - {"2018"}, - {"2023"}, - {"00", "01"}, - {"9701"}, - {"2011"}, - {"105"}, - {"05", "10", "16"}, - {"33683111"}, - {"33683111", "33683111", "33683111", "33683111", "33683111", "33683111", "19131243"}, - {"9701"}, + + for _, tc := range []struct { + key string + exp []string + }{ // expected value is the first column of each row + {"cna", []string{"6204000", "6201501", "6202300", "6203100", "6209100", "6311900"}}, + {"emp", []string{"33683111", "19131243"}}, + {"imu", []string{"2023"}}, + {"arb", []string{"2023"}}, + {"pre", []string{"2018"}}, + {"rea", []string{"2023"}}, + {"mot", []string{"00", "01"}}, + {"mun", []string{"9701"}}, + {"nat", []string{"2011"}}, + {"pai", []string{"105"}}, + {"qua", []string{"05", "10", "16"}}, + {"sim", []string{"33683111"}}, + {"soc", []string{"33683111", "33683111", "33683111", "33683111", "33683111", "33683111", "19131243"}}, + {"tab", []string{"9701"}}, } { - src := srcs[idx] + src := srcs[tc.key] t.Run(src.prefix, func(t *testing.T) { ctx := context.Background() kv, err := newBadger(t.TempDir(), false) @@ -38,7 +42,7 @@ func TestLoadCSVs(t *testing.T) { if err := loadCSVs(ctx, "../testdata", src, nil, kv); err != nil { t.Errorf("expected no error loading csvs, got %s", err) } - for _, id := range exp { + for _, id := range tc.exp { key := src.keyPrefixFor(id) got, err := kv.getPrefix(key) if err != nil { diff --git a/transformnext/sources.go b/transformnext/sources.go index d3f03fa3..9b9567c9 100644 --- a/transformnext/sources.go +++ b/transformnext/sources.go @@ -8,6 +8,7 @@ import ( type source struct { prefix string + key string sep rune hasHeader bool isCumulative bool @@ -15,22 +16,21 @@ type source struct { } func (s *source) keyFor(id string) []byte { - k := strings.ToLower(strings.TrimPrefix(s.prefix, "Lucro ")[0:3]) if !s.isCumulative { - return fmt.Appendf([]byte{}, "%s::%s", id, k) + return fmt.Appendf([]byte{}, "%s::%s", id, s.key) } c := s.counter.Add(1) - return fmt.Appendf([]byte{}, "%s::%s::%d", id, k, c) + return fmt.Appendf([]byte{}, "%s::%s::%d", id, s.key, c) } func (s *source) keyPrefixFor(id string) []byte { if !s.isCumulative { return s.keyFor(id) } - k := strings.ToLower(strings.TrimPrefix(s.prefix, "Lucro ")[0:3]) - return fmt.Appendf([]byte{}, "%s::%s", id, k) + return fmt.Appendf([]byte{}, "%s::%s", id, s.key) } func newSource(prefix string, sep rune, hasHeader, isCumulative bool) *source { - return &source{prefix: prefix, sep: sep, hasHeader: hasHeader, isCumulative: isCumulative} + key := strings.ToLower(strings.TrimPrefix(prefix, "Lucro ")[0:3]) + return &source{prefix: prefix, key: key, sep: sep, hasHeader: hasHeader, isCumulative: isCumulative} } diff --git a/transformnext/sources_test.go b/transformnext/sources_test.go index 39aa01af..5ed34425 100644 --- a/transformnext/sources_test.go +++ b/transformnext/sources_test.go @@ -1,10 +1,13 @@ package transformnext -import "testing" +import ( + "strings" + "testing" +) func TestSourceKey(t *testing.T) { srcs := sources() - for idx, exp := range []string{ + for _, exp := range []string{ "42::cna", "42::emp", "42::imu::1", @@ -20,7 +23,11 @@ func TestSourceKey(t *testing.T) { "42::soc::1", "42::tab", } { - src := srcs[idx] + key := strings.TrimSuffix(strings.TrimPrefix(exp, "42::"), "::1") + src, ok := srcs[key] + if !ok { + t.Fatalf("expected source %s in %v, got nil", key, srcs) + } t.Run(src.prefix, func(t *testing.T) { got := src.keyFor("42") if string(got) != exp { diff --git a/transformnext/transform.go b/transformnext/transform.go index c871143e..7b694ffb 100644 --- a/transformnext/transform.go +++ b/transformnext/transform.go @@ -10,12 +10,72 @@ import ( "strings" "time" + "github.com/cuducos/minha-receita/download" "github.com/schollz/progressbar/v3" "golang.org/x/sync/errgroup" ) -func sources() []*source { // all but Estabelecimentos (this one is loaded later on) - return []*source{ +const ( + // BatchSize determines the size of the batches used to create the initial JSON + // data in the database. + BatchSize = 8192 + + // MaxParallelDBQueries is the default for maximum number of parallels save + // queries sent to the database + MaxParallelDBQueries = 8 +) + +var extraIndexes = [...]string{ + "cnae_fiscal", + "cnaes_secundarios.codigo", + "codigo_municipio", + "codigo_municipio_ibge", + "codigo_natureza_juridica", + "qsa.cnpj_cpf_do_socio", + "uf", +} + +// In Oct. 2025 the Federal Revenue started using the country code 367. which is +// not present in Paises.zip. The issue was officially reported to them via +// Fala.BR. They replied but did not seem to care about updating the dataset. +// +// It seems safe to assume this is England: +// 1. Other official documents from the institution uses 367 for England, eg.: +// https://balanca.economia.gov.br/balanca/bd/tabelas/PAIS.csv or +// https://www.cenofisco.com.br/arquivos/BDFlash/IR_IN_RFB_1076.pdf +// 2. Paises.zip contains a CSV ordered by country name and “Inglaterra” would +// match this ordering +// +// The same logic was used to other unmatched country codes: +var extraCounties = map[int]string{ + 15: "Aland, Ilhas", + 150: "Canal, Ilhas do (Guernsey)", + 151: "Canárias, Ilhas", + 200: "Curaçao", + 321: "Guernsey", + 359: "Ilha de Man", + 367: "Inglaterra", + 393: "Jersey", + 449: "Macedônia", + 452: "Madeira, Ilha da", + 498: "Montenegro", + 578: "Palestina", + 678: "Saint Kitts e Nevis", + 699: "Sint Maarten", + 737: "Sérvia", + 994: "A Designar", +} + +type database interface { + PreLoad() error + CreateCompanies([][]string) error + PostLoad() error + CreateExtraIndexes([]string) error + MetaSave(string, string) error +} + +func sources() map[string]*source { // all but Estabelecimentos (this one is loaded later on) + srcs := []*source{ newSource("Cnaes", ';', false, false), newSource("Empresas", ';', false, false), newSource("Imunes e Isentas", ',', true, true), @@ -31,42 +91,56 @@ func sources() []*source { // all but Estabelecimentos (this one is loaded later newSource("Socios", ';', false, true), newSource("tabmun", ';', false, false), } + m := make(map[string]*source) + for _, src := range srcs { + m[src.key] = src + } + return m } -func newProgressBar(label string, srcs []*source) (*progressbar.ProgressBar, error) { +func newProgressBar(label string, srcs int) (*progressbar.ProgressBar, error) { bar := progressbar.NewOptions( - len(srcs), // it has a bug starting at zero, so we compensate for it later + srcs, // it has a bug starting At zero, so we compensate for it later progressbar.OptionFullWidth(), progressbar.OptionSetDescription(label), - progressbar.OptionUseANSICodes(true), progressbar.OptionShowBytes(true), progressbar.OptionShowCount(), + progressbar.OptionShowElapsedTimeOnFinish(), progressbar.OptionShowTotalBytes(true), + progressbar.OptionUseANSICodes(true), ) return bar, bar.RenderBlank() } -func Cleanup() error { - return filepath.WalkDir(os.TempDir(), func(pth string, d fs.DirEntry, err error) error { - if !d.IsDir() { - return nil - } - if !strings.HasPrefix(d.Name(), "minha-receita-") { - return nil - } - part := strings.Split(d.Name(), "-") - if len(part) != 4 { - return nil - } - if _, err := time.Parse("20060102150405", part[2]); err != nil { - return nil - } - fmt.Printf("Removing %s\n", pth) - return os.RemoveAll(pth) - }) +func saveUpdatedAt(db database, dir string) error { + slog.Info("Saving the updated at date to the database…") + p := filepath.Join(dir, download.FederalRevenueUpdatedAt) + v, err := os.ReadFile(p) + if err != nil { + return fmt.Errorf("error reading %s: %w", p, err) + + } + return db.MetaSave("updated-at", string(v)) +} + +func postLoad(db database) error { + slog.Info("Consolidating the database…") + if err := db.PostLoad(); err != nil { + return err + } + slog.Info("Database consolidated!") + slog.Info("Creating indexes…") + if err := db.CreateExtraIndexes(extraIndexes[:]); err != nil { + return err + } + slog.Info("Indexes created!") + return nil } -func Transform(dir string) error { +func Transform(dir string, db database, batch, maxDB int, privacy bool) error { + if err := db.PreLoad(); err != nil { + return err + } srcs := sources() tmp, err := os.MkdirTemp("", fmt.Sprintf("minha-receita-%s-*", time.Now().Format("20060102150405"))) if err != nil { @@ -86,7 +160,7 @@ func Transform(dir string) error { slog.Warn("could not close badger database", "error", err) } }() - bar, err := newProgressBar("[Step 1 of 2] Loading data to key-value storage", srcs) + bar, err := newProgressBar("[Step 1 of 2] Loading data to key-value storage", len(srcs)) if err != nil { return fmt.Errorf("could not create a progress bar: %w", err) } @@ -94,10 +168,44 @@ func Transform(dir string) error { defer cancel() var g errgroup.Group for _, src := range srcs { - src := src + s := src + g.Go(func() error { + return loadCSVs(ctx, dir, s, bar, kv) + }) + } + for k, v := range extraCounties { g.Go(func() error { - return loadCSVs(ctx, dir, src, bar, kv) + return kv.put(srcs["pai"], fmt.Sprintf("%d", k), []string{v}) }) } - return g.Wait() + if err := g.Wait(); err != nil { + return err + } + if err := writeJSONs(ctx, srcs, kv, db, maxDB, batch, dir, privacy); err != nil { + return err + } + if err := postLoad(db); err != nil { + return err + } + return saveUpdatedAt(db, dir) +} + +func Cleanup() error { + return filepath.WalkDir(os.TempDir(), func(pth string, d fs.DirEntry, err error) error { + if !d.IsDir() { + return nil + } + if !strings.HasPrefix(d.Name(), "minha-receita-") { + return nil + } + part := strings.Split(d.Name(), "-") + if len(part) != 4 { + return nil + } + if _, err := time.Parse("20060102150405", part[2]); err != nil { + return nil + } + fmt.Printf("Removing %s\n", pth) + return os.RemoveAll(pth) + }) } diff --git a/transformnext/writer.go b/transformnext/writer.go new file mode 100644 index 00000000..ae158d40 --- /dev/null +++ b/transformnext/writer.go @@ -0,0 +1,165 @@ +package transformnext + +import ( + "archive/zip" + "bytes" + "context" + "encoding/csv" + "errors" + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "strings" + "sync" + + "golang.org/x/sync/errgroup" + "golang.org/x/text/encoding/charmap" +) + +func worker(ctx context.Context, db database, s int, ch <-chan []string) error { + var b [][]string + for { + select { + case <-ctx.Done(): + return nil + case row, ok := <-ch: + if !ok { + if len(b) > 0 { + return db.CreateCompanies(b) + } + return nil + } + b = append(b, row) + if len(b) >= s { + if err := db.CreateCompanies(b); err != nil { + return err + } + b = [][]string{} + } + } + } +} + +func writeJSONs(ctx context.Context, srcs map[string]*source, kv *kv, db database, maxDB, batch int, dir string, privacy bool) error { // TODO: test + bar, err := newProgressBar("[Step 2 of 2] Writing JSONs", 1) + if err != nil { + return fmt.Errorf("could not create a progress bar: %w", err) + } + defer func() { + bar.AddMax(-1) // compensate for the extra byte added when creating the bar + }() + pths, err := os.ReadDir(dir) + if err != nil { + return fmt.Errorf("could not read directory %s: %w", dir, err) + } + src := newSource("Estabelecimentos", ';', false, false) + buf := &sync.Pool{ + New: func() any { + return &bytes.Buffer{} + }, + } + ch := make(chan []string) + var consumers errgroup.Group + for range maxDB { + consumers.Go(func() error { + return worker(ctx, db, batch, ch) + }) + } + var producers errgroup.Group + for _, pth := range pths { + if !strings.HasPrefix(pth.Name(), src.prefix) { + continue + } + p := pth + producers.Go(func() error { + pth := filepath.Join(dir, p.Name()) + a, err := zip.OpenReader(pth) + if err != nil { + return fmt.Errorf("could not open archive %s: %w", pth, err) + } + defer func() { + if err := a.Close(); err != nil { + slog.Warn("could not close %s reader", "path", pth, "error", err) + } + }() + var g errgroup.Group + for _, z := range a.File { + g.Go(func() error { + bar.AddMax64(int64(z.UncompressedSize64)) + st := z.FileInfo() + if st.IsDir() { + return nil + } + f, err := z.Open() + if err != nil { + return fmt.Errorf("could not read %s from %s: %w", z.Name, pth, err) + } + defer func() { + if err := f.Close(); err != nil { + slog.Warn("Could not close csv reader", "path", pth, "name", z.Name, "error", err) + } + }() + b := countReader{f, 0} + r := csv.NewReader(charmap.ISO8859_15.NewDecoder().Reader(&b)) + r.Comma = src.sep + var prev int64 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + row, err := r.Read() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return fmt.Errorf("error reading %s: %w", pth, err) + } + if len(row) < 2 { + return fmt.Errorf("unexpected row with %d columns in %s", len(row), src.prefix) + } + for n := range row { + row[n] = cleanupColumn(row[n]) + } + c, err := newCompany(srcs, kv, row) + if err != nil { + return fmt.Errorf("could not create company %v: %w", row[:3], err) + } + if privacy { + c.withPrivacy() + } + j, err := c.JSON(buf) + if err != nil { + return err + } + ch <- []string{c.CNPJ, j} + s := b.read - prev + if s > 0 { + if err := bar.Add64(s); err != nil { + slog.Warn("could not update the progress bar", "error", err) + } + } + prev = b.read + } + } + }) + } + return g.Wait() + }) + } + err1 := producers.Wait() + close(ch) + err2 := consumers.Wait() + if err1 != nil && err2 != nil { + return fmt.Errorf("errors writing json: (producer error) %w, (connsumer error) %w", err1, err2) + } + if err1 != nil { + return err1 + } + if err2 != nil { + return err2 + } + return nil +} diff --git a/transformnext/writer_test.go b/transformnext/writer_test.go new file mode 100644 index 00000000..3d7d59fc --- /dev/null +++ b/transformnext/writer_test.go @@ -0,0 +1,82 @@ +package transformnext + +import ( + "context" + "sync" + "testing" +) + +type testDB struct { + lock sync.Mutex + data map[string]string +} + +func (db *testDB) PreLoad() error { + db.lock.Lock() + defer db.lock.Unlock() + db.data = make(map[string]string) + return nil +} + +func (db *testDB) CreateCompanies(companies [][]string) error { + db.lock.Lock() + defer db.lock.Unlock() + for _, c := range companies { + db.data[c[0]] = c[1] + } + return nil +} + +func (db *testDB) PostLoad() error { + return nil +} + +func (db *testDB) CreateExtraIndexes(indexes []string) error { + return nil +} + +func (db *testDB) MetaSave(key, value string) error { + db.lock.Lock() + defer db.lock.Unlock() + db.data[key] = value + return nil +} + +func TestWriteJSONs(t *testing.T) { + ctx := context.Background() + srcs := sources() + kv, err := newBadger(t.TempDir(), false) + if err != nil { + t.Fatalf("expected no error creating badger, got %s", err) + } + defer func() { + if err := kv.db.Close(); err != nil { + t.Errorf("expected no error closing badger, got %s", err) + } + }() + for key, src := range srcs { + if key == "est" { + continue + } + if err := loadCSVs(ctx, "../testdata", src, nil, kv); err != nil { + t.Fatalf("expected no error loading %s data, got %s", key, err) + } + } + db := &testDB{} + if err := db.PreLoad(); err != nil { + t.Fatalf("expected no error calling PreLoad, got %s", err) + } + err = writeJSONs(ctx, srcs, kv, db, 16, 8192, "../testdata", false) + if err != nil { + t.Fatalf("expected no error processing test data, got %s", err) + } + db.lock.Lock() + defer db.lock.Unlock() + if len(db.data) != 1 { + t.Errorf("expected 1 company to be persisted, got %d", len(db.data)) + } + exp := "33683111000280" + if _, ok := db.data[exp]; !ok { + t.Errorf("expected CNPJ %s to be persisted, got nil", exp) + } +}