目录动手实现一个分布式注册中心日志服务log/Server.golog/Client.go主启动程序LogService服务启动与注册service/service.go服务注册与发现registry/client.goregistry/registration.goregistry/server.go
动手实现一个分布式注册中心
以一个日志微服务为例,将日志服务注册到注册中心展开!
日志服务
log/Server.go
其实这一个日志类的功能就是有基本的写文件功能,然后就是注册一个http的接口去写日志进去
package log import ( "io/ioutil" stlog "log" "net/http" "os" ) var log *stlog.Logger type fileLog string // 编写日志的方法 func (fl fileLog) Write(data []byte) (int, error) { f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) if err != nil { return 0, err } defer f.Close() return f.Write(data) } // 启动一个日志对象 参数为日志文件名 func Run(destination string) { log = stlog.New(fileLog(destination), "[go] - ", stlog.LstdFlags) } // 自身注册的一个服务方法 func RegisterHandlers() { http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: msg, err := ioutil.ReadAll(r.Body) if err != nil || len(msg) == 0 { w.WriteHeader(http.StatusBadRequest) return } write(string(msg)) default: w.WriteHeader(http.StatusMethodNotAllowed) return } }) } func write(message string) { log.Printf("%v\n", message) }
log/Client.go 提供给外部服务的接口,定义好日志的命名格式,来显示调用接口去使用已经注册好的日志接口并且返回状态
package log import ( "bytes" "distributed/registry" "fmt" "net/http" stlog "log" ) func SetClientLogger(serviceURL string, clientService registry.ServiceName) { stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService)) stlog.SetFlags(0) stlog.SetOutput(&clientLogger{url: serviceURL}) } type clientLogger struct { url string } func (cl clientLogger) Write(data []byte) (int, error) { b := bytes.NewBuffer([]byte(data)) res, err := http.Post(cl.url+"/log", "text/plain", b) if err != nil { return 0, err } if res.StatusCode != http.StatusOK { return 0, fmt.Errorf("Failed to send log message. Service responded with %d - %s", res.StatusCode, res.Status) } return len(data), nil }
主启动程序LogService 启动服务
Logservice
,主要执行start方法,里面有细节实现服务注册与服务发现package main import ( "context" "distributed/log" "distributed/registry" "distributed/service" "fmt" stlog "log" ) func main() { // 初始化启动一个日志文件对象 log.Run("./distributed.log") // 日志服务注册的端口和地址 host, port := "localhost", "4000" serviceAddress := fmt.Sprintf("http://%s:%s", host, port) // 初始化注册对象 r := registry.Registration{ ServiceName: registry.LogService, // 自身服务名 ServiceURL: serviceAddress, // 自身服务地址 RequiredServices: make([]registry.ServiceName, 0),// 依赖服务 ServiceUpdateURL: serviceAddress + "/services", // 服务列表 HeartbeatURL: serviceAddress + "/heartbeat", // 心跳 } // 启动日志服务包含服务注册,发现等细节 ctx, err := service.Start( context.Background(), host, port, r, log.RegisterHandlers, ) // 异常写入到日志中 if err != nil { stlog.Fatalln(err) } // 超时停止退出服务 <-ctx.Done() fmt.Println("Shutting down log service.") }
服务启动与注册
service/service.go Start 启动服务的主方法
/* host: 地址 port: 端口号 reg: 注册的服务对象 registerHandlersFunc: 注册方法 */ func Start(ctx context.Context, host, port string, reg registry.Registration, registerHandlersFunc func()) (context.Context, error) { registerHandlersFunc() // 启动注册方法 // 启动服务 ctx = startService(ctx, reg.ServiceName, host, port) // 注册服务 err := registry.RegisterService(reg) if err != nil { return ctx, err } return ctx, nil }startService
func startService(ctx context.Context, serviceName registry.ServiceName, host, port string) context.Context { ctx, cancel := context.WithCancel(ctx) var srv http.Server srv.Addr = ":" + port // 该协程为监听http服务,并且停止服务的时候cancel go func() { log.Println(srv.ListenAndServe()) // 删除对应的服务 err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port)) if err != nil { log.Println(err) } cancel() }() // 该协程为监听手动停止服务的信号 go func() { fmt.Printf("%v started. Press any key to stop. \n", serviceName) var s string fmt.Scanln(&s) err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port)) if err != nil { log.Println(err) } srv.Shutdown(ctx) cancel() }() return ctx }
服务注册与发现
registry/client.go 注册服务的时候会连着心跳以及服务更新的方法一起注册!
而服务更新里面的细节就是自己自定义了一个Handler然后ServeHttp方法里面去
update
全局的服务提供对象,update主要是更新服务和删除服务的最新消息
然后就是提供一个注销服务的方法
package registry import ( "bytes" "encoding/json" "fmt" "log" "math/rand" "net/http" "net/url" "sync" ) // 注册服务 func RegisterService(r Registration) error { // 获得心跳地址并注册 heartbeatURL, err := url.Parse(r.HeartbeatURL) if err != nil { return err } http.HandleFunc(heartbeatURL.Path, func (w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) // 获得服务更新地址,并且自定义http服务的handler,因为每次更新服务的时候,可以在ServeHttp方法里面去维护 serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL) if err != nil { return err } http.Handle(serviceUpdateURL.Path, &serviceUpdateHanlder{}) // 写入buf值将服务对象发送给注册中心的services地址 buf := new(bytes.Buffer) enc := json.NewEncoder(buf) err = enc.Encode(r) if err != nil { return err } res, err := http.Post(ServicesURL, "application/json", buf) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("Failed to register service. Registry service "+ "responded with code %v", res.StatusCode) } return nil } type serviceUpdateHanlder struct{} func (suh serviceUpdateHanlder) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } dec := json.NewDecoder(r.Body) var p patch err := dec.Decode(&p) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } fmt.Printf("Updated received %v\n", p) prov.Update(p) // 更新服务提供对象 } // 删除对应注册中心的服务地址 func ShutdownService(url string) error { req, err := http.NewRequest(http.MethodDelete, ServicesURL, bytes.NewBuffer([]byte(url))) if err != nil { return err } req.Header.Add("Content-Type", "text/plain") res, err := http.DefaultClient.Do(req) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("Failed to deregister service. Registry "+ "service responded with code %v", res.StatusCode) } return nil } // 更新服务列表 func (p *providers) Update(pat patch) { p.mutex.Lock() defer p.mutex.Unlock() // 将patch中有新增的进行添加 for _, patchEntry := range pat.Added { if _, ok := p.services[patchEntry.Name]; !ok { p.services[patchEntry.Name] = make([]string, 0) } p.services[patchEntry.Name] = append(p.services[patchEntry.Name], patchEntry.URL) } // 将patch中被标记删除的 for _, patchEntry := range pat.Removed { if providerURLs, ok := p.services[patchEntry.Name]; ok { for i := range providerURLs { if providerURLs[i] == patchEntry.URL { p.services[patchEntry.Name] = append(providerURLs[:i], providerURLs[i+1:]...) } } } } } // 根据服务名负载均衡随机获取服务地址 func (p providers) get(name ServiceName) (string, error) { providers, ok := p.services[name] if !ok { return "", fmt.Errorf("No providers available for service %v", name) } idx := int(rand.Float32() * float32(len(providers))) return providers[idx], nil } // 对外暴露生产者的方法 func GetProvider(name ServiceName) (string, error) { return prov.get(name) } type providers struct { services map[ServiceName][]string mutex *sync.RWMutex } // 服务提供对象 var prov = providers{ services: make(map[ServiceName][]string), // 服务列表 服务名->集群地址集合 mutex: new(sync.RWMutex), // 锁 防止服务注册更新时的并发情况 }
registry/registration.go 主要是一些关于服务使用到的参数以及对象!
package registry type Registration struct { ServiceName ServiceName // 服务名 ServiceURL string // 服务地址 RequiredServices []ServiceName // 依赖的服务 ServiceUpdateURL string // 服务更新的地址 HeartbeatURL string // 心跳地址 } type ServiceName string // 服务名集合 const ( LogService = ServiceName("LogService") GradingService = ServiceName("GradingService") PortalService = ServiceName("Portald") ) // 服务对象参数 type patchEntry struct { Name ServiceName URL string } // 更新的服务对象参数 type patch struct { Added []patchEntry Removed []patchEntry }
registry/server.go 服务端的注册中心服务的增删改查管理以及心跳检测,及时将最新的更新的服务消息通知回给客户端
package registry import ( "bytes" "encoding/json" "fmt" "io/ioutil" "log" "net/http" "sync" "time" ) const ServerPort = ":3000" const ServicesURL = "http://localhost" + ServerPort + "/services" // 注册中心地址 // 服务对象集合 type registry struct { registrations []Registration mutex *sync.RWMutex } // 添加服务 func (r *registry) add(reg Registration) error { r.mutex.Lock() r.registrations = append(r.registrations, reg) r.mutex.Unlock() err := r.sendRequiredServices(reg) r.notify(patch{ Added: []patchEntry{ patchEntry{ Name: reg.ServiceName, URL: reg.ServiceURL, }, }, }) return err } // 通知服务接口请求去刷新改变后到服务 func (r registry) notify(fullPatch patch) { r.mutex.RLock() defer r.mutex.RUnlock() for _, reg := range r.registrations { go func(reg Registration) { for _, reqService := range reg.RequiredServices { p := patch{Added: []patchEntry{}, Removed: []patchEntry{}} sendUpdate := false for _, added := range fullPatch.Added { if added.Name == reqService { p.Added = append(p.Added, added) sendUpdate = true } } for _, removed := range fullPatch.Removed { if removed.Name == reqService { p.Removed = append(p.Removed, removed) sendUpdate = true } } if sendUpdate { err := r.sendPatch(p, reg.ServiceUpdateURL) if err != nil { log.Println(err) return } } } }(reg) } } // 更新每个服务的依赖服务 func (r registry) sendRequiredServices(reg Registration) error { r.mutex.RLock() defer r.mutex.RUnlock() var p patch for _, serviceReg := range r.registrations { for _, reqService := range reg.RequiredServices { if serviceReg.ServiceName == reqService { p.Added = append(p.Added, patchEntry{ Name: serviceReg.ServiceName, URL: serviceReg.ServiceURL, }) } } } err := r.sendPatch(p, reg.ServiceUpdateURL) if err != nil { return err } return nil } // 告诉客户端更新,最新的服务列表是这个 func (r registry) sendPatch(p patch, url string) error { d, err := json.Marshal(p) if err != nil { return err } _, err = http.Post(url, "application/json", bytes.NewBuffer(d)) if err != nil { return err } return nil } // 注册中心删除服务对象 func (r *registry) remove(url string) error { for i := range reg.registrations { if reg.registrations[i].ServiceURL == url { // 通知客户端更新对象信息 r.notify(patch{ Removed: []patchEntry{ { Name: r.registrations[i].ServiceName, URL: r.registrations[i].ServiceURL, }, }, }) r.mutex.Lock() reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...) r.mutex.Unlock() return nil } } return fmt.Errorf("Service at URL %s not found", url) } // 心跳检测 func (r *registry) heartbeat(freq time.Duration) { for { var wg sync.WaitGroup for _, reg := range r.registrations { wg.Add(1) go func(reg Registration) { defer wg.Done() success := true for attemps := 0; attemps < 3; attemps++ { res, err := http.Get(reg.HeartbeatURL) if err != nil { log.Println(err) } else if res.StatusCode == http.StatusOK { log.Printf("Heartbeat check passed for %v", reg.ServiceName) // 如果心跳恢复了,把服务重新注册回来 if !success { r.add(reg) } break; } // 如果执行到这就代表着心跳没有响应,那就代表着需要回收注销该服务了 log.Printf("Heartbeat check failed for %v", reg.ServiceName) if success { success = false r.remove(reg.ServiceURL) } time.Sleep(1 * time.Second) } }(reg) wg.Wait() time.Sleep(freq) } } } var once sync.Once func SetupRegistryService() { // 保证执行一次进行服务到心跳 每三秒循环一遍 once.Do(func() { go reg.heartbeat(3 * time.Second) }) } var reg = registry{ registrations: make([]Registration, 0), mutex: new(sync.RWMutex), } type RegistryService struct{} func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Println("Request received") switch r.Method { case http.MethodPost: dec := json.NewDecoder(r.Body) var r Registration err := dec.Decode(&r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } log.Printf("Adding service: %v with URL: %s\n", r.ServiceName, r.ServiceURL) err = reg.add(r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } case http.MethodDelete: payload, err := ioutil.ReadAll(r.Body) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } url := string(payload) log.Printf("Removing service at URL: %s", url) err = reg.remove(url) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } default: w.WriteHeader(http.StatusMethodNotAllowed) return } }到此这篇关于Golang分布式注册中心实现流程讲解的文章就介绍到这了,更多相关Golang分布式注册中心内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!