(精华)2020年9月22日 微服务 Consul工具层封装和使用
发布日期:2021-06-29 15:12:19 浏览次数:3 分类:技术文章

本文共 19812 字,大约阅读时间需要 66 分钟。

封装

Cluster文件夹

///     /// 负载均衡抽象实现    ///     public abstract class AbstractLoadBalance : ILoadBalance    {
public ServiceUrl Select(IList
serviceUrls) {
if (serviceUrls == null || serviceUrls.Count ==0) return null; if (serviceUrls.Count == 1) return serviceUrls[0]; return DoSelect(serviceUrls); } ///
/// 子类去实现 /// ///
///
public abstract ServiceUrl DoSelect(IList
serviceUrls); }
///     /// 服务负载均衡    ///     public interface ILoadBalance    {
/// /// 服务选择 /// /// ///
ServiceUrl Select(IList
serviceUrls); }
///     /// 随机负载均衡    /// 1、还可以实现加权轮询    ///     public class RandomLoadBalance : AbstractLoadBalance    {
private readonly Random random = new Random(); public override ServiceUrl DoSelect(IList
serviceUrls) {
// 1、获取随机数 var index = random.Next(serviceUrls.Count); // 2、选择一个服务进行连接 return serviceUrls[index]; } }

HttpClientConsul文件夹

///     /// consul httpclient扩展    ///    public class ConsulHttpClient   {
private readonly IServiceDiscovery serviceDiscovery; private readonly ILoadBalance loadBalance; private readonly IHttpClientFactory httpClientFactory; public ConsulHttpClient(IServiceDiscovery serviceDiscovery, ILoadBalance loadBalance, IHttpClientFactory httpClientFactory) {
this.serviceDiscovery = serviceDiscovery; this.loadBalance = loadBalance; this.httpClientFactory = httpClientFactory; } /// /// Get方法 /// ///
/// param name="ServiceSchme">服务名称:(http/https) /// 服务名称 /// 服务路径 ///
public async Task
GetAsync
(string Serviceshcme, string ServiceName,string serviceLink) {
// 1、获取服务 IList
serviceUrls = await serviceDiscovery.Discovery(ServiceName); // 2、负载均衡服务 ServiceUrl serviceUrl = loadBalance.Select(serviceUrls); // 3、建立请求 HttpClient httpClient = httpClientFactory.CreateClient("mrico"); HttpResponseMessage response = await httpClient.GetAsync(serviceUrl.Url + serviceLink); // 3.1json转换成对象 if (response.StatusCode == HttpStatusCode.OK) {
string json = await response.Content.ReadAsStringAsync(); return JsonConvert.DeserializeObject
(json); } else {
throw new Exception($"{ServiceName}服务调用错误"); } } }
///     /// HttpClientFactory conusl下的扩展    ///     public static class ConsulHttpClientServiceCollectionExtensions    {
/// /// 添加consul /// ///
/// ///
public static IServiceCollection AddHttpClientConsul
(this IServiceCollection services) where ConsulHttpClient : class {
// 1、注册consul services.AddConsulDiscovery(); // 2、注册服务负载均衡 services.AddSingleton
(); // 3、注册httpclient services.AddSingleton
(); return services; } }

HttpClientPolly文件夹

