Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#114] 실시간 알림 기능 리팩토링 #115

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import io.driver.codrive.global.constants.APIConstants;
import io.driver.codrive.global.model.BaseResponse;
import io.driver.codrive.modules.notification.model.dto.NotificationEventDto;
import io.driver.codrive.modules.notification.model.request.NotificationReadRequest;
import io.driver.codrive.modules.notification.model.response.NotificationListResponse;
import io.driver.codrive.modules.notification.service.NotificationService;
Expand All @@ -18,7 +17,6 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import reactor.core.publisher.Flux;

@Tag(name = "Notification API", description = "알림 관련 API")
@RestController
Expand All @@ -30,7 +28,7 @@ public class NotificationController {
summary = "알림 스트림 등록"
)
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<NotificationEventDto>> registerUser() {
public SseEmitter registerUser() {
return notificationService.registerUser();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.driver.codrive.modules.notification.service;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.http.codec.ServerSentEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import io.driver.codrive.global.exception.InternalServerErrorApplicationException;
import io.driver.codrive.global.exception.NotFoundApplicationException;
Expand All @@ -22,69 +23,86 @@
import io.driver.codrive.modules.user.service.UserService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
private static final Long DEFAULT_TIMEOUT = 120L * 1000 * 60;
private static final String DEFAULT_MESSAGE = "message";
private final NotificationRepository notificationRepository;
private final UserService userService;
private final Map<Long, Sinks.Many<ServerSentEvent<NotificationEventDto>>> userNotificationSinks = new ConcurrentHashMap<>();
private final Map<Long, SseEmitter> userNotificationEmitters = new ConcurrentHashMap<>();

@Transactional
public Flux<ServerSentEvent<NotificationEventDto>> registerUser() {
public SseEmitter registerUser() {
Long userId = AuthUtils.getCurrentUserId();
User user = userService.getUserById(userId);
Notification notification = Notification.create(user, null, NotificationType.CONNECT_START,
String.valueOf(userId));
return userNotificationSinks.computeIfAbsent(userId, id -> Sinks.many().multicast().onBackpressureBuffer())
.asFlux()
.doOnSubscribe(subscription -> {
userNotificationSinks.get(userId).tryEmitNext(createServerSentEvent(notification));
log.info("User [{}]의 알림 스트림을 시작합니다.", userId);
});

SseEmitter emitter = createSseEmitter(userId);
sendNotification(userId, notification);
log.info("User [{}]의 알림 스트림을 시작합니다.", userId);
return emitter;
}

@Async
public void sendNotification(User user, Long dataId, NotificationType type, String... args) {
public void saveAndSendNotification(User user, Long dataId, NotificationType type, String... args) {
Notification notification = createNotification(user, dataId, type, args);
Long userId = user.getUserId();
if (userNotificationSinks.containsKey(userId)) {
userNotificationSinks.get(userId).tryEmitNext(createServerSentEvent(notification));
} else {
log.warn("User [{}]의 알림 스트림이 존재하지 않습니다.", userId);
}
sendNotification(user.getUserId(), notification);
}

public void sendFollowNotification(User user, Long dataId, NotificationType type, String... args) {
if (!notificationRepository.existsByUserAndDataIdAndNotificationType(user, dataId, type)) {
sendNotification(user, dataId, type, args);
saveAndSendNotification(user, dataId, type, args);
}
}

public void sendNotification(Long userId, Notification notification) {
SseEmitter emitter = userNotificationEmitters.get(userId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.name(DEFAULT_MESSAGE)
.id(String.valueOf(notification.getNotificationId()))
.data(new NotificationEventDto(notification))
.comment(notification.getContent()));
} catch (IOException e) {
log.warn("User [{}]에게 알림을 전송하지 못했습니다. 연결을 종료합니다.", userId);
emitter.complete();
}
} else {
log.warn("User [{}]의 알림 스트림이 존재하지 않습니다.", userId);
}
}

private SseEmitter createSseEmitter(Long userId) {
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
userNotificationEmitters.put(userId, emitter);

emitter.onCompletion(() -> {
log.info("User [{}]의 알림 스트림이 종료되었습니다.", userId);
userNotificationEmitters.remove(userId);
});

emitter.onTimeout(() -> {
log.info("User [{}]의 알림 스트림이 타임아웃되었습니다.", userId);
emitter.complete();
});

return emitter;
}

protected Notification createNotification(User user, Long dataId, NotificationType type, String... args) {
Notification notification = Notification.create(user, dataId, type, args);
return notificationRepository.save(notification);
}

private ServerSentEvent<NotificationEventDto> createServerSentEvent(Notification notification) {
return ServerSentEvent.<NotificationEventDto>builder()
.event("message")
.data(new NotificationEventDto(notification))
.id(String.valueOf(notification.getNotificationId()))
.comment(notification.getContent())
.build();
}

public void unregisterUser() {
Long userId = AuthUtils.getCurrentUserId();
if (userNotificationSinks.containsKey(userId)) {
userNotificationSinks.get(userId).tryEmitComplete();
userNotificationSinks.remove(userId);
log.info("User [{}]의 알림 스트림이 종료되었습니다.", userId);
if (userNotificationEmitters.containsKey(userId)) {
userNotificationEmitters.get(userId).complete();
} else {
throw new InternalServerErrorApplicationException("알림 스트림이 존재하지 않습니다.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void changeRoomStatus(Long roomId, String status) {
room.changeRoomStatus(roomStatus);

if (roomStatus == RoomStatus.INACTIVE) {
room.getMembers().forEach(member -> notificationService.sendNotification(member, roomId,
room.getMembers().forEach(member -> notificationService.saveAndSendNotification(member, roomId,
NotificationType.ROOM_STATUS_INACTIVE, MessageUtils.changeNameFormat(room.getTitle(),
NotificationType.ROOM_STATUS_INACTIVE.getLength())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void joinPrivateRoom(Long roomId, PasswordRequest request) {
RoomRequest roomRequest = RoomRequest.toPrivateRoomRequest(room, user);
saveRoomRequest(roomRequest, room);
roomUserMappingService.createRoomUserMapping(room, user);
notificationService.sendNotification(room.getOwner(), room.getRoomId(), NotificationType.CREATED_PRIVATE_ROOM_JOIN,
notificationService.saveAndSendNotification(room.getOwner(), room.getRoomId(), NotificationType.CREATED_PRIVATE_ROOM_JOIN,
MessageUtils.changeNameFormat(room.getTitle(), NotificationType.CREATED_PRIVATE_ROOM_JOIN.getLength()),
MessageUtils.changeNameFormat(user.getNickname(), NotificationType.CREATED_PRIVATE_ROOM_JOIN.getLength()));
}
Expand All @@ -83,9 +83,9 @@ public void joinPublicRoom(Long roomId) {
}
saveRoomRequest(roomRequest, room);

notificationService.sendNotification(room.getOwner(), room.getRoomId(), NotificationType.CREATED_PUBLIC_ROOM_REQUEST,
notificationService.saveAndSendNotification(room.getOwner(), room.getRoomId(), NotificationType.CREATED_PUBLIC_ROOM_REQUEST,
MessageUtils.changeNameFormat(room.getTitle(), NotificationType.CREATED_PUBLIC_ROOM_REQUEST.getLength()));
notificationService.sendNotification(user, room.getRoomId(), NotificationType.PUBLIC_ROOM_REQUEST,
notificationService.saveAndSendNotification(user, room.getRoomId(), NotificationType.PUBLIC_ROOM_REQUEST,
MessageUtils.changeNameFormat(room.getTitle(), NotificationType.PUBLIC_ROOM_REQUEST.getLength()));
}

Expand Down Expand Up @@ -141,7 +141,7 @@ public void approveRequest(Long roomId, Long roomRequestId) {
roomRequest.changeRoomRequestStatus(UserRequestStatus.JOINED);
roomUserMappingService.createRoomUserMapping(room, roomRequest.getUser());

notificationService.sendNotification(roomRequest.getUser(), room.getRoomId(),
notificationService.saveAndSendNotification(roomRequest.getUser(), room.getRoomId(),
NotificationType.PUBLIC_ROOM_APPROVE, MessageUtils.changeNameFormat(room.getTitle(), NotificationType.PUBLIC_ROOM_APPROVE.getLength()));
}

Expand Down