-
Notifications
You must be signed in to change notification settings - Fork 0
/
instance_group.go
101 lines (82 loc) · 2.52 KB
/
instance_group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package main
import (
"log"
"time"
)
type InstanceGroup struct {
instanceGroupID int
node *Node
tm *timerMgr
nextInstanceID int
learner *learner
acceptor *acceptor
proposer *proposer
}
func newInstanceGroup(node *Node, instanceGroupID int, sm statemachine) *InstanceGroup {
instanceGroup := &InstanceGroup{node: node, instanceGroupID: instanceGroupID, nextInstanceID: 1}
instanceGroup.tm = newTimerMgr()
instanceGroup.acceptor = newAcceptor(instanceGroup)
instanceGroup.proposer = newProposer(instanceGroup)
instanceGroup.learner = newLearner(instanceGroup, sm)
go instanceGroup.run()
return instanceGroup
}
func (instanceGroup *InstanceGroup) commit(val string) (string, error) {
return instanceGroup.proposer.commit(val)
}
func (instanceGroup *InstanceGroup) getNodeID() int {
return instanceGroup.node.getNodeID()
}
func (instanceGroup *InstanceGroup) getNodeCount() int {
return instanceGroup.node.getNodeCount()
}
func (instanceGroup *InstanceGroup) getNextInstanceID() int {
return instanceGroup.nextInstanceID
}
func (instanceGroup *InstanceGroup) updateNextInstanceID() {
instanceGroup.nextInstanceID++
}
func (instanceGroup *InstanceGroup) send(id int, m message) {
instanceGroup.node.network.send(id, m)
}
func (instanceGroup *InstanceGroup) response(id int, m message) {
instanceGroup.node.network.response(id, m)
}
func (instanceGroup *InstanceGroup) broadcast(m message, self bool) {
for k := range instanceGroup.node.network.nodeConns1 {
if !self && k == instanceGroup.node.getNodeID() {
continue
}
instanceGroup.node.network.send(k, m)
}
}
func (instanceGroup *InstanceGroup) run() {
for {
m, ok := instanceGroup.node.network.recv(time.Millisecond * 10)
if ok {
switch m.typ {
case Prepare:
instanceGroup.acceptor.onPrepare(m)
case Propose:
instanceGroup.acceptor.onAccept(m)
case Promised:
instanceGroup.proposer.onPromised(m)
case Accepted:
instanceGroup.proposer.onAccepted(m)
case PushLearn:
instanceGroup.learner.leanValue(m)
case PullLearnRequest:
instanceGroup.learner.onPullLearnRequest(m)
case PullLearnResponse:
instanceGroup.learner.onPullLearnResponse(m)
default:
log.Printf("node: %d unexpected message type: %d\n", instanceGroup.node.getNodeID(), m.typ)
}
}
instanceGroup.tm.update()
instanceGroup.proposer.update(true)
select {
case <-time.After(time.Microsecond):
}
}
}