From 6dc24e35762c6cb692927d7baa05ba3273d61b72 Mon Sep 17 00:00:00 2001 From: Alkorin Date: Sun, 10 Feb 2019 22:53:42 +0100 Subject: [PATCH] Send activities asynchronously via another goroutine Closes: #59 --- conversation.go | 37 +++++++++++++++++++++++++++++++------ space.go | 16 +--------------- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/conversation.go b/conversation.go index 829e2b5..960ed0b 100644 --- a/conversation.go +++ b/conversation.go @@ -3,6 +3,7 @@ package main import ( "bytes" "encoding/json" + "io" "io/ioutil" "net/http" "sync" @@ -25,6 +26,8 @@ type Conversation struct { newSpaceEventHandlers []func(*Space) newActivityEventHandlers []func(*Space, *Activity) + activityQueue chan io.Reader + logger *log.Entry } @@ -35,18 +38,20 @@ type Team struct { func NewConversation(device *Device, mercury *Mercury, kms *KMS) *Conversation { c := &Conversation{ - device: device, - mercury: mercury, - kms: kms, - spaces: make(map[string]*Space), - teams: make(map[string]*Team), - logger: log.WithField("type", "Conversation"), + device: device, + mercury: mercury, + kms: kms, + spaces: make(map[string]*Space), + teams: make(map[string]*Team), + logger: log.WithField("type", "Conversation"), + activityQueue: make(chan io.Reader, 64), } mercury.RegisterHandler("conversation.activity", c.ParseActivity) // Fetch current spaces go c.FetchAllSpaces() + go c.HandleActivityQueue() return c } @@ -279,6 +284,26 @@ func (c *Conversation) AddNewActivityEventHandler(f func(*Space, *Activity)) { c.newActivityEventHandlers = append(c.newActivityEventHandlers, f) } +func (c *Conversation) HandleActivityQueue() { + for data := range c.activityQueue { + response, err := c.device.RequestService("POST", "conversationServiceUrl", "/activities", data) + if err != nil { + log.WithError(err).Error("Failed to create request") + continue + } + defer response.Body.Close() + + if response.StatusCode != http.StatusOK { + responseError, err := ioutil.ReadAll(response.Body) + if err != nil { + c.logger.WithError(err).Error("Failed to read error response") + } else { + c.logger.WithError(errors.New(string(responseError))).Error("Failed to send message") + } + } + } +} + func (c *Conversation) CreateSpace(name string) { logger := c.logger.WithField("func", "CreateSpace").WithField("spaceName", name) diff --git a/space.go b/space.go index 1df389b..6cdf827 100644 --- a/space.go +++ b/space.go @@ -3,8 +3,6 @@ package main import ( "bytes" "encoding/json" - "io/ioutil" - "net/http" "strings" "sync" @@ -157,19 +155,7 @@ func (s *Space) SendMessage(msg string) error { } logger.Trace("Send message") - response, err := s.conversation.device.RequestService("POST", "conversationServiceUrl", "/activities", bytes.NewReader(data)) - if err != nil { - return errors.Wrap(err, "failed to create request") - } - - if response.StatusCode != http.StatusOK { - responseError, err := ioutil.ReadAll(response.Body) - if err != nil { - return errors.Wrap(err, "failed to read error response") - } - return errors.Errorf("failed to send message: %s", responseError) - } - + s.conversation.activityQueue <- bytes.NewReader(data) return nil }