0. 요약
- 아래 코드를 통해 아주 간단히 Redis를 이용한 Publisher, subsciber를 이용한 방법을 알 수 있다.
- 세부적인 방법은 코드 다음 글을 참고하자.
import { createClient, RedisClientType } from 'redis';
import { ConfigService } from '@nestjs/config';
@Injectable()
export class MailService implements OnModuleInit, OnModuleDestroy {
private subscriber: RedisClientType;
private publisher: RedisClientType;
private sseSubjects: Map<number, BehaviorSubject<string>> = new Map();
private myIp: string;
private intervalConnect: NodeJS.Timeout;
constructor(
private readonly databaseService: DatabaseService,
private readonly configService: ConfigService
) {}
onModuleDestroy() {
const keys = this.sseSubjects.keys();
for (const key in keys) {
this.publisher.del(key);
}
if (this.intervalConnect) {
clearInterval(this.intervalConnect);
}
}
async onModuleInit() {
const redisUrl = this.configService.get<string>('REDIS_URL');
this.subscriber = createClient({ url: redisUrl });
this.publisher = createClient({ url: redisUrl });
await this.subscriber.connect();
await this.publisher.connect();
const networkInterfaces = os.networkInterfaces();
for (const interfaceName in networkInterfaces) {
const networkInfo = networkInterfaces[interfaceName];
if (networkInfo) {
const ipv4 = networkInfo.find(info => info.family === 'IPv4' && !info.internal);
if (ipv4) {
this.myIp = String(ipv4.address);
break;
}
}
}
this.subscriber.subscribe('notifications', message => {
const parsedMessage = JSON.parse(message);
const memberId = parsedMessage.memberId;
// 그외 실행 코드 생략
});
}
async connectSse(memberId: number, res: Response) {
// 실행 코드 축약, pubsub 관련 코드만 잔류
await this.publisher.set(`sseRedisMember:${memberId}`, this.myIp);
res.on('close', () => {
this.sseSubjects.delete(memberId);
this.publisher.del(`sseRedisMember:${memberId}`);
res.end();
});
}
async sendMessage(memberId: number) {
const serverInfo = await this.publisher.get(`sseRedisMember:${memberId}`);
if (!serverInfo) {
return;
}
if (serverInfo === this.myIp) {
const subject = this.sseSubjects.get(memberId);
if (subject) {
//다중서버 이지만 현재 서버에서 진행
}
} else {
//다중서버 상황에서 다른 서버에서 진행
await this.publisher.publish('notifications', JSON.stringify({ memberId }));
}
}
}
1. 어떻게 사용해야하는가?
- 단계적으로 보면 매우 간단하다.
1) 우선 아래와 같이 publisher와 subscriber를 등록한다.
const redisUrl = 'redis://localhost:6379'
this.subscriber = createClient({ url: redisUrl });
this.publisher = createClient({ url: redisUrl });
await this.subscriber.connect();
await this.publisher.connect();
- 여기서 어짜피 같은 주소로 한다면 1개만 만들어도 될텐데, sub/pub 나눠서 만드는지는 좀 더 아래에 설명하겠다.
2) 구독을 시행한다.
this.subscriber.subscribe('notifications', message => {
const parsedMessage = JSON.parse(message);
const memberId = parsedMessage.memberId;
console.log(memberId)
});
- 서버에 'notifications' 라는 이벤트와 함께 message 라는 데이터가 들어온다.
- 이 때 message를 받아서 어떻게 해야할지를 작성한다.
3) 이벤트를 발행한다.
await this.publisher.publish('notifications', JSON.stringify({ memberId }));
- 이렇게 message에 해당하는 부분을 string으로 만들어 발행시킨다.
4) 그외 redis 사용하기.
this.publisher.set(`sseRedisMember:${memberId}`, this.myIp);
this.publisher.del(`sseRedisMember:${memberId}`);
- publisher로 명명해서 그렇지만, 사실은 일반 redis 처럼 데이터를 저장하고 삭제, 조회가 가능하다.
2. 사용시 신경써야할 점
- 당연하게도 subsribe를 먼저 선언해주어야 한다.
- 그래서 NestJS를 사용할때 service코드의 onModuleInit에서 작성해주었다.
- message 값은 string만 지원한다. 따라서 그래서 JSON,stringify 등을 이용해서 보내야할 데이터가 복잡하다면 가공이 한번 들어가면 좋다.
3. 왜 publisher와 subscriber 2개를 선언했는가
- 제일 큰 이유는 역할 분리와 독립성을 보장하기 위해서이다.
- 역할의 분리
- 해당 객체의 역할을 확실히 구분함으로써 코드 작성 시 오류를 방지할 수 있다.
- 발행자와 구독자가 서로 독립적이기 때문에 시스템 설계가 간단하고, 확장이나 유지보수가 용이하다.
- 즉, 각자의 코드 변경이 영향을 주지 않는다.
- 비동기 작업 처리
- 발행자는 구독자가 메시지를 처리 중인지, 연결이 되어 있는지 신경 쓰지 않고 메시지만 보낸다.
- 구독자는 자신이 관심있는 체널에 구독한 후 메시지가 발행 되었을 때만 반응 한다.
4. 참조
1)공식 문서 : https://redis.io/docs/latest/develop/interact/pubsub/
Redis Pub/Sub
How to use pub/sub channels in Redis
redis.io
2) node.js에서 Redis PubSub 구현하기 : https://inpa.tistory.com/entry/REDIS-%F0%9F%93%9A-Nodejs-%EC%97%90%EC%84%9C-PubSub-%EA%B8%B0%EB%8A%A5-%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0