-
[뉴스파이프라인 #2] Spring Boot + Spring Batch + Jsoup에 Kafka Streams 로 파이프라이닝 해보기!프로젝트 노트 2023. 11. 8. 10:16
📌 들어가기에 앞서..
어제는 이런걸 해보았습니다.
[뉴스파이프라인 #1] Spring Boot + Spring Batch + JSOUP을 활용한 뉴스 크롤링
📌 만드는 이유 우선 저는 경제관련 뉴스 보는 것을 굉장히 즐깁니다. 그간 뉴스를 보려면 사이트에 접속을 하고 제가 직접 찾아봐야 함이 너무 귀찮더군요.. 누군가 제게 재깍재깍 알맞은 시간
deguruv.tistory.com
Spring Batch를 통해 뉴스를 긁어와서 텔레그램으로 전송해주는 프로젝트를 가볍게(?) 만들어 보았습니다.사실 이걸로도 충분히 경제와 부동산 뉴스를 텔레그램으로 볼 수 있게 되었습니다.
근데 막연하게 뇌리에 'Kafka Streams'를 공부했던 기억이 났습니다.
활용해보고 싶다는 생각이 강하게 들었습니다.
'한 번 써보긴 해야하는데...'
그럼 이왕 만들고 공부해본거니까, 도입해보죠.
📌 프로젝트 업그레이드...!!
프로젝트의 업그레이드가 필요합니다. 카프카를 탑재해야 하는 것이죠.
pom.xml에 다음과 같이 kafka 관련 의존성들을 추가해줍니다.
... <!-- Kafka를 사용하기 위한 Library 추가 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId> <version>4.0.4</version> </dependency> <!-- Kafka를 사용하기 위한 Library 추가 --> ...
spring-kafka와 kafka-streams, 그리고 spring-cloud의 stream-binder-kafka-streams를 통해 카프카를 직접적으로 사용해보고, 토픽에 데이터를 보내고 consuming하는 것까지 해보도록 하겠습니다!
📌 아키텍쳐랄 것도 없지만..
프로젝트 구조를 다시 그려보면 아래와 같습니다.
Docker를 통해 Zookeeper와 Kafka를 1:1로 띄울 것이구요, 각각의 토픽들은 파티션을 2개씩 갖도록 구성을 해볼 예정입니다.
Kafka Stream Topology를 통해 자동으로 News Topic으로 데이터가 쌓이면 Eco와 Land 토픽으로 분산되도록 처리할 것이고, Consumer를 통하여 각 Eco와 Land Topic에 있는 레코드들을 꺼내와서 Console로 Comsuming하는 로직을 구현해볼 것입니다.
기본적으로 카프카에 대한 스터디가 어느정도 진행이 되셨다는 전제하에 진행하오니, 이 점 참고 부탁드립니다!
📌 프로젝트 변경
이전의 프로젝트 구조에 비하면 뭐가 많이 추가되었습니다.
Batch 설정과 더불어 Kafka 설정이 추가되다보니 프로젝트가 생각보다 비대해 진 느낌이 드네요..
일단 다 때려박아 모놀리식으로 구성을 한 뒤에 마이크로 서비스로 쪼개는 방법도 구상을 해봐야겠습니다.
참고로 텔레그램 전송 로직은 잠시 주석처리를 하였습니다.
우선 주키퍼와 카프카를 설정해야겠지요.
위 그림에서 보실 수 있듯, Docker를 이용해 컨테이너 모듈로 띄울 생각입니다.
version: '2' services: zookeeper: container_name: zookeeper image: wurstmeister/zookeeper ports: - "2181:2181" kafka: container_name: kafka image: wurstmeister/kafka depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ADVERTISED_PORT: 9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock
주키퍼 컨테이너의 이름은 zookeeper, 카프카 컨테이너의 이름은 kafka로 띄울 것이고 포트는 각각 2181과 9092로 포워딩해줍니다.
docker-compose up -d 옵션으로 docker-compose.yml을 실행시켜 주시면 zookeeper와 kafka가 정상적으로 구동된 것을 확인하실 수 있습니다.
docker ps -a 로 확인해보죠.
> docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 15dbef0d5164 wurstmeister/kafka "start-kafka.sh" 17 hours ago Up About an hour 0.0.0.0:9092->9092/tcp kafka b338aa374259 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 17 hours ago Up About an hour 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp zookeeper
참고로 IntelliJ의 Services 탭을 활용하면 보다 손쉽게 도커 컨테이너를 관리하실 수 있습니다!
카프카 사용 준비는 다 끝났습니다.
이제 프로젝트와 연결을 해줄 차례입니다.
가장 먼저 설정할 것은 Kafka Configuration입니다.
StreamsConfiguration이라는 Configuration 클래스를 만들어 설정정보들과 Topic, 그리고 KafkaTemplate을 만들 수 있는 Factory를 구현합니다.
@Configuration public class StreamsConfiguration { // Kafka Streams Configuration public static Properties getConfiguration() { Properties props = new Properties(); // Streams Application ID 지정 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "news-pipeline"); // Kafka 서버 지정 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Default Key Serde를 Long으로 선언 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); // Default Value Serde를 String으로 선언(JSON) props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // Kafka 초기 Offset이 없거나 범위를 벗어난 경우에 대한 설정으로 earliest는 가장 처음 offset부터 consuming props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } // Topic 및 Kafka Template 생성 @Bean public NewTopic newsTopic() { return TopicBuilder.name(TopicEnum.NEWS_TOPIC.getType()) .partitions(2) .replicas(1) .build(); } @Bean public NewTopic ecoNewsTopic() { return TopicBuilder.name(TopicEnum.ECO_TOPIC.getType()) .partitions(2) .replicas(1) .build(); } @Bean public NewTopic landNewsTopic() { return TopicBuilder.name(TopicEnum.LAND_TOPIC.getType()) .partitions(2) .replicas(1) .build(); } // Kafka Template 설정 @Bean public KafkaTemplate<Long, News> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } /** PRIVATE METHODS **/ // Kafka Template 을 만들기 위한 ProducerFactory private ProducerFactory<Long, News> producerFactory() { // Kafka Producer 설정 정보를 여기서 로드합니다. Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); // Key가 Long 타입이기 때문에 LongSerializer를 선언 configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // Value가 Object형이기 때문에 JsonSerializer를 선언 // Kafka Template 사용 시 DefaultKafkaProducerFactory 와 함께 사용하면 // Thread safe 하게 기능을 구현할 수 있습니다. return new DefaultKafkaProducerFactory<>(configs); } }
토픽을 만들었으니, 토픽 이름을 상수로 관리할 수 있는 Enum 클래스를 선언해서 사용하도록 하겠습니다.
@Getter @RequiredArgsConstructor public enum TopicEnum { NEWS_TOPIC("news"), ECO_TOPIC("eco-news"), LAND_TOPIC("land-news") ; private final String type; }
그리고 Kafka Streams에서 가장 중요한 Stream Topology를 만들어보도록 하겠습니다.
public class TopologyBuilder { // 마지막 경제 뉴스 ID private static Long LAST_ECO_NEWS_ID = 0L; // 마지막 부동산 뉴스 ID private static Long LAST_LAND_NEWS_ID = 0L; // Kafka Streams Topology public static Topology build() { // Data Streams 를 위한 StreamsBuilder StreamsBuilder streamsBuilder = new StreamsBuilder(); // Source Processor // News Topic 의 데이터를 꺼내와서 KStream을 만듭니다. KStream<Long, News> sourceStream = streamsBuilder.stream(TopicEnum.NEWS_TOPIC.getType(), Consumed.with(Serdes.Long(), new JsonSerde<>(News.class))); // sourceStream에 각각의 Topic Topology 를 구현합니다. ecoNewsStream(sourceStream); landNewsStream(sourceStream); return streamsBuilder.build(); } private static void ecoNewsStream(KStream<Long, News> sourceStream) { // Streams Processor : 본격적인 데이터 처리를 하며 새로운 KStream을 만들어냅니다. KStream<Long, News> ecoNewsStream = sourceStream.filter((key, value) -> { // Recored의 Type이 eco이고, newsId가 최종 뉴스 ID보다 작으면 Consuming 합니다. if (value.type().equals("eco") && value.newsId() > LAST_ECO_NEWS_ID) { LAST_ECO_NEWS_ID = value.newsId(); return true; } return false; }); // Sink Processor : Streams Processor 에서 가공된 데이터를 원하는 목적지 Topic 으로 전송합니다. // News Topic -> 데이터 가공 -> Eco News Topic 으로 전송합니다. ecoNewsStream.to(TopicEnum.ECO_TOPIC.getType(), Produced.with(Serdes.Long(), new JsonSerde<>(News.class))); } private static void landNewsStream(KStream<Long, News> sourceStream) { // Streams Processor : 본격적인 데이터 처리를 하며 새로운 KStream을 만들어냅니다. KStream<Long, News> landNewsStream = sourceStream.filter((key, value) -> { // Recored의 Type이 land이고, newsId가 최종 뉴스 ID보다 작으면 Consuming 합니다. if (value.type().equals("land") && value.newsId() > LAST_LAND_NEWS_ID) { LAST_LAND_NEWS_ID = value.newsId(); return true; } return false; }); // Sink Processor : Streams Processor 에서 가공된 데이터를 원하는 목적지 Topic 으로 전송합니다. // News Topic -> 데이터 가공 -> Land News Topic 으로 전송합니다. landNewsStream.to(TopicEnum.LAND_TOPIC.getType(), Produced.with(Serdes.Long(), new JsonSerde<>(News.class))); } }
카프카 스트림즈를 공부해보시면 아시겠지만 비즈니스의 흐름은 Source Processor 에서 Streams Processor로 전달되고 Streams Processor에서 연산이 된 데이터를 Sink Processor를 통해 목적지로 전송을 해주게 됩니다.
.to()가 그것입니다.
카프카 스트림즈를 이용해서 Crawling을 하는 Service를 만들어보도록 하죠.
StreamsService라는 클래스를 만들어 crawl() 메소드를 선언하고 가볍게 각각의 뉴스를 긁어와서 forEach로 kafkaTemplate.send()를 호출하는 로직입니다.
@Service @RequiredArgsConstructor public class StreamsService { private final KafkaTemplate<Long, News> kafkaTemplate; private static final EcoCrawler ecoCrawler = new EcoCrawler(); private static final LandCrawler landCrawler = new LandCrawler(); public void crawl() { List<News> newsList = ecoCrawler.getEcoNews(); newsList.addAll(landCrawler.getLandNews()); newsList.forEach(news -> { kafkaTemplate.send(TopicEnum.NEWS_TOPIC.getType(), news); System.out.println("News ID = [" + news.type() + "] " + news.newsId()); }); } }
매우 간단하죠? 별도의 로그를 사용하지 않아서 그냥 콘솔로그를 활용하였습니다.
추후에는 Slf4j로 정식 logging을 추가해야겠다는 생각이 문득 드네요..
자, 이제 Kafka Streams 구성은 모두 끝났습니다.
뉴스를 크롤링해서 가져온 뒤 내부 도메인인 News로 정제해서 Kafka의 news 토픽에 담아두면 스트림즈 로직에 따라 eco-news와 land-news 토픽으로 분기되어 처리됩니다.
이렇게만 만들어두면 각 토픽에 데이터가 쌓이기만 하겠죠?
그럼 이제 토픽의 데이터를 사용하는 Consumer를 만들어줍니다.
여기서는 Consumer의 처리 메소드들을 Bean으로 등록하여 WAS 구동시 카프카 스트림즈에 의해 처리가 완료되어 적재된 eco-news와 land-news의 데이터를 바로 Consuming 할 수 있도록 설정을 해주게 됩니다.
@Configuration public class NewsConsumer { @Bean public Consumer<KStream<Long, News>> ecoService() { return newsStream -> newsStream.foreach((key, news) -> { System.out.println(String.format("Eco NEWS Url[%s] Type[%s] Title[%s]", news.url(), news.type(), news.title())); }); } @Bean public Consumer<KStream<Long, News>> landService() { return newsStream -> newsStream.foreach((key, news) -> { System.out.println(String.format("Land NEWS Url[%s] Type[%s] Title[%s]", news.url(), news.type(), news.title())); }); } }
Configuration 클래스를 통해 eco-news 토픽의 데이터를 소진하는 ecoService와 land-news 토픽의 데이터를 소진하는 landService 메소드를 정의해줍니다.
현재는 Console Log로 소진하는데, 추후에는 이 부분이 실제 데이터베이스에 insert하는 부분으로
(언젠가는..)변경될 예정입니다.그럼 이제 컨슈머를 만들었으니 실제 토픽과의 Binding 작업을 해주어야 합니다. 이는 application.yml 파일을 활용합니다.
앞선 내용에서 수정했던 application.yml에 Consumer에 대한 Binding 설정을 작업해줍니다.
# Spring Cloud Kafka Streams 설정 spring.cloud: function: definition: ecoService;landService stream: bindings: ecoService-in-0: destination: eco-news landService-in-0: destination: land-news kafka: bindings: ecoService-in-0: consumer: configuration: value: deserializer: org.springframework.kafka.support.serializer.JsonDeserializer landService-in-0: consumer: configuration: value: deserializer: org.springframework.kafka.support.serializer.JsonDeserializer binder: brokers: - localhost:9092 spring: batch: job: enabled: false # 서버 재시작시 배치 자동실행 방지 datasource: # batch를 사용하기 위해 h2 database 사용 driver-class-name: org.h2.Driver url: jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;
경제 뉴스와 부동산 뉴스 컨슈머를 각각 따로 두고 토픽을 소비를 하게 됩니다.
📌 와..완료!완료했다!!!
프로젝트 업그레이드가 완료되었습니다.
이제는 텔레그램으로 메시지를 보냄과 동시에 토픽에 데이터를 자동으로 적재하는 스트림즈 구성까지 해보았습니다.
그냥 막무가내로 해본 것이기에 설명이 매우 부족하지만..그래도 구성은 다 해보았으니 원하는데로 동작하는지 확인하기 위해 프로젝트를 실행을 해볼까요?
...(중략)... 2023-11-08T10:03:35.890+09:00 INFO 16396 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=sendNewsJob]] launched with the following parameters: [{'datetime':'{value=2023-11-08T10:03:35.888877200, type=class java.lang.String, identifying=true}'}] 2023-11-08T10:03:35.892+09:00 INFO 16396 --- [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [sendNewsStep] News ID = [eco] 20231108095815544 News ID = [eco] 20231108100007632 News ID = [eco] 20231108100024684 News ID = [eco] 20231108100126808 News ID = [eco] 20231108100153832 News ID = [eco] 20231108100205840 News ID = [eco] 20231108100212845 News ID = [eco] 20231108100213847 News ID = [eco] 20231108100213848 News ID = [eco] 20231108100214849 News ID = [eco] 20231108100218852 News ID = [eco] 20231108100227858 News ID = [eco] 20231108100240864 News ID = [eco] 20231108100300887 News ID = [eco] 20231108100303898 News ID = [eco] 20231108100311913 News ID = [eco] 20231108100313916 News ID = [eco] 20231108100314919 News ID = [land] 20231108094213868 News ID = [land] 20231108094216872 News ID = [land] 20231108094314909 News ID = [land] 20231108094542983 News ID = [land] 20231108094614001 News ID = [land] 20231108094615003 News ID = [land] 20231108094615004 News ID = [land] 20231108094710034 News ID = [land] 20231108095034161 News ID = [land] 20231108095056177 News ID = [land] 20231108095514403 News ID = [land] 20231108095613452 News ID = [land] 20231108095713504 News ID = [land] 20231108095730510 News ID = [land] 20231108095921588 News ID = [land] 20231108095953608 News ID = [land] 20231108100051749 News ID = [land] 20231108100100758 News ID = [land] 20231108100151829 News ID = [land] 20231108100240864 2023-11-08T10:03:36.658+09:00 INFO 16396 --- [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [sendNewsStep] executed in 765ms 2023-11-08T10:03:36.660+09:00 INFO 16396 --- [ scheduling-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=sendNewsJob]] completed with the following parameters: [{'datetime':'{value=2023-11-08T10:03:35.888877200, type=class java.lang.String, identifying=true}'}] and the following status: [COMPLETED] in 768ms Land NEWS Url[https://v.daum.net/v/20231108100240864] Type[land] Title[서울 편입 추진에 들썩이는 김포... '풍무역 푸르지오 시티' 등 문의 ..] Eco NEWS Url[https://v.daum.net/v/20231108100227858] Type[eco] Title[셀트리온·셀트리온헬스케어, 5000억원 규모 자사주 취득] Eco NEWS Url[https://v.daum.net/v/20231108100240864] Type[eco] Title[서울 편입 추진에 들썩이는 김포... '풍무역 푸르지오 시티' 등 문의 이어져] Eco NEWS Url[https://v.daum.net/v/20231108100300887] Type[eco] Title[ITC 영어, 12월 2일 사업설명회 개최…전국으로 가맹점 확대] Eco NEWS Url[https://v.daum.net/v/20231108100303898] Type[eco] Title[[르포]"미래 네이버의 심장" 제2 데이터센터 '각 세종' 가보니] Eco NEWS Url[https://v.daum.net/v/20231108100311913] Type[eco] Title[LG전자, 협력사에 국내대학∙공공연구기관 보유 우수기술 소개] Eco NEWS Url[https://v.daum.net/v/20231108100313916] Type[eco] Title[삼성전자, 자체 개발 생성형 AI '삼성 가우스' 공개] Eco NEWS Url[https://v.daum.net/v/20231108100314919] Type[eco] Title[LGD 3세대OLED TV 패널, 글로벌 '눈건강' 인증 잇달아 획득]
로그를 보면 크롤링된 News들의 카테고리와 ID를 확인할 수 있습니다.
그리고 크롤링이 모두 완료되면 ID를 비교하며 최신 뉴스를 선별해내고 그 뒤에 스트림즈 로직을 통해 각각의 토픽에 데이터가 쌓이고, 각각의 토픽은 연결된 컨슈머에 의해 데이터가 소진되게 됩니다.
그럼 각각의 컨슈머 그룹의 상태를 한 번 볼까요?
> kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list ecoService-applicationId landService-applicationId news-pipeline
ecoService-applicationId와 landService-applicationId라는 컨슈머 그룹을 컨슈머 상태를 확인할 수 있습니다.
각각의 상태를 확인해보도록 하겠습니다.
> kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ecoService-applicationId --describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID ecoService-applicationId eco-news 0 126 126 0 ecoService-applicationId-81cb54e7-e0f7-43b8-9e02-687e6529d849-StreamThread-1-consumer-354ac4ff-2628-4213-96de-759c8186579c /172.19.0.1 ecoService-applicationId-81cb54e7-e0f7-43b8-9e02-687e6529d849-StreamThread-1-consumer ecoService-applicationId eco-news 1 97 97 0 ecoService-applicationId-81cb54e7-e0f7-43b8-9e02-687e6529d849-StreamThread-1-consumer-354ac4ff-2628-4213-96de-759c8186579c /172.19.0.1 ecoService-applicationId-81cb54e7-e0f7-43b8-9e02-687e6529d849-StreamThread-1-consumer
> kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group landService-applicationId --describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID landService-applicationId land-news 0 25 25 0 landService-applicationId-769c691a-3b97-49b5-9a8f-97774df2c17c-StreamThread-1-consumer-f7f9138c-736c-4cae-a2fe-76524ff75b87 /172.19.0.1 landService-applicationId-769c691a-3b97-49b5-9a8f-97774df2c17c-StreamThread-1-consumer landService-applicationId land-news 1 47 47 0 landService-applicationId-769c691a-3b97-49b5-9a8f-97774df2c17c-StreamThread-1-consumer-f7f9138c-736c-4cae-a2fe-76524ff75b87 /172.19.0.1 landService-applicationId-769c691a-3b97-49b5-9a8f-97774df2c17c-StreamThread-1-consumer
📌 마치며
이번에는 지난 내용에 이어서 뉴스 파이프라인의 고도화 작업을 해보았습니다.
예제 코드는 깃헙에 업로드 되어있습니다.
GitHub - timulys/newspipeline
Contribute to timulys/newspipeline development by creating an account on GitHub.
github.com
그냥 크롤링 뿐 아니라 이제는 카프카 스트림즈를 통해 데이터 파이프라이닝의 기본 골격을 만든 것이라 의미가 있다고 생각이 드네요.
앞으로 더 다양하게 프로젝트가 업그레이드가 될 것입니다.
생각에는 NoSQL DB를 도입해보면 어떨가 싶고, 그 DB와 함께 ElasticSearch나 OpenSearch를 통해 검색 인덱스 구조도 만들어 볼까 합니다.
Vue.js나 React를 통해 가볍게 프론트 작업도 해볼 예정이구요.
이러게 1차적으로 프로젝트가 다~끝나면 각각의 컴포넌트들을 뜯어내서 마이크로 서비스로도 변경해볼 예정입니다.
일단은..예정만 해보고 있습니다.
혹시라도 읽어주시는 분이 계신다면 감사드립니다.
'프로젝트 노트' 카테고리의 다른 글