lettuce初探

1. 起因

由于对Springboot不够熟悉,由于对lettuce不够熟悉,出现了各种匪夷所思的问题
比如线程泄露问题,比如command reject问题,等等

2. 进展

查阅相关源码,从片段中看出一些端倪,修改了相关配置,
比如修改clientResource,disconnectedBehavior等
果然正常了,但是始终心中不放心

3. 实验

花费了大量时间,进行实验,分为三个部分

1. 使用lettuce直接连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 创建RedisClient
*/
@Bean(destroyMethod = "shutdown")
RedisClient redisClient(@Qualifier("commonClientResources") ClientResources commonClientResources,
@Qualifier("commonClientOptions") ClientOptions commonClientOptions) {
RedisURI uri = RedisURI.builder()
.withHost(redisConfigProperty.getHost())
.withPassword(redisConfigProperty.getPassword())
.withPort(redisConfigProperty.getPort())
.withTimeout(redisConfigProperty.getTimeout())
.withDatabase(redisConfigProperty.getDatabase())
.build();
RedisClient client = RedisClient.create(commonClientResources, uri);
client.setOptions(commonClientOptions);
return client;
}

1
2
3
4
StatefulRedisConnection<String, String> lettuceConnection = redisClient.connect();
lettuceConnection.async().set("123", "123");
String string = lettuceConnection.async().get("123").get(3, TimeUnit.SECONDS);
lettuceConnection.close();

通过这种方式,将单次连接的流程完全梳理清楚,也理解了每个client配置的属性影响范围

  • clientResource

