Warm tip: This article is reproduced from serverfault.com, please click

How to keep redis connection open when reading from reactive API

发布于 2020-11-25 13:33:53

I am continuously listening on redis streams using the spring reactive api(using lettuce driver). I am using a standalone connection. It seems like the reactor's event loop opens a new connection every time it reads the messages instead of keeping the connection open. I see a lot of TIME_WAIT ports in my machine when i run my program. Is this normal? Is there a way to let lettuce know to re-use the connection instead of reconnecting every time?

This is my code:

StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(factory);
return receiver
    .receive(Consumer.from(keyCacheStreamsConfig.getConsumerGroup(), keyCacheStreamsConfig.getConsumer()),
        StreamOffset.create(keyCacheStreamsConfig.getStreamName(), ReadOffset.lastConsumed()))//
    // flatMap reads 256 messages by default and processes them in the given scheduler
    .flatMap(record -> Mono.fromCallable(() -> consumer.consume(record)).subscribeOn(Schedulers.boundedElastic()))//
    .doOnError(t -> {
      log.error("Error processing.", t);
      streamConnections.get(nodeName).setDirty(true);
    })//
    .onErrorContinue((err, elem) -> log.error("Error processing message. Continue listening."))//
    .subscribe();
Questioner
falcon
Viewed
0
falcon 2020-12-10 02:42:55

Looks like the spring-data-redis library re-uses the connection only if the poll timeout is set to '0' in the stream receiver options and pass it as the second argument in StreamReceiver.create(factory, options). Figured by looking into spring-data-redis' source code.