diff --git a/pkg/enedis/db.go b/pkg/enedis/db.go index a8ea058..670f0a2 100644 --- a/pkg/enedis/db.go +++ b/pkg/enedis/db.go @@ -6,7 +6,6 @@ import ( "time" "github.com/ViBiOh/httputils/v3/pkg/db" - "github.com/ViBiOh/httputils/v3/pkg/logger" ) const lastFetch = ` @@ -28,23 +27,8 @@ func (a app) getLastFetch(ctx context.Context) (time.Time, error) { return output, db.Get(ctx, a.db, scanner, lastFetch, a.name) } -func (a app) save(ctx context.Context, datas []Value) error { - logger.Info("Saving %d records", len(datas)) - +func (a app) save(ctx context.Context, feeder func(stmt *sql.Stmt) error) error { return db.DoAtomic(ctx, a.db, func(ctx context.Context) error { - var index int - feeder := func(stmt *sql.Stmt) error { - if index == len(datas) { - return db.ErrBulkEnded - } - - data := datas[index] - index++ - - _, err := stmt.Exec(a.name, data.Timestamp, data.Valeur) - return err - } - return db.Bulk(ctx, feeder, "spurf", "enedis_value", "name", "ts", "value") }) } diff --git a/pkg/enedis/enedis.go b/pkg/enedis/enedis.go index 6f8d5b7..90e9c51 100644 --- a/pkg/enedis/enedis.go +++ b/pkg/enedis/enedis.go @@ -12,14 +12,11 @@ import ( "strings" "time" + "github.com/ViBiOh/httputils/v3/pkg/db" "github.com/ViBiOh/httputils/v3/pkg/flags" "github.com/ViBiOh/httputils/v3/pkg/logger" ) -const ( - commitSize = 10000 -) - // App of package type App interface { Start() @@ -76,65 +73,66 @@ func (a app) handleFile(filename string) error { } }() + return a.handleLines(bufio.NewScanner(file)) +} + +func (a app) handleLines(scanner *bufio.Scanner) error { lastInsert, err := a.getLastFetch(context.Background()) if err != nil { return fmt.Errorf("unable to get last fetch: %s", err) } - scanner := bufio.NewScanner(file) - - var datas []Value - for scanner.Scan() { - datas = handleLine(datas, lastInsert, strings.TrimSpace(scanner.Text())) - - if len(datas) == commitSize { - if err := a.save(context.Background(), datas); err != nil { - return fmt.Errorf("error while saving intermediate datas: %s", err) - } + feedLine := func(stmt *sql.Stmt) error { + if !scanner.Scan() { + return db.ErrBulkEnded + } - datas = nil + value := handleLine(lastInsert, strings.TrimSpace(scanner.Text())) + if value == emptyValue { + return nil } + + _, err := stmt.Exec(a.name, value.Timestamp, value.Valeur) + return err } - if len(datas) != 0 { - if err := a.save(context.Background(), datas); err != nil { - return fmt.Errorf("error while saving final datas: %s", err) - } + if err := a.save(context.Background(), feedLine); err != nil { + return fmt.Errorf("unable to save datas: %s", err) } if err := scanner.Err(); err != nil { - return fmt.Errorf("error while checking scanner: %s", err) + return fmt.Errorf("error while reading line-by-line: %s", err) } return nil } -func handleLine(datas []Value, lastInsert time.Time, line string) []Value { +func handleLine(lastInsert time.Time, line string) value { parts := strings.Split(line, ";") if len(parts) != 2 { logger.Warn("ignoring line `%s`: invalid format", line) - return datas + return emptyValue } timestamp, err := time.Parse(time.RFC3339, parts[0]) if err != nil { logger.Warn("ignoring line `%s`: invalid date format", line) - return datas + return emptyValue } if timestamp.Before(lastInsert) || timestamp.Equal(lastInsert) { logger.Warn("ignoring line `%s`: timestamp already inserted", line) - return datas + return emptyValue } - value, err := strconv.ParseFloat(parts[1], 64) + valeur, err := strconv.ParseFloat(parts[1], 64) if err != nil { logger.Warn("ignoring line `%s`: invalid value format", line) - return datas + return emptyValue } - return append(datas, Value{ - Valeur: value / 1000, + return value{ + Valeur: valeur / 1000, Timestamp: parts[0], - }) + } } diff --git a/pkg/enedis/model.go b/pkg/enedis/model.go index 699473c..c691d5b 100644 --- a/pkg/enedis/model.go +++ b/pkg/enedis/model.go @@ -1,7 +1,10 @@ package enedis -// Value describes data point -type Value struct { +var ( + emptyValue = value{} +) + +type value struct { Valeur float64 Timestamp string }