Implementations of {@link ClientResources} are stateful and must be {@link #shutdown()} after they are no longer in use.

源码已经注明,这个对象是一个client的运行环境,并且是有状态的,用完要关闭,不过可以多个client共享一个,所以在Springboot应用中,声明为Bean,保证是单例,避免出现每次创建之后不销毁;

这个也是最初线程泄漏的原因,创建连接的时候都创建了一个该对象,而其中的nioEventLoop线程池自然也就创建了很多,由于业务需要,需要创建很多连接,从而导致线程数量超出控制,后改为单例解决

  • clientOptions

Client Options to control the behavior of {@link RedisClient}

这是控制具体的client行为的配置

1
2
3
4
5
6
7
8
9
10
private final boolean pingBeforeActivateConnection; // 在连接建立,激活之前,ping或者auth(有密码)确认连接可用
private final boolean autoReconnect;//顾名思义,连接失效之后会自动重新连接,是通过一个watchDog线程完成
private final boolean cancelCommandsOnReconnectFailure;
private final boolean publishOnScheduler;
private final boolean suspendReconnectOnProtocolFailure;
private final int requestQueueSize;
private final DisconnectedBehavior disconnectedBehavior;//当前连接失效之后的行为,有default(如果autoReconnect=true则accept,否则reject),reject(直接拒绝当前命令),accept(如果连接不可用,等待直到超时)
private final SocketOptions socketOptions;
private final SslOptions sslOptions;
private final TimeoutOptions timeoutOptions;

其他选项细节请参照官网,除非特殊需要,用默认就好

以上选项是单个连接层面的,很好理解,下面继续配合连接池,看一下这些又有什么影响

2. 使用连接池连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 @Bean
public GenericObjectPoolConfig getRedisConfig() {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(redisConfigProperty.getLettuce().getPool().getMaxIdle());
genericObjectPoolConfig.setMinIdle(redisConfigProperty.getLettuce().getPool().getMinIdle());
genericObjectPoolConfig.setMaxTotal(redisConfigProperty.getLettuce().getPool().getMaxActive());
//连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
genericObjectPoolConfig.setBlockWhenExhausted(true);
genericObjectPoolConfig.setMaxWaitMillis(redisConfigProperty.getLettuce().getPool().getMaxWait().toMillis());
//在borrow一个实例时,是否提前进行validate操作;如果为true,则得到的实例均是可用的
genericObjectPoolConfig.setTestOnBorrow(true);
//调用returnObject方法时,是否进行有效检查
genericObjectPoolConfig.setTestOnReturn(true);
//在空闲时检查有效性, 默认false
genericObjectPoolConfig.setTestWhileIdle(true);
//由于默认会启用jmx,导致出现重名bean,无法注册
genericObjectPoolConfig.setJmxEnabled(false);
//表示idle object evitor两次扫描之间要sleep的毫秒数;
genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(3000);
//表示一个对象至少停留在idle状态的最短时间,
//然后才能被idle object evitor扫描并驱逐;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义;
genericObjectPoolConfig.setMinEvictableIdleTimeMillis(10000);

return genericObjectPoolConfig;
}

这是一个连接池的配置,配置项已经注明其意义

1
2
3
4
5
@Bean(name = "lettucePool")
public GenericObjectPool<StatefulRedisConnection<String, String>> lettucePool(GenericObjectPoolConfig getRedisConfig){
return ConnectionPoolSupport.createGenericObjectPool(
() -> redisClient.connect(), getRedisConfig, false);
}
1
2
3
4
StatefulRedisConnection<String, String> lettuceConnection = lettucePool.borrowObject();
lettuceConnection.async().set("123", "123");
string = lettuceConnection.async().get("123").get(3, TimeUnit.SECONDS);
lettucePool.returnObject(lettuceConnection);

注意到,连接池有几个配置

1
2
3
4
private boolean testOnBorrow = DEFAULT_TEST_ON_BORROW;

private boolean testOnReturn = DEFAULT_TEST_ON_RETURN;

默认值都是false,如果设置为true,在对应时间就会触发调用validate方法,对于lettuce,就是进行一次ping请求来检验连接是否可用,并决定是否要重新创建;
对比lettuce的autoReconnect配置

  • autoReconnect:从命令执行层面理解,该配置会确保命令一定会被执行一次,除非超时,
  • validate:确保池中有可用连接,取出的连接是有效的,放回的也是有效的,但已经active的就不能保证了,此时autoReconnect会保证连接的有效性

minEvictableIdleTimeMillis和softMinEvictableIdleTimeMillis和TestWhileIdle
这几项配置需要timeBetweenEvictionRunsMillis > 0 ,才会生效,会一直循环驱逐空闲(或者是失效,具体看配置)的连接,从而替代频繁的TestOnBorrow等配置,确保连接池内连接的有效性,可以根据具体情况进行配置

但是如果使用redisTemplate的话,TestOnBorrow这些操作就会不那么频繁了,见下文

3. 使用redisTemplate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
 /**
* 生成连接池
*
* @return DefaultLettucePool
*/
@Bean
public LettucePoolingClientConfiguration lettucePoolingClientConfiguration(GenericObjectPoolConfig getRedisConfig) {
return LettucePoolingClientConfiguration.builder()
.commandTimeout(redisConfigProperty.getTimeout())
.poolConfig(getRedisConfig).clientResources(commonClientResources).clientOptions(commonClientOptions).build();
}

/**
* redis单机配置
*
* @return RedisStandaloneConfiguration
*/
@Bean
public RedisStandaloneConfiguration redisStandaloneConfiguration() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setDatabase(redisConfigProperty.getDatabase());
redisStandaloneConfiguration.setHostName(redisConfigProperty.getHost());
redisStandaloneConfiguration.setPort(redisConfigProperty.getPort());
redisStandaloneConfiguration.setPassword(RedisPassword.of(redisConfigProperty.getPassword()));
return redisStandaloneConfiguration;
}

/**
* lettuce 连接工厂配置
*
* @return LettuceConnectionFactory implement RedisConnectionFactory
*/
@Bean
public LettuceConnectionFactory lettuceConnectionFactory(RedisStandaloneConfiguration redisStandaloneConfiguration,
LettucePoolingClientConfiguration lettucePoolingClientConfiguration) {
LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, lettucePoolingClientConfiguration);
lettuceConnectionFactory.setValidateConnection(true);
return lettuceConnectionFactory;
}


@Bean(name = "stringRedisTemplate")
public StringRedisTemplate getStringRedisTemplate(RedisConnectionFactory lettuceConnectionFactory) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.setDefaultSerializer(new StringRedisSerializer());
//事务支持
// stringRedisTemplate.setEnableTransactionSupport(true);
stringRedisTemplate.setConnectionFactory(lettuceConnectionFactory);
return stringRedisTemplate;
}

1
2
stringRedisTemplate.opsForValue().set("123", "12345");
String string = stringRedisTemplate.opsForValue().get("123");

这种方式使用Spring提供的封装模版,实现方式相当于在连接池基础上封装一层连接工厂,每次调用opsForValue方法时都是从工厂中调用getConnection()方法,获取连接,该方法在lettuce中实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public RedisConnection getConnection() {

if (isClusterAware()) {
return getClusterConnection();
}

LettuceConnection connection;

if (pool != null) {
connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase());
} else {
connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
}

connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
return connection;
}

这里的getSharedConnection(),是获取了一个线程安全的共享连接,,这个共享连接逻辑是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
StatefulConnection<E, E> getConnection() {

synchronized (this.connectionMonitor) {

if (this.connection == null) {
this.connection = getNativeConnection();
}

if (getValidateConnection()) {
validateConnection();
}

return this.connection;
}
}

只有当connection为null时,才会调用getNativeConnection(),实际上这是从连接池中borrowObject,并且默认getValidateConnection()是false,也就是说,这条连接的有效性默认是不会进行验证的,而这样处理的原因是,lettuce有上文提到的autoReconnect,一旦把autoReconnect设置为false,那这条连接的有效性就无法保证了,即使连接池中都是有效的连接,但工厂中缓存的这条连接始终不会被验证。

高级特性

在Spring中使用lettuce,使用RedisTemplate是能够比较便捷的操作,满足绝大多数场景,但由于RedisTemplate还是太重,逻辑复杂,并且对于lettuce来说,屏蔽了一些高级特性,尤其是异步请求(Asynchronous API),响应式编程(Reactive API), 流式API(Streaming API)等用法,

1
2
3
4
5
6
7
8
9
10
11
12
StatefulRedisConnection<String, String> lettuceConnection = lettucePool.borrowObject();
lettuceConnection.async().set("123", "123");
RedisStringAsyncCommands<String, String> async = lettuceConnection.async();

RedisFuture<String> future = async.get("123");

try {
string = future.get(60, TimeUnit.SECONDS);
System.out.println(string);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}

Lambda表达式

1
future.thenAccept(System.out::println);

另外的线程处理

1
2
Executor sharedExecutor = Executors.newFixedThreadPool(1);
future.thenAcceptAsync(System.out::println, sharedExecutor);

链式操作

1
2
future.thenApply(String::length)
.thenAccept(integer -> System.out.println("Got value: " + integer));

Reactive

1
2
3
4
5
RedisStringReactiveCommands<String, String> reactive = lettuceConnection.reactive();
lettuceConnection.async().set("123", "123");

reactive.get("123").subscribe(System.out::println);

Streaming

1
2
3
4
5
6
7
8
9
10
Long count = redis.lrange(new ValueStreamingChannel<String, String>()
{
@Override
public void onValue(String value)
{
System.out.println("Value: " + value);
}
}, "key", 0, -1);

System.out.println("Count: " + count);

Reactive

这个概念单独可以讲成一本书,但是这里记录一些便于理解的东西,从而方便理解lettuce能做些什么,不会深入探讨

1. Publisher and Subscriber

All commands return a Flux, Mono or Mono to which a Subscriber can subscribe to

首先抛出两个概念,Publisher和Subscriber

1
2
3
4
5
6
RedisClient client = RedisClient.create("redis://localhost");
RedisStringReactiveCommands<String, String> commands = client.connect().reactive();

commands
.get("key")
.subscribe(value -> System.out.println(value));

这里,commands的get方法就是返回了Mono,也就是一个Publisher,而subscribe的参数就是Subscriber,可以理解为发布订阅模式

1
2
3
4
Flux.just("Ben", "Michael", "Mark")
.flatMap(commands::get)
.flatMap(value -> commands.rpush("result", value))
.subscribe();

这里Flux.just也是产生一个Publisher,进而可以进行链式操作,最后调用subscribe,

这种用法与异步的future的用法区别是,如果没有订阅者,那么什么事都不会发生,而future其实是在创建的时候就执行了

1
2
3
4
5
6
Flux.just("Ben", "Michael", "Mark")
.doOnNext(value -> {
throw new IllegalStateException("Takes way too long");
})
.onErrorReturn("Default value")
.subscribe();

异常处理的相关方法