TIL

24/01/29 TIL __ 레디스 stream타입 데이터 벌크로 영구저장.

GABOJOK 2024. 1. 29. 23:56

 

 

오늘은 레디스를 사용한 방법에 대해 한가지 적어보려 한다. 

 

실시간 채팅기능 구현을 맡은 나는, 채팅을 저장하고자 했다.

신고기능이나, 다른 부분들에 있어서 데이터를 보관하고 있으면 좋을것 같다고 생각했기 때문이다

(카카오 톡의 채팅 보관기능을 떠올렸던 것도 있다. )

 

그래서 redis를 이용해 데이터를 모은 다음, 벌크인서트 로 레디스에서 mongodb로 넣어주는 방향을 생각했다.

 

지난번 레디스를 hash타입으로 사용하고자 했으나, 

다시 알아보니 stream타입도 시간 순서대로 진행되어 좋을 것 같아 변경했다.

채팅데이터에 시간이 뒤죽박죽이라면, 의미가 떨어지기 때문이다.

 

처음에는 단순히 20개가 넘어가면 레디스에서 mongodb로 이동하고, 방금 넣은 그 데이터들은 삭제하도록 했었다. 

 

이를 좀더 입체적으로 바꾸게 됬는데,

현재 채팅 도배방지 기능을 염두에 두고 있었던 터라

20개의 채팅 데이터가 만약 1초만에 올라온다면? 

 

1초에 한번씩 10번의 같은 내용을 입력해 도배를 하는 유저는

도배방지 기능에서 잡을수 없을것이다. 

 

따라서 단순히 개수만으로 캐시db의 내용을 넘기는 것이 아니라, 

사간과 개수 모두 살펴보며 내용을 넘기도록 구현햇다.

 

예를들면 10초동안 1개의 데이터가 들어올 수 도 있고,

10초동안 1,000,000개의 데이터가 들어올 수도 있는 부분인데,

단순히 시간으로만 처리를 하게되면 트래픽에 대응할 수 없기 때문에,

10초의 인터벌 내에 들어온 데이터 개수가 일정 개수 미만이면, 이번 주기를 넘기도록 설정했다. 

또한 10초의 인터벌 내에 들어온 데이터 개수가 일정 개수 이상이라면 바로 mongodb로 넘기고 캐시를 삭제하도록 구현했다. 

 

//캐시 저장.
async setStreamCache(channelId: string, cacheData: object) {
    await this.redis.xAdd(channelId, '*', { ...cacheData });
    const channelIdDataSize = await this.redis.xLen(channelId);
    console.log('channelIdDataSize 길이캐쉬', channelIdDataSize);

    if (channelIdDataSize >= 10) {
        //100으로 변경예정
        this.dataPushMongo(channelId);
        return true;
    }

    setInterval(async () => {
        this.timeOutIsRunning = true;
        await this.dataPushMongo(channelId);
        this.timeOutIsRunning = false;
        return true;
    }, 5000); //10초로 변경예정
    if (!this.timeOutIsRunning) return false;
    return false;
}

 

이렇게 정리하고 나니, 좀더 안정적으로 데이터를 처리하는듯 하다.

아직 서버분리라던지 트래픽 대응을 위해 다른 부분들을 많이 고려해 보아야 겠지만 말이다.