///     /// HttpClient熔断降级策略选项    ///    public class PollyHttpClientOptions   {
/// /// 超时时间设置,单位为秒 /// public int TimeoutTime {
set; get; } /// /// 失败重试次数 /// public int RetryCount {
set; get; } /// /// 执行多少次异常,开启短路器(例:失败2次,开启断路器) /// public int CircuitBreakerOpenFallCount {
set; get; } /// /// 断路器开启的时间(例如:设置为2秒,短路器两秒后自动由开启到关闭) /// public int CircuitBreakerDownTime {
set; get; } /// /// 降级处理(将异常消息封装成为正常消息返回,然后进行响应处理,例如:系统正在繁忙,请稍后处理.....) /// public HttpResponseMessage httpResponseMessage {
set; get; } }
///    /// 微服务中HttpClient熔断,降级策略扩展   ///    public static class PollyHttpClientServiceCollectionExtensions   {
/// /// Httpclient扩展方法 /// /// ioc容器 /// HttpClient 名称(针对不同的服务进行熔断,降级) /// 熔断降级配置 /// 降级处理错误的结果 ///
public static IServiceCollection AddPollyHttpClient(this IServiceCollection services, string name,Action
action) {
// 1、创建选项配置类 PollyHttpClientOptions options = new PollyHttpClientOptions(); action(options); // 2、配置httpClient,熔断降级策略 services.AddHttpClient(name) //1.1 降级策略 .AddPolicyHandler(Policy
.HandleInner
().FallbackAsync(options.httpResponseMessage, async b => {
// 1、降级打印异常 Console.WriteLine($"服务{name}开始降级,异常消息:{b.Exception.Message}"); // 2、降级后的数据 Console.WriteLine($"服务{name}降级内容响应:{options.httpResponseMessage.Content.ToString()}"); await Task.CompletedTask; })) // 1.2 断路器策略 .AddPolicyHandler(Policy
.Handle
().CircuitBreakerAsync(options.CircuitBreakerOpenFallCount, TimeSpan.FromSeconds(options.CircuitBreakerDownTime), (ex, ts) => {
Console.WriteLine($"服务{name}断路器开启,异常消息:{ex.Exception.Message}"); Console.WriteLine($"服务{name}断路器开启时间:{ts.TotalSeconds}s"); }, () => {
Console.WriteLine($"服务{name}断路器关闭"); }, () => {
Console.WriteLine($"服务{name}断路器半开启(时间控制,自动开关)"); })) // 1.3 重试策略 .AddPolicyHandler(Policy
.Handle
() .RetryAsync(options.RetryCount) ) // 1.4 超时策略 .AddPolicyHandler(Policy.TimeoutAsync
(TimeSpan.FromSeconds(options.TimeoutTime))); return services; } }

Registry文件夹

///     /// consul服务发现实现    ///     public class ConsulServiceDiscovery : IServiceDiscovery    {
private readonly IConfiguration configuration; public ConsulServiceDiscovery(IConfiguration configuration) {
this.configuration = configuration; } public async Task
> Discovery(string serviceName) {
ServiceDiscoveryConfig serviceDiscoveryConfig = configuration.GetSection("ConsulDiscovery").Get
(); // 1、创建consul客户端连接 var consulClient = new ConsulClient(configuration => {
//1.1 建立客户端和服务端连接 configuration.Address = new Uri(serviceDiscoveryConfig.RegistryAddress); }); // 2、consul查询服务,根据具体的服务名称查询 var queryResult = await consulClient.Catalog.Service(serviceName); // 3、将服务进行拼接 var list = new List
(); foreach (var service in queryResult.Response) {
list.Add(new ServiceUrl {
Url = service.ServiceAddress + ":" + service.ServicePort }); } return list; } }
///     /// consul服务注册实现    ///     public class ConsulServiceRegistry : IServiceRegistry    {
/// /// 注册服务 /// /// public void Register(ServiceRegistryConfig serviceNode) {
// 1、创建consul客户端连接 var consulClient = new ConsulClient(configuration => {
//1.1 建立客户端和服务端连接 configuration.Address = new Uri(serviceNode.RegistryAddress); }); // 2、获取服务内部地址 // 3、创建consul服务注册对象 var registration = new AgentServiceRegistration() {
ID = serviceNode.Id, Name = serviceNode.Name, Address = serviceNode.Address, Port = serviceNode.Port, Tags = serviceNode.Tags, Check = new AgentServiceCheck {
// 3.1、consul健康检查超时间 Timeout = TimeSpan.FromSeconds(10), // 3.2、服务停止5秒后注销服务 DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5), // 3.3、consul健康检查地址 HTTP = serviceNode.HealthCheckAddress, // 3.4 consul健康检查间隔时间 Interval = TimeSpan.FromSeconds(10), } }; // 4、注册服务 consulClient.Agent.ServiceRegister(registration).Wait(); // 5、关闭连接 consulClient.Dispose(); } /// /// 注销服务 /// /// public void Deregister(ServiceRegistryConfig serviceNode) {
// 1、创建consul客户端连接 var consulClient = new ConsulClient(configuration => {
//1.1 建立客户端和服务端连接 configuration.Address = new Uri(serviceNode.RegistryAddress); }); // 2、注销服务 consulClient.Agent.ServiceDeregister(serviceNode.Id); // 3、关闭连接 consulClient.Dispose(); } }
///    /// 服务发现   ///    public interface IServiceDiscovery   {
/// /// 服务发现 /// /// 服务名称 ///
Task
> Discovery(string serviceName); }
///     /// 服务注册    ///    public interface IServiceRegistry   {
/// /// 注册服务 /// void Register(ServiceRegistryConfig serviceNode); /// /// 撤销服务 /// void Deregister(ServiceRegistryConfig serviceNode); }
///     /// 服务发现配置    ///     public class ServiceDiscoveryConfig    {
/// /// 服务注册地址 /// public string RegistryAddress {
set; get; } }
///    /// 服务注册节点   ///    public class ServiceRegistryConfig   {
// 服务ID public string Id {
get; set; } // 服务名称 public string Name {
get; set; } // 服务标签(版本) public string[] Tags {
set; get; } // 服务地址(可以选填 === 默认加载启动路径) public string Address {
set; get; } // 服务端口号(可以选填 === 默认加载启动路径端口) public int Port {
set;get; } // 服务注册地址 public string RegistryAddress {
get; set; } // 服务健康检查地址 public string HealthCheckAddress {
get; set; } }
///     /// 服务url    ///     public class ServiceUrl    {
public string Url {
set; get; } }

