개요
2022 스마일게이트 윈터데브캠프를 통해 성장하기 3 - 구현 : 채팅 서비스 (tistory.com)
앞 글에서 채팅 서비스를 구현했는데요!
이번에는 접속상태 서비스를 구현하여 제가 구상한 아키텍처를 참고하여 서비스들간 통신을 하려고 합니다.
Flow는 다음과 같습니다.
1. 애플리케이션에 접속시 자신의 접속 상태를 업데이트 시킵니다. (on & off)
2. 자신의 접속상태를 자신과 친구관계인 유저에게 자신이 접속했다는 정보를 소켓을 통해 전달합니다.
3. 채팅 메시지 전송시 채팅 서버에서 접속상태를 접속상태서버를 확인하고 경우에 따라 처리합니다.
3.1. "online" 소켓을 통해 채팅메시지 전달
3.2. "offline" 푸시서버에 채팅 메시지 전달
1. 접속상태 서비스
1.1. 웹소켓 메시지 전달을 위한 카프카 설정
접속상태 서비스에도 카프카 사용을 위해 설정을 따로 합니다.
채팅서비스에서는 Producer와 Consumer 설정을 클래스에서 설정했다면 이번에는
application.yml 파일에서 설정을 했습니다.
spring:
application:
name: presence-service
profiles:
default: dev
kafka:
consumer:
group-id: presence-group
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: '*'
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
kafka:
topic:
name: presence
토픽설정
KafkaTopicConfig
@Configuration
public class KafkaTopicConfig {
@Value("${kafka.topic.name}")
private String preTopicName;
@Bean
public NewTopic getTopic(){
return TopicBuilder.name(preTopicName).build();
}
}
브로커로 전달되는 메시지 포맷
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class PresenceMessage {
private String user_id;
private String status;
private List<String> friends;
}
1.2. 웹소켓 메시지 브로커 설정
@EnableWebSocketMessageBroker
@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/presence/user-sub");
registry.setApplicationDestinationPrefixes("/presence/pub");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-presence")
.setAllowedOrigins("*");
registry.addEndpoint("/ws-presence")
.setAllowedOrigins("*").withSockJS();
}
}
1.3. 접속상태 메시지 전송
사용자에게 전달되는 메시지
@Schema(description = "사용자 접속 상태 정보")
@Getter
@Setter
@NoArgsConstructor
public class PresenceUserDto {
@Schema(description = "자신 id")
private String user_id;
@Schema(description = "접속상태: online, offline")
private String status;
@Builder
public PresenceUserDto(String user_id, String status) {
this.user_id = user_id;
this.status = status;
}
}
컨트롤러
PresenceController
@Slf4j
@RestController
@RequestMapping("/presence")
@RequiredArgsConstructor
public class PresenceController {
private final JwtTokenProvider jwtTokenProvider;
private final PresenceService presenceService;
private final FriendService friendService;
private final Producer producer;
private String getTokenToUserId(String jwt){
return jwtTokenProvider.getUserInfo(jwtTokenProvider.removeBearer(jwt)).getUserId();
}
@Operation(summary = "접속상태가 online인 유저id리스트 조회", description = "상태가 online인 나의 친구 id 리스트 조회")
@GetMapping("/v1/users")
public ResponseEntity<ResponsePresenceUsers> presenceUsers(@RequestHeader("Authorization") String jwt){
/**
* 유저서버를 통해 친구리스트API 요청
*/
List<String> friends = friendService.getMyFriends(jwt);
if(friends.isEmpty()) return new ResponseEntity<>(new ResponsePresenceUsers(), HttpStatus.OK);
return new ResponseEntity<>(presenceService.getOnlineUsersPresence(friends), HttpStatus.OK);
}
@Operation(summary = "접속상태가 offline인 유저id리스트 조회", description = "유저 리스트를 받아 offline인 유저 id 리스트 조회")
@PostMapping("/v1/offline-users")
public ResponseEntity<ResponsePresenceUsers> offlineUsers(@RequestBody RequestUsers requestUsers){
return new ResponseEntity<>(presenceService.getOfflineUsersPresence(requestUsers.getMembers()), HttpStatus.OK);
}
@Operation(summary = "접속상태를 'online'으로 변경", description = "나의 접속상태를 'online'으로 변경하고 웹소켓으로 해당 정보 친구들에게 전달")
@PutMapping("/v1/on")
public ResponseEntity<String> sendPresenceOn(@RequestHeader("Authorization") String jwt){
String userId = getTokenToUserId(jwt);
PresenceUserDto presenceUserDto = presenceService.presenceOn(userId);
List<String> friends = friendService.getMyFriends(jwt);
PresenceMessage presenceMessage = PresenceMessage.builder()
.user_id(presenceUserDto.getUser_id())
.status(presenceUserDto.getStatus())
.friends(friends)
.build();
producer.sendMessage(presenceMessage);
return ResponseEntity.ok("success");
}
@Operation(summary = "접속상태를 'offline'으로 변경", description = "나의 접속상태를 'offline'으로 변경하고 웹소켓으로 해당 정보 친구들에게 전달")
@PutMapping("/v1/off")
public ResponseEntity<String> sendPresenceOff(@RequestHeader("Authorization") String jwt){
String userId = getTokenToUserId(jwt);
PresenceUserDto presenceUserDto = presenceService.presenceOff(userId);
List<String> friends = friendService.getMyFriends(jwt);
PresenceMessage presenceMessage = PresenceMessage.builder()
.user_id(presenceUserDto.getUser_id())
.status(presenceUserDto.getStatus())
.friends(friends)
.build();
producer.sendMessage(presenceMessage);
return ResponseEntity.ok("success");
}
}
url | summary |
GET /presence/v1/users | 접속상태가 online인 유저 id리스트 조회 |
POST /presence/v1/offline-users | 접속상태가 offline인 유저 id리스트 조회 |
PUT /presence/v1/on | 접속상태를 online으로 변경과 상태 전달 |
PUT /presence/v1/off | 접속상태를 offline으로 변경과 상태 전달 |
1.3.1 접속상태변경과 전달 on - off
접속상태업데이트
@Operation(summary = "접속상태를 'online'으로 변경", description = "나의 접속상태를 'online'으로 변경하고 웹소켓으로 해당 정보 친구들에게 전달")
@PutMapping("/v1/on")
public ResponseEntity<String> sendPresenceOn(@RequestHeader("Authorization") String jwt){
String userId = getTokenToUserId(jwt);
PresenceUserDto presenceUserDto = presenceService.presenceOn(userId);
...
@Operation(summary = "접속상태를 'offline'으로 변경", description = "나의 접속상태를 'offline'으로 변경하고 웹소켓으로 해당 정보 친구들에게 전달")
@PutMapping("/v1/off")
public ResponseEntity<String> sendPresenceOff(@RequestHeader("Authorization") String jwt){
String userId = getTokenToUserId(jwt);
PresenceUserDto presenceUserDto = presenceService.presenceOff(userId);
...
PresenceService
public PresenceUserDto presenceOn(String userId) {
return changePresence(userId, "online");
}
public PresenceUserDto presenceOff(String userId) {
return changePresence(userId, "offline");
}
public PresenceUserDto changePresence(String userId, String status){
PresenceCollection presenceCollection = presenceRepository.findByUserId(userId);
if(presenceCollection == null){
PresenceCollection saved = presenceRepository.save(PresenceCollection.builder()
.userId(userId)
.status(status)
.lastActiveAt(LocalDateTime.now())
.build()
);
return convertToDto(saved);
}
presenceCollection.setStatus(status);
presenceCollection.setLastActiveAt(LocalDateTime.now());
return convertToDto(presenceRepository.save(presenceCollection));
}
MongoDB에 상태정보 저장
접속상태 전달
PresenceController
sendPresenceOn sendPresenceOff
PresenceMessage presenceMessage = PresenceMessage.builder()
.user_id(presenceUserDto.getUser_id())
.status(presenceUserDto.getStatus())
.friends(friends)
.build();
producer.sendMessage(presenceMessage);
producer 의 sendMessage를 통해 카프카에 전달
Producer
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@Slf4j
@Service
@RequiredArgsConstructor
public class Producer {
@Value("${kafka.topic.name}")
private String preTopicName;
private final KafkaTemplate<String, PresenceMessage> kafkaTemplate;
public void sendMessage(PresenceMessage presenceMessage){
Message<PresenceMessage> message = MessageBuilder
.withPayload(presenceMessage)
.setHeader(KafkaHeaders.TOPIC, preTopicName)
.build();
ListenableFuture<SendResult<String, PresenceMessage>> send = kafkaTemplate.send(message);
send.addCallback(new ListenableFutureCallback<SendResult<String, PresenceMessage>>() {
@Override
public void onFailure(Throwable ex) {
log.info("{}에게 전송 실패",presenceMessage.getUser_id());
}
@Override
public void onSuccess(SendResult<String, PresenceMessage> result) {}
});
}
}
메세지 객체로 메시징하여 카프카로 전달
Consumer
@Slf4j
@Component
@RequiredArgsConstructor
public class Consumers {
private final SimpMessagingTemplate template;
@KafkaListener(groupId = "${spring.kafka.chat-consumer.group-id}" ,topics="${kafka.topic.chat-name}")
public void listenChat(ChatMessageDto chatMessageDto){
template.convertAndSend("/chatting/topic/room/"+chatMessageDto.getRoom_id(), chatMessageDto);
}
@KafkaListener(groupId = "${spring.kafka.room-consumer.group-id}",topics = "${kafka.topic.room-name}", containerFactory = "kafkaListenerContainerFactory")
public void listenGroupCreation(RoomMessageDto roomMessageDto){
RespRoomDto respRoomDto = roomMessageDto.getRespRoomDto();
for(String userId : roomMessageDto.getReceivers()){
template.convertAndSend("/chatting/topic/new-room/"+userId,respRoomDto);
}
}
}
for(String userId : presenceMessage.getFriends()) template.convertAndSend("/presence/user-sub/"+userId, presenceUserDto);
해당 메소드를 통해 STOMP로 자신이 접속했다는 정보를 소켓을 통해 전달된다.
1.3.2. 접속상태가 online인 유저 id리스트 조회
PresenceController
@Operation(summary = "접속상태가 online인 유저id리스트 조회", description = "상태가 online인 나의 친구 id 리스트 조회")
@GetMapping("/v1/users")
public ResponseEntity<ResponsePresenceUsers> presenceUsers(@RequestHeader("Authorization") String jwt){
/**
* 유저서버를 통해 친구리스트API 요청
*/
List<String> friends = friendService.getMyFriends(jwt);
if(friends.isEmpty()) return new ResponseEntity<>(new ResponsePresenceUsers(), HttpStatus.OK);
return new ResponseEntity<>(presenceService.getOnlineUsersPresence(friends), HttpStatus.OK);
}
우선 요청한 유저의 친구 정보가 필요하다.
그래서 유저서비스에 요청하여 친구 정보를 받아보자
List<String> friends = friendService.getMyFriends(jwt);
FriendService
@Slf4j
@Service
@RequiredArgsConstructor
public class FriendService {
private final UserProxy userProxy;
public List<String> getMyFriends(String jwt){
List<String> friends;
try{
ResponseDto responseDtos = userProxy.getMyFriends(jwt);
friends = responseDtos.getData().stream().map(ResponseProfile::getUserId).collect(Collectors.toList());
}catch (FeignException exception){
log.error("getMyFriends, 유저서버와 연결되지 않음");
return new ArrayList<>();
}catch (NullPointerException exception){
log.info("getMyFriends, 친구정보가 없음");
return new ArrayList<>();
}
return friends;
}
}
마이크로서비스 통신을 위해 프록시객체를 만들어 해당 객체에 Feign을 통해 요청
@FeignClient(name = "user-service", url = "localhost:8000")
public interface UserProxy {
/**
* 헤더에 토큰을 담아 자신의 친구 리스트를
* 유저서버에 요청하여 값을 받는다.
*/
@GetMapping("/user/v1/friend")
ResponseDto getMyFriends(@RequestHeader("Authorization") String jwt);
}
☆ 만약 디스커버리 서버에 요청하고자 하는 서비스가 등록되어 있는 경우
등록된 서비스명을 통해 url을 하드코딩하지 않고도 요청 가능!
다시 컨트롤러 presenceUsers 메소드에서 가져온 친구 리스트를 통해 서비스 단에 조회메소드 호출
presenceService.getOnlineUsersPresence(friends)
PresenceService
public class PresenceService {
private final PresenceRepository presenceRepository;
public ResponsePresenceUsers getOnlineUsersPresence(List<String> users){
List<PresenceCollection> presenceCollections = presenceRepository.findByUserIdInUsers(users);
List<String> members = presenceCollections.stream().map(PresenceCollection::getUserId).collect(Collectors.toList());
return new ResponsePresenceUsers(members);
}
public ResponsePresenceUsers getOfflineUsersPresence(List<String> users){
List<PresenceCollection> presenceCollections = presenceRepository.findOfflineByUserIdInUsers(users);
List<String> members = presenceCollections.stream().map(PresenceCollection::getUserId).collect(Collectors.toList());
return new ResponsePresenceUsers(members);
}
...
MongoDB 조회 메소드는 이후 포스팅을 통해 공개 예정
2. 채팅 서비스에서 요청
채팅 서비스에서 메시지를 받으면 상대방의 접속 여부에 따라 채팅메시지를 전송할지 푸시알림을 보낼지 결정을 합니다.
이 때 필요한 접속여부를 접속상태서비스를 통해 정보를 받습니다.
채팅서비스
@Operation(summary = "메시지 전송")
@PostMapping(value="/v1/message", consumes = "application/json",produces = "application/json")
public void sendMessage(@Valid @RequestBody ChatMessageDto chatMessageDto){
if(!chatRoomService.existsRoom(chatMessageDto.getRoom_id())){
throw new CustomAPIException(ErrorCode.ROOM_NOT_FOUND_ERROR, "채팅방이 없음-"+chatMessageDto.getRoom_id());
}
ChatMessageDto savedMessage = chatMessageService.saveChatMessage(chatMessageDto);
producers.sendMessage(savedMessage);
// 비동기 푸시 알림
pushService.pushMessageToUsers(chatMessageDto);
}
pushService.pushMessageToUsers(chatMessageDto);
PushService
import feign.FeignException;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import smilegate.plop.chat.controller.PushProxy;
@Slf4j
@Service
@Async("push") // 비동기 추가
@RequiredArgsConstructor
public class PushService {
private final PresenceService presenceService;
private final PushProxy pushProxy;
public void pushMessageToUsers(ChatMessageDto chatMessageDto){
// 유저 접속상태 확인후 미접속일 때 알림서버에 요청
ResponsePresenceUsers offlineUsers = presenceService.getOfflineUsers(chatMessageDto.getRoom_id());
if(offlineUsers.getMembers().size() >= 1){
try
{
pushProxy.sendNotification(RequestMessage.builder()
.title(chatMessageDto.getSender_id())
.body(chatMessageDto.getContent())
.roomId(chatMessageDto.getRoom_id())
.target(offlineUsers.getMembers())
.build());
}catch (FeignException e){
log.error("pushProxy.sendNotification, 푸시 서버 연결 안됨");
}
log.info("오프라인 유저에게 푸시알림 요청");
}
}
}
presenceService.getOfflineUsers(chatMessageDto.getRoom_id())
PresenceService
@Slf4j
@Service
@RequiredArgsConstructor
public class PresenceService {
private final PresenceProxy presenceProxy;
private final ChatRoomService chatRoomService;
/**
* 채팅방의 멤버들의 id 리스트를 가져와서
* "접속상태 서버"를 통해 online 상태인 유저들을 조회(PresenceProxy)
*/
public ResponsePresenceUsers getOfflineUsers(String roomId){
RespRoomDto chatRoomInfo = chatRoomService.getChatRoomInfo(roomId);
List<String> members = chatRoomInfo.getMembers().stream().map(Member::getUserId).collect(Collectors.toList());
try{
return presenceProxy.offlineUsers(new RequestUsers(members));
}catch (Exception ex){ // ConnectException
log.warn("ConnectException - 접속상태 서버 off: {}", ex.getMessage());
return new ResponsePresenceUsers(new ArrayList<>());
}
}
}
프록시를 통해 접속상태 서비스에 요청
PresenceProxy
@FeignClient(name = "PRESENCE-SERVICE")
public interface PresenceProxy {
@PostMapping("/presence/v1/offline-users")
ResponsePresenceUsers offlineUsers(@RequestBody RequestUsers requestUsers);
}
디스커버리 서버에 등록을 했기때문에 등록된 서비스명만 name에 작성하면 요청 가능
ResponsePresenceUsers
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ResponsePresenceUsers {
private List<String> members;
}
이렇게 해서 접속상태 서비스를 구현해보았습니다.
더 디테일한 것도 있지만 생략하였습니다.
기존에는 redis에 정보를 넣기로 했지만 이미 채팅 서비스를 구현하며 MongoDB를 사용하고 있었기 때문에 DB를 하나 더 두지 않고 MongoDB에 통합하여 사용하기로 하였습니다.
Redis 를 사용하여 속도에 좀 더 이점을 가져올 수 있을 것 같습니다.
'Dev > [프로젝트] 2022 Winter Dev Camp' 카테고리의 다른 글
2022 스마일게이트 윈터데브캠프를 통해 성장하기 6 - 프로젝트 배포 (0) | 2023.03.04 |
---|---|
2022 스마일게이트 윈터데브캠프를 통해 성장하기 5 - MongoDB 쿼리 자유롭게 사용하기 (0) | 2023.03.03 |
2022 스마일게이트 윈터데브캠프를 통해 성장하기 3 - 구현 : 채팅 서비스 (0) | 2023.02.28 |
2022 스마일게이트 윈터데브캠프를 통해 성장하기 2 - 프로젝트 설계와 마음가짐 (0) | 2023.02.26 |
2022 스마일게이트 윈터데브캠프를 통해 성장하기 1 - 좋은 목표는 좋은 실행에서 나온다. (0) | 2023.02.25 |
댓글