본문 바로가기
카테고리 없음

[SSE] nest에서 SSE 알림을 주는 방법

by hbIncoding 2024. 11. 14.

0. 예시 설명

  • /api/mail/check/1 로 SSE 알림을 받을 수 있도록 등록한다.
  • /api/mail/call/1 로 알림을 주도록 한다.

 

1. express의 Response를 이용해서 만들어 보자!

 

// 컨트롤러 코드

@Controller('api/mail')
export class MailController {
  constructor(
    private readonly mailService: MailService,
    private readonly eventEmitter: EventEmitter2
  ) {}

  @Get('check/:memberId')
  @ApiOperation({ summary: '알림 연결 요청 API' })
  @ApiResponse({
    status: 200,
    description: 'Alarm response example',
    type: MailCheckResponseDto
  })
  initialCheckAndConnectSse(@Param('memberId') memberId: string, @Res() res: Response) {
    this.mailService.checkMailAndConnectSse(res, memberId);
  }

  @Get('call/:memberId')
  @ApiOperation({ summary: '알림 발생 요청 API' })
  @ApiResponse({
    status: 200,
    description: 'Alarm response example',
    type: MailCheckResponseDto
  })
  triggerAlarm(@Param('memberId') memberId: string) {
    this.eventEmitter.emit('sendAlarm', memberId);
  }
}

 

// 서비스 코드

@Injectable()
export class MailService {
  constructor(private readonly databaseService: DatabaseService) {}
  private connectedClients: Map<string, Response> = new Map();

  async checkMailAndConnectSse(res: Response, memberId: string): Promise<void> {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');

    const checkUnread = await this.databaseService.query(mailQueries.checkUnreadQuery, [memberId]);
    const data = {
      check: checkUnread,
      time: new Date()
    };
    const body = successhandler(successMessage.GET_MAIL_ALARM_SUCCESS, data);
    res.write(JSON.stringify(body));
    res.write('\r\n');

    this.connectedClients.set(memberId, res);

    res.on('close', () => {
      this.connectedClients.delete(memberId);
      res.end();
    });
  }

  @OnEvent('sendAlarm')
  handleAlarmEvent(memberId: string): void {
    const clientRes = this.connectedClients.get(memberId);

    if (clientRes) {
      const data = {
        check: true,
        time: new Date()
      };
      const body = successhandler(successMessage.GET_MAIL_ALARM_SUCCESS, data);
      clientRes.write(JSON.stringify(body));
      clientRes.write('\r\n');
    } else {
      console.log('없다닌까요!');
    }
  }
}

 

  • 위와 같이 Express의 Response를 이용하면은 접속의 끊김 처리와 알림을 보다 쉽게 처리할 수 있다.
  • 그리고 결과는 아래와 같다.
{"code":200,"message":"Catch alarm!.","data":{"check":true,"time":"2024-11-14T08:52:23.463Z"}}
{"code":200,"message":"Catch alarm!.","data":{"check":true,"time":"2024-11-14T08:52:40.776Z"}}
{"code":200,"message":"Catch alarm!.","data":{"check":true,"time":"2024-11-14T08:52:42.583Z"}}
  • 제일 첫번째 줄은 첫 연결시 보낸 데이터고, 이후는 /call 요청을 통해 생성한 것이다.
  • 위와 같이 chrome에서 바로 get요청으로 연결했을 때는 chrome 브라우저에 데이터가 띄워진다.
  • 하지만 postMan으로 연결을 시도했을 때는 body에 보여주면서 아무것도 알 수 없었다.

 

 

 

2. rxjs의 Observable을 이용해서 만들어 보자!

  • 코드를 controller 단에 몰아 넣었다.
import { Controller, Get, Param, Sse } from '@nestjs/common';
import { Observable, Subject, map } from 'rxjs';

@Controller('sse')
export class SseController {
  private sseSubject = new Subject<string>();

  @Sse('check')
  checkSse(): Observable<{ data: string }> {
    return this.sseSubject
      .asObservable()
      .pipe(map((message) => ({ data: message })));
  }

  @Get('call/')
  sendAlarm() {
    this.sseSubject.next('send Alarm Ok');
    return { message: '알림이 전송되었습니다.' };
  }
}

 

  • 코드가 비교적 간단해 졌다.
id: 1
data: send Alarm Ok

id: 2
data: send Alarm Ok

id: 3
data: send Alarm Ok

 

  • 크롬을 이용하면 위와 같이 나온다.

  • 포스트맨을 이용했을 때도 위와 같이 알림을 확인할 수 있다.
  • 단점은 첫 SSE 연결시에 연결만 시켜줄 뿐 데이터를 주기 어렵다.
    • 대대적인 코드 수정이 필요하며 아래와 같이 코드를 수정해도 first Ok는 전달 되지 않았다.
import { Controller, Get, Param, Sse } from '@nestjs/common';
import { Observable, Subject, map } from 'rxjs';

@Controller('sse')
export class SseController {
  private sseSubject = new Subject<string>();

  @Sse('check')
  checkSse(): Observable<{ data: string }> {
    this.sseSubject.next('first OK');
    return this.sseSubject
      .asObservable()
      .pipe(map((message) => ({ data: message })));
  }

  @Get('call/')
  sendAlarm() {
    this.sseSubject.next('send Alarm Ok');
    return { message: '알림이 전송되었습니다.' };
  }
}

2-1. 개선 방법

const body = "테스트하는 스트링 예시"
const newSubject = new BehaviorSubject<string>(body);

return userSubject.asObservable().pipe(map(message => ({ data: message })));

 

  • 앞선 Subject를 이용한 것과 달리 BehaviorSubject를 이용하면 SSE 연결 전에 담겨있던 데이터를 바로 전송해주게 된다.

3. 그외의 방법들

  • @nestjs/platform-fastify를 활용하여 Fastify 기반 SSE 구현
    • Express와 방식은 유사하다. 직접 헤더를 작성해줘야하고, socket에 데이터를 보내는 것처럼 하나하나 다 작성해서 보내줘야한다. 
    • 높은 성능과 낮은 리소스 소비가 장점이여서 대규모 트래픽을 다룰 때는 효과적이다.
    • 하지만 Express 보다 설정이 조금 복잡하다.
  • 이 외에도 여러 방법이 있지만 대체로 위 2가지와 비슷하며 유사한 모듈을 사용하게 된다.

 

4. Express와 RxJS의 차이점

특성 Express(response.write) RxJS(Subject)
클라이언트 관리 연결 상태를 수동으로 관리해야 함 클라이언트 구독을 통해 자동 관리 가능
확장성 멀티클라이언트 처리가 번거롭고 상태 관리가 복잡 멀티클라이언트를 쉽게 처리 가능
데이터 흐름 처리 데이터 흐름을 수동으로 제어해야 함 RxJS 연산자를 활용한 데이터 흐름 제어 가능
의존성 별도의 라이브러리가 필요하지 않음 RxJS 의존성이 추가됨
복잡성 간단한 이벤트 전송에 적합 복잡한 데이터 처리와 확장에 적합

 

  • response.write 방식
    • 클라이언트로 데이터를 스트림 형태로 전송한다.
      • 이 방식은 HTTP 응답의 본문(body) 대신, 지속적으로 데이터를 보내기 위한 스트림 처리를 한다.
  • Subject 방식
    • 서버에서 Observable 형태로 데이터를 처리하여 메시지 형태로 변환한 뒤 전송한다.
    • @Sse 데코레이터는 데이터를 이벤트 스트림 형식으로 변환하되, JSON 데이터를 명확하게 포맷팅하여 클라이언트로 보낸다.