使用的扩展方法

///     /// 微服务注册发现使用扩展    ///    public static class MicroServiceConsulApplicationBuilderExtensions   {
public static IApplicationBuilder UseConsulRegistry(this IApplicationBuilder app) {
// 1、从IOC容器中获取Consul服务注册配置 var serviceNode = app.ApplicationServices.GetRequiredService
>().Value; // 2、获取应用程序生命周期 var lifetime = app.ApplicationServices.GetRequiredService
(); // 2.1 获取服务注册实例 var serviceRegistry = app.ApplicationServices.GetRequiredService
(); // 3、获取服务地址 var features = app.Properties["server.Features"] as FeatureCollection; var address = features.Get
().Addresses.First(); var uri = new Uri(address); // 4、注册服务 serviceNode.Id = Guid.NewGuid().ToString(); serviceNode.Address = $"{uri.Scheme}://{uri.Host}"; serviceNode.Port = uri.Port; serviceNode.HealthCheckAddress = $"{uri.Scheme}://{uri.Host}:{uri.Port}{serviceNode.HealthCheckAddress}"; serviceRegistry.Register(serviceNode); // 5、服务器关闭时注销服务 lifetime.ApplicationStopping.Register(() => {
serviceRegistry.Deregister(serviceNode); }); return app; } }
///     /// Console 注册中心扩展(加载配置)    ///     public static class MicroServiceConsulServiceCollectionExtensions    {
// consul服务注册 public static IServiceCollection AddConsulRegistry(this IServiceCollection services, IConfiguration configuration) {
// 1、加载Consul服务注册配置 services.Configure
(configuration.GetSection("ConsulRegistry")); // 2、注册consul注册 services.AddSingleton
(); return services; } // consul服务发现 public static IServiceCollection AddConsulDiscovery(this IServiceCollection services) {
// 1、加载Consul服务发现配置 // services.Configure
(configuration.GetSection("ConsulDiscovery")); // 2、注册consul服务发现 services.AddSingleton
(); return services; } }

服务注册

首先在配置文件中添加如下配置

"ConsulRegistry": {
"Name": "TeamService", "RegistryAddress": "http://127.0.0.1:8500", "HealthCheckAddress": "/HealthCheck" }
public class Startup    {
public Startup(IConfiguration configuration) {
Configuration = configuration; } public IConfiguration Configuration {
get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) {
// 1、注册上下文到IOC容器 services.AddDbContext
(options => {
options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")); }); // 2、注册团队service services.AddScoped
(); // 3、注册团队仓储 services.AddScoped
(); // 4、添加映射 //services.AddAutoMapper(); // 5、添加服务注册条件 services.AddConsulRegistry(Configuration); services.AddControllers(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) {
if (env.IsDevelopment()) {
app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); // 1、consul服务注册 app.UseConsulRegistry(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => {
endpoints.MapControllers(); }); } }

聚合层发现服务

