Skip to content

Commit 083a047

Browse files
committed
Fixed the noDPs bug by waiting for the query to be broadcast to all servers
1 parent 1a9d38a commit 083a047

File tree

2 files changed

+80
-88
lines changed

2 files changed

+80
-88
lines changed

services/service.go

+79-86
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package servicesunlynx
22

33
import (
44
"strconv"
5-
"time"
65

76
"github.com/Knetic/govaluate"
87
"github.com/btcsuite/goleveldb/leveldb/errors"
@@ -39,6 +38,8 @@ type SurveyCreationQuery struct {
3938
MapDPs map[string]int64
4039
Proofs bool
4140
AppFlag bool
41+
IntraMessage bool
42+
Source *network.ServerIdentity
4243

4344
// query statement
4445
Sum []string
@@ -67,9 +68,10 @@ type Survey struct {
6768

6869
// MsgTypes defines the Message Type ID for all the service's intra-messages.
6970
type MsgTypes struct {
70-
msgSurveyCreationQuery network.MessageTypeID
71-
msgSurveyResultsQuery network.MessageTypeID
72-
msgDDTfinished network.MessageTypeID
71+
msgSurveyCreationQuery network.MessageTypeID
72+
msgSurveyResultsQuery network.MessageTypeID
73+
msgDDTfinished network.MessageTypeID
74+
msgQueryBroadcastFinished network.MessageTypeID
7375
}
7476

7577
var msgTypes = MsgTypes{}
@@ -79,14 +81,20 @@ func init() {
7981
log.ErrFatal(err)
8082

8183
msgTypes.msgSurveyCreationQuery = network.RegisterMessage(&SurveyCreationQuery{})
82-
network.RegisterMessage(&SurveyResponseQuery{})
8384
msgTypes.msgSurveyResultsQuery = network.RegisterMessage(&SurveyResultsQuery{})
8485
msgTypes.msgDDTfinished = network.RegisterMessage(&DDTfinished{})
86+
msgTypes.msgQueryBroadcastFinished = network.RegisterMessage(&QueryBroadcastFinished{})
8587

88+
network.RegisterMessage(&SurveyResponseQuery{})
8689
network.RegisterMessage(&ServiceState{})
8790
network.RegisterMessage(&ServiceResult{})
8891
}
8992

93+
// QueryBroadcastFinished is used to ensure that all servers have received the query/survey
94+
type QueryBroadcastFinished struct {
95+
SurveyID SurveyID
96+
}
97+
9098
// DDTfinished is used to ensure that all servers perform the shuffling+DDT before collectively aggregating the results
9199
type DDTfinished struct {
92100
SurveyID SurveyID
@@ -122,17 +130,10 @@ type Service struct {
122130
Survey *concurrent.ConcurrentMap
123131
}
124132

125-
func castToSurvey(object interface{}, err error) Survey {
126-
if err != nil || object == nil {
127-
log.Fatal("Error reading map:", err)
128-
}
129-
return object.(Survey)
130-
}
131-
132-
func (s *Service) getSurvey(sid SurveyID, step int) Survey {
133+
func (s *Service) getSurvey(sid SurveyID) Survey {
133134
surv, err := s.Survey.Get(string(sid))
134135
if err != nil || surv == nil {
135-
log.Fatalf("Error '%s' while getting surveyID %x %d, %s", err, sid, step, s.ServerIdentity().String())
136+
log.Fatal("Error", err, "while getting surveyID", sid)
136137
}
137138
return surv.(Survey)
138139
}
@@ -161,10 +162,14 @@ func NewService(c *onet.Context) (onet.Service, error) {
161162
if cerr = newUnLynxInstance.RegisterHandler(newUnLynxInstance.HandleDDTfinished); cerr != nil {
162163
log.Fatal("Wrong Handler.", cerr)
163164
}
165+
if cerr = newUnLynxInstance.RegisterHandler(newUnLynxInstance.HandleQueryBroadcastFinished); cerr != nil {
166+
log.Fatal("Wrong Handler.", cerr)
167+
}
164168

165169
c.RegisterProcessor(newUnLynxInstance, msgTypes.msgSurveyCreationQuery)
166170
c.RegisterProcessor(newUnLynxInstance, msgTypes.msgSurveyResultsQuery)
167171
c.RegisterProcessor(newUnLynxInstance, msgTypes.msgDDTfinished)
172+
c.RegisterProcessor(newUnLynxInstance, msgTypes.msgQueryBroadcastFinished)
168173
return newUnLynxInstance, cerr
169174
}
170175

@@ -178,6 +183,10 @@ func (s *Service) Process(msg *network.Envelope) {
178183
tmp := (msg.Msg).(*SurveyResultsQuery)
179184
_, err := s.HandleSurveyResultsQuery(tmp)
180185
log.ErrFatal(err)
186+
} else if msg.MsgType.Equal(msgTypes.msgQueryBroadcastFinished) {
187+
tmp := (msg.Msg).(*QueryBroadcastFinished)
188+
_, err := s.HandleQueryBroadcastFinished(tmp)
189+
log.ErrFatal(err)
181190
} else if msg.MsgType.Equal(msgTypes.msgDDTfinished) {
182191
tmp := (msg.Msg).(*DDTfinished)
183192
_, err := s.HandleDDTfinished(tmp)
@@ -187,7 +196,7 @@ func (s *Service) Process(msg *network.Envelope) {
187196

188197
// PushData is used to store incoming data by servers
189198
func (s *Service) PushData(resp *SurveyResponseQuery, proofs bool) {
190-
survey := s.getSurvey(resp.SurveyID, 1)
199+
survey := s.getSurvey(resp.SurveyID)
191200
for _, v := range resp.Responses {
192201
dr := libunlynx.DpResponse{}
193202
dr.FromDpResponseToSend(v)
@@ -207,20 +216,12 @@ func (s *Service) HandleSurveyCreationQuery(recq *SurveyCreationQuery) (network.
207216
log.Lvl1(s.ServerIdentity().String(), " received a Survey Creation Query")
208217

209218
// if this server is the one receiving the query from the client
210-
if recq.SurveyID == "" {
219+
if recq.IntraMessage == false {
211220
id := uuid.NewV4()
212221
newID := SurveyID(id.String())
213222
recq.SurveyID = newID
214-
215223
log.Lvl1(s.ServerIdentity().String(), " handles this new survey ", recq.SurveyID)
216224

217-
// broadcasts the query
218-
err := libunlynxtools.SendISMOthers(s.ServiceProcessor, &recq.Roster, recq)
219-
if err != nil {
220-
log.Error("broadcasting error ", err)
221-
}
222-
log.Lvl1(s.ServerIdentity(), " initiated the survey ", newID)
223-
224225
}
225226

226227
// chooses an ephemeral secret for this survey
@@ -244,8 +245,25 @@ func (s *Service) HandleSurveyCreationQuery(recq *SurveyCreationQuery) (network.
244245
if err != nil {
245246
return nil, err
246247
}
248+
log.Lvl1(s.ServerIdentity(), " initiated the survey ", recq.SurveyID)
249+
250+
if recq.IntraMessage == false {
251+
recq.IntraMessage = true
252+
recq.Source = s.ServerIdentity()
253+
// broadcasts the query
254+
err := libunlynxtools.SendISMOthers(s.ServiceProcessor, &recq.Roster, recq)
255+
if err != nil {
256+
log.Error("broadcasting error ", err)
257+
}
258+
recq.IntraMessage = false
259+
} else {
260+
// warn 'root' node that it has received the query
261+
err := s.SendRaw(recq.Source, &QueryBroadcastFinished{SurveyID: recq.SurveyID})
262+
if err != nil {
263+
return nil, err
264+
}
265+
}
247266

248-
log.Lvl1(s.ServerIdentity(), " created the survey ", recq.SurveyID)
249267
// if it is a app download the data from the test file
250268
if recq.AppFlag {
251269
index := 0
@@ -259,63 +277,33 @@ func (s *Service) HandleSurveyCreationQuery(recq *SurveyCreationQuery) (network.
259277
s.PushData(resp, recq.Proofs)
260278

261279
//number of data providers who have already pushed the data
262-
s.getSurvey(resp.SurveyID, 2).DpChannel <- 1
280+
s.getSurvey(resp.SurveyID).DpChannel <- 1
263281
}
264282

265-
// update surveyChannel so that the server knows he can start to process data from DPs
266-
s.getSurvey(recq.SurveyID, 3).SurveyChannel <- 1
283+
if recq.IntraMessage == false {
284+
counter := len(recq.Roster.List) - 1
285+
for counter > 0 {
286+
counter = counter - (<-s.getSurvey(recq.SurveyID).SurveyChannel)
287+
}
288+
}
267289
return &ServiceState{recq.SurveyID}, nil
268290
}
269291

270292
// HandleSurveyResponseQuery handles a survey answers submission by a subject.
271293
func (s *Service) HandleSurveyResponseQuery(resp *SurveyResponseQuery) (network.Message, error) {
272-
var el interface{}
273-
el = nil
274-
for el == nil {
275-
el, _ = s.Survey.Get((string)(resp.SurveyID))
276-
277-
if el != nil {
278-
break
279-
}
280-
281-
time.Sleep(time.Millisecond * 100)
282-
}
283-
284-
survey := el.(Survey)
285-
if survey.Query.SurveyID == resp.SurveyID {
286-
<-s.getSurvey(resp.SurveyID, 3).SurveyChannel
287-
288-
s.PushData(resp, survey.Query.Proofs)
294+
survey := s.getSurvey(resp.SurveyID)
295+
s.PushData(resp, survey.Query.Proofs)
289296

290-
//unblock the channel to allow another DP to send its data
291-
s.getSurvey(resp.SurveyID, 4).SurveyChannel <- 1
292-
//number of data providers who have already pushed the data
293-
s.getSurvey(resp.SurveyID, 5).DpChannel <- 1
294-
295-
return &ServiceState{"1"}, nil
296-
}
297-
298-
log.Lvl1(s.ServerIdentity(), " does not know about this survey!")
299-
return &ServiceState{resp.SurveyID}, nil
297+
//number of data providers who have already pushed the data
298+
s.getSurvey(resp.SurveyID).DpChannel <- 1
299+
return &ServiceState{"1"}, nil
300300
}
301301

302302
// HandleSurveyResultsQuery handles the survey result query by the surveyor.
303303
func (s *Service) HandleSurveyResultsQuery(resq *SurveyResultsQuery) (network.Message, error) {
304-
var el interface{}
305-
el = nil
306-
for el == nil {
307-
el, _ = s.Survey.Get((string)(resq.SurveyID))
308-
309-
if el != nil {
310-
break
311-
}
312-
313-
time.Sleep(time.Millisecond * 100)
314-
}
315-
316304
log.Lvl1(s.ServerIdentity(), " received a survey result query")
317305

318-
survey := s.getSurvey(resq.SurveyID, 6)
306+
survey := s.getSurvey(resq.SurveyID)
319307
survey.Query.ClientPubKey = resq.ClientPublic
320308
err := s.putSurvey(resq.SurveyID, survey)
321309
if err != nil {
@@ -337,7 +325,7 @@ func (s *Service) HandleSurveyResultsQuery(resq *SurveyResultsQuery) (network.Me
337325

338326
log.Lvl1(s.ServerIdentity(), " completed the query processing...")
339327

340-
survey := s.getSurvey(resq.SurveyID, 7)
328+
survey := s.getSurvey(resq.SurveyID)
341329
results := survey.PullDeliverableResults(false, libunlynx.CipherText{})
342330
err = s.putSurvey(resq.SurveyID, survey)
343331
if err != nil {
@@ -350,9 +338,16 @@ func (s *Service) HandleSurveyResultsQuery(resq *SurveyResultsQuery) (network.Me
350338
return nil, s.StartService(resq.SurveyID, false)
351339
}
352340

353-
// HandleDDTfinished handles the message
341+
// HandleDDTfinished handles the message DDTfinished: one of the nodes is ready to perform a collective aggregation
354342
func (s *Service) HandleDDTfinished(recq *DDTfinished) (network.Message, error) {
355-
s.getSurvey(recq.SurveyID, 8).DDTChannel <- 1
343+
s.getSurvey(recq.SurveyID).DDTChannel <- 1
344+
return nil, nil
345+
}
346+
347+
// HandleQueryBroadcastFinished handles the message QueryBroadcastFinished: one of the nodes has already received the query
348+
func (s *Service) HandleQueryBroadcastFinished(recq *QueryBroadcastFinished) (network.Message, error) {
349+
log.LLvl1(recq.SurveyID)
350+
s.getSurvey(recq.SurveyID).SurveyChannel <- 1
356351
return nil, nil
357352
}
358353

@@ -369,7 +364,7 @@ func (s *Service) NewProtocol(tn *onet.TreeNodeInstance, conf *onet.GenericConfi
369364
var pi onet.ProtocolInstance
370365

371366
target := SurveyID(string(conf.Data))
372-
survey := castToSurvey(s.Survey.Get(string(conf.Data)))
367+
survey := s.getSurvey(SurveyID(conf.Data))
373368

374369
switch tn.ProtocolName() {
375370
case protocolsunlynx.ShufflingProtocolName:
@@ -449,7 +444,7 @@ func (s *Service) NewProtocol(tn *onet.TreeNodeInstance, conf *onet.GenericConfi
449444

450445
counter := len(tn.Roster().List) - 1
451446
for counter > 0 {
452-
counter = counter - (<-castToSurvey(s.Survey.Get(string(conf.Data))).DDTChannel)
447+
counter = counter - (<-s.getSurvey(SurveyID(conf.Data)).DDTChannel)
453448
}
454449

455450
case protocolsunlynx.DROProtocolName:
@@ -519,7 +514,7 @@ func (s *Service) NewProtocol(tn *onet.TreeNodeInstance, conf *onet.GenericConfi
519514

520515
// StartProtocol starts a specific protocol (Pipeline, Shuffling, etc.)
521516
func (s *Service) StartProtocol(name string, targetSurvey SurveyID) (onet.ProtocolInstance, error) {
522-
tmp := s.getSurvey(targetSurvey, 9)
517+
tmp := s.getSurvey(targetSurvey)
523518
tree := tmp.Query.Roster.GenerateNaryTreeWithRoot(2, s.ServerIdentity())
524519

525520
var tn *onet.TreeNodeInstance
@@ -551,22 +546,20 @@ func (s *Service) StartProtocol(name string, targetSurvey SurveyID) (onet.Protoc
551546

552547
// StartService starts the service (with all its different steps/protocols)
553548
func (s *Service) StartService(targetSurvey SurveyID, root bool) error {
554-
555549
log.Lvl1(s.ServerIdentity(), " is waiting on channel")
556-
<-s.getSurvey(targetSurvey, 10).SurveyChannel
557550

558-
survey := s.getSurvey(targetSurvey, 11)
551+
survey := s.getSurvey(targetSurvey)
559552

560553
counter := survey.Query.MapDPs[s.ServerIdentity().String()]
561554
for counter > int64(0) {
562555
log.Lvl1(s.ServerIdentity(), " is waiting for ", counter, " data providers to send their data")
563-
counter = counter - int64(<-s.getSurvey(targetSurvey, 12).DpChannel)
556+
counter = counter - int64(<-s.getSurvey(targetSurvey).DpChannel)
564557
}
565558
log.Lvl1("All data providers (", survey.Query.MapDPs[s.ServerIdentity().String()], ") for server ", s.ServerIdentity(), " have sent their data")
566559

567560
log.Lvl1(s.ServerIdentity(), " starts a UnLynx Protocol for survey ", targetSurvey)
568561

569-
target := s.getSurvey(targetSurvey, 13)
562+
target := s.getSurvey(targetSurvey)
570563

571564
// Shuffling Phase
572565
start := libunlynx.StartTimer(s.ServerIdentity().String() + "_ShufflingPhase")
@@ -635,7 +628,7 @@ func (s *Service) StartService(targetSurvey SurveyID, root bool) error {
635628

636629
// ShufflingPhase performs the shuffling of the ClientResponses
637630
func (s *Service) ShufflingPhase(targetSurvey SurveyID) error {
638-
survey := s.getSurvey(targetSurvey, 14)
631+
survey := s.getSurvey(targetSurvey)
639632

640633
if len(survey.DpResponses) == 0 && len(survey.DpResponsesAggr) == 0 {
641634
log.Lvl1(s.ServerIdentity(), " no data to shuffle")
@@ -647,7 +640,7 @@ func (s *Service) ShufflingPhase(targetSurvey SurveyID) error {
647640
return err
648641
}
649642
tmpShufflingResult := <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel
650-
shufflingResult := protocolsunlynx.MatrixCipherTextToProcessResponse(tmpShufflingResult, s.getSurvey(targetSurvey, 14).Lengths)
643+
shufflingResult := protocolsunlynx.MatrixCipherTextToProcessResponse(tmpShufflingResult, s.getSurvey(targetSurvey).Lengths)
651644

652645
survey.PushShuffledProcessResponses(shufflingResult)
653646
err = s.putSurvey(targetSurvey, survey)
@@ -656,7 +649,7 @@ func (s *Service) ShufflingPhase(targetSurvey SurveyID) error {
656649

657650
// TaggingPhase performs the private grouping on the currently collected data.
658651
func (s *Service) TaggingPhase(targetSurvey SurveyID) error {
659-
survey := s.getSurvey(targetSurvey, 15)
652+
survey := s.getSurvey(targetSurvey)
660653

661654
if len(survey.ShuffledProcessResponses) == 0 {
662655
log.Lvl1(s.ServerIdentity(), " for survey ", survey.Query.SurveyID, " has no data to det tag")
@@ -669,7 +662,7 @@ func (s *Service) TaggingPhase(targetSurvey SurveyID) error {
669662
}
670663

671664
tmpDeterministicTaggingResult := <-pi.(*protocolsunlynx.DeterministicTaggingProtocol).FeedbackChannel
672-
deterministicTaggingResult := protocolsunlynx.DeterCipherVectorToProcessResponseDet(tmpDeterministicTaggingResult, s.getSurvey(targetSurvey, 16).TargetOfSwitch)
665+
deterministicTaggingResult := protocolsunlynx.DeterCipherVectorToProcessResponseDet(tmpDeterministicTaggingResult, s.getSurvey(targetSurvey).TargetOfSwitch)
673666

674667
var queryWhereTag []libunlynx.WhereQueryAttributeTagged
675668
for i, v := range deterministicTaggingResult[:len(survey.Query.Where)] {
@@ -698,7 +691,7 @@ func (s *Service) AggregationPhase(targetSurvey SurveyID) error {
698691
}
699692
cothorityAggregatedData := <-pi.(*protocolsunlynx.CollectiveAggregationProtocol).FeedbackChannel
700693

701-
survey := s.getSurvey(targetSurvey, 16)
694+
survey := s.getSurvey(targetSurvey)
702695
survey.PushCothorityAggregatedFilteredResponses(cothorityAggregatedData.GroupedData)
703696
err = s.putSurvey(targetSurvey, survey)
704697
return err
@@ -711,7 +704,7 @@ func (s *Service) DROPhase(targetSurvey SurveyID) error {
711704
return err
712705
}
713706

714-
survey := s.getSurvey(targetSurvey, 17)
707+
survey := s.getSurvey(targetSurvey)
715708

716709
tmpShufflingResult := <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel
717710
shufflingResult := protocolsunlynx.MatrixCipherTextToProcessResponse(tmpShufflingResult, survey.Lengths)
@@ -728,7 +721,7 @@ func (s *Service) KeySwitchingPhase(targetSurvey SurveyID) error {
728721
return err
729722
}
730723

731-
survey := s.getSurvey(targetSurvey, 18)
724+
survey := s.getSurvey(targetSurvey)
732725

733726
tmpKeySwitchedAggregatedResponses := <-pi.(*protocolsunlynx.KeySwitchingProtocol).FeedbackChannel
734727
keySwitchedAggregatedResponses := protocolsunlynx.CipherVectorToFilteredResponse(tmpKeySwitchedAggregatedResponses, survey.Lengths)

services/service_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -770,9 +770,8 @@ func TestAllServersNoDPs(t *testing.T) {
770770
}
771771

772772
surveyID, err := client.SendSurveyCreationQuery(el, servicesunlynx.SurveyID(""), nil, nbrDPs, proofsService, false, sum, count, whereQueryValues, predicate, groupBy)
773-
774773
if err != nil {
775-
t.Fatal("Service did not start.")
774+
t.Fatal("Service did not start:", err)
776775
}
777776

778777
//save values in a map to verify them at the end

0 commit comments

Comments
 (0)