본문 바로가기
Dev/[프로젝트] 2022 Winter Dev Camp

2022 스마일게이트 윈터데브캠프를 통해 성장하기 4 - 구현 : 접속상태 서비스

by javapp 자바앱 2023. 3. 1.
728x90

 

 

개요

 

2022 스마일게이트 윈터데브캠프를 통해 성장하기 3 - 구현 : 채팅 서비스 (tistory.com)

 

앞 글에서 채팅 서비스를 구현했는데요!

이번에는 접속상태 서비스를 구현하여 제가 구상한 아키텍처를 참고하여 서비스들간 통신을 하려고 합니다.

 

Flow는 다음과 같습니다.

1. 애플리케이션에 접속시 자신의 접속 상태를 업데이트 시킵니다. (on & off)

2. 자신의 접속상태를 자신과 친구관계인 유저에게 자신이 접속했다는 정보를 소켓을 통해 전달합니다.

3. 채팅 메시지 전송시 채팅 서버에서 접속상태를 접속상태서버를 확인하고 경우에 따라 처리합니다.

    3.1. "online" 소켓을 통해 채팅메시지 전달

    3.2. "offline" 푸시서버에 채팅 메시지 전달

 

 

1. 접속상태 서비스

1.1. 웹소켓 메시지 전달을 위한 카프카 설정

접속상태 서비스에도 카프카 사용을 위해 설정을 따로 합니다.

 

채팅서비스에서는 Producer와 Consumer 설정을 클래스에서 설정했다면 이번에는

application.yml 파일에서 설정을 했습니다.

spring:
  application:
    name: presence-service
  profiles:
    default: dev
  kafka:
    consumer:
      group-id: presence-group
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: '*'

    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

kafka:
  topic:
    name: presence

 

토픽설정

KafkaTopicConfig

@Configuration
public class KafkaTopicConfig  {
    @Value("${kafka.topic.name}")
    private String preTopicName;

    @Bean
    public NewTopic getTopic(){
        return TopicBuilder.name(preTopicName).build();
    }
}

 

브로커로 전달되는 메시지 포맷

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class PresenceMessage {
    private String user_id;
    private String status;
    private List<String> friends;
}

 

1.2. 웹소켓 메시지 브로커 설정

@EnableWebSocketMessageBroker
@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/presence/user-sub");
        registry.setApplicationDestinationPrefixes("/presence/pub");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-presence")
                .setAllowedOrigins("*");
        registry.addEndpoint("/ws-presence")
                .setAllowedOrigins("*").withSockJS();
    }
}

 

1.3. 접속상태 메시지 전송

 

사용자에게 전달되는 메시지

@Schema(description = "사용자 접속 상태 정보")
@Getter
@Setter
@NoArgsConstructor
public class PresenceUserDto {
    @Schema(description = "자신 id")
    private String user_id;
    @Schema(description = "접속상태: online, offline")
    private String status;

    @Builder
    public PresenceUserDto(String user_id, String status) {
        this.user_id = user_id;
        this.status = status;
    }
}

 

컨트롤러

PresenceController

@Slf4j
@RestController
@RequestMapping("/presence")
@RequiredArgsConstructor
public class PresenceController {

    private final JwtTokenProvider jwtTokenProvider;
    private final PresenceService presenceService;
    private final FriendService friendService;
    private final Producer producer;

    private String getTokenToUserId(String jwt){
        return jwtTokenProvider.getUserInfo(jwtTokenProvider.removeBearer(jwt)).getUserId();
    }

    @Operation(summary = "접속상태가 online인 유저id리스트 조회", description = "상태가 online인 나의 친구 id 리스트 조회")
    @GetMapping("/v1/users")
    public ResponseEntity<ResponsePresenceUsers> presenceUsers(@RequestHeader("Authorization") String jwt){
        /**
         * 유저서버를 통해 친구리스트API 요청
          */
        List<String> friends = friendService.getMyFriends(jwt);
        if(friends.isEmpty()) return new ResponseEntity<>(new ResponsePresenceUsers(), HttpStatus.OK);
        return new ResponseEntity<>(presenceService.getOnlineUsersPresence(friends), HttpStatus.OK);
    }

    @Operation(summary = "접속상태가 offline인 유저id리스트 조회", description = "유저 리스트를 받아 offline인 유저 id 리스트 조회")
    @PostMapping("/v1/offline-users")
    public ResponseEntity<ResponsePresenceUsers> offlineUsers(@RequestBody RequestUsers requestUsers){
        return new ResponseEntity<>(presenceService.getOfflineUsersPresence(requestUsers.getMembers()), HttpStatus.OK);
    }