"ConsulDiscovery": {
"RegistryAddress": "http://127.0.0.1:8500" }
public class Startup    {
public Startup(IConfiguration configuration) {
Configuration = configuration; } public IConfiguration Configuration {
get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) {
// 1、自定义异常处理(用缓存处理) var fallbackResponse = new HttpResponseMessage {
Content = new StringContent("系统正繁忙,请稍后重试"),// 内容,自定义内容 StatusCode = HttpStatusCode.GatewayTimeout // 504 }; //services.AddHttpClient().AddHttpClientConsul
(); #region polly配置 {
/*services.AddHttpClient("mrico") // 1.1 降级(捕获异常,进行自定义处理) .AddPolicyHandler(Policy
.Handle
().FallbackAsync(fallbackResponse, async b => { // 1、降级打印异常 Console.WriteLine($"开始降级,异常消息:{b.Exception.Message}"); // 2、降级后的数据 //Console.WriteLine($"降级内容响应:{}"); await Task.CompletedTask; })) // 1.2 熔断机制 .AddPolicyHandler(Policy
.Handle
().CircuitBreakerAsync(3, TimeSpan.FromSeconds(10), (ex, ts) => { Console.WriteLine($"断路器开启,异常消息:{ex.Exception.Message}"); Console.WriteLine($"断路器开启时间:{ts.TotalSeconds}s"); }, () => { Console.WriteLine($"断路器重置"); }, () => { Console.WriteLine($"断路器半开启(一会开,一会关)"); })) // 1.3 失败重试 .AddPolicyHandler(Policy
.Handle
() .RetryAsync(3) ) //1.4、超时 .AddPolicyHandler(Policy.TimeoutAsync
(TimeSpan.FromSeconds(2)));*/ } #endregion // 1.2 封装之后的调用PollyHttpClient services.AddPollyHttpClient("mrico", options => { options.TimeoutTime = 1; // 1、超时时间 options.RetryCount = 3;// 2、重试次数 options.CircuitBreakerOpenFallCount = 2;// 3、熔断器开启(多少次失败开启) options.CircuitBreakerDownTime = 100;// 4、熔断器开启时间 options.httpResponseMessage = fallbackResponse;// 5、降级处理 }) // 2、consul封装 .AddHttpClientConsul
(); /*// 2、注册consul服务发现 services.AddConsulDiscovery(); // 3、注册负载均衡 services.AddSingleton
();*/ // 4、注册team服务 services.AddSingleton
(); services.AddControllers(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); } }
///     /// 服务调用实现    ///     public class HttpTeamServiceClient : ITeamServiceClient    {
/*public readonly IServiceDiscovery serviceDiscovery; public readonly ILoadBalance loadBalance;*/ private readonly IHttpClientFactory httpClientFactory; private readonly string ServiceSchme = "https"; private readonly string ServiceName = "teamservice"; //服务名称 private readonly string ServiceLink = "/Teams"; //服务名称 //private readonly ConsulHttpClient consulHttpClient; public HttpTeamServiceClient(/*IServiceDiscovery serviceDiscovery, ILoadBalance loadBalance,*/ IHttpClientFactory httpClientFactory/*, ConsulHttpClient consulHttpClient*/) {
/*this.serviceDiscovery = serviceDiscovery; this.loadBalance = loadBalance;*/ this.httpClientFactory = httpClientFactory; //this.consulHttpClient = consulHttpClient; } public async Task
> GetTeams() {
// 1、获取服务 /* IList
serviceUrls = await serviceDiscovery.Discovery(ServiceName); // 2、负载均衡服务 ServiceUrl serviceUrl = loadBalance.Select(serviceUrls); // string name = "https://localhost:5001";*/ // 3、建立请求 for (int i =0;i < 100;i++) {
try {
Thread.Sleep(1000); HttpClient httpClient = httpClientFactory.CreateClient("mrico"); HttpResponseMessage response = await httpClient.GetAsync("https://localhost:5001" + ServiceLink); // 3.1json转换成对象 IList
teams = null; if (response.StatusCode == HttpStatusCode.OK) {
string json = await response.Content.ReadAsStringAsync(); teams = JsonConvert.DeserializeObject
>(json); } else { Console.WriteLine($"降级处理:{await response.Content.ReadAsStringAsync()}"); } } catch (Exception e) { Console.WriteLine($"异常捕获:{e.Message}"); } } // List
teams = await consulHttpClient.GetAsync
>(ServiceSchme, ServiceName, ServiceLink); return null; } }

转载地址:https://codeboy.blog.csdn.net/article/details/108458350 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(精华)2020年9月22日 微服务 Consul注册服务中心详解
下一篇:(精华)2020年9月23日 微服务 熔断降级的基本介绍和使用

发表评论

最新留言

感谢大佬
[***.8.128.20]2024年04月27日 17时07分25秒