본문 바로가기
카테고리 없음

[Redis] Redis Pub/Sub

by hbIncoding 2024. 11. 27.

0. 요약

  • 아래 코드를 통해 아주 간단히 Redis를 이용한 Publisher, subsciber를 이용한 방법을 알 수 있다.
  • 세부적인 방법은 코드 다음 글을 참고하자.
import { createClient, RedisClientType } from 'redis';
import { ConfigService } from '@nestjs/config';

@Injectable()
export class MailService implements OnModuleInit, OnModuleDestroy {
  private subscriber: RedisClientType;
  private publisher: RedisClientType;
  private sseSubjects: Map<number, BehaviorSubject<string>> = new Map();
  private myIp: string;
  private intervalConnect: NodeJS.Timeout;

  constructor(
    private readonly databaseService: DatabaseService,
    private readonly configService: ConfigService
  ) {}

  onModuleDestroy() {
    const keys = this.sseSubjects.keys();
    for (const key in keys) {
      this.publisher.del(key);
    }

    if (this.intervalConnect) {
      clearInterval(this.intervalConnect);
    }
  }

  async onModuleInit() {
    const redisUrl = this.configService.get<string>('REDIS_URL');
    this.subscriber = createClient({ url: redisUrl });
    this.publisher = createClient({ url: redisUrl });

    await this.subscriber.connect();
    await this.publisher.connect();

    const networkInterfaces = os.networkInterfaces();
    for (const interfaceName in networkInterfaces) {
      const networkInfo = networkInterfaces[interfaceName];
      if (networkInfo) {
        const ipv4 = networkInfo.find(info => info.family === 'IPv4' && !info.internal);
        if (ipv4) {
          this.myIp = String(ipv4.address);
          break;
        }
      }
    }

    this.subscriber.subscribe('notifications', message => {
      const parsedMessage = JSON.parse(message);
      const memberId = parsedMessage.memberId;
      // 그외 실행 코드 생략

    });
  }

  async connectSse(memberId: number, res: Response) {
	// 실행 코드 축약, pubsub 관련 코드만 잔류
    await this.publisher.set(`sseRedisMember:${memberId}`, this.myIp);

    res.on('close', () => {
      this.sseSubjects.delete(memberId);
      this.publisher.del(`sseRedisMember:${memberId}`);
      res.end();
    });
  }

  async sendMessage(memberId: number) {
    const serverInfo = await this.publisher.get(`sseRedisMember:${memberId}`);
    if (!serverInfo) {
      return;
    }

    if (serverInfo === this.myIp) {
      const subject = this.sseSubjects.get(memberId);
      if (subject) {
        //다중서버 이지만 현재 서버에서 진행
      }
    } else {
    	//다중서버 상황에서 다른 서버에서 진행
      await this.publisher.publish('notifications', JSON.stringify({ memberId }));
    }
  }
}

1. 어떻게 사용해야하는가?

  • 단계적으로 보면 매우 간단하다.

1) 우선 아래와 같이 publisher와 subscriber를 등록한다.

const redisUrl = 'redis://localhost:6379'
this.subscriber = createClient({ url: redisUrl });
this.publisher = createClient({ url: redisUrl });

await this.subscriber.connect();
await this.publisher.connect();
  • 여기서 어짜피 같은 주소로 한다면 1개만 만들어도 될텐데, sub/pub 나눠서 만드는지는 좀 더 아래에 설명하겠다.

2) 구독을 시행한다.

this.subscriber.subscribe('notifications', message => {
      const parsedMessage = JSON.parse(message);
      const memberId = parsedMessage.memberId;
      console.log(memberId)
});
  • 서버에 'notifications' 라는 이벤트와 함께 message 라는 데이터가 들어온다.
  • 이 때 message를 받아서 어떻게 해야할지를 작성한다.

3) 이벤트를 발행한다.

await this.publisher.publish('notifications', JSON.stringify({ memberId }));
  • 이렇게 message에 해당하는 부분을 string으로 만들어 발행시킨다.

4) 그외 redis 사용하기.

this.publisher.set(`sseRedisMember:${memberId}`, this.myIp);
this.publisher.del(`sseRedisMember:${memberId}`);
  • publisher로 명명해서 그렇지만, 사실은 일반 redis 처럼 데이터를 저장하고 삭제, 조회가 가능하다.

 

2. 사용시 신경써야할 점

  • 당연하게도 subsribe를 먼저 선언해주어야 한다.
    • 그래서 NestJS를 사용할때 service코드의 onModuleInit에서 작성해주었다.
  • message 값은 string만 지원한다. 따라서 그래서 JSON,stringify 등을 이용해서 보내야할 데이터가 복잡하다면 가공이 한번 들어가면 좋다.

 

3. 왜 publisher와 subscriber 2개를 선언했는가

  • 제일 큰 이유는 역할 분리와 독립성을 보장하기 위해서이다.
  • 역할의 분리
    • 해당 객체의 역할을 확실히 구분함으로써 코드 작성 시 오류를 방지할 수 있다.
    • 발행자와 구독자가 서로 독립적이기 때문에 시스템 설계가 간단하고, 확장이나 유지보수가 용이하다.
      • 즉, 각자의 코드 변경이 영향을 주지 않는다.
  • 비동기 작업 처리
    • 발행자는 구독자가 메시지를 처리 중인지, 연결이 되어 있는지 신경 쓰지 않고 메시지만 보낸다.
    • 구독자는 자신이 관심있는 체널에 구독한 후 메시지가 발행 되었을 때만 반응 한다.

 

4. 참조

1)공식 문서 : https://redis.io/docs/latest/develop/interact/pubsub/

 

Redis Pub/Sub

How to use pub/sub channels in Redis

redis.io

 

2) node.js에서 Redis PubSub 구현하기 : https://inpa.tistory.com/entry/REDIS-%F0%9F%93%9A-Nodejs-%EC%97%90%EC%84%9C-PubSub-%EA%B8%B0%EB%8A%A5-%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0