    @Operation(summary = "접속상태를 'online'으로 변경", description = "나의 접속상태를 'online'으로 변경하고 웹소켓으로 해당 정보 친구들에게 전달")
    @PutMapping("/v1/on")
    public ResponseEntity<String> sendPresenceOn(@RequestHeader("Authorization") String jwt){
        String userId = getTokenToUserId(jwt);
        PresenceUserDto presenceUserDto = presenceService.presenceOn(userId);

        List<String> friends = friendService.getMyFriends(jwt);

        PresenceMessage presenceMessage = PresenceMessage.builder()
                        .user_id(presenceUserDto.getUser_id())
                        .status(presenceUserDto.getStatus())
                        .friends(friends)
                        .build();

        producer.sendMessage(presenceMessage);

        return ResponseEntity.ok("success");
    }

    @Operation(summary = "접속상태를 'offline'으로 변경", description = "나의 접속상태를 'offline'으로 변경하고 웹소켓으로 해당 정보 친구들에게 전달")
    @PutMapping("/v1/off")
    public ResponseEntity<String> sendPresenceOff(@RequestHeader("Authorization") String jwt){
        String userId = getTokenToUserId(jwt);
        PresenceUserDto presenceUserDto = presenceService.presenceOff(userId);

        List<String> friends = friendService.getMyFriends(jwt);
        PresenceMessage presenceMessage = PresenceMessage.builder()
                .user_id(presenceUserDto.getUser_id())
                .status(presenceUserDto.getStatus())
                .friends(friends)
                .build();

        producer.sendMessage(presenceMessage);

        return ResponseEntity.ok("success");
    }
}
url summary
GET /presence/v1/users 접속상태가 online인 유저 id리스트 조회
POST /presence/v1/offline-users 접속상태가 offline인 유저 id리스트 조회
PUT /presence/v1/on 접속상태를 online으로 변경과 상태 전달
PUT /presence/v1/off 접속상태를 offline으로 변경과 상태 전달

 

 

1.3.1 접속상태변경과 전달 on - off

 

접속상태업데이트

 

@Operation(summary = "접속상태를 'online'으로 변경", description = "나의 접속상태를 'online'으로 변경하고 웹소켓으로 해당 정보 친구들에게 전달")
@PutMapping("/v1/on")
public ResponseEntity<String> sendPresenceOn(@RequestHeader("Authorization") String jwt){
    String userId = getTokenToUserId(jwt);
    PresenceUserDto presenceUserDto = presenceService.presenceOn(userId);
    ...
    
@Operation(summary = "접속상태를 'offline'으로 변경", description = "나의 접속상태를 'offline'으로 변경하고 웹소켓으로 해당 정보 친구들에게 전달")
@PutMapping("/v1/off")
public ResponseEntity<String> sendPresenceOff(@RequestHeader("Authorization") String jwt){
    String userId = getTokenToUserId(jwt);
    PresenceUserDto presenceUserDto = presenceService.presenceOff(userId);
    ...

 

PresenceService

public PresenceUserDto presenceOn(String userId) {
    return changePresence(userId, "online");
}

public PresenceUserDto presenceOff(String userId) {
    return changePresence(userId, "offline");
}

public PresenceUserDto changePresence(String userId, String status){
    PresenceCollection presenceCollection = presenceRepository.findByUserId(userId);
    if(presenceCollection == null){
        PresenceCollection saved = presenceRepository.save(PresenceCollection.builder()
                .userId(userId)
                .status(status)
                .lastActiveAt(LocalDateTime.now())
                .build()
        );
        return convertToDto(saved);
    }
    presenceCollection.setStatus(status);
    presenceCollection.setLastActiveAt(LocalDateTime.now());
    return convertToDto(presenceRepository.save(presenceCollection));
}

MongoDB에 상태정보 저장

 

 

접속상태 전달

PresenceController

sendPresenceOn sendPresenceOff

PresenceMessage presenceMessage = PresenceMessage.builder()
                .user_id(presenceUserDto.getUser_id())
                .status(presenceUserDto.getStatus())
                .friends(friends)
                .build();

producer.sendMessage(presenceMessage);

producer 의 sendMessage를 통해 카프카에 전달

 

Producer

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

@Slf4j
@Service
@RequiredArgsConstructor
public class Producer {

    @Value("${kafka.topic.name}")
    private String preTopicName;

    private final KafkaTemplate<String, PresenceMessage> kafkaTemplate;

    public void sendMessage(PresenceMessage presenceMessage){
        Message<PresenceMessage> message = MessageBuilder
                .withPayload(presenceMessage)
                .setHeader(KafkaHeaders.TOPIC, preTopicName)
                .build();
        ListenableFuture<SendResult<String, PresenceMessage>> send = kafkaTemplate.send(message);
        send.addCallback(new ListenableFutureCallback<SendResult<String, PresenceMessage>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("{}에게 전송 실패",presenceMessage.getUser_id());
            }

            @Override
            public void onSuccess(SendResult<String, PresenceMessage> result) {}
        });
    }
}

메세지 객체로 메시징하여 카프카로 전달

 

Consumer

@Slf4j
@Component
@RequiredArgsConstructor
public class Consumers {
    private final SimpMessagingTemplate template;

    @KafkaListener(groupId = "${spring.kafka.chat-consumer.group-id}" ,topics="${kafka.topic.chat-name}")
    public void listenChat(ChatMessageDto chatMessageDto){
        template.convertAndSend("/chatting/topic/room/"+chatMessageDto.getRoom_id(), chatMessageDto);
    }

    @KafkaListener(groupId = "${spring.kafka.room-consumer.group-id}",topics = "${kafka.topic.room-name}", containerFactory = "kafkaListenerContainerFactory")
    public void listenGroupCreation(RoomMessageDto roomMessageDto){
        RespRoomDto respRoomDto = roomMessageDto.getRespRoomDto();
        for(String userId : roomMessageDto.getReceivers()){
            template.convertAndSend("/chatting/topic/new-room/"+userId,respRoomDto);
        }
    }
}

for(String userId : presenceMessage.getFriends()) template.convertAndSend("/presence/user-sub/"+userId, presenceUserDto);

해당 메소드를 통해 STOMP로 자신이 접속했다는 정보를 소켓을 통해 전달된다.

 

 

1.3.2. 접속상태가 online인 유저 id리스트 조회

PresenceController

@Operation(summary = "접속상태가 online인 유저id리스트 조회", description = "상태가 online인 나의 친구 id 리스트 조회")
@GetMapping("/v1/users")
public ResponseEntity<ResponsePresenceUsers> presenceUsers(@RequestHeader("Authorization") String jwt){
    /**
     * 유저서버를 통해 친구리스트API 요청
      */
    List<String> friends = friendService.getMyFriends(jwt);
    if(friends.isEmpty()) return new ResponseEntity<>(new ResponsePresenceUsers(), HttpStatus.OK);
    return new ResponseEntity<>(presenceService.getOnlineUsersPresence(friends), HttpStatus.OK);
}

우선 요청한 유저의 친구 정보가 필요하다.

 

그래서 유저서비스에 요청하여 친구 정보를 받아보자

List<String> friends = friendService.getMyFriends(jwt);

 

FriendService

@Slf4j
@Service
@RequiredArgsConstructor
public class FriendService {
    private final UserProxy userProxy;

    public List<String> getMyFriends(String jwt){
        List<String> friends;
        try{
            ResponseDto responseDtos = userProxy.getMyFriends(jwt);
            friends = responseDtos.getData().stream().map(ResponseProfile::getUserId).collect(Collectors.toList());
        }catch (FeignException exception){
            log.error("getMyFriends, 유저서버와 연결되지 않음");
            return new ArrayList<>();
        }catch (NullPointerException exception){
            log.info("getMyFriends, 친구정보가 없음");
            return new ArrayList<>();
        }

        return friends;
    }
}

 

마이크로서비스 통신을 위해 프록시객체를 만들어 해당 객체에 Feign을 통해 요청

@FeignClient(name = "user-service", url = "localhost:8000")
public interface UserProxy {
    /**
     * 헤더에 토큰을 담아 자신의 친구 리스트를
     * 유저서버에 요청하여 값을 받는다.
     */
    @GetMapping("/user/v1/friend")
    ResponseDto getMyFriends(@RequestHeader("Authorization") String jwt);
}

☆ 만약 디스커버리 서버에 요청하고자 하는 서비스가 등록되어 있는 경우

등록된 서비스명을 통해 url을 하드코딩하지 않고도 요청 가능!

 

 

다시 컨트롤러 presenceUsers 메소드에서 가져온 친구 리스트를 통해 서비스 단에 조회메소드 호출

presenceService.getOnlineUsersPresence(friends)

 

PresenceService

public class PresenceService {

    private final PresenceRepository presenceRepository;

    public ResponsePresenceUsers getOnlineUsersPresence(List<String> users){
        List<PresenceCollection> presenceCollections = presenceRepository.findByUserIdInUsers(users);
        List<String> members = presenceCollections.stream().map(PresenceCollection::getUserId).collect(Collectors.toList());
        return new ResponsePresenceUsers(members);
    }

    public ResponsePresenceUsers getOfflineUsersPresence(List<String> users){
        List<PresenceCollection> presenceCollections = presenceRepository.findOfflineByUserIdInUsers(users);
        List<String> members = presenceCollections.stream().map(PresenceCollection::getUserId).collect(Collectors.toList());
        return new ResponsePresenceUsers(members);
    }
    ...

 

MongoDB 조회 메소드는 이후 포스팅을 통해 공개 예정

 

 

2. 채팅 서비스에서 요청

채팅 서비스에서 메시지를 받으면 상대방의 접속 여부에 따라 채팅메시지를 전송할지 푸시알림을 보낼지 결정을 합니다.

이 때 필요한 접속여부를 접속상태서비스를 통해 정보를 받습니다.

 

채팅서비스

@Operation(summary = "메시지 전송")
@PostMapping(value="/v1/message", consumes = "application/json",produces = "application/json")
public void sendMessage(@Valid @RequestBody ChatMessageDto chatMessageDto){
    if(!chatRoomService.existsRoom(chatMessageDto.getRoom_id())){
        throw new CustomAPIException(ErrorCode.ROOM_NOT_FOUND_ERROR, "채팅방이 없음-"+chatMessageDto.getRoom_id());
    }
    ChatMessageDto savedMessage = chatMessageService.saveChatMessage(chatMessageDto);
    producers.sendMessage(savedMessage);

    // 비동기 푸시 알림
    pushService.pushMessageToUsers(chatMessageDto);
}

pushService.pushMessageToUsers(chatMessageDto);

 

 

PushService

import feign.FeignException;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import smilegate.plop.chat.controller.PushProxy;

@Slf4j
@Service
@Async("push") // 비동기 추가
@RequiredArgsConstructor
public class PushService {
    private final PresenceService presenceService;
    private final PushProxy pushProxy;

    public void pushMessageToUsers(ChatMessageDto chatMessageDto){

        // 유저 접속상태 확인후 미접속일 때 알림서버에 요청
        ResponsePresenceUsers offlineUsers = presenceService.getOfflineUsers(chatMessageDto.getRoom_id());

        if(offlineUsers.getMembers().size() >= 1){
            try
            {
                pushProxy.sendNotification(RequestMessage.builder()
                        .title(chatMessageDto.getSender_id())
                        .body(chatMessageDto.getContent())
                        .roomId(chatMessageDto.getRoom_id())
                        .target(offlineUsers.getMembers())
                        .build());
            }catch (FeignException e){
                log.error("pushProxy.sendNotification, 푸시 서버 연결 안됨");
            }
            log.info("오프라인 유저에게 푸시알림 요청");
        }
    }
}

presenceService.getOfflineUsers(chatMessageDto.getRoom_id())

 

 

PresenceService

@Slf4j
@Service
@RequiredArgsConstructor
public class PresenceService {
    private final PresenceProxy presenceProxy;
    private final ChatRoomService chatRoomService;

    /**
     * 채팅방의 멤버들의 id 리스트를 가져와서
     * "접속상태 서버"를 통해 online 상태인 유저들을 조회(PresenceProxy)
     */
    public ResponsePresenceUsers getOfflineUsers(String roomId){
        RespRoomDto chatRoomInfo = chatRoomService.getChatRoomInfo(roomId);
        List<String> members = chatRoomInfo.getMembers().stream().map(Member::getUserId).collect(Collectors.toList());

        try{
            return presenceProxy.offlineUsers(new RequestUsers(members));
        }catch (Exception ex){ // ConnectException
            log.warn("ConnectException - 접속상태 서버 off: {}", ex.getMessage());
            return new ResponsePresenceUsers(new ArrayList<>());
        }
    }
}

프록시를 통해 접속상태 서비스에 요청

 

PresenceProxy

@FeignClient(name = "PRESENCE-SERVICE")
public interface PresenceProxy {
    @PostMapping("/presence/v1/offline-users")
    ResponsePresenceUsers offlineUsers(@RequestBody RequestUsers requestUsers);
}

디스커버리 서버에 등록을 했기때문에 등록된 서비스명만 name에 작성하면 요청 가능

 

 

ResponsePresenceUsers

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ResponsePresenceUsers {
    private List<String> members;
}

 

 

이렇게 해서 접속상태 서비스를 구현해보았습니다.

더 디테일한 것도 있지만 생략하였습니다.

 

기존에는 redis에 정보를 넣기로 했지만 이미 채팅 서비스를 구현하며 MongoDB를 사용하고 있었기 때문에 DB를 하나 더 두지 않고 MongoDB에 통합하여 사용하기로 하였습니다.

Redis 를 사용하여 속도에 좀 더 이점을 가져올 수 있을 것 같습니다.

 

 

댓글