Commit 01279baa authored by Stefan Dähling's avatar Stefan Dähling
Browse files

change storage structure

parent 82f4cbe3
Pipeline #266352 passed with stages
in 3 minutes and 20 seconds
......@@ -168,8 +168,8 @@ func (agency *Agency) terminate(gracefulStop chan os.Signal) {
func (agency *Agency) startAgents() (err error) {
// request configuration
var agencyInfoFull schemas.AgencyInfoFull
agencyInfoFull, _, err = amsclient.GetContainerAgencyInfoFull(agency.info.MASID,
agency.info.ImageGroupID, agency.info.ID)
agencyInfoFull, _, err = amsclient.GetAgencyInfo(agency.info.MASID, agency.info.ImageGroupID,
agency.info.ID)
agency.mutex.Lock()
agency.info.ID = agencyInfoFull.ID
agency.info.Logger = agencyInfoFull.Logger
......
......@@ -187,14 +187,9 @@ func (ams *AMS) getAgencies(masID int) (ret schemas.Agencies, err error) {
}
// getAgencyInfoFull returns status of one agency
func (ams *AMS) getAgencyInfoFull(masID int, agencyID int) (ret schemas.AgencyInfoFull, err error) {
ret, err = ams.stor.getAgencyInfoFull(masID, agencyID)
return
}
func (ams *AMS) getContainerAgencyInfoFull(masID int, imID int,
agencyID int) (ret schemas.AgencyInfoFull, err error) {
ret, err = ams.stor.getContainerAgencyInfoFull(masID, imID, agencyID)
func (ams *AMS) getAgencyInfoFull(masID int, imID int, agencyID int) (ret schemas.AgencyInfoFull,
err error) {
ret, err = ams.stor.getAgencyInfoFull(masID, imID, agencyID)
return
}
......@@ -255,18 +250,18 @@ func (ams *AMS) configureMAS(masSpec schemas.MASSpec) (masInfo schemas.MASInfo,
// extract all image groups
imageTemp := make(map[string]interface{})
for i := range masSpec.ImageGroups {
if _, ok := imageTemp[masSpec.ImageGroups[i].Image]; ok {
if _, ok := imageTemp[masSpec.ImageGroups[i].Config.Image]; ok {
// image already exists
err = errors.New("invalid masSpec; two or more groups with same image")
return
}
imGroupInfo := schemas.ImageGroupInfo{
Image: masSpec.ImageGroups[i].Image,
PullSecret: masSpec.ImageGroups[i].PullSecret,
ID: i,
Config: masSpec.ImageGroups[i].Config,
ID: i,
}
masInfo.ImageGroups = append(masInfo.ImageGroups, imGroupInfo)
imageTemp[masSpec.ImageGroups[i].Image] = nil
masInfo.ImageGroups.Instances = append(masInfo.ImageGroups.Instances, imGroupInfo)
masInfo.ImageGroups.Counter++
imageTemp[masSpec.ImageGroups[i].Config.Image] = nil
}
// MAS configuration
......@@ -275,20 +270,19 @@ func (ams *AMS) configureMAS(masSpec schemas.MASSpec) (masInfo schemas.MASInfo,
// total number of agents and total number of agencies
masInfo.Agents.Counter = 0
numAgencies = make([]int, len(masSpec.ImageGroups), len(masSpec.ImageGroups))
masInfo.Agencies.Counter = 0
for i := range masSpec.ImageGroups {
masInfo.Agents.Counter += len(masSpec.ImageGroups[i].Agents)
numAgencies[i] = len(masSpec.ImageGroups[i].Agents) / masSpec.Config.NumAgentsPerAgency
num := len(masSpec.ImageGroups[i].Agents) / masSpec.Config.NumAgentsPerAgency
if len(masSpec.ImageGroups[i].Agents)%masSpec.Config.NumAgentsPerAgency > 0 {
numAgencies[i] = numAgencies[i] + 1
num++
}
masInfo.Agencies.Counter += numAgencies[i]
masInfo.ImageGroups.Instances[i].Agencies.Instances = make([]schemas.AgencyInfo, num,
num)
masInfo.ImageGroups.Instances[i].Agencies.Counter = num
numAgencies[i] = num
}
masInfo.Agents.Instances = make([]schemas.AgentInfo, masInfo.Agents.Counter,
masInfo.Agents.Counter)
masInfo.Agencies.Instances = make([]schemas.AgencyInfo, masInfo.Agencies.Counter,
masInfo.Agencies.Counter)
// empty graph?
if len(masInfo.Graph.Node) == 0 {
......@@ -297,13 +291,11 @@ func (ams *AMS) configureMAS(masSpec schemas.MASSpec) (masInfo schemas.MASInfo,
// agent configuration
agentID := 0
agencyIDOffset := 0
for i := range masSpec.ImageGroups {
for j := range masSpec.ImageGroups[i].Agents {
masInfo.Agents.Instances[agentID].Spec = masSpec.ImageGroups[i].Agents[j]
masInfo.Agents.Instances[agentID].ID = agentID
masInfo.Agents.Instances[agentID].AgencyID = agencyIDOffset +
j/masSpec.Config.NumAgentsPerAgency
masInfo.Agents.Instances[agentID].AgencyID = j / masSpec.Config.NumAgentsPerAgency
masInfo.Agents.Instances[agentID].ImageGroupID = i
masInfo.Agents.Instances[agentID].Address.Agency = "-im-" + strconv.Itoa(i) + "-agency-" +
strconv.Itoa(j/masSpec.Config.NumAgentsPerAgency)
......@@ -316,18 +308,16 @@ func (ams *AMS) configureMAS(masSpec schemas.MASSpec) (masInfo schemas.MASInfo,
}
agentID++
}
agencyIDOffset += numAgencies[i]
}
// agency configuration
agencyID := 0
agentCounterTot := 0
for i := range masSpec.ImageGroups {
agentCounter := 0
for j := 0; j < numAgencies[i]; j++ {
agencyInfo := schemas.AgencyInfo{
ImageGroupID: i,
ID: agencyID,
ID: j,
Logger: masSpec.Config.Logger,
Name: "-im-" + strconv.Itoa(i) + "-agency-" + strconv.Itoa(j),
}
......@@ -339,9 +329,7 @@ func (ams *AMS) configureMAS(masSpec schemas.MASSpec) (masInfo schemas.MASInfo,
agentCounter++
agentCounterTot++
}
masInfo.Agencies.Instances[agencyID] = agencyInfo
masInfo.ImageGroups[i].Agencies = append(masInfo.ImageGroups[i].Agencies, agencyID)
agencyID++
masInfo.ImageGroups.Instances[i].Agencies.Instances[j] = agencyInfo
}
}
return
......
......@@ -136,8 +136,10 @@ func dummyClient(s *http.Server, t *testing.T) {
},
ImageGroups: []schemas.ImageGroupSpec{
schemas.ImageGroupSpec{
Image: "agent",
PullSecret: "",
Config: schemas.ImageGroupConfig{
Image: "agent",
PullSecret: "",
},
Agents: []schemas.AgentSpec{
schemas.AgentSpec{
Name: "test1",
......@@ -207,7 +209,7 @@ func dummyClient(s *http.Server, t *testing.T) {
t.Error("Error GetAgencies " + strconv.Itoa(httpStatus))
}
_, httpStatus, err = amsclient.GetAgencyInfoFull(0, 0)
_, httpStatus, err = amsclient.GetAgencyInfo(0, 0, 0)
if err != nil {
t.Error(err)
}
......
......@@ -190,30 +190,12 @@ func GetAgencies(masID int) (agencies schemas.Agencies, httpStatus int, err erro
return
}
// GetAgencyInfoFull requests agency information
func GetAgencyInfoFull(masID int, agencyID int) (agency schemas.AgencyInfoFull, httpStatus int,
err error) {
var body []byte
body, httpStatus, err = httpretry.Get(httpClient, "http://"+Host+":"+strconv.Itoa(Port)+
"/api/clonemap/mas/"+strconv.Itoa(masID)+"/agencies/"+strconv.Itoa(agencyID),
time.Second*2, 2)
if err != nil {
return
}
//fmt.Println(string(body))
err = json.Unmarshal(body, &agency)
if err != nil {
agency = schemas.AgencyInfoFull{}
}
return
}
// GetContainerAgencyInfoFull requests agency information
func GetContainerAgencyInfoFull(masID int, imID int, agencyID int) (agency schemas.AgencyInfoFull,
// GetAgencyInfo requests agency information
func GetAgencyInfo(masID int, imID int, agencyID int) (agency schemas.AgencyInfoFull,
httpStatus int, err error) {
var body []byte
body, httpStatus, err = httpretry.Get(httpClient, "http://"+Host+":"+strconv.Itoa(Port)+
"/api/clonemap/mas/"+strconv.Itoa(masID)+"/container/"+strconv.Itoa(imID)+"/"+
"/api/clonemap/mas/"+strconv.Itoa(masID)+"/imgroup/"+strconv.Itoa(imID)+"/"+
strconv.Itoa(agencyID), time.Second*2, 2)
if err != nil {
return
......
......@@ -61,7 +61,7 @@ import (
// deployment interface for interaction with storage
type deployment interface {
newMAS(masID int, images []schemas.ImageGroupInfo, logging bool, mqtt bool,
newMAS(masID int, images schemas.ImageGroups, logging bool, mqtt bool,
df bool) (err error)
scaleMAS(masID int, deltaAgencies int) (err error)
deleteMAS(masID int) (err error)
......@@ -73,15 +73,15 @@ type localDeployment struct {
}
// newMAS triggers the cluster manager to start new agency containers
func (localdepl *localDeployment) newMAS(masID int, images []schemas.ImageGroupInfo,
func (localdepl *localDeployment) newMAS(masID int, images schemas.ImageGroups,
logging bool, mqtt bool, df bool) (err error) {
for i := range images {
for j := 0; j < len(images[i].Agencies); j++ {
for i := range images.Instances {
for j := 0; j < len(images.Instances[i].Agencies.Instances); j++ {
temp := schemas.StubAgencyConfig{
MASID: masID,
AgencyID: j,
ImageGroupID: i,
Image: images[i].Image,
Image: images.Instances[i].Config.Image,
Logging: logging,
MQTT: mqtt,
DF: df,
......
......@@ -50,11 +50,12 @@ THE SOFTWARE.
// ams/mas/counter: int (masCounter)
// ams/mas/<masID>/config schemas.MASConfig
// ams/mas/<masID>/status schemas.Status
// ams/mas/<masID>/groups []schemas.ImageGroupInfo
// ams/mas/<masID>/imcounter int (imCounter)
// ams/mas/<masID>/im/<imID>/config ImageGroupConfig
// ams/mas/<masID>/im/<imID>/agencycounter int(agencyCounter)
// ams/mas/<masID>/im/<imID>/agency/<agency>/status
// ams/mas/<masID>/agentcounter int (agentCounter)
// ams/mas/<masID>/agents/<agentID>: schemas.AgentInfo
// ams/mas/<masID>/agencycounter: int (agencyCounter)
// ams/mas/<masID>/agencies/<agencyID>: AgencyInfo
//
// df/graph/<masID>: schemas.Graph
......@@ -89,14 +90,20 @@ type etcdStorage struct {
// version of mas keys in etcd
type masVersion struct {
status int
status int
config int
groupCounter int
imGroups []imGroupVersion
agentCounter int
agents []int
graph int
}
// version of image group
type imGroupVersion struct {
config int
groups int
agentCounter int
agents []int
agencyCounter int
agencies []int
graph int
}
// setCloneMAPInfo sets info specific to running clonemap instance
......@@ -160,7 +167,8 @@ func (stor *etcdStorage) storeMAS(masID int, masInfo schemas.MASInfo) (err error
if err != nil {
return
}
err = stor.etcdPutResource("ams/mas/"+strconv.Itoa(masID)+"/groups", newMAS.ImageGroups)
err = stor.etcdPutResource("ams/mas/"+strconv.Itoa(masID)+"/imcount",
newMAS.ImageGroups.Counter)
if err != nil {
return
}
......@@ -169,11 +177,6 @@ func (stor *etcdStorage) storeMAS(masID int, masInfo schemas.MASInfo) (err error
if err != nil {
return
}
err = stor.etcdPutResource("ams/mas/"+strconv.Itoa(masID)+"/agencycounter",
newMAS.Agencies.Counter)
if err != nil {
return
}
err = stor.etcdPutResource("df/graph/"+strconv.Itoa(masID),
newMAS.Graph)
if err != nil {
......@@ -185,7 +188,7 @@ func (stor *etcdStorage) storeMAS(masID int, masInfo schemas.MASInfo) (err error
return
}
err = stor.uploadAgencyInfo(newMAS)
err = stor.uploadImGroupInfo(newMAS)
if err != nil {
return
}
......@@ -228,38 +231,50 @@ func (stor *etcdStorage) uploadAgentInfo(newMAS schemas.MASInfo) (err error) {
return
}
// uploadAgencyInfo puts all AgencyInfo of a newly created MAS to etcd
func (stor *etcdStorage) uploadAgencyInfo(newMAS schemas.MASInfo) (err error) {
agencyIndex := 0
for {
numAgInTrans := 100
if newMAS.Agencies.Counter-agencyIndex < numAgInTrans {
numAgInTrans = newMAS.Agencies.Counter - agencyIndex
}
Ops := make([]clientv3.Op, numAgInTrans, numAgInTrans)
// put all agencies structs together
for i := 0; i < numAgInTrans; i++ {
var res []byte
res, err = json.Marshal(newMAS.Agencies.Instances[agencyIndex])
if err != nil {
return
}
Ops[i] = clientv3.OpPut("ams/mas/"+strconv.Itoa(newMAS.ID)+"/agency/"+
strconv.Itoa(agencyIndex), string(res))
agencyIndex++
// uploadImGroupInfo puts all AgencyInfo of a newly created MAS to etcd
func (stor *etcdStorage) uploadImGroupInfo(newMAS schemas.MASInfo) (err error) {
for i := range newMAS.ImageGroups.Instances {
err = stor.etcdPutResource("ams/mas/"+strconv.Itoa(newMAS.ID)+"/im/"+strconv.Itoa(i)+
"/config", newMAS.ImageGroups.Instances[i].Config)
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
cond := clientv3.Compare(clientv3.Version("ams/mas/"+strconv.Itoa(newMAS.ID)+
"/agencycounter"), ">", 0)
_, err = stor.client.Txn(ctx).If(cond).Then(Ops...).Commit()
cancel()
err = stor.etcdPutResource("ams/mas/"+strconv.Itoa(newMAS.ID)+"/im/"+strconv.Itoa(i)+
"/agencycounter", newMAS.ImageGroups.Instances[i].Agencies.Counter)
if err != nil {
return
}
if agencyIndex >= newMAS.Agencies.Counter {
break
agencyIndex := 0
for {
numAgInTrans := 100
if newMAS.ImageGroups.Instances[i].Agencies.Counter-agencyIndex < numAgInTrans {
numAgInTrans = newMAS.ImageGroups.Instances[i].Agencies.Counter - agencyIndex
}
Ops := make([]clientv3.Op, numAgInTrans, numAgInTrans)
// put all agencies structs together
for j := 0; j < numAgInTrans; j++ {
var res []byte
res, err = json.Marshal(newMAS.ImageGroups.Instances[i].Agencies.Instances[agencyIndex].Status)
if err != nil {
return
}
Ops[j] = clientv3.OpPut("ams/mas/"+strconv.Itoa(newMAS.ID)+"/im/"+strconv.Itoa(i)+"/agency/"+
strconv.Itoa(agencyIndex)+"/status", string(res))
agencyIndex++
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
cond := clientv3.Compare(clientv3.Version("ams/mas/"+strconv.Itoa(newMAS.ID)+"/im/"+
strconv.Itoa(i)+"/agencycounter"), ">", 0)
_, err = stor.client.Txn(ctx).If(cond).Then(Ops...).Commit()
cancel()
if err != nil {
return
}
if agencyIndex >= newMAS.ImageGroups.Instances[i].Agencies.Counter {
break
}
time.Sleep(50 * time.Millisecond)
}
time.Sleep(50 * time.Millisecond)
}
return
}
......@@ -348,12 +363,6 @@ func (stor *etcdStorage) initCache() (err error) {
if err != nil {
return
}
// MAS groups
stor.verMAS[i].status, err = stor.etcdGetResource("ams/mas/"+strconv.Itoa(i)+"/groups",
&stor.mas[i].ImageGroups)
if err != nil {
return
}
// MAS graph
stor.verMAS[i].graph, err = stor.etcdGetResource("df/graph/"+strconv.Itoa(i),
&stor.mas[i].Graph)
......@@ -366,7 +375,7 @@ func (stor *etcdStorage) initCache() (err error) {
return
}
// MAS agencies
err = stor.initMASAgencies(i)
err = stor.initMASImGroups(i)
if err != nil {
return
}
......@@ -416,45 +425,79 @@ func (stor *etcdStorage) initMASAgents(masID int) (err error) {
return
}
// initMASAgencies retrieves data of agencies in a mas from etcd
func (stor *etcdStorage) initMASAgencies(masID int) (err error) {
// MAS agency counter
stor.verMAS[masID].agencyCounter, err = stor.etcdGetResource("ams/mas/"+strconv.Itoa(masID)+
"/agencycounter", &stor.mas[masID].Agencies.Counter)
// initMASImGroups retrieves data of agencies in a mas from etcd
func (stor *etcdStorage) initMASImGroups(masID int) (err error) {
// MAS im group counter
stor.verMAS[masID].groupCounter, err = stor.etcdGetResource("ams/mas/"+strconv.Itoa(masID)+"/imCount",
&stor.mas[masID].ImageGroups.Counter)
if err != nil {
return
}
stor.mas[masID].Agencies.Instances = make([]schemas.AgencyInfo, stor.mas[masID].Agencies.Counter,
stor.mas[masID].Agencies.Counter)
stor.verMAS[masID].agencies = make([]int, stor.mas[masID].Agencies.Counter,
stor.mas[masID].Agencies.Counter)
stor.mas[masID].ImageGroups.Instances = make([]schemas.ImageGroupInfo,
stor.mas[masID].ImageGroups.Counter, stor.mas[masID].ImageGroups.Counter)
stor.verMAS[masID].imGroups = make([]imGroupVersion, stor.mas[masID].ImageGroups.Counter,
stor.mas[masID].ImageGroups.Counter)
// get info of all agencies and loop through
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp := &clientv3.GetResponse{}
resp, err = stor.client.Get(ctx, "ams/mas/"+strconv.Itoa(masID)+"/agency")
if err == nil {
for i := range resp.Kvs {
temp := strings.Split(string(resp.Kvs[i].Key), "/")
if len(temp) != 5 {
continue
}
var agencyID int
agencyID, err = strconv.Atoi(temp[4])
if err != nil {
continue
}
if agencyID >= stor.mas[masID].Agencies.Counter {
continue
}
err = json.Unmarshal(resp.Kvs[i].Value, &stor.mas[masID].Agencies.Instances[agencyID])
if err != nil {
continue
for i := 0; i < stor.mas[masID].ImageGroups.Counter; i++ {
stor.verMAS[masID].imGroups[i].agencyCounter, err = stor.etcdGetResource("ams/mas/"+
strconv.Itoa(masID)+"/im/"+strconv.Itoa(i)+"/agencycounter",
&stor.mas[masID].ImageGroups.Instances[i].Agencies.Counter)
if err != nil {
return
}
stor.verMAS[masID].imGroups[i].config, err = stor.etcdGetResource("ams/mas/"+
strconv.Itoa(masID)+"/im/"+strconv.Itoa(i)+"/config",
&stor.mas[masID].ImageGroups.Instances[i].Config)
if err != nil {
return
}
stor.mas[masID].ImageGroups.Instances[i].Agencies.Instances = make([]schemas.AgencyInfo,
stor.mas[masID].ImageGroups.Instances[i].Agencies.Counter,
stor.mas[masID].ImageGroups.Instances[i].Agencies.Counter)
stor.verMAS[masID].imGroups[i].agencies = make([]int,
stor.mas[masID].ImageGroups.Instances[i].Agencies.Counter,
stor.mas[masID].ImageGroups.Instances[i].Agencies.Counter)
// get info of all agencies and loop through
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp := &clientv3.GetResponse{}
resp, err = stor.client.Get(ctx, "ams/mas/"+strconv.Itoa(masID)+"/im/"+strconv.Itoa(i)+
"/agency")
if err == nil {
for j := range resp.Kvs {
temp := strings.Split(string(resp.Kvs[j].Key), "/")
if len(temp) != 7 {
continue
}
var agencyID int
agencyID, err = strconv.Atoi(temp[6])
if err != nil {
continue
}
if agencyID >= stor.mas[masID].ImageGroups.Instances[i].Agencies.Counter {
continue
}
err = json.Unmarshal(resp.Kvs[j].Value,
&stor.mas[masID].ImageGroups.Instances[i].Agencies.Instances[agencyID].Status)
if err != nil {
continue
}
stor.verMAS[masID].imGroups[i].agencies[agencyID] = int(resp.Kvs[j].Version)
stor.mas[masID].ImageGroups.Instances[i].Agencies.Instances[agencyID].MASID = masID
stor.mas[masID].ImageGroups.Instances[i].Agencies.Instances[agencyID].ID = agencyID
stor.mas[masID].ImageGroups.Instances[i].Agencies.Instances[agencyID].ImageGroupID =
i
stor.mas[masID].ImageGroups.Instances[i].Agencies.Instances[agencyID].Logger =
stor.mas[masID].Config.Logger
stor.mas[masID].ImageGroups.Instances[i].Agencies.Instances[agencyID].Name =
"mas-" + strconv.Itoa(masID) + "-im-" + strconv.Itoa(i) +
"-agency-" + strconv.Itoa(agencyID) + ".mas" + strconv.Itoa(masID) +
"agencies"
}
stor.verMAS[masID].agencies[agencyID] = int(resp.Kvs[i].Version)
}
cancel()
}
cancel()
return
}
......@@ -485,33 +528,54 @@ func (stor *etcdStorage) handleAMSEvents() {
stor.mas = append(stor.mas, schemas.MASInfo{})
}
}
stor.mutex.Unlock()
if len(stor.verMAS) <= masID {
for i := 0; i < masID-len(stor.verMAS)+1; i++ {
stor.verMAS = append(stor.verMAS, masVersion{})
}
}
stor.mutex.Unlock()
if len(path) == 4 {
err = stor.handleMASEvents(event.Kv, masID, path[3])
} else if len(path) == 5 {
if strings.HasPrefix(key, "ams/mas/") {
if path[3] == "agent" {
var agentID int
agentID, err = strconv.Atoi(path[4])
if err != nil {
stor.logError.Println(err)
continue
}
err = stor.handleAgentEvents(event.Kv, masID, agentID)
} else if path[3] == "agency" {
var agencyID int
agencyID, err = strconv.Atoi(path[4])
if err != nil {
stor.logError.Println(err)
continue
}
err = stor.handleAgencyEvents(event.Kv, masID, agencyID)
} else if len(path) == 5 && path[3] == "agent" {
var agentID int
agentID, err = strconv.Atoi(path[4])
if err != nil {
stor.logError.Println(err)
continue
}
err = stor.handleAgentEvents(event.Kv, masID, agentID)
} else if len(path) >= 6 && path[3] == "im" {
var imID int
imID, err = strconv.Atoi(path[4])
if err != nil {
stor.logError.Println(err)
continue
}
// create imGroup storage object, if number of stored im groups is lower than imID
stor.mutex.Lock()
if len(stor.mas[masID].ImageGroups.Instances) <= imID {
for i := 0; i < imID-len(stor.mas[masID].ImageGroups.Instances)+1; i++ {
stor.mas[masID].ImageGroups.Instances = append(stor.mas[masID].ImageGroups.Instances,
schemas.ImageGroupInfo{})
}
}
if len(stor.verMAS[masID].imGroups) <= imID {
for i := 0; i < imID-len(stor.verMAS[masID].imGroups)+1; i++ {
stor.verMAS[masID].imGroups = append(stor.verMAS[masID].imGroups, imGroupVersion{})
}
}
stor.mutex.Unlock()
if len(path) == 6 {
err = stor.handleImGroupEvents(event.Kv, masID, imID, path[5])
} else if len(path) == 8 {
var agencyID int
agencyID, err = strconv.Atoi(path[6])
if err != nil {
stor.logError.Println(err)
continue
}
err = stor.handleAgencyEvents(event.Kv, masID, imID, agencyID)
}
}
}
......@@ -527,7 +591,9 @@ func (stor *etcdStorage) handleAMSEvents() {
// handleMASCounterEvents is the handler function for events of the ams/mas/counter path
func (stor *etcdStorage) handleMASCounterEvents(kv *mvccpb.KeyValue) (err error) {
if stor.verMASCounter < int(kv.Version) {
stor.mutex.Lock()
err = json.Unmarshal(kv.Value, &stor.masCounter)
stor.mutex.Unlock()
if err != nil {
return
}
......@@ -540,45 +606,49 @@ func (stor *etcdStorage) handleMASCounterEvents(kv *mvccpb.KeyValue) (err error)
func (stor *etcdStorage) handleMASEvents(kv *mvccpb.KeyValue, masID int, key string) (err error) {
switch key {
case "config":
stor.mutex.Lock()
if stor.verMAS[masID].config < int(kv.Version) {
err = json.Unmarshal(kv.Value, &stor.mas[masID].Config)
if err != nil {
stor.mutex.Unlock()
return
}
stor.verMAS[masID].config = int(kv.Version)
}
stor.mutex.Unlock