728x90
Kafka 라이브러리 추가
build.gradle에 추가
implementation 'org.springframework.kafka:spring-kafka'
Kafka 설정
application.yml 설정
데이터를 주고받을 때는 같은 데이터 타입으로 주고받아야 한다.
string으로 보냈으면 string으로, JSON으로 보냈으면 JSON으로
bootstrap-servers에는 카프카 서버 IP 주소를 적어준다.
# String으로 받기
spring:
kafka:
bootstrap-servers: 10.10.10.123:9092
consumer:
group-id: psy-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# JSON일 때는
# org.springframework.kafka.support.serializer.JsonDeserializer
# org.springframework.kafka.support.serializer.JsonSerializer
실시간 채팅 구현 (WebSocket, Kafka)
웹 소켓과 카프카를 연동하여 실시간 채팅을 구현한다.
카프카로 객체를 보낼 수 있게 의존성을 주입하고,
카프카로부터 메시지를 받는 기능을 만든다.
클라이언트가 웹 소켓에 연결하면 sessions 리스트에 저장한다.
클라이언트가 메시지를 보내면 카프카 'chat' 토픽으로 전달한다.
카프카에서 메시지를 수신하면, 모든 웹 소켓 세션에 메시지를 전송하여 실시간 채팅을 구현한다.
@Component
@RequiredArgsConstructor
public class MessageHandler extends TextWebSocketHandler {
// 카프카로 객체 보낼 수 있게 의존성 주입
private final KafkaTemplate<String, String> kafkaTemplate;
private final Set<WebSocketSession> sessions = new HashSet<>();
// 웹 소켓 연결 관리
@Override
public void afterConnectionEstablished(WebSocketSession session)throws Exception {
sessions.add(session);
System.out.println("클라이언트가 연결했다");
}
// 웹 소켓에서 받은 메시지를 Kafka로 전송
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
kafkaTemplate.send("chat", message.getPayload());
System.out.println(message + "메시지를 받았다.");
}
// 카프카로부터 메시지를 받아 웹 소켓으로 전송
@KafkaListener(topics = "chat", groupId = "psy-group")
public void getMessageFromKafka(ConsumerRecord<String, String> record) throws IOException {
for(WebSocketSession session : sessions) {
session.sendMessage(new TextMessage(record.value()));
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
System.out.println("클라이언트가 연결을 종료했다.");
}
}
728x90
'BE > Spring Boot' 카테고리의 다른 글
[Spring Boot] Spring Cloud란? / Spring Cloud Gateway (0) | 2025.03.18 |
---|---|
[Spring Boot] STOMP란? / 간단한 채팅방 구현 (0) | 2025.03.18 |
[Spring Boot] 웹 소켓(Web Socket)을 사용해 실시간 채팅 구현 (0) | 2025.03.17 |
[Spring Boot] 자주 쓰는 라이브러리 정리 (build.gradle) (0) | 2025.03.13 |
[Spring Boot] 로컬 환경에 파일 업로드 (multipart) (0) | 2025.03.03 |