Skip to content
This repository was archived by the owner on May 6, 2023. It is now read-only.

Commit

Permalink
refactor: Improving bulk load by reducing memory footprint
Browse files Browse the repository at this point in the history
  • Loading branch information
ViBiOh committed Jul 20, 2020
1 parent 10ede32 commit aef6588
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 48 deletions.
18 changes: 1 addition & 17 deletions pkg/enedis/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/ViBiOh/httputils/v3/pkg/db"
"github.com/ViBiOh/httputils/v3/pkg/logger"
)

const lastFetch = `
Expand All @@ -28,23 +27,8 @@ func (a app) getLastFetch(ctx context.Context) (time.Time, error) {
return output, db.Get(ctx, a.db, scanner, lastFetch, a.name)
}

func (a app) save(ctx context.Context, datas []Value) error {
logger.Info("Saving %d records", len(datas))

func (a app) save(ctx context.Context, feeder func(stmt *sql.Stmt) error) error {
return db.DoAtomic(ctx, a.db, func(ctx context.Context) error {
var index int
feeder := func(stmt *sql.Stmt) error {
if index == len(datas) {
return db.ErrBulkEnded
}

data := datas[index]
index++

_, err := stmt.Exec(a.name, data.Timestamp, data.Valeur)
return err
}

return db.Bulk(ctx, feeder, "spurf", "enedis_value", "name", "ts", "value")
})
}
56 changes: 27 additions & 29 deletions pkg/enedis/enedis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@ import (
"strings"
"time"

"github.com/ViBiOh/httputils/v3/pkg/db"
"github.com/ViBiOh/httputils/v3/pkg/flags"
"github.com/ViBiOh/httputils/v3/pkg/logger"
)

const (
commitSize = 10000
)

// App of package
type App interface {
Start()
Expand Down Expand Up @@ -76,65 +73,66 @@ func (a app) handleFile(filename string) error {
}
}()

return a.handleLines(bufio.NewScanner(file))
}

func (a app) handleLines(scanner *bufio.Scanner) error {
lastInsert, err := a.getLastFetch(context.Background())
if err != nil {
return fmt.Errorf("unable to get last fetch: %s", err)
}

scanner := bufio.NewScanner(file)

var datas []Value
for scanner.Scan() {
datas = handleLine(datas, lastInsert, strings.TrimSpace(scanner.Text()))

if len(datas) == commitSize {
if err := a.save(context.Background(), datas); err != nil {
return fmt.Errorf("error while saving intermediate datas: %s", err)
}
feedLine := func(stmt *sql.Stmt) error {
if !scanner.Scan() {
return db.ErrBulkEnded
}

datas = nil
value := handleLine(lastInsert, strings.TrimSpace(scanner.Text()))
if value == emptyValue {
return nil
}

_, err := stmt.Exec(a.name, value.Timestamp, value.Valeur)
return err
}

if len(datas) != 0 {
if err := a.save(context.Background(), datas); err != nil {
return fmt.Errorf("error while saving final datas: %s", err)
}
if err := a.save(context.Background(), feedLine); err != nil {
return fmt.Errorf("unable to save datas: %s", err)
}

if err := scanner.Err(); err != nil {
return fmt.Errorf("error while checking scanner: %s", err)
return fmt.Errorf("error while reading line-by-line: %s", err)
}

return nil
}

func handleLine(datas []Value, lastInsert time.Time, line string) []Value {
func handleLine(lastInsert time.Time, line string) value {
parts := strings.Split(line, ";")
if len(parts) != 2 {
logger.Warn("ignoring line `%s`: invalid format", line)
return datas
return emptyValue
}

timestamp, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
logger.Warn("ignoring line `%s`: invalid date format", line)
return datas
return emptyValue
}

if timestamp.Before(lastInsert) || timestamp.Equal(lastInsert) {
logger.Warn("ignoring line `%s`: timestamp already inserted", line)
return datas
return emptyValue
}

value, err := strconv.ParseFloat(parts[1], 64)
valeur, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
logger.Warn("ignoring line `%s`: invalid value format", line)
return datas
return emptyValue
}

return append(datas, Value{
Valeur: value / 1000,
return value{
Valeur: valeur / 1000,
Timestamp: parts[0],
})
}
}
7 changes: 5 additions & 2 deletions pkg/enedis/model.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package enedis

// Value describes data point
type Value struct {
var (
emptyValue = value{}
)

type value struct {
Valeur float64
Timestamp string
}

0 comments on commit aef6588

Please sign in to comment.