- JAVA에서 Consumer 개발
- Consumer 을 통하여 로그 가공 후 es에 저장.
-
@Service public class KafkaConsumerService { public static void consumerService(){ try{ long unique = uniqueKey; // set kafka properties Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "~.~.~.~:9092"); // kafka cluster props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// KEY_SERIALIZER props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // VALUE_SERIALIZER props.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,500000); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,700); // init KafkaConsumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("testTopic")); // topic list while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // polling interval for (ConsumerRecord<String, String> record : records) { JSONParser jsonparse = new JSONParser(); JSONObject obj = (JSONObject) jsonparse.parse(record.value()); //이 레코드의 timestamp long dateData = record.timestamp(); } } }catch(Exception e){ e.printStackTrace(); } } }
- Pom.xml
<!-- json -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>