博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊spring cloud的AsyncLoadBalancerAutoConfiguration
阅读量:6221 次
发布时间:2019-06-21

本文共 12846 字,大约阅读时间需要 42 分钟。

  hot3.png

本文主要研究一下AsyncLoadBalancerAutoConfiguration

AsyncLoadBalancerAutoConfiguration

spring-cloud-commons-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/client/loadbalancer/AsyncLoadBalancerAutoConfiguration.java

/** * Auto configuration for Ribbon (client side load balancing). * * @author Rob Worsnop */@Configuration@ConditionalOnBean(LoadBalancerClient.class)@ConditionalOnClass(AsyncRestTemplate.class)public class AsyncLoadBalancerAutoConfiguration {	@Configuration	static class AsyncRestTemplateCustomizerConfig {		@LoadBalanced		@Autowired(required = false)		private List
restTemplates = Collections.emptyList(); @Bean public SmartInitializingSingleton loadBalancedAsyncRestTemplateInitializer( final List
customizers) { return new SmartInitializingSingleton() { @Override public void afterSingletonsInstantiated() { for (AsyncRestTemplate restTemplate : AsyncRestTemplateCustomizerConfig.this.restTemplates) { for (AsyncRestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } } }; } } @Configuration static class LoadBalancerInterceptorConfig { @Bean public AsyncLoadBalancerInterceptor asyncLoadBalancerInterceptor(LoadBalancerClient loadBalancerClient) { return new AsyncLoadBalancerInterceptor(loadBalancerClient); } @Bean public AsyncRestTemplateCustomizer asyncRestTemplateCustomizer( final AsyncLoadBalancerInterceptor loadBalancerInterceptor) { return new AsyncRestTemplateCustomizer() { @Override public void customize(AsyncRestTemplate restTemplate) { List
list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); } }; } }}
  • 这里创建一个AsyncRestTemplateCustomizerConfig,用于加载AsyncRestTemplateCustomizer
  • 还有一个LoadBalancerInterceptorConfig,配置了AsyncLoadBalancerInterceptor及AsyncRestTemplateCustomizer

AsyncRestTemplateCustomizer

spring-cloud-commons-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/client/loadbalancer/AsyncRestTemplateCustomizer.java

public interface AsyncRestTemplateCustomizer {	void customize(AsyncRestTemplate restTemplate);}
  • 这里采用匿名类实现,主要就是设置AsyncClientHttpRequestInterceptor

AsyncClientHttpRequestInterceptor

spring-cloud-commons-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/client/loadbalancer/AsyncLoadBalancerInterceptor.java

public class AsyncLoadBalancerInterceptor implements AsyncClientHttpRequestInterceptor {	private LoadBalancerClient loadBalancer;	public AsyncLoadBalancerInterceptor(LoadBalancerClient loadBalancer) {		this.loadBalancer = loadBalancer;	}	@Override	public ListenableFuture
intercept(final HttpRequest request, final byte[] body, final AsyncClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); return this.loadBalancer.execute(serviceName, new LoadBalancerRequest
>() { @Override public ListenableFuture
apply(final ServiceInstance instance) throws Exception { HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer); return execution.executeAsync(serviceRequest, body); } }); }}
  • 这个拦截器从url获取serviceName,然后调用loadBalancer.execute方法
  • 这里构造的LoadBalancerRequest,采用ServiceRequestWrapper,调用的是execution.executeAsync

AbstractAsyncClientHttpRequest

spring-web-5.0.7.RELEASE-sources.jar!/org/springframework/http/client/AbstractAsyncClientHttpRequest.java

/** * Abstract base for {@link AsyncClientHttpRequest} that makes sure that headers and body * are not written multiple times. * * @author Arjen Poutsma * @since 4.0 * @deprecated as of Spring 5.0, in favor of {@link org.springframework.http.client.reactive.AbstractClientHttpRequest} */@Deprecatedabstract class AbstractAsyncClientHttpRequest implements AsyncClientHttpRequest {	private final HttpHeaders headers = new HttpHeaders();	private boolean executed = false;	@Override	public final HttpHeaders getHeaders() {		return (this.executed ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);	}	@Override	public final OutputStream getBody() throws IOException {		assertNotExecuted();		return getBodyInternal(this.headers);	}	@Override	public ListenableFuture
executeAsync() throws IOException { assertNotExecuted(); ListenableFuture
result = executeInternal(this.headers); this.executed = true; return result; } /** * Asserts that this request has not been {@linkplain #executeAsync() executed} yet. * @throws IllegalStateException if this request has been executed */ protected void assertNotExecuted() { Assert.state(!this.executed, "ClientHttpRequest already executed"); } /** * Abstract template method that returns the body. * @param headers the HTTP headers * @return the body output stream */ protected abstract OutputStream getBodyInternal(HttpHeaders headers) throws IOException; /** * Abstract template method that writes the given headers and content to the HTTP request. * @param headers the HTTP headers * @return the response object for the executed request */ protected abstract ListenableFuture
executeInternal(HttpHeaders headers) throws IOException;}
  • 这个executeAsync的委托给子类的executeInternal实现
  • 主要有SimpleStreamingAsyncClientHttpRequest、Netty4ClientHttpRequest两个实现

SimpleStreamingAsyncClientHttpRequest

spring-web-5.0.7.RELEASE-sources.jar!/org/springframework/http/client/SimpleStreamingAsyncClientHttpRequest.java

/** * {@link org.springframework.http.client.ClientHttpRequest} implementation * that uses standard Java facilities to execute streaming requests. Created * via the {@link org.springframework.http.client.SimpleClientHttpRequestFactory}. * * @author Arjen Poutsma * @since 3.0 * @see org.springframework.http.client.SimpleClientHttpRequestFactory#createRequest * @see org.springframework.http.client.support.AsyncHttpAccessor * @see org.springframework.web.client.AsyncRestTemplate * @deprecated as of Spring 5.0, with no direct replacement */@Deprecatedfinal class SimpleStreamingAsyncClientHttpRequest extends AbstractAsyncClientHttpRequest {	private final HttpURLConnection connection;	private final int chunkSize;	@Nullable	private OutputStream body;	private final boolean outputStreaming;	private final AsyncListenableTaskExecutor taskExecutor;	SimpleStreamingAsyncClientHttpRequest(HttpURLConnection connection, int chunkSize,			boolean outputStreaming, AsyncListenableTaskExecutor taskExecutor) {		this.connection = connection;		this.chunkSize = chunkSize;		this.outputStreaming = outputStreaming;		this.taskExecutor = taskExecutor;	}	@Override	public String getMethodValue() {		return this.connection.getRequestMethod();	}	@Override	public URI getURI() {		try {			return this.connection.getURL().toURI();		}		catch (URISyntaxException ex) {			throw new IllegalStateException(					"Could not get HttpURLConnection URI: " + ex.getMessage(), ex);		}	}	@Override	protected OutputStream getBodyInternal(HttpHeaders headers) throws IOException {		if (this.body == null) {			if (this.outputStreaming) {				long contentLength = headers.getContentLength();				if (contentLength >= 0) {					this.connection.setFixedLengthStreamingMode(contentLength);				}				else {					this.connection.setChunkedStreamingMode(this.chunkSize);				}			}			SimpleBufferingClientHttpRequest.addHeaders(this.connection, headers);			this.connection.connect();			this.body = this.connection.getOutputStream();		}		return StreamUtils.nonClosing(this.body);	}	@Override	protected ListenableFuture
executeInternal(final HttpHeaders headers) throws IOException { return this.taskExecutor.submitListenable(new Callable
() { @Override public ClientHttpResponse call() throws Exception { try { if (body != null) { body.close(); } else { SimpleBufferingClientHttpRequest.addHeaders(connection, headers); connection.connect(); // Immediately trigger the request in a no-output scenario as well connection.getResponseCode(); } } catch (IOException ex) { // ignore } return new SimpleClientHttpResponse(connection); } }); }}
  • 主要使用的是jdk的HttpURLConnection来实现

Netty4ClientHttpRequest

spring-web-5.0.7.RELEASE-sources.jar!/org/springframework/http/client/Netty4ClientHttpRequest.java

/** * {@link ClientHttpRequest} implementation based on Netty 4. * * 

Created via the {@link Netty4ClientHttpRequestFactory}. * * @author Arjen Poutsma * @author Rossen Stoyanchev * @author Brian Clozel * @since 4.1.2 * @deprecated as of Spring 5.0, in favor of * {@link org.springframework.http.client.reactive.ReactorClientHttpConnector} */@Deprecatedclass Netty4ClientHttpRequest extends AbstractAsyncClientHttpRequest implements ClientHttpRequest { private final Bootstrap bootstrap; private final URI uri; private final HttpMethod method; private final ByteBufOutputStream body; public Netty4ClientHttpRequest(Bootstrap bootstrap, URI uri, HttpMethod method) { this.bootstrap = bootstrap; this.uri = uri; this.method = method; this.body = new ByteBufOutputStream(Unpooled.buffer(1024)); } @Override public HttpMethod getMethod() { return this.method; } @Override public String getMethodValue() { return this.method.name(); } @Override public URI getURI() { return this.uri; } @Override public ClientHttpResponse execute() throws IOException { try { return executeAsync().get(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException("Interrupted during request execution", ex); } catch (ExecutionException ex) { if (ex.getCause() instanceof IOException) { throw (IOException) ex.getCause(); } else { throw new IOException(ex.getMessage(), ex.getCause()); } } } @Override protected OutputStream getBodyInternal(HttpHeaders headers) throws IOException { return this.body; } @Override protected ListenableFuture

executeInternal(final HttpHeaders headers) throws IOException { final SettableListenableFuture
responseFuture = new SettableListenableFuture<>(); ChannelFutureListener connectionListener = future -> { if (future.isSuccess()) { Channel channel = future.channel(); channel.pipeline().addLast(new RequestExecuteHandler(responseFuture)); FullHttpRequest nettyRequest = createFullHttpRequest(headers); channel.writeAndFlush(nettyRequest); } else { responseFuture.setException(future.cause()); } }; this.bootstrap.connect(this.uri.getHost(), getPort(this.uri)).addListener(connectionListener); return responseFuture; } private FullHttpRequest createFullHttpRequest(HttpHeaders headers) { io.netty.handler.codec.http.HttpMethod nettyMethod = io.netty.handler.codec.http.HttpMethod.valueOf(this.method.name()); String authority = this.uri.getRawAuthority(); String path = this.uri.toString().substring(this.uri.toString().indexOf(authority) + authority.length()); FullHttpRequest nettyRequest = new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, nettyMethod, path, this.body.buffer()); nettyRequest.headers().set(HttpHeaders.HOST, this.uri.getHost() + ":" + getPort(uri)); nettyRequest.headers().set(HttpHeaders.CONNECTION, "close"); headers.forEach((headerName, headerValues) -> nettyRequest.headers().add(headerName, headerValues)); if (!nettyRequest.headers().contains(HttpHeaders.CONTENT_LENGTH) && this.body.buffer().readableBytes() > 0) { nettyRequest.headers().set(HttpHeaders.CONTENT_LENGTH, this.body.buffer().readableBytes()); } return nettyRequest; } private static int getPort(URI uri) { int port = uri.getPort(); if (port == -1) { if ("http".equalsIgnoreCase(uri.getScheme())) { port = 80; } else if ("https".equalsIgnoreCase(uri.getScheme())) { port = 443; } } return port; } /** * A SimpleChannelInboundHandler to update the given SettableListenableFuture. */ private static class RequestExecuteHandler extends SimpleChannelInboundHandler
{ private final SettableListenableFuture
responseFuture; public RequestExecuteHandler(SettableListenableFuture
responseFuture) { this.responseFuture = responseFuture; } @Override protected void channelRead0(ChannelHandlerContext context, FullHttpResponse response) throws Exception { this.responseFuture.set(new Netty4ClientHttpResponse(context, response)); } @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { this.responseFuture.setException(cause); } }}

  • 使用netty的bootstrap.connect进行请求

小结

AsyncLoadBalancerAutoConfiguration使用的AsyncClientHttpRequest及其实现类都被标记为废弃,spring 5之后推荐使用webClient。

doc

转载于:https://my.oschina.net/go4it/blog/1857739

你可能感兴趣的文章
NPOI 导出excel带图片,可控大小
查看>>
算法数据结构(一)-B树
查看>>
阿里云官方教程 Linux 系统挂载数据盘
查看>>
(数组)众数问题
查看>>
如何写一个简单的手写识别算法?
查看>>
JavaScript学习笔记——函数
查看>>
atitit.基于 Commons CLI 的命令行原理与 开发
查看>>
Blog CSS
查看>>
git workflow 原文 以及缺点
查看>>
QT对话框中show和exec的区别
查看>>
Android和C#实时视频传输Demo
查看>>
java并发编程学习:如何等待多个线程执行完成后再继续后续处理(synchronized、join、FutureTask、CyclicBarrier)...
查看>>
Mysql Binlog三种格式介绍及分析
查看>>
70、二维码生成+圆形头像
查看>>
Pazera Free Audio Extractor 中文版 - 轻松将视频背景音乐/对话音频提取出来的免费软件...
查看>>
读取spring配置文件的方法(spring读取资源文件)
查看>>
PostConstruct
查看>>
MyEclipse------快速读取特定目录下的文件的内容(字节输入流)
查看>>
Linq查询操作之排序操作
查看>>
Spring 4支持的Java 8新特性一览
查看>>