Database/Redis
PipeLine
최-코드
2024. 8. 26. 09:49
상황
- redis을 사용할 때 캐싱 전략으로 write back 사용하다보니 일괄적으로 redis에 존재하는 데이터를 삭제해야 할 필요가 있었다.
- redis는 TCP 기반의 클라이언트-서버 모델을 따르므로 클라이언트가 보낸 쿼리는 소켓을 통해 읽혀지고, 보통 blocking 형태로 response된다. 이후 쿼리를 처리한 후 이에 대한 응답을 클라이언트에 다시 보낸다.
- 따라서 물리적인 네트워크 지연 또는 3-way handshake 등의 이유로 병목 현상이 발생할 수 있다.
해결법 : 이러한 병목을 개선하기 위해 redis에서는 PipeLining api를 제공한다. 이를 통해 클라이언트는 복수 개의 작업을 쌓아놓고 한 번에 전송할 수 있게 된다. 따라서 응답을 기다리지 않고, 여러 명령어를 한 번에 서버에 보낼 수 있고, 응답 또한 한 번에 받아올 수 있게 된다. 즉, 응답은 목록이
주의사항
- 파이프라이닝을 하는 동안에는 클라이언트의 메모리가 소모되므로 파이프라이닝을 할 때에는 한 번의 파이프라이닝 대기열에 포함시킬 작업의 합리적인 수를 가장 먼저 산정해야 한다. 보통 100~1000개 사이이다.
- 이 때 명령어마다 TCP 패킷으로 나뉘기 때문에 일부는 실패할 수도 있다.
@Scheduled(fixedDelay = 300000)
public void removeKeysUsingPipeLine() {
Set<String> keys = stringLongRedisTemplate.keys(
RedisKeyPrefixEnum.DISCONNECTED + "*");
if (keys != null) {
List<byte[]> byteKeys = keys.stream().filter(key -> {
Long baseKey = Long.valueOf(key.split(":")[1]);
return studyGrupIdSessionsMap.get(baseKey).isEmpty();
}).map(String::getBytes).toList();
int batchSize = 100;
int keySize = byteKeys.size();
for (int i = 0; i < (keySize + batchSize - 1) / batchSize; i++) {
List<byte[]> tempByteKeys = byteKeys.subList(
i, i += i + batchSize < keySize ? batchSize : keySize % batchSize);
stringLongRedisTemplate.executePipelined(
(RedisCallback<?>) redisConnection -> {
tempByteKeys.forEach(key -> redisConnection.keyCommands().del(key));
return null;
});
}
}
}
- stringLongRedisTemplate.executePipelined의 반환값은 각 명령어의 결과에 해당하는 리스트 값이다.
- executePipelined()에서 인자안에 있는 return 값을 필요없으므로 항상 null로 해도 무방하다.