Kafka
Structure
Record < Partition < Topic < Broker < Cluster
Topic
- ๋ฉ์์ง๋ฅผ ๊ตฌ๋ถํ๋ ๋ ผ๋ฆฌ์ ์ธ ๋จ์
Partition
- ๋ชจ๋ ํ ํฝ์ ๊ฐ๊ฐ ๋์ํ๋ ํ๋ ์ด์์ ํํฐ์
์ด ๋ธ๋ก์ปค์ ๊ตฌ์ฑ๋๊ณ , ๋ฐํ๋๋ ํ ํฝ ๋ฉ์์ง๋ค์ ํํฐ์
๋ค์ ๋๋์ด ์ ์ฅ๋จ.
- ํ๋์ ํ ํฝ์ ๋ํ์ฌ ์ฌ๋ฌ ํํฐ์ ์ ๊ตฌ์ฑํ๋ ๊ฐ์ฅ ํฐ ์ด์ : ๋ถ์ฐ ์ฒ๋ฆฌ๋ฅผ ํตํ ์ฑ๋ฅ ํฅ์
- ํ๋์ ํํฐ์ ๋ด์์๋ ๋ฉ์์ง ์์๊ฐ ๋ณด์ฅ
Broker
- ์นดํ์นด ๋ธ๋ก์ปค๋ ํ๋ก๋์์ ์ปจ์๋จธ ์ฌ์ด์์ ๋ฉ์์ง๋ฅผ ์ค๊ณ ์นดํ์นด ๋ธ๋ก์ปค๊ฐ ์ผ๋ฐ์ ์ผ๋ก โ์นดํ์นดโ๋ผ๊ณ ๋ถ๋ฆฌ๋ ์์คํ ์. ํ๋ก๋์์ ์ปจ์๋จธ๋ ๋ณ๋์ ์ ํ๋ฆฌ์ผ์ด์ ์ผ๋ก ๊ตฌ์ฑ๋๋ ๋ฐ๋ฉด, ๋ธ๋ก์ปค๋ ์นดํ์นด ์์ฒด์ด๊ธฐ ๋๋ฌธ์ ๋๋ค. ๋ฐ๋ผ์ โ์นดํ์นด๋ฅผ ๊ตฌ์ฑํ๋คโ ํน์ โ์นดํ์นด๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ์ ๋ฌํ๋คโ์์ ์นดํ์นด๋ ๋ธ๋ก์ปค๋ฅผ ์๋ฏธ.
Producer
,Consumer
- Producer๋ ์ค์ง ๋์๋ง ์ฐ๋ฉฐ, Consumer๋ ์คํ์ ์ ๊ธฐ์ค์ผ๋ก ์ฐจ๋ก์ฐจ๋ก ์ฝ์ด๋๊ฐ.
Topics
- a particular stream of data
- you can have as many topics as you want
- a topic is identified by its name
- Topics are split in partitions
- Each partition is ordered
- Each message within a paritition gets an incremental id, called offset
- ๋์ผํ ํ ํฝ์ ๋ฉ์์ง๋ค์ ๋ ผ๋ฆฌ์ ์ผ๋ก ๊ฐ์ ๋ฌธ๋งฅ(context)์ ๊ฐ์ง
Message
- Key(ํค)์ Value(๊ฐ)๋ก ๊ตฌ์ฑ
- ๋ธ๋ก์ปค๋ฅผ ํตํด ๋ฉ์์ง๊ฐ ๋ฐํ๋๊ฑฐ๋ ์๋น๋ ๋, ๋ฉ์์ง ์ ์ฒด๊ฐ ์ง๋ ฌํ/์ญ์ง๋ ฌํ๋จ
- ํน์ ํ ๊ตฌ์กฐ์ธ ์คํค๋ง(schema)๋ฅผ ๊ฐ์ง > ํ๋ก๋์๊ฐ ๋ฐํํ๊ณ ์ปจ์๋จธ๊ฐ ์๋นํ ๋ ๋ฉ์์ง๋ฅผ ์ ์ ํ๊ฒ ์ฒ๋ฆฌํ๊ธฐ ์ํด ํ์ (๋ง์ฝ ํ๋ก๋์์ ์ปจ์๋จธ๊ฐ ๋ฉ์์ง์ ๋ํ ์๋ก ๋ค๋ฅธ ์คํค๋ง๋ฅผ ๊ฐ์ง๊ณ ์๋ค๋ฉด, ์ ์์ ์ธ ์ฒ๋ฆฌ๋ฅผ ํ ์ ์์)
Replica
- ์๋น์ค ์์ ์ฑ๊ณผ ์ฅ์ ์์ฉ(Fault-Tolerance)์ ๊ดํ ์์
- ํ๋์ ํํฐ์ ์ 1๊ฐ์ ๋ฆฌ๋ ๋ ํ๋ฆฌ์นด์ ๊ทธ ์ธ 0๊ฐ ์ด์์ ํ๋ก์ด ๋ ํ๋ฆฌ์นด๋ก ๊ตฌ์ฑ๋จ. ๋ฆฌ๋ ๋ ํ๋ฆฌ์นด๋ ํํฐ์ ์ ๋ชจ๋ ์ฐ๊ธฐ, ์ฝ๊ธฐ ์์ ์ ๋ด๋น. ๋ฐ๋๋ก ํ๋ก์ด ๋ ํ๋ฆฌ์นด๋ ๋ฆฌ๋ ๋ ํ๋ฆฌ์นด๋ก ์ฐ์ธ ๋ฉ์์ง๋ค์ ๊ทธ๋๋ก ๋ณต์ ํ๊ณ , ๋ง์ฝ ๋ฆฌ๋ ๋ ํ๋ฆฌ์นด์ ์ฅ์ ๊ฐ ๋ฐ์ํ๋ ๊ฒฝ์ฐ, ๋ฆฌ๋ ์๋ฆฌ๋ฅผ ์น๊ณ๋ฐ์ ์ค๋น๋ฅผ ํจ. ์ฐธ๊ณ ๋ก ์น๊ณ๋ฐ์ ์ค๋น๊ฐ ๋ ์ฆ, ๋ฆฌ๋ ๋ ํ๋ฆฌ์นด์ ๋ฉ์์ง๋ฅผ ์ ์ ํ๊ฒ ๋ณต์ ํ์ฌ ๋ฆฌ๋ ๋ ํ๋ฆฌ์นด์ ๋๊ธฐํ๋ ๋ ํ๋ฆฌ์นด๋ค์ ๊ทธ๋ฃน์ ISR(In-Sync Replica)๋ผ๊ณ ํจ.
- Replication-factor
Offset
- only have ameaning for a specific partition
- Ex. offset3 in partition0 doesnโt represent the same data as offset3 in partition1
- Order is guaranteed only within a partition (not across paritions)
Data (Message / Record)
- Data is kept only for a limited time (default is one week)
- once the data is written to a partition, itโt be changed (immutabiltiy)
- Data is assigned randomly to a partition unless a key is provided (more on this later)
necessary : offset, messageKey, messageValue
- reference : https://always-kimkim.tistory.com/entry/kafka101-message-topic-partition
Features
- good solution for large scale message processing applications
- better throughput, built-in partitioning, replication, and fault-tolerance, horizontal scalabiltiy
- messaging uses are often comparatively low-throughput
- but may require low end-to-end latency and often depend on the strong durability guarantees (Kafka can provide)
- kafka can scale to 100s of brokers
- kafka can scale to milions of messages per second
- high performance (latency of less than 10ms) - real time
- for the log history
- for decoupling of data streams & systems
Use cases
- messageing system
- activity tracking
- gather metrics from many different locations
- application logs gathering
- stream processing (with the kafka stream API or Spark for example)
- De-coupling of system dependencies
Integration with Spark, Flink, Storm, Hadoop and many other Big Data technologies
- key point :
in real-time
Kafka VS RabbitMQ
ย | Kafka | RabbitMQ |
---|---|---|
ย | distributed event streaming platform | open source distributed message broker |
ย | pub/sub (์์ฐ์๊ฐ ์ํ๋ ๊ฐ ๋ฉ์์ง๋ฅผ ๊ฒ์ํ ์ ์๋๋ก ํ๋ ๋ฉ์์ง ๋ฐฐํฌ ํจํด) | message broker (์์ฉํ๋ก๊ทธ๋จ, ์๋น์ค ๋ฐ ์์คํ ์ด ์ ๋ณด๋ฅผ ํต์ ํ๊ณ ๊ตํํ ์ ์๋๋ก ํ๋ ์ํํธ์จ์ด ๋ชจ๋) |
ย | ๋ณต์กํ ๋ผ์ฐํ ์ ์์กดํ์ง ์๊ณ ์ต๋ ์ฒ๋ฆฌ๋์ผ๋ก ์คํธ๋ฆฌ๋ฐํ๋ ๋ฐ ๊ฐ์ฅ ์ ํฉ, ๋ค๋จ๊ณ ํ์ดํ๋ผ์ธ์์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌ | ๋ณต์กํ ๋ผ์ฐํ , ์ ์ํ ์์ฒญ-์๋ต์ด ํ์ํ ์น ์๋ฒ์ ์ ํฉ |
- reference : https://coding-nyan.tistory.com/129
- reference : Apache Kafka in 5 minutes
https://www.youtube.com/watch?v=PzPXRmVHMxI Apache Kafka in 6 minutes https://www.youtube.com/watch?v=Ch5VhJzaoaI
https://always-kimkim.tistory.com/entry/kafka101-broker
Internal/External Structure
Example
build.gradle
1
2
3
dependencies {
implementation: "spring-kafka"
}
application.yml
1
2
3
spring:
kafka:
bootstrap-servers: localhost:9092
KafkaTopicConfig.java
1
2
3
4
5
6
7
8
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic amgifoscodeTopic() { // NewTopic : org.apache.kafka.clients.admin.NewTopic
return TopicBuilder.name("amigoscode").build();
}
}
Output
- Terminal
- https://kafka.apache.org/quickstart + additional command : https://sangchul.kr/144
1
2
3
4
5
6
7
8
9
10
11
cd kafka_2.12-3.2.1
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
// Create a topic
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
// Read the events (topic --> amigoscode)
bin/kafka-console-consumer.sh --topic amigoscode --from-beginning --bootstrap-server localhost:9092
KafkaProducerConfig.java
- Use
KafkaTemplate
for implemeting producer- KafkaProducer.send() in KafkaTemplate.send()
- reference : https://jessyt.tistory.com/142
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; public Map<String, Object> producerConfig() { Map<String, Object> properties = new HashMap<>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return properties; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfig()); } @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { return new KafkaTemplate<>(producerFactory); } // KafkaTemplate<String, Object> ~~~ }
KafkaConsumerConfig.java
Use
ConcurrentKafkaListenerContainerFactory
for implementing consumerreference : https://semtax.tistory.com/83
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
public class KafkaConsumerConfig {
@Value("localhost:9092")
private String bootstrapServers;
public Map<String, Object> consumerConfig() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
return properties;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> factory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
// ConcurrentKafkaListenerContainerFactory<String, Object> ~~~
}
KafkaApplication.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootApplication
public class KafkaexampleApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaexampleApplication.class, args);
}
@Bean
CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
return args -> {
kafkaTemplate.send("amigoscode", "hello kafka"); // topic, data(message)
};
}
}
- KafkaTemplate.send()
- it goes through different layers before the message is sent to Kafka.
KafkaListeners.java
1
2
3
4
5
6
7
@Componenet
public class KafkaListeners {
@KafkaListener(topics = "amigoscode", groupId = "groupId")
void listener(String data) {
System.out.println("Listener received: " + data );
}
}
MessageRequest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// record
public record MessageRequest(String message) {
}
=
// class
// Records provide a public constructor (with all arguments), read methods for each field (equivalent to getters) and the implementation of hashCode, equals and toString methods.
public class MessageRequest {
private String message;
public MessageRequest() {
}
public MessageRequest(String message) {
}
public String getMessage() {
return message;
}
public boolean equals(Object o) {
return true;
}
public int hashCode() {
return 0;
}
public String toString() {
return "";
}
}
MessageController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
@RequestMapping("api/vi/messages")
public class MessageController {
private KafkaTemplate<String, String> kafkaTemplate;
public MessageController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping
public void publish(@RequestBody MessageReqeust request) {
kafkaTemplate.send("amigoscode", request.getMessage());
}
}
Output
API TEST
1
2
3
4
5
6
POST http://localhost:8080/api/vi/messages
Content-Type: application/json
{
"message": "Api With Kafka"
}
Output
- reference :
Apache Kafka in 5 minutes https://www.youtube.com/watch?v=PzPXRmVHMxI
Kafka Tutorial - Spring Boot Microservices https://www.youtube.com/watch?v=SqVfCyfCJqw
Kafka Topics, Partitions and Offsets Explained https://www.youtube.com/watch?v=_q1IjK5jjyU
Kafka Stream
kafka streams api๋ฅผ ์ฌ์ฉํ์ฌ, ์ง์์ ์ผ๋ก ํ๋ฌ๋ค์ด์ค๋ ๋ฐ์ดํฐ์ ๋ํ ๋ถ์, ์ฒ๋ฆฌ๋ฅผ ์ํ client library
์ด๋ค Topic์ผ๋ก ๋ค์ด์ค๋ ๋ฐ์ดํฐ๋ฅผ Consumeํ์ฌ, streams api๋ฅผ ํตํด ์ฒ๋ฆฌ ํ ๋ค๋ฅธ Topic์ผ๋ก ์ ์ก(Producing) ํ๊ฑฐ๋ ๋๋ด๋ ํ์
Spring cloud stream์์ ์ ๊ณตํ๋ Binder๋ผ๋ ๊ตฌํ์ฒด๋ฅผ ์ค๊ฐ์ ๋๊ณ ํต์ ํ๊ธฐ ๋๋ฌธ์, ์ด๋ ํ๋์ ๋ฏธ๋ค์จ์ด์ ๊ฐ๊ฒฐํฉ ๋์ด์์ง ์์ ์ํ์์ ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ฐ๋ฐ
ย | binder | bindings(input/output) |
---|---|---|
ย | ๋ฉ์์ง broker ์ ๋ณด | ๋ฉ์ธ์ง๋ฅผ ์ ์กํ ์ฑ๋์ ๋ณด |
ย | ๋ฏธ๋ค์จ์ด์์ ํต์ ์ ๋ด๋นํ๋ ์ปดํฌ๋ํธ | ๋ฏธ๋ค์จ์ด์ ํต์ ์ ์ํ ๋ธ๋ฆฟ์ง |
ย | ๋ฏธ๋ค์จ์ด(kafka)์ producer ๋ฐ consumer์ ์ฐ๊ฒฐ, ์์ ๋ฐ ๋ผ์ฐํ ๋ฑ์ ๋ด๋น | ๋ฐ์ธ๋์ ์ /์ถ๋ ฅ์ ๋ฏธ๋ค์จ์ด(kafka)์ ์ฐ๊ฒฐํ๊ธฐ ์ํ Bridge |
Example
build.gradle
1
2
3
dependencies {
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
}
Default interface
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Sink {
String INPUT = "process-input"; // INPUT : consumer ์
์ฅ์์ subscribe ๋ฐ์ TOPIC๋ช
@Input(INPUT)
SubscribableChannel input();
}
public interface Source {
String OUTPUT = "process-output"; // OUTPUT : producer ์
์ฅ์์ publishํ TOPIC๋ช
@Output(OUTPUT)
MessageChannel output();
}
public interface Processor extends Source, Sink {
}
Custom interface
1
2
3
4
5
6
7
8
9
10
public interface ProcessMessage {
String SEND_MESSAGE = "send-message";
String RECEIVE_MESSAGE = "receive-message";
@Output(SEND_MESSAGE)
MessageChannel sendMessage();
@Input(RECEIVE_MESSAGE)
SubscribableChannel getMessage();
}
1
2
3
4
5
6
7
8
9
@ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true)
@EnableBinding(EventSource.class)
public class MessageConsumer {
@StreamListener(target = ProcessMessage.RECEIVE_MESSAGE, condition = "headers['eventType'] == 'MessageEvent'")
public void pushMessage(MessageEvent event) throws JsonProcessingException {
...
}
}
1
2
3
public class MessageEvent {
// props
}
application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
cloud:
stream:
enable:
kafka:
binder:
brokers: localhost // broker ip
# replication-factor: 2 // minimum = 2
# auto-create-topics: false
bindings: // input/output
receive-message: // channel name
destination: "${spring.cloud.stream.topic:}process-input" // topic name
group: "${spring.cloud.stream.consumer.group:}" // consumer group id
send-message: // output
destination: "${spring.cloud.stream.topic:}process-output"
Related Annotations
@EnableBinding
1
@EnableBinding(EventSource.class)
@KafkaListener
1
@KafkaListener(topics = "amigoscode", groupId = "groupId")
@ConditionalOnProperty(โฆ)
enables bean registration only if an environment property is present and has a specific value.
1
@ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true)
- condition : spring.cloud.stream.enabled:true (in application.yml)
- name (=value) : key in application.yml
- havingValue : value of key in application.yml
- matchIfMissing : whether create a bean even if not matching
@ConditionalOnMissingBean
- reference : https://saramin.github.io/2019-08-28-2/
https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/
http://www.chidoo.me/index.php/2016/11/06/building-a-messaging-system-with-kafka/
https://jaehun2841.github.io/2019/12/23/2019-12-23-kafka-streams-binder-feature/#Version-Up
https://sangchul.kr/144
Comments powered by Disqus.