controller侧与agent侧分布式通迅实现-睿云智合

[复制链接]

103

主题

0

回帖

15

积分

游客

积分
15
传新创技 发表于 2018-11-18 17:47:35 | 显示全部楼层 |阅读模式
    一.背景

    在k8s分布式系统中,通迅成为重要的部分。本文分享一下如何使用通迅中间件。

    本文代码相关技术如下:

    rabbitmq

    redis

    golang

    k8s集群与集群之间通讯,我们都可以使用相同的中间件rabbitmq。

    本文使用最简单的模式LB,单实例的RPC调用。

    二.分布式调用结构
    2.1rabbitmqlb模式调用

    .agent1agent2agent3同时上报自己在线时,rabbitmq自动调用controller1或controller2中其中一个实例,再由controllerX写入redis中去。当controller1、controller2需要所有agent状态时,读取数据都是redis,所以都是一致的。

    .agent1agent2agent3获取配置信息时,rabbitmq也自动调用controller1或controller2其中一个实例。再由controllerX读取redis或者mysql数据,再返回给agent。不论是调用到controller1还是controller2,返回的数据都是一致。

    2.2rabbitmq单实例模式调用
    controller实例下发配置信息时:

    setp1.获得当前在线的agent。

    setp2.单实例模式rpc调用。向所有的agent发送配置信息。

    setp3.可以明确了解有没有agent时下发配置失败的。如果都失败,则本次调用失败.。只要有一个失败,就可以认为需要重发一次命令。

    三.代码实现

    3.1.rabbitmqrpc调用客户端实现

    packageingress

    import(

    "fmt"

    "time"

    "context"

    "github.com/wzhliang/xing"

    "wise2c/wisecloud-ingress-agent/communicate"

    "wise2c/wisecloud-ingress-agent/log"

    "wise2c/wisecloud-ingress-agent/common"

    )

    typeControllerClientstruct{

    Producer*xing.Client

    Clientcommunicate.ControllerHelperClient

    }

    funcNewControllerClient()*ControllerClient{

    agent:=&ControllerClient{}

    //amqp_url:="amqp://guest:guest@localhost:5672/"

    amqp_url:=fmt.Sprintf("amqp://%s:%s@%s:%d",

    common.MQUser,

    common.MQPassword,

    common.MQHost,

    common.MQPort,

    )

    varerrerror

    agent.Producer,err=xing.NewClient(

    globalRPCAgentName,

    amqp_url,

    xing.SetIdentifier(&xing.NoneIdentifier{}),

    xing.SetSerializer(&xing.JSONSerializer{}),

    )

    iferr!=nil{

    log.Error("xing.NewClient()isfailed.%s",err.Error())

    returnagent

    }

    //LBRPC

    target:=fmt.Sprint("ingress.controller")

    agent.Client=communicate.NewControllerHelperClient(target,agent.Producer)

    returnagent

    }

    func(this*ControllerClient)Close(){

    ifthis.Producer==nil{

    return

    }

    this.Producer.Close()

    //this.closed=true

    }

    func(this*ControllerClient)OnlineAgent(namestring)error{

    ctx,cancel:=context.WithTimeout(context.Background(),5000*time.Millisecond)

    defercancel()

    log.Debug("OnlineAgent(%s)",name)

    _,err:=this.Client.OnlineAgent(ctx,

    &communicate.OnlineAgentRequest{Name:name,})

    iferr!=nil{

    returnerr

    }

    returnnil

    }

    func(this*ControllerClient)GetIngressConfigs(uuidstring)(string,error){

    ctx,cancel:=context.WithTimeout(context.Background(),5000*time.Millisecond)

    defercancel()

    response,err:=this.Client.GetIngressConfigs(ctx,

    &communicate.GetIngressConfigsRequest{

    Uuid:uuid,

    })

    iferr!=nil{

    return"error",err

    }

    returnresponse.Content,err

    }

    funcAgentHeartbeatToController(){

    ifglobalControllerClient==nil{

    return

    }

    varerrerror

    content:=""

    for{

    err=globalControllerClient.OnlineAgent(globalRPCAgentName)

    iferr!=nil{

    log.Error(err.Error())

    }

    //1time/2second.

    time.Sleep(time.Millisecond*2000)

    }

    }

    3.2.rabbitmqrpc调用服务端实现

    packageingress

    import(

    "fmt"

    "context"

    "github.com/wzhliang/xing"

    "wise2c/wisecloud-ingress-controller/communicate"

    "wise2c/wisecloud-ingress-controller/log"

    "wise2c/wisecloud-ingress-controller/common"

    )

    typeControllerServerImpstruct{}

    func(g*ControllerServerImp)OnlineAgent(ctxcontext.Context,req*communicate.OnlineAgentRequest,rsp*communicate.Void)error{

    log.Debug("OnlineAgent(%s)",req.Name)

    err:=globalAgentClient.manager.OnlineAgent(req.Name)

    iferr!=nil{

    returnerr

    }

    returnnil

    }

    func(g*ControllerServerImp)GetIngressConfigs(ctxcontext.Context,req*communicate.GetIngressConfigsRequest,rsp*communicate.GetIngressConfigsResponse)error{

    log.Info("GetIngressConfigs(%s)",req.Uuid)

    rsp.Content="ok"

    returnnil

    }

    funcRunRPCServer(){

    //globalRPCControllerName=fmt.Sprintf("host.controller.%s",common.GetGuid())

    //LBRPC.

    globalRPCControllerName=fmt.Sprintf("ingress.controller")

    //amqp_url:="amqp://guest:guest@localhost:5672/"

    amqp_url:=fmt.Sprintf("amqp://%s:%s@%s:%d",

    common.MQUser,

    common.MQPassword,

    common.MQHost,

    common.MQPort,

    )

    svc,err:=xing.NewService(

    globalRPCControllerName,

    amqp_url,

    xing.SetSerializer(&xing.JSONSerializer{}),

    xing.SetBrokerTimeout(15,5),

    )

    iferr!=nil{

    log.Error(fmt.Sprintf("MQURL=%sNewServiceisfailed.%s",amqp_url,err.Error()))

    }

    communicate.RegisterControllerHelperHandler(svc,&ControllerServerImp{})

    log.Info("RPCServerisstarting.Connecttotherabbitmq[%s].",amqp_url)

    err=svc.Run()

    iferr!=nil{

    log.Error(err.Error())

    }

    }

    3.3.rabbitmq单实例rpc调用客户端实现

    1.rpclb调用实时上报agent是否在线,实现了类似consul的服务发现的功能.

    2.ClientManager可以通过redis中的实时数据,管理所有的rpcclient.当agent下线,或者3秒之间没有上报状态,则清除指定的rpcclient.

    3.这样每次下发配置时,可以实时发送到每个rpc单实例服务器实例.

    packageingress

    import(

    "fmt"

    "sync"

    "time"

    "errors"

    "context"

    "github.com/wzhliang/xing"

    "github.com/astaxie/beego/utils"

    "wise2c/wisecloud-ingress-controller/communicate"

    "wise2c/wisecloud-ingress-controller/log"

    "wise2c/wisecloud-ingress-controller/common"

    )

    typeAgentHelperstruct{

    mutex*sync.Mutex

    closedbool

    Helpercommunicate.AgentHelperClient

    Producer*xing.Client

    }

    funcNewAgentHelper(agent_namestring)*AgentHelper{

    varerrerror

    agent:=&AgentHelper{

    mutex:new(sync.Mutex),

    closed:false,

    }

    //amqp_url:="amqp://guest:guest@localhost:5672/"

    amqp_url:=fmt.Sprintf("amqp://%s:%s@%s:%d",

    common.MQUser,

    common.MQPassword,

    common.MQHost,

    common.MQPort,

    )

    agent.Producer,err=xing.NewClient(

    globalRPCControllerName,

    amqp_url,

    xing.SetIdentifier(&xing.NoneIdentifier{}),

    xing.SetSerializer(&xing.JSONSerializer{}),

    )

    iferr!=nil{

    log.Error(fmt.Sprintf("MQURL=%sNewClientisfailed.%s",

    amqp_url,

    err.Error()))

    returnagent

    }

    //target:=fmt.Sprint("ingress.agent.%s",agent_name)

    agent.Helper=communicate.NewAgentHelperClient(agent_name,agent.Producer)

    returnagent

    }

    func(this*AgentHelper)Close(){

    this.mutex.Lock()

    deferthis.mutex.Unlock()

    ifthis.Producer!=nil{

    this.Producer.Close()

    }

    this.closed=true

    }

    func(this*AgentHelper)SetIngressConfig(contentstring)error{

    this.mutex.Lock()

    deferthis.mutex.Unlock()

    ifthis.closed{

    returnerrors.New("theclientisclosed.")

    }

    ctx,cancel:=context.WithTimeout(context.Background(),5000*time.Millisecond)

    defercancel()

    log.Info("SetIngressConfig(%s)",content)

    _,err:=this.Helper.SetIngressConfig(ctx,

    &communicate.SetIngressConfigRequest{

    content,

    })

    returnerr

    }

    func(this*AgentHelper)DelIngressConfig(uuidstring)error{

    this.mutex.Lock()

    deferthis.mutex.Unlock()

    ifthis.closed{

    returnerrors.New("theclientisclosed.")

    }

    ctx,cancel:=context.WithTimeout(context.Background(),5000*time.Millisecond)

    defercancel()

    log.Info("DelIngressConfig(%s)",uuid)

    _,err:=this.Helper.DelIngressConfig(ctx,

    &communicate.DelIngressConfigRequest{

    uuid,

    })

    returnerr

    }

    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

    typeClientManagerstruct{

    mutex*sync.Mutex

    Pool*utils.BeeMap

    }

    funcNewClientManager()*ClientManager{

    return&ClientManager{

    mutex:new(sync.Mutex),

    Pool:utils.NewBeeMap(),

    }

    }

    func(p*ClientManager)Init(){

    gop.RunConnect()

    }

    func(p*ClientManager)RunConnect(){

    for{

    names,err:=globalRedisClient.GetAgentNames()

    iferr!=nil{

    log.Error(err.Error())

    }

    name_map:=map[interface{}]int{}

    for_,name:=rangenames{

    name_map[name]=1

    }

    forkey,v:=rangep.Pool.Items(){

    //log.Error("key=%s",key)

    _,ok:=name_map[key]

    ifok{

    //log.Warning("findthe%s",key)

    continue

    }

    ifv!=nil{

    log.Warning("ClosetheAgentHelper%s",key)

    v.(*AgentHelper).Close()

    }

    log.Warning("DeletethePool%s",key)

    p.Pool.Delete(key)

    }

    for_,name:=rangenames{

    if!p.Pool.Check(name){

    log.Warning("NewtheAgentHelper%s",name)

    p.Pool.Set(name,NewAgentHelper(name))

    }

    }

    time.Sleep(time.Second*1)

    }

    }

    func(p*ClientManager)GetClients()map[interface{}]interface{}{

    returnp.Pool.Items()

    }

    func(p*ClientManager)OnlineAgent(agent_namestring)error{

    p.mutex.Lock()

    deferp.mutex.Unlock()

    globalRedisClient.OnlineAgent(agent_name,"1")

    if!p.Pool.Check(agent_name){

    p.Pool.Set(agent_name,NewAgentHelper(agent_name))

    }

    returnnil

    }

    func(p*ClientManager)OfflineAgent(agent_namestring)error{

    p.mutex.Lock()

    deferp.mutex.Unlock()

    globalRedisClient.OfflineAgent(agent_name)

    agent_helper:=p.Pool.Get(agent_name)

    ifagent_helper!=nil{

    agent_helper.(*AgentHelper).Close()

    p.Pool.Delete(agent_name)

    }

    returnnil

    }

    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

    typeAgentClientstruct{

    manager*ClientManager

    }

    funcNewAgentClient()*AgentClient{

    return&AgentClient{

    manager:NewClientManager(),

    }

    }

    func(client*AgentClient)Init(){

    client.manager.Init()

    }

    typeAgentHandlerCallbackfunc(request,responseinterface{},helper*AgentHelper)error

    func(client*AgentClient)AgentHandler(request,responseinterface{},callbackAgentHandlerCallback)(errerror){

    maps:=client.manager.GetClients()

    count:=0

    fork,v:=rangemaps{

    name:=k.(string)

    ifv==nil{

    continue

    }

    helper:=v.(*AgentHelper)

    ifcallback==nil{

    continue

    }

    err=callback(request,response,helper)

    iferr!=nil{

    returnerrors.New(fmt.Sprintf("%s%s",name,err.Error()))

    }

    count+=1

    }

    ifcount>0{

    returnnil

    }

    returnerrors.New("noagentonline.")

    }

    func(this*AgentClient)SetIngressConfig(request,responseinterface{})error{

    returnthis.AgentHandler(request,response,func(request,responseinterface{},helper*AgentHelper)error{

    returnhelper.SetIngressConfig(request.(string))

    })

    }

    func(this*AgentClient)DelIngressConfig(request,responseinterface{})error{

    returnthis.AgentHandler(request,response,func(request,responseinterface{},helper*AgentHelper)error{

    returnhelper.DelIngressConfig(request.(string))

    })

    }

    3.4.rabbitmq单实例rpc调用.服务端实现

    packageingress

    import(

    "fmt"

    "context"

    "github.com/wzhliang/xing"

    "wise2c/wisecloud-ingress-agent/communicate"

    "wise2c/wisecloud-ingress-agent/log"

    "wise2c/wisecloud-ingress-agent/common"

    )

    typeAgentServerImpstruct{}

    func(g*AgentServerImp)SetIngressConfig(ctxcontext.Context,req*communicate.SetIngressConfigRequest,rsp*communicate.Void)error{

    log.Info("SetIngressConfig(%s)",req.Content)

    config:=&Wise2cIngressConfig{}

    err:=config.Parse([]byte(req.Content))

    iferr!=nil{

    log.Error(err.Error())

    returnerr

    }

    globalIngressProcess.SetIngressConfig(config)

    returnnil

    }

    func(g*AgentServerImp)DelIngressConfig(ctxcontext.Context,req*communicate.DelIngressConfigRequest,rsp*communicate.Void)error{

    log.Info("DelIngressConfig(%s)",req.Uuid)

    globalIngressProcess.DelIngressConfig(req.Uuid)

    returnnil

    }

    funcRunRPCServer(){

    //amqp_url:="amqp://guest:guest@localhost:5672/"

    amqp_url:=fmt.Sprintf("amqp://%s:%s@%s:%d",

    common.MQUser,

    common.MQPassword,

    common.MQHost,

    common.MQPort,

    )

    svc,err:=xing.NewService(

    globalRPCAgentName,

    amqp_url,

    xing.SetSerializer(&xing.JSONSerializer{}),

    )

    iferr!=nil{

    log.Error(fmt.Sprintf("MQURL=%s,NewService()isfailed.%s",amqp_url,err.Error()))

    }

    communicate.RegisterAgentHelperHandler(svc,&AgentServerImp{})

    log.Info("RPCServerisstarting.Connecttotherabbitmq[%s].",amqp_url)

    goLoopRPC(svc)

    }

    funcLoopRPC(svc*xing.Client){

    err:=svc.Run()

    iferr!=nil{

    log.Error(err.Error())

    }

    }

    四.总结

    ●通过rabbitmqlb调用方式,可以实现从agent侧上报数据到controller侧或者agent侧拉取需要的数据。

    ●通过rabbitmq单实例调用方式,由于有了之前lb上报agent状态,或者使用第三方consul.etcd中服务发现功能。我们可以实现从controller侧下发配置到每一个agent,在每个agent实例中完成相同的功能。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
你喜欢看