lettuce初探
1. 起因
由于对Springboot不够熟悉,由于对lettuce不够熟悉,出现了各种匪夷所思的问题
比如线程泄露问题,比如command reject问题,等等
2. 进展
查阅相关源码,从片段中看出一些端倪,修改了相关配置,
比如修改clientResource,disconnectedBehavior等
果然正常了,但是始终心中不放心
3. 实验
花费了大量时间,进行实验,分为三个部分
1. 使用lettuce直接连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 /** * 创建RedisClient */ @Bean(destroyMethod = "shutdown") RedisClient redisClient(@Qualifier("commonClientResources") ClientResources commonClientResources, @Qualifier("commonClientOptions") ClientOptions commonClientOptions) { RedisURI uri = RedisURI.builder() .withHost(redisConfigProperty.getHost()) .withPassword(redisConfigProperty.getPassword()) .withPort(redisConfigProperty.getPort()) .withTimeout(redisConfigProperty.getTimeout()) .withDatabase(redisConfigProperty.getDatabase()) .build(); RedisClient client = RedisClient.create(commonClientResources, uri); client.setOptions(commonClientOptions); return client; }
1 2 3 4 StatefulRedisConnection<String, String> lettuceConnection = redisClient.connect(); lettuceConnection.async().set("123", "123"); String string = lettuceConnection.async().get("123").get(3, TimeUnit.SECONDS); lettuceConnection.close();
通过这种方式,将单次连接的流程完全梳理清楚,也理解了每个client配置的属性影响范围
Implementations of {@link ClientResources} are stateful and must be {@link #shutdown()} after they are no longer in use.
源码已经注明,这个对象是一个client的运行环境,并且是有状态的,用完要关闭,不过可以多个client共享一个,所以在Springboot应用中,声明为Bean,保证是单例,避免出现每次创建之后不销毁;
这个也是最初线程泄漏的原因,创建连接的时候都创建了一个该对象,而其中的nioEventLoop线程池自然也就创建了很多,由于业务需要,需要创建很多连接,从而导致线程数量超出控制,后改为单例解决
Client Options to control the behavior of {@link RedisClient}
这是控制具体的client行为的配置
1 2 3 4 5 6 7 8 9 10 private final boolean pingBeforeActivateConnection; // 在连接建立,激活之前,ping或者auth(有密码)确认连接可用 private final boolean autoReconnect;//顾名思义,连接失效之后会自动重新连接,是通过一个watchDog线程完成 private final boolean cancelCommandsOnReconnectFailure; private final boolean publishOnScheduler; private final boolean suspendReconnectOnProtocolFailure; private final int requestQueueSize; private final DisconnectedBehavior disconnectedBehavior;//当前连接失效之后的行为,有default(如果autoReconnect=true则accept,否则reject),reject(直接拒绝当前命令),accept(如果连接不可用,等待直到超时) private final SocketOptions socketOptions; private final SslOptions sslOptions; private final TimeoutOptions timeoutOptions;
其他选项细节请参照官网,除非特殊需要,用默认就好
以上选项是单个连接层面的,很好理解,下面继续配合连接池,看一下这些又有什么影响
2. 使用连接池连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Bean public GenericObjectPoolConfig getRedisConfig() { GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(redisConfigProperty.getLettuce().getPool().getMaxIdle()); genericObjectPoolConfig.setMinIdle(redisConfigProperty.getLettuce().getPool().getMinIdle()); genericObjectPoolConfig.setMaxTotal(redisConfigProperty.getLettuce().getPool().getMaxActive()); //连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true genericObjectPoolConfig.setBlockWhenExhausted(true); genericObjectPoolConfig.setMaxWaitMillis(redisConfigProperty.getLettuce().getPool().getMaxWait().toMillis()); //在borrow一个实例时,是否提前进行validate操作;如果为true,则得到的实例均是可用的 genericObjectPoolConfig.setTestOnBorrow(true); //调用returnObject方法时,是否进行有效检查 genericObjectPoolConfig.setTestOnReturn(true); //在空闲时检查有效性, 默认false genericObjectPoolConfig.setTestWhileIdle(true); //由于默认会启用jmx,导致出现重名bean,无法注册 genericObjectPoolConfig.setJmxEnabled(false); //表示idle object evitor两次扫描之间要sleep的毫秒数; genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(3000); //表示一个对象至少停留在idle状态的最短时间, //然后才能被idle object evitor扫描并驱逐;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义; genericObjectPoolConfig.setMinEvictableIdleTimeMillis(10000); return genericObjectPoolConfig; }
这是一个连接池的配置,配置项已经注明其意义
1 2 3 4 5 @Bean(name = "lettucePool") public GenericObjectPool<StatefulRedisConnection<String, String>> lettucePool(GenericObjectPoolConfig getRedisConfig){ return ConnectionPoolSupport.createGenericObjectPool( () -> redisClient.connect(), getRedisConfig, false); }
1 2 3 4 StatefulRedisConnection<String, String> lettuceConnection = lettucePool.borrowObject(); lettuceConnection.async().set("123", "123"); string = lettuceConnection.async().get("123").get(3, TimeUnit.SECONDS); lettucePool.returnObject(lettuceConnection);
注意到,连接池有几个配置
1 2 3 4 private boolean testOnBorrow = DEFAULT_TEST_ON_BORROW; private boolean testOnReturn = DEFAULT_TEST_ON_RETURN;
默认值都是false,如果设置为true,在对应时间就会触发调用validate方法,对于lettuce,就是进行一次ping请求来检验连接是否可用,并决定是否要重新创建;
对比lettuce的autoReconnect配置
autoReconnect:从命令执行层面理解,该配置会确保命令一定会被执行一次,除非超时,
validate:确保池中有可用连接,取出的连接是有效的,放回的也是有效的,但已经active的就不能保证了,此时autoReconnect会保证连接的有效性
minEvictableIdleTimeMillis和softMinEvictableIdleTimeMillis和TestWhileIdle
这几项配置需要timeBetweenEvictionRunsMillis > 0 ,才会生效,会一直循环驱逐空闲(或者是失效,具体看配置)的连接,从而替代频繁的TestOnBorrow等配置,确保连接池内连接的有效性,可以根据具体情况进行配置
但是如果使用redisTemplate的话,TestOnBorrow这些操作就会不那么频繁了,见下文
3. 使用redisTemplate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 /** * 生成连接池 * * @return DefaultLettucePool */ @Bean public LettucePoolingClientConfiguration lettucePoolingClientConfiguration(GenericObjectPoolConfig getRedisConfig) { return LettucePoolingClientConfiguration.builder() .commandTimeout(redisConfigProperty.getTimeout()) .poolConfig(getRedisConfig).clientResources(commonClientResources).clientOptions(commonClientOptions).build(); } /** * redis单机配置 * * @return RedisStandaloneConfiguration */ @Bean public RedisStandaloneConfiguration redisStandaloneConfiguration() { RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(); redisStandaloneConfiguration.setDatabase(redisConfigProperty.getDatabase()); redisStandaloneConfiguration.setHostName(redisConfigProperty.getHost()); redisStandaloneConfiguration.setPort(redisConfigProperty.getPort()); redisStandaloneConfiguration.setPassword(RedisPassword.of(redisConfigProperty.getPassword())); return redisStandaloneConfiguration; } /** * lettuce 连接工厂配置 * * @return LettuceConnectionFactory implement RedisConnectionFactory */ @Bean public LettuceConnectionFactory lettuceConnectionFactory(RedisStandaloneConfiguration redisStandaloneConfiguration, LettucePoolingClientConfiguration lettucePoolingClientConfiguration) { LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, lettucePoolingClientConfiguration); lettuceConnectionFactory.setValidateConnection(true); return lettuceConnectionFactory; } @Bean(name = "stringRedisTemplate") public StringRedisTemplate getStringRedisTemplate(RedisConnectionFactory lettuceConnectionFactory) { StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(); stringRedisTemplate.setDefaultSerializer(new StringRedisSerializer()); //事务支持 // stringRedisTemplate.setEnableTransactionSupport(true); stringRedisTemplate.setConnectionFactory(lettuceConnectionFactory); return stringRedisTemplate; }
1 2 stringRedisTemplate.opsForValue().set("123", "12345"); String string = stringRedisTemplate.opsForValue().get("123");
这种方式使用Spring提供的封装模版,实现方式相当于在连接池基础上封装一层连接工厂,每次调用opsForValue方法时都是从工厂中调用getConnection()
方法,获取连接,该方法在lettuce中实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public RedisConnection getConnection() { if (isClusterAware()) { return getClusterConnection(); } LettuceConnection connection; if (pool != null) { connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase()); } else { connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase()); } connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults); return connection; }
这里的getSharedConnection()
,是获取了一个线程安全的共享连接,,这个共享连接逻辑是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 StatefulConnection<E, E> getConnection() { synchronized (this.connectionMonitor) { if (this.connection == null) { this.connection = getNativeConnection(); } if (getValidateConnection()) { validateConnection(); } return this.connection; } }
只有当connection为null时,才会调用getNativeConnection()
,实际上这是从连接池中borrowObject,并且默认getValidateConnection()
是false,也就是说,这条连接的有效性默认是不会进行验证的,而这样处理的原因是,lettuce有上文提到的autoReconnect,一旦把autoReconnect设置为false,那这条连接的有效性就无法保证了,即使连接池中都是有效的连接,但工厂中缓存的这条连接始终不会被验证。
高级特性
在Spring中使用lettuce,使用RedisTemplate是能够比较便捷的操作,满足绝大多数场景,但由于RedisTemplate还是太重,逻辑复杂,并且对于lettuce来说,屏蔽了一些高级特性,尤其是异步请求(Asynchronous API),响应式编程(Reactive API), 流式API(Streaming API)等用法,
1 2 3 4 5 6 7 8 9 10 11 12 StatefulRedisConnection<String, String> lettuceConnection = lettucePool.borrowObject(); lettuceConnection.async().set("123", "123"); RedisStringAsyncCommands<String, String> async = lettuceConnection.async(); RedisFuture<String> future = async.get("123"); try { string = future.get(60, TimeUnit.SECONDS); System.out.println(string); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); }
Lambda表达式
1 future.thenAccept(System.out::println);
另外的线程处理
1 2 Executor sharedExecutor = Executors.newFixedThreadPool(1); future.thenAcceptAsync(System.out::println, sharedExecutor);
链式操作
1 2 future.thenApply(String::length) .thenAccept(integer -> System.out.println("Got value: " + integer));
Reactive
1 2 3 4 5 RedisStringReactiveCommands<String, String> reactive = lettuceConnection.reactive(); lettuceConnection.async().set("123", "123"); reactive.get("123").subscribe(System.out::println);
Streaming
1 2 3 4 5 6 7 8 9 10 Long count = redis.lrange(new ValueStreamingChannel<String, String>() { @Override public void onValue(String value) { System.out.println("Value: " + value); } }, "key", 0, -1); System.out.println("Count: " + count);
Reactive
这个概念单独可以讲成一本书,但是这里记录一些便于理解的东西,从而方便理解lettuce能做些什么,不会深入探讨
1. Publisher and Subscriber
All commands return a Flux, Mono or Mono to which a Subscriber can subscribe to
首先抛出两个概念,Publisher和Subscriber
1 2 3 4 5 6 RedisClient client = RedisClient.create("redis://localhost"); RedisStringReactiveCommands<String, String> commands = client.connect().reactive(); commands .get("key") .subscribe(value -> System.out.println(value));
这里,commands的get方法就是返回了Mono,也就是一个Publisher,而subscribe的参数就是Subscriber,可以理解为发布订阅模式
1 2 3 4 Flux.just("Ben", "Michael", "Mark") .flatMap(commands::get) .flatMap(value -> commands.rpush("result", value)) .subscribe();
这里Flux.just也是产生一个Publisher,进而可以进行链式操作,最后调用subscribe,
这种用法与异步的future的用法区别是,如果没有订阅者,那么什么事都不会发生,而future其实是在创建的时候就执行了
1 2 3 4 5 6 Flux.just("Ben", "Michael", "Mark") .doOnNext(value -> { throw new IllegalStateException("Takes way too long"); }) .onErrorReturn("Default value") .subscribe();
异常处理的相关方法