-
[뉴스파이프라인 #3] MongoDB에 데이터를 저장해보자(Spring Boot + Spring Batch + Jsoup + Spring Cloud + Kafka Streams + MongoDB)프로젝트 노트 2023. 11. 10. 11:12
📌 들어가기에 앞서
앞서 포스팅한 게시글을 통해 프로젝트를 왜 만들었고, 어떻게 진행이 되고 있는지 공유를 드렸었습니다.
2편에서 Kafka Consumer를 통해 Console로 소비되는 데이터를 보며 뭔가 휘발성 데이터로 날라가버리는게 아쉽다는 생각이 들었죠. 이걸 효율적으로 저장하고 검색기능 및 형태소 분리 기능 등을 제공하면 좀 더 가치있는 데이터로 만들어낼 수 있을텐데 하는..그런 생각이 머리 속에 있었습니다.
사실 그리 많은 데이터는 아니지만 지속적인 DB IO 발생이 옳은 것인가 고민을 잠깐(깔짝)했습니다.
하지만 NoSQL을 안써보기도 했고, 평상시 몽고DB를 경험해보고 싶다는 생각을 했었습니다.
그럼 몽고DB로 데이터를 저장하면 모든게 해결 되겠군요?
해봅시다.
📌 아키텍쳐랄 것도 없지만...(3)
아키텍쳐랄 것도 없지만 x 3, MongoDB가 추가되었습니다.
프로젝트에서는 MongoDB에 데이터를 저장하고 조회하는 방식으로 여러가지를 상황에 따라 쓰게 되겠지만, 지금은 Repository방식과 MongoTemplate을 활용해 직접 쿼리를 만들어서 질의하는 방식 등으로 접근을 해볼 생각입니다.
몽고DB에 대해서는 과거에 스터디한 적이 있는데, 아예 아무것도 기억이 안나더군요..ㅋㅋ..
그래서 Docker를 통해 Image를 다운받고 설치하고 접속하는 방법까지 까먹지 않기 위해 기록해놓았으니 몽고DB를 아직 설치하지 않으신 분은 참고하셔도 좋을 듯 싶습니다.
그 외 데이터 및 인프라로 사용중인 Kafka나 Docker에 대한 명령어도 간단하게 포스팅 해놓았으니 참고하시면 좋을 것 같습니다.
그럼 프로젝트가 어떻게 변경되었는지 확인을 해보도록 하시죠.
참고로 해당 소스는 제 개인 Github(클릭)에 업로드 되어 있으니 필요하시면 코드를 전체적으로 확인하실 수 있습니다.
그럼 한 번 만들어 보겠습니다!
📌 어찌되었든..만들어보자!
내부적으로 뭐가 엄청 많이 생겼습니다. 없던 패키지와 클래스들이 생겨나고, 기존에 파일들의 위치 조정도 함께 진행되었지요. 가볍게 RestController를 하나 만들어 Viewing 하는 기능도 추가하였습니다.
우선 몽고DB를 활용하기 위해서 Docker기반으로 MongoDB를 설치하고, 프로젝트에서 pom.xml에 필요 의존성을 추가해줍니다.
<!-- MongoDB를 사용하기 위한 라이브러리 추가 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> <version>3.1.5</version> </dependency> <!-- MongoDB를 사용하기 위한 라이브러리 추가 -->
찾아보니 MongoDB 이외에 다른 DB를 사용하려면 mongodb-reactive를 사용해야 한다더군요..
하지만 현재는 MongoDB만 사용하기에 딱히 그에 대한 설명은 추가하지는 않겠습니다.
앞서 안내해드린 몽고DB 설치 포스팅을 참고하셨다면 저와 비슷한 환경으로 설치를 하셨을겁니다.
그러면 MongoDB를 매니지먼트 툴로 접근을 할 수 있게 되실겁니다.
저는 DataGrip을 사용하여 접속하였습니다.
데이터 그립으로 접속을 하시면 database를 만들어주셔야 합니다.
해당 프로젝트는 news를 주로 다루는 프로젝트이니 'news'라는 데이터베이스를 만들어보겠습니다.
show dbs // 존재하는 Database를 모두 확인할 수 있습니다. use news // news라는 Database가 존재하면 사용하고, 없으면 생성합니다. // 경제뉴스와 부동산 뉴스를 따로 저장하기 위해 colletion을 2개 선언해줍니다. db.createCollection("eco") db.createCollection("land")
이렇게 MongoDB의 news Database에 eco와 land Collection까지 선언해주셨다면 MongoDB를 사용할 준비는 끝입니다.
추가가 완료되었다면 application.yml에 우리가 Docker에 설치한 MongoDB를 지정해줍니다.
spring: data: mongodb: host: localhost port: 27017 authentication-database: admin username: root password: root database: news
위에서 만들었던 news database를 지정해주면 됩니다.
authentication-database : admin의 경우 우리가 접속 계정으로 등록한 root 계정이 위치한 곳을 database를 지정하는 것입니다.
접속 준비까지 모두 끝났습니다.
프로젝트에서 추가된 친구들을 한 번 살펴볼까요?
기본적으로 MongoDB를 사용하기 위해서는 직접 MongoDB가 사용하는 Entity가 @Document로 선언되어 있어야 합니다.
즉, eco Collection과 연관된 Entity인 EcoNews 클래스를 만들어주어야 하죠.
@Getter @Document(collection = "eco") @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) public class EcoNews { @Id private Long newsId; private String url; private String title; public static EcoNews of(News news) { return new EcoNews(news.newsId(), news.url(), news.title()); } }
immutable class로 선언하여 외부에서의 수정과 생성을 미연에 방지해놓습니다.
혹시 모르지요..나중에 제가 아닌 누군가가 이 프로젝트에 수정을 하게 될지도요..ㅎ..
MongoDB Collection과 Entity의 연관관계 매핑은 이렇게 끝입니다. 참 쉽죠?
그리고 MongoDB에 질의를 하기 위해 Repository를 선언해줍니다. 여기서는 eco와 land Collection을 사용하므로 두 개의 Repository Interface를 선언해주시면 됩니다.
public interface EcoNewsRepository extends MongoRepository<EcoNews, Long> { } public interface LandNewsRepository extends MongoRepository<LandNews, Long> { }
MongoRepository<Class, ID Type> 을 extends해주면 가볍게 사용할 수 있는 Repository가 만들어집니다.
이를 통해 save, findAll 등의 기본적인 질의를 할 수 있게 됩니다.
// MongoRepository @NoRepositoryBean public interface MongoRepository<T, ID> extends ListCrudRepository<T, ID>, ListPagingAndSortingRepository<T, ID>, QueryByExampleExecutor<T> { <S extends T> S insert(S entity); <S extends T> List<S> insert(Iterable<S> entities); <S extends T> List<S> findAll(Example<S> example); <S extends T> List<S> findAll(Example<S> example, Sort sort); }
MongoRepository Interface는 위와 같이 생겼습니다. Paging 처리도 매우 쉽게 할 수 있지요.
다만 현재 구조 상 Page처리는 추후 기능으로 미루어 놓았습니다.
이제는 비즈니스 로직 상 News를 각각의 Collection에 저장하는 기능과 조회하는 기능을 만들어보도록 하겠습니다.
그러기 위해선 서비스가 필요하겠죠?
public interface EcoNewsService { void saveEcoNews(News news); List<EcoNews> findAll(); } public interface LandNewsService { void saveLandNews(News news); List<LandNews> findAll(); }
Repository와 마찬가지로 Service도 Eco와 Land로 두 개 만들어줍니다.
추후에 뉴스 카테고리가 늘어난다면 다른 Service와 Repository도 추가되겠지요.
그리고 이를 구현하는 구현체를 만들어줍니다.
@Service @RequiredArgsConstructor public class EcoNewsServiceImpl implements EcoNewsService { private final MongoTemplate mongoTemplate; private final EcoNewsRepository ecoNewsRepository; @Override public void saveEcoNews(News news) { ecoNewsRepository.save(EcoNews.of(news)); } @Override public List<EcoNews> findAll() { return mongoTemplate.find(MongoQueryUtil.defaultSearch(100), EcoNews.class); } }
LandNewsServiceImpl은 EcoNewsServiceImpl과 비슷하여 제외시켰습니다.
여기서는 저장 등의 기본적인 처리는 Repository를 활용해서 진행하였습니다.
조회의 경우 각각 조건을 사용하기에는 Repository 방식보다는 MongoTemplate 방식이 더 편리할 것 같아 따로 분리도 할 겸 MongoTemplate 방식을 사용하였습니다.
MongoQueryUtil을 만들어 공통적으로 사용하는 Query는 Util성으로 빼놓을 예정입니다.
그럼 MongoQueryUtil도 한 번 볼까요?
public class MongoQueryUtil { /** * Mongo DB Default Find Query * with Sort and limit 10 * @return */ public static Query defaultSearch(int limit) { Query query = new Query(); query.with(Sort.by(Sort.Direction.DESC, "newsId")).limit(limit); return query; } }
MongoTemplate에 질의를 위해 필요한 객체인 Query객체를 return하는 메소드들이 추가될 것입니다.
여기서는 기본적인 정렬과 limit를 주기 위해 선언된 defaultSearch 메소드입니다.
limit의 경우 추후에 Page 처리를 위해 parameter로 받을 수 있도록 추가해놓았습니다.
다만 구조에 따라 변경될 수 있겠지요..그건 나중에 구현하면서 고치겠습니다 ㅎ
기본적인 쿼리까지 모두 완료되었습니다.
그럼 이제 Kafka Consumer에서 console을 통해 logging하는 로직위에 MongoDB에 저장로직을 호출하는 부분을 추가해보도록 하겠습니다.
@Configuration @RequiredArgsConstructor public class NewsConsumer { private final EcoNewsService ecoNewsService; private final LandNewsService landNewsService; @Bean public Consumer<KStream<Long, News>> ecoService() { return newsStream -> newsStream.foreach((key, news) -> { ecoNewsService.saveEcoNews(news); System.out.println(String.format("Eco NEWS Url[%s] Type[%s] Title[%s]", news.url(), news.type(), news.title())); }); } @Bean public Consumer<KStream<Long, News>> landService() { return newsStream -> newsStream.foreach((key, news) -> { landNewsService.saveLandNews(news); System.out.println(String.format("Land NEWS Url[%s] Type[%s] Title[%s]", news.url(), news.type(), news.title())); }); } }
Spring-Cloud-Kafka-Streams 를 위해 선언된 NewsConsumer에 각각 NewsService를 선언하고 save 메소드를 호출해줍니다. 그럼 각 서비스들은 MongoDB에 Record News를 EcoNews와 LandNews로 변경하여 저장하겠지요.
📌 잘 되는가?
MongoDB에서 조회를 해보도록 하겠습니다. 조회 질의는 다음과 같이 질의해주시면 됩니다.
db.getCollection("eco").find({}).sort({"_id":-1})
그러면 다음과 같이 DataGrip에서 조회가 됩니다.
저장이 매우매우 잘되는군요. 카프카도 잘 처리가 되고 있는지 확인해보도록 하겠습니다.
> kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ecoService-applicationId --describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID ecoService-applicationId eco-news 0 4535 4535 0 ecoService-applicationId-fde07205-863b-4486-b904-d6df7ee0deaf-StreamThread-1-consumer-8c4fc726-b80b-4d28-ae1f-4c276fa6316c /172.19.0.1 ecoService-applicationId-fde07205-863b-4486-b904-d6df7ee0deaf-StreamThread-1-consumer ecoService-applicationId eco-news 1 4382 4382 0 ecoService-applicationId-fde07205-863b-4486-b904-d6df7ee0deaf-StreamThread-1-consumer-8c4fc726-b80b-4d28-ae1f-4c276fa6316c /172.19.0.1 ecoService-applicationId-fde07205-863b-4486-b904-d6df7ee0deaf-StreamThread-1-c
경제 뉴스 토픽만 확인해보았는데 꽤나 많은 뉴스들이 처리되었군요. 아주 좋습니다.
구동시킨지 한 4일 정도 되었으니, 하루에 대략 2000~2500건 정도 처리가 되는 데이터 파이프라인이 구축되었네요ㅎ
추후에는 각 신문사도 다 연동을 해서..하루 만건 이상의 소박한 데이터 스트림을 구성해보면 어떨까..생각이 드네요 ㅎ
그럼 간편하게 Controller를 만들어서 브라우저에서 조회를 해볼까요?
@RestController @RequestMapping("/news") @RequiredArgsConstructor public class NewsController { private final EcoNewsService ecoNewsService; private final LandNewsService landNewsService; @GetMapping("/eco-all") public ResponseEntity<List<EcoNews>> findAllEcoNews() { return ResponseEntity.ok(ecoNewsService.findAll()); } @GetMapping("/land-all") public ResponseEntity<List<LandNews>> findAllLandNews() { return ResponseEntity.ok(landNewsService.findAll()); } }
가볍게 만들어본 RestController입니다. Json형식으로 그냥 조회만 하는 Page이기 때문에 별도의 작업은 필요치 않습니다.
URI를 호출하면 다음과 같이 표현됩니다.
이것으로 웹에서도 조회가 될 수 있는 환경까지 구성이 완료되었습니다.
📌 마치며
사실 아직 근본적인 스터디가 부족한 상황이긴 합니다.
정확히 어떤 방식으로 Mongo를 사용해야 더 좋은 방식인지도 명확하진 않습니다.
조금 야성적인(?) 공부 방식으로 무작정 만들어보고 있는데..너무 모놀리식해지지 않나 싶네요.
추후에는 모듈로 쪼개서 마이크로서비스 단위로 로그인도 붙여보고..별거 다 해봐야겠습니다.
혹시라도 읽어주신 분이 계신다면 감사드립니다. 정말.
'프로젝트 노트' 카테고리의 다른 글
Ubuntu (우분투) 환경에서 OpenSearch Dashboard 설치 및 외부 접속 설정 (0) 2023.12.12 Ubuntu (우분투) 에 OpenSearch (오픈서치) 설치하기 (0) 2023.12.12 OpenSearch + Spring Boot + Java 연동 예제 (HTTPS 및 Apache HttpClient 5 활용하기!) (1) 2023.12.06 [뉴스파이프라인 #2] Spring Boot + Spring Batch + Jsoup에 Kafka Streams 로 파이프라이닝 해보기! (0) 2023.11.08 [뉴스파이프라인 #1] Spring Boot + Spring Batch + JSOUP을 활용한 뉴스 크롤링 (0) 2023.11.06