Commit 2599bbaf authored by Stefan Dähling's avatar Stefan Dähling
Browse files

agency client

parent c3af3514
Pipeline #405074 passed with stages
in 3 minutes and 54 seconds
......@@ -51,13 +51,14 @@ import (
"net/http"
"strconv"
agencyclient "git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/agency/client"
agencycli "git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/agency/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
// remoteAgency holds the channel used for sending messages to remot agency
type remoteAgency struct {
msgIn chan schemas.ACLMessage // ACL message inbox
msgIn chan schemas.ACLMessage // ACL message inbox
agencyClient *agencycli.Client
// agents map[int]*agent.Agent
}
......@@ -113,7 +114,8 @@ func (agency *Agency) aclLookup(agentID int) (acl *ACL, err error) {
agency.logInfo.Println("New remote agent ", agentID, " in unknown agency ", address.Agency)
// create new remote agency
remAgency = &remoteAgency{
msgIn: make(chan schemas.ACLMessage, 1000),
msgIn: make(chan schemas.ACLMessage, 1000),
agencyClient: agency.agencyClient,
}
agency.mutex.Lock()
agency.remoteAgencies[address.Agency] = remAgency
......@@ -172,14 +174,14 @@ func (remAgency *remoteAgency) sendMsgs(remName string, localName string, logErr
msgs[i+1].AgencySender = localName
msgs[i+1].AgencyReceiver = remName
}
stat, err = agencyclient.PostMsgs(ip, msgs)
stat, err = remAgency.agencyClient.PostMsgs(ip, msgs)
if err != nil || stat != http.StatusCreated {
ip, err = getIP(remName)
if err != nil {
logErr.Println(err)
return
}
stat, err = agencyclient.PostMsgs(ip, msgs)
stat, err = remAgency.agencyClient.PostMsgs(ip, msgs)
if err != nil {
logErr.Println(err)
}
......@@ -219,14 +221,14 @@ func (agency *Agency) receiveMsgs() {
if ok {
err := ag.ACL.newIncomingMessage(msgs[i])
if err != nil {
_, err = agencyclient.ReturnMsg(msgs[i].AgencySender, msgs[i])
_, err = agency.agencyClient.ReturnMsg(msgs[i].AgencySender, msgs[i])
if err != nil {
agency.logError.Println(err)
return
}
}
} else {
_, err := agencyclient.ReturnMsg(msgs[i].AgencySender, msgs[i])
_, err := agency.agencyClient.ReturnMsg(msgs[i].AgencySender, msgs[i])
if err != nil {
agency.logError.Println(err)
return
......
......@@ -58,6 +58,7 @@ import (
"syscall"
"time"
agencycli "git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/agency/client"
amscli "git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/ams/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/status"
......@@ -76,6 +77,7 @@ type Agency struct {
logger *loggerClient
mqtt *mqttClient
amsClient *amscli.Client
agencyClient *agencycli.Client
logInfo *log.Logger // logger for info logging
logError *log.Logger // logger for error logging
}
......@@ -90,6 +92,7 @@ func StartAgency(task func(*Agent) error) (err error) {
remoteAgencies: make(map[string]*remoteAgency),
msgIn: make(chan []schemas.ACLMessage, 1000),
amsClient: amscli.New(time.Second*60, time.Second*1, 4),
agencyClient: agencycli.New(time.Second*60, time.Second*1, 4),
logError: log.New(os.Stderr, "[ERROR] ", log.LstdFlags),
}
err = agency.init()
......
......@@ -56,14 +56,19 @@ import (
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/status"
)
var httpClient = &http.Client{Timeout: time.Second * 10}
var delay = time.Second * 1
var numRetries = 4
// Client is the ams client
type Client struct {
httpClient *http.Client // http client
Port int // ams port
delay time.Duration // delay between two retries
numRetries int // number of retries
}
// GetInfo requests the agency info
func GetInfo(agency string) (agencyInfo schemas.AgencyInfo, httpStatus int, err error) {
func (cli *Client) GetInfo(agency string) (agencyInfo schemas.AgencyInfo, httpStatus int,
err error) {
var body []byte
body, httpStatus, err = httpretry.Get(httpClient, "http://"+agency+":10000/api/agency",
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix(agency)+"/api/agency",
time.Second*2, 2)
if err != nil {
return
......@@ -76,9 +81,10 @@ func GetInfo(agency string) (agencyInfo schemas.AgencyInfo, httpStatus int, err
}
// GetAgents requests the agents running in agency
func GetAgents(agency string) (agentInfo []schemas.AgentInfo, httpStatus int, err error) {
func (cli *Client) GetAgents(agency string) (agentInfo []schemas.AgentInfo, httpStatus int,
err error) {
var body []byte
body, httpStatus, err = httpretry.Get(httpClient, "http://"+agency+":10000/api/agency/agents",
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix(agency)+"/api/agency/agents",
time.Second*2, 2)
if err != nil {
return
......@@ -91,10 +97,10 @@ func GetAgents(agency string) (agentInfo []schemas.AgentInfo, httpStatus int, er
}
// GetAgent requests one agent running in agency
func GetAgent(agency string, agentID int) (agentInfo schemas.AgentInfo, httpStatus int,
err error) {
func (cli *Client) GetAgent(agency string, agentID int) (agentInfo schemas.AgentInfo,
httpStatus int, err error) {
var body []byte
body, httpStatus, err = httpretry.Get(httpClient, "http://"+agency+":10000/api/agency/agents/"+
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix(agency)+"/api/agency/agents/"+
strconv.Itoa(agentID), time.Second*2, 2)
if err != nil {
return
......@@ -107,26 +113,26 @@ func GetAgent(agency string, agentID int) (agentInfo schemas.AgentInfo, httpStat
}
// PostAgent post an agent to agency
func PostAgent(agency string, agent schemas.AgentInfo) (httpStatus int, err error) {
func (cli *Client) PostAgent(agency string, agent schemas.AgentInfo) (httpStatus int, err error) {
js, _ := json.Marshal(agent)
_, httpStatus, err = httpretry.Post(httpClient, "http://"+agency+":10000/api/agency/agents",
_, httpStatus, err = httpretry.Post(cli.httpClient, cli.prefix(agency)+"/api/agency/agents",
"application/json", js, time.Second*2, 2)
return
}
// DeleteAgent requests an agent to terminate
func DeleteAgent(agency string, agentID int) (httpStatus int, err error) {
httpStatus, err = httpretry.Delete(httpClient, "http://"+agency+":10000/api/agency/agents/"+
func (cli *Client) DeleteAgent(agency string, agentID int) (httpStatus int, err error) {
httpStatus, err = httpretry.Delete(cli.httpClient, cli.prefix(agency)+"/api/agency/agents/"+
strconv.Itoa(agentID), nil, time.Second*2, 2)
return
}
// GetAgentStatus requests status from agent and returns it
func GetAgentStatus(agency string, agentID int) (agentStatus schemas.Status,
func (cli *Client) GetAgentStatus(agency string, agentID int) (agentStatus schemas.Status,
httpStatus int, err error) {
var temp schemas.Status
var body []byte
body, httpStatus, err = httpretry.Get(httpClient, "http://"+agency+":10000/api/agency/agents/"+
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix(agency)+"/api/agency/agents/"+
strconv.Itoa(agentID)+"/status", time.Second*2, 2)
if err != nil {
agentStatus.Code = status.Error
......@@ -142,24 +148,33 @@ func GetAgentStatus(agency string, agentID int) (agentStatus schemas.Status,
}
// PostMsgs post an agent message to the agent
func PostMsgs(agency string, msgs []schemas.ACLMessage) (httpStatus int, err error) {
func (cli *Client) PostMsgs(agency string, msgs []schemas.ACLMessage) (httpStatus int, err error) {
js, _ := json.Marshal(msgs)
_, httpStatus, err = httpretry.Post(httpClient, "http://"+agency+":10000/api/agency/msgs",
_, httpStatus, err = httpretry.Post(cli.httpClient, cli.prefix(agency)+"/api/agency/msgs",
"application/json", js, time.Second*2, 2)
return
}
// ReturnMsg return undeliverable msg
func ReturnMsg(agency string, msg schemas.ACLMessage) (httpStatus int, err error) {
func (cli *Client) ReturnMsg(agency string, msg schemas.ACLMessage) (httpStatus int, err error) {
js, _ := json.Marshal(msg)
_, httpStatus, err = httpretry.Post(httpClient, "http://"+agency+":10000/api/agency/msgundeliv",
_, httpStatus, err = httpretry.Post(cli.httpClient, cli.prefix(agency)+"/api/agency/msgundeliv",
"application/json", js, time.Second*2, 2)
return
}
// Init initializes the client
func Init(timeout time.Duration, del time.Duration, numRet int) {
httpClient.Timeout = timeout
delay = del
numRetries = numRet
func (cli *Client) prefix(agency string) (ret string) {
ret = "http://" + agency + ":" + strconv.Itoa(cli.Port)
return
}
// New creates a new AMS client
func New(timeout time.Duration, del time.Duration, numRet int) (cli *Client) {
cli = &Client{
httpClient: &http.Client{Timeout: timeout},
Port: 10000,
delay: del,
numRetries: numRet,
}
return
}
......@@ -63,16 +63,20 @@ import (
// AMS contains storage and deployment object
type AMS struct {
stor storage // interface for local or distributed storage
depl deployment // interface for local or cloud deployment
logInfo *log.Logger // logger for info logging
logError *log.Logger // logger for error logging
stor storage // interface for local or distributed storage
depl deployment // interface for local or cloud deployment
logInfo *log.Logger // logger for info logging
logError *log.Logger // logger for error logging
agencyClient *agcli.Client
}
// StartAMS starts an AMS instance. It initializes the cluster and storage object and starts API
// server.
func StartAMS() (err error) {
ams := &AMS{logError: log.New(os.Stderr, "[ERROR] ", log.LstdFlags)}
ams := &AMS{
logError: log.New(os.Stderr, "[ERROR] ", log.LstdFlags),
agencyClient: agcli.New(time.Second*60, time.Second*1, 4),
}
// create storage and deployment object according to specified deployment type
err = ams.init()
if err != nil {
......@@ -455,7 +459,7 @@ func (ams *AMS) removeAgent(masID int, agentID int) (err error) {
if err != nil {
return
}
_, err = agcli.DeleteAgent(addr.Agency, agentID)
_, err = ams.agencyClient.DeleteAgent(addr.Agency, agentID)
return
}
......@@ -463,7 +467,7 @@ func (ams *AMS) removeAgent(masID int, agentID int) (err error) {
// postAgentToAgency sends a post request to agency with info about agent to start
func (ams *AMS) postAgentToAgency(agentInfo schemas.AgentInfo) (err error) {
var httpStatus int
httpStatus, err = agcli.PostAgent(agentInfo.Address.Agency, agentInfo)
httpStatus, err = ams.agencyClient.PostAgent(agentInfo.Address.Agency, agentInfo)
if err != nil {
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment