|
一.背景 在k8s分布式系统中,通迅成为重要的部分。本文分享一下如何使用通迅中间件。 本文代码相关技术如下: rabbitmq redis golang k8s集群与集群之间通讯,我们都可以使用相同的中间件rabbitmq。 本文使用最简单的模式LB,单实例的RPC调用。 二.分布式调用结构 2.1rabbitmqlb模式调用 .agent1agent2agent3同时上报自己在线时,rabbitmq自动调用controller1或controller2中其中一个实例,再由controllerX写入redis中去。当controller1、controller2需要所有agent状态时,读取数据都是redis,所以都是一致的。 .agent1agent2agent3获取配置信息时,rabbitmq也自动调用controller1或controller2其中一个实例。再由controllerX读取redis或者mysql数据,再返回给agent。不论是调用到controller1还是controller2,返回的数据都是一致。 2.2rabbitmq单实例模式调用 controller实例下发配置信息时: setp1.获得当前在线的agent。 setp2.单实例模式rpc调用。向所有的agent发送配置信息。 setp3.可以明确了解有没有agent时下发配置失败的。如果都失败,则本次调用失败.。只要有一个失败,就可以认为需要重发一次命令。 三.代码实现 3.1.rabbitmqrpc调用客户端实现 packageingress import( "fmt" "time" "context" "github.com/wzhliang/xing" "wise2c/wisecloud-ingress-agent/communicate" "wise2c/wisecloud-ingress-agent/log" "wise2c/wisecloud-ingress-agent/common" ) typeControllerClientstruct{ Producer*xing.Client Clientcommunicate.ControllerHelperClient } funcNewControllerClient()*ControllerClient{ agent:=&ControllerClient{} //amqp_url:="amqp://guest:guest@localhost:5672/" amqp_url:=fmt.Sprintf("amqp://%s:%s@%s:%d", common.MQUser, common.MQPassword, common.MQHost, common.MQPort, ) varerrerror agent.Producer,err=xing.NewClient( globalRPCAgentName, amqp_url, xing.SetIdentifier(&xing.NoneIdentifier{}), xing.SetSerializer(&xing.JSONSerializer{}), ) iferr!=nil{ log.Error("xing.NewClient()isfailed.%s",err.Error()) returnagent } //LBRPC target:=fmt.Sprint("ingress.controller") agent.Client=communicate.NewControllerHelperClient(target,agent.Producer) returnagent } func(this*ControllerClient)Close(){ ifthis.Producer==nil{ return } this.Producer.Close() //this.closed=true } func(this*ControllerClient)OnlineAgent(namestring)error{ ctx,cancel:=context.WithTimeout(context.Background(),5000*time.Millisecond) defercancel() log.Debug("OnlineAgent(%s)",name) _,err:=this.Client.OnlineAgent(ctx, &communicate.OnlineAgentRequest{Name:name,}) iferr!=nil{ returnerr } returnnil } func(this*ControllerClient)GetIngressConfigs(uuidstring)(string,error){ ctx,cancel:=context.WithTimeout(context.Background(),5000*time.Millisecond) defercancel() response,err:=this.Client.GetIngressConfigs(ctx, &communicate.GetIngressConfigsRequest{ Uuid:uuid, }) iferr!=nil{ return"error",err } returnresponse.Content,err } funcAgentHeartbeatToController(){ ifglobalControllerClient==nil{ return } varerrerror content:="" for{ err=globalControllerClient.OnlineAgent(globalRPCAgentName) iferr!=nil{ log.Error(err.Error()) } //1time/2second. time.Sleep(time.Millisecond*2000) } } 3.2.rabbitmqrpc调用服务端实现 packageingress import( "fmt" "context" "github.com/wzhliang/xing" "wise2c/wisecloud-ingress-controller/communicate" "wise2c/wisecloud-ingress-controller/log" "wise2c/wisecloud-ingress-controller/common" ) typeControllerServerImpstruct{} func(g*ControllerServerImp)OnlineAgent(ctxcontext.Context,req*communicate.OnlineAgentRequest,rsp*communicate.Void)error{ log.Debug("OnlineAgent(%s)",req.Name) err:=globalAgentClient.manager.OnlineAgent(req.Name) iferr!=nil{ returnerr } returnnil } func(g*ControllerServerImp)GetIngressConfigs(ctxcontext.Context,req*communicate.GetIngressConfigsRequest,rsp*communicate.GetIngressConfigsResponse)error{ log.Info("GetIngressConfigs(%s)",req.Uuid) rsp.Content="ok" returnnil } funcRunRPCServer(){ //globalRPCControllerName=fmt.Sprintf("host.controller.%s",common.GetGuid()) //LBRPC. globalRPCControllerName=fmt.Sprintf("ingress.controller") //amqp_url:="amqp://guest:guest@localhost:5672/" amqp_url:=fmt.Sprintf("amqp://%s:%s@%s:%d", common.MQUser, common.MQPassword, common.MQHost, common.MQPort, ) svc,err:=xing.NewService( globalRPCControllerName, amqp_url, xing.SetSerializer(&xing.JSONSerializer{}), xing.SetBrokerTimeout(15,5), ) iferr!=nil{ log.Error(fmt.Sprintf("MQURL=%sNewServiceisfailed.%s",amqp_url,err.Error())) } communicate.RegisterControllerHelperHandler(svc,&ControllerServerImp{}) log.Info("RPCServerisstarting.Connecttotherabbitmq[%s].",amqp_url) err=svc.Run() iferr!=nil{ log.Error(err.Error()) } } 3.3.rabbitmq单实例rpc调用客户端实现 1.rpclb调用实时上报agent是否在线,实现了类似consul的服务发现的功能. 2.ClientManager可以通过redis中的实时数据,管理所有的rpcclient.当agent下线,或者3秒之间没有上报状态,则清除指定的rpcclient. 3.这样每次下发配置时,可以实时发送到每个rpc单实例服务器实例. packageingress import( "fmt" "sync" "time" "errors" "context" "github.com/wzhliang/xing" "github.com/astaxie/beego/utils" "wise2c/wisecloud-ingress-controller/communicate" "wise2c/wisecloud-ingress-controller/log" "wise2c/wisecloud-ingress-controller/common" ) typeAgentHelperstruct{ mutex*sync.Mutex closedbool Helpercommunicate.AgentHelperClient Producer*xing.Client } funcNewAgentHelper(agent_namestring)*AgentHelper{ varerrerror agent:=&AgentHelper{ mutex:new(sync.Mutex), closed:false, } //amqp_url:="amqp://guest:guest@localhost:5672/" amqp_url:=fmt.Sprintf("amqp://%s:%s@%s:%d", common.MQUser, common.MQPassword, common.MQHost, common.MQPort, ) agent.Producer,err=xing.NewClient( globalRPCControllerName, amqp_url, xing.SetIdentifier(&xing.NoneIdentifier{}), xing.SetSerializer(&xing.JSONSerializer{}), ) iferr!=nil{ log.Error(fmt.Sprintf("MQURL=%sNewClientisfailed.%s", amqp_url, err.Error())) returnagent } //target:=fmt.Sprint("ingress.agent.%s",agent_name) agent.Helper=communicate.NewAgentHelperClient(agent_name,agent.Producer) returnagent } func(this*AgentHelper)Close(){ this.mutex.Lock() deferthis.mutex.Unlock() ifthis.Producer!=nil{ this.Producer.Close() } this.closed=true } func(this*AgentHelper)SetIngressConfig(contentstring)error{ this.mutex.Lock() deferthis.mutex.Unlock() ifthis.closed{ returnerrors.New("theclientisclosed.") } ctx,cancel:=context.WithTimeout(context.Background(),5000*time.Millisecond) defercancel() log.Info("SetIngressConfig(%s)",content) _,err:=this.Helper.SetIngressConfig(ctx, &communicate.SetIngressConfigRequest{ content, }) returnerr } func(this*AgentHelper)DelIngressConfig(uuidstring)error{ this.mutex.Lock() deferthis.mutex.Unlock() ifthis.closed{ returnerrors.New("theclientisclosed.") } ctx,cancel:=context.WithTimeout(context.Background(),5000*time.Millisecond) defercancel() log.Info("DelIngressConfig(%s)",uuid) _,err:=this.Helper.DelIngressConfig(ctx, &communicate.DelIngressConfigRequest{ uuid, }) returnerr } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// typeClientManagerstruct{ mutex*sync.Mutex Pool*utils.BeeMap } funcNewClientManager()*ClientManager{ return&ClientManager{ mutex:new(sync.Mutex), Pool:utils.NewBeeMap(), } } func(p*ClientManager)Init(){ gop.RunConnect() } func(p*ClientManager)RunConnect(){ for{ names,err:=globalRedisClient.GetAgentNames() iferr!=nil{ log.Error(err.Error()) } name_map:=map[interface{}]int{} for_,name:=rangenames{ name_map[name]=1 } forkey,v:=rangep.Pool.Items(){ //log.Error("key=%s",key) _,ok:=name_map[key] ifok{ //log.Warning("findthe%s",key) continue } ifv!=nil{ log.Warning("ClosetheAgentHelper%s",key) v.(*AgentHelper).Close() } log.Warning("DeletethePool%s",key) p.Pool.Delete(key) } for_,name:=rangenames{ if!p.Pool.Check(name){ log.Warning("NewtheAgentHelper%s",name) p.Pool.Set(name,NewAgentHelper(name)) } } time.Sleep(time.Second*1) } } func(p*ClientManager)GetClients()map[interface{}]interface{}{ returnp.Pool.Items() } func(p*ClientManager)OnlineAgent(agent_namestring)error{ p.mutex.Lock() deferp.mutex.Unlock() globalRedisClient.OnlineAgent(agent_name,"1") if!p.Pool.Check(agent_name){ p.Pool.Set(agent_name,NewAgentHelper(agent_name)) } returnnil } func(p*ClientManager)OfflineAgent(agent_namestring)error{ p.mutex.Lock() deferp.mutex.Unlock() globalRedisClient.OfflineAgent(agent_name) agent_helper:=p.Pool.Get(agent_name) ifagent_helper!=nil{ agent_helper.(*AgentHelper).Close() p.Pool.Delete(agent_name) } returnnil } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// typeAgentClientstruct{ manager*ClientManager } funcNewAgentClient()*AgentClient{ return&AgentClient{ manager:NewClientManager(), } } func(client*AgentClient)Init(){ client.manager.Init() } typeAgentHandlerCallbackfunc(request,responseinterface{},helper*AgentHelper)error func(client*AgentClient)AgentHandler(request,responseinterface{},callbackAgentHandlerCallback)(errerror){ maps:=client.manager.GetClients() count:=0 fork,v:=rangemaps{ name:=k.(string) ifv==nil{ continue } helper:=v.(*AgentHelper) ifcallback==nil{ continue } err=callback(request,response,helper) iferr!=nil{ returnerrors.New(fmt.Sprintf("%s%s",name,err.Error())) } count+=1 } ifcount>0{ returnnil } returnerrors.New("noagentonline.") } func(this*AgentClient)SetIngressConfig(request,responseinterface{})error{ returnthis.AgentHandler(request,response,func(request,responseinterface{},helper*AgentHelper)error{ returnhelper.SetIngressConfig(request.(string)) }) } func(this*AgentClient)DelIngressConfig(request,responseinterface{})error{ returnthis.AgentHandler(request,response,func(request,responseinterface{},helper*AgentHelper)error{ returnhelper.DelIngressConfig(request.(string)) }) } 3.4.rabbitmq单实例rpc调用.服务端实现 packageingress import( "fmt" "context" "github.com/wzhliang/xing" "wise2c/wisecloud-ingress-agent/communicate" "wise2c/wisecloud-ingress-agent/log" "wise2c/wisecloud-ingress-agent/common" ) typeAgentServerImpstruct{} func(g*AgentServerImp)SetIngressConfig(ctxcontext.Context,req*communicate.SetIngressConfigRequest,rsp*communicate.Void)error{ log.Info("SetIngressConfig(%s)",req.Content) config:=&Wise2cIngressConfig{} err:=config.Parse([]byte(req.Content)) iferr!=nil{ log.Error(err.Error()) returnerr } globalIngressProcess.SetIngressConfig(config) returnnil } func(g*AgentServerImp)DelIngressConfig(ctxcontext.Context,req*communicate.DelIngressConfigRequest,rsp*communicate.Void)error{ log.Info("DelIngressConfig(%s)",req.Uuid) globalIngressProcess.DelIngressConfig(req.Uuid) returnnil } funcRunRPCServer(){ //amqp_url:="amqp://guest:guest@localhost:5672/" amqp_url:=fmt.Sprintf("amqp://%s:%s@%s:%d", common.MQUser, common.MQPassword, common.MQHost, common.MQPort, ) svc,err:=xing.NewService( globalRPCAgentName, amqp_url, xing.SetSerializer(&xing.JSONSerializer{}), ) iferr!=nil{ log.Error(fmt.Sprintf("MQURL=%s,NewService()isfailed.%s",amqp_url,err.Error())) } communicate.RegisterAgentHelperHandler(svc,&AgentServerImp{}) log.Info("RPCServerisstarting.Connecttotherabbitmq[%s].",amqp_url) goLoopRPC(svc) } funcLoopRPC(svc*xing.Client){ err:=svc.Run() iferr!=nil{ log.Error(err.Error()) } } 四.总结 ●通过rabbitmqlb调用方式,可以实现从agent侧上报数据到controller侧或者agent侧拉取需要的数据。 ●通过rabbitmq单实例调用方式,由于有了之前lb上报agent状态,或者使用第三方consul.etcd中服务发现功能。我们可以实现从controller侧下发配置到每一个agent,在每个agent实例中完成相同的功能。 | |
处理生命周期内这一阶段数据的系统通常都很复杂,但从广义层面来看它们的目标是非常一致的:通过对数据 ...
随着IT从DOS时代向Windows时代过渡,软件系统开始变得越来越复杂,程序开发变成了软件工程。软件除了开 ...