Gerçek Zamanlı Veri İşleme Uygulaması-II

İbrahim Çağrı Artun
4 min readJul 3, 2020

--

Merhaba,

Birinci yazımızda Spring Boot ile Rest Servis örneği yapmış ve bu servisler üzerinden Cassandra Db’ye kayıt eklemiştik. Şimdi sırada veriyi Apache Kafka kullanarak kuyruk yapısına ekleme işlemi var.

Apache Kafka kullanarak kaynak sistemden hedef sisteme veri transferi yapacağız. Bu kapsamda rest servisin tetiklenmesi ile başlayan akış iş mantığının çalışması sonrasında dallanarak Cassandra Db’ye kaydedilecek, paralelde de gerçek zamanlı akış içerisinde Apache Kafka üzerinden kuyruk sistemine girecek ve işlenmek üzere diğer aşamaya geçecektir.

Hadi uygulamaya geçelim.

Projemizin yapısı aşağıdaki gibi olacaktır.

İlk bölümde yazılan demo1RestAPI uygulamamızı daha da geliştireceğiz. Apache Kafka dependecy’mizi ekleyerek başlayalım.

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

Apache Kafka konfigürasyonunu yapıyoruz.

KafkaConfiguration

package com.artun.demo1RestAPI.config;

import com.artun.demo1RestAPI.model.VoiceStream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfiguration {

@Value("${app.kafka.address}")
private String kafkaAddress;

@Value("${app.kafka.group.id}")
private String groupId;

@Bean
public KafkaTemplate<String, VoiceStream> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress); // kafka adresimizi belirtiyoruz
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // kafkaya gonderilen mesajlari key value seklinde gonderiyorum. Key, string olacak.
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // value ise java objesi olarak gonderiyoruz. Bu objeyi json a cevirip oyle gonderiyorum.
return new DefaultKafkaProducerFactory(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, VoiceStream> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, VoiceStream> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
public ConsumerFactory<String, VoiceStream> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, VoiceStream.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}

Producer

package com.artun.demo1RestAPI.engine;

import com.artun.demo1RestAPI.model.VoiceStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Slf4j
@RequiredArgsConstructor
@Service
public class Producer {

@Value("${app.kafka.topic.producer}")
private String TOPIC;

private final KafkaTemplate<String, VoiceStream> kafkaTemplate;

public void sendMessage(VoiceStream voiceStreamParameter) {
log.info(String.format("#### -> Producing message -> %s", voiceStreamParameter));
this.kafkaTemplate.send(TOPIC, voiceStreamParameter).addCallback(new ListenableFutureCallback<SendResult<String, VoiceStream>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("Error while producing : " + throwable.getMessage());
}

@Override
public void onSuccess(SendResult<String, VoiceStream> stringVoiceStreamSendResult) {
log.info("Received new metadata. \n" +
"Topic : " + stringVoiceStreamSendResult.getRecordMetadata().topic() + "\n" +
"Partition : " + stringVoiceStreamSendResult.getRecordMetadata().partition() + "\n" +
"Offset : " + stringVoiceStreamSendResult.getRecordMetadata().offset() + "\n" +
"Timestamp :" + stringVoiceStreamSendResult.getRecordMetadata().timestamp());
}
});
}
}

VoiceStreamService

VoiceStreamService isimli sınıfı aşağıdaki gibi değiştiriyoruz.

Burada yeni bir voice eklemek istediğimizde önce veriyi Apache Kafka’ya gönderiyoruz ve akışı devam ettiriyoruz. Paralelde de Cassandra Db’ye ekliyoruz.

package com.artun.demo1RestAPI.service;

import com.artun.demo1RestAPI.engine.Producer;
import com.artun.demo1RestAPI.model.VoiceStream;
import com.artun.demo1RestAPI.repository.VoiceStreamRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

import java.util.List;

@RequiredArgsConstructor // voiceStreamRepository nesnesinin constructor unu olusturuyorum ve bunun uzerinden inject yapiyorum
@Service
public class VoiceStreamService {

private final VoiceStreamRepository voiceStreamRepository; // final dedim, constructor uzerinden inject islemini yapacagim.
private final Producer producer;

public VoiceStream findVoiceStreamByIdService(String id){
return voiceStreamRepository.findById(id).orElseThrow(() -> new IllegalArgumentException());
}

public List<VoiceStream> findAllVoiceStreamService(){
return voiceStreamRepository.findAll();
}

public VoiceStream saveVoiceStreamService(VoiceStream voiceStream){
// send to kafka topic
producer.sendMessage(voiceStream);
// cassandra kayit islemi
return voiceStreamRepository.save(voiceStream);
}
}

Application.properties

Properties dosyasına Apache Kafka’nın adres, topic ismi gibi özelliklerini ekliyoruz.

server.port=9210

#cassandra properies
spring.data.cassandra.keyspace-name=sample
spring.data.cassandra.contact-points=127.0.0.1
spring.data.cassandra.port=9042
spring.data.cassandra.username=cassandra
spring.data.cassandra.password=cassandra
spring.data.cassandra.schema-action=create_if_not_exists

spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration

app.kafka.address=127.0.0.1:9092
app.kafka.group.id= voice-group
app.kafka.topic.producer = voice-topic

docker-compose.yml

version: '3.1'
services:
kafka:
image: spotify/kafka
ports:
- "2181:2181"
- "9092:9092"
hostname: localhost
container_name: kafka
environment:
- "ADVERTISED_HOST=localhost"
- "ADVERTISED_PORT=9092"
- "AUTO_CREATE_TOPICS=true"
command: >
bash -c
"(sleep 15s &&
/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh
--create
--zookeeper
localhost:2181 --replication-factor 1 --partitions 1
--topic voice-topic &) && (supervisord -n)"

Docker-compose dosyasını kullanarak, aşağıdaki komut ile Apache Kafka’yı Docker Container üzerinde ayağa kaldırıyoruz.

docker-compose -f docker-compose.yml up -d

(-f ile docker-compose dosyanın path’i verilmelidir.)

Projemizi çalıştırıyoruz.

Projemiz ayağa kalkarken docker-compose dosyasında da yer aldığı gibi voice-topic isimli topic oluşturdu. Apache Kafka’ya özel listeleme komutları vasıtası ile yada Kafka Tool gibi toollar vasıtası ile bunu görebiliriz.

Hadi şuana kadar yaptıklarımızı baştan sona test edelim.

Rest servisimizi aşağıdaki gibi tetikliyoruz. Voice verimizin(Arama Kaydının) sistemimize geldiğini düşünelim.

Tetiklenen yapı sonrasında Cassandra Db’de kaydımızın eklendiğini görüyoruz.

Sonrasında datanın Kafka topic ‘e de geldiğini kontrol edelim.

Komut satırı üzerinden aşağıdaki komutları çalıştırarak Kafka topic’e gönderilen veriyi görebiliriz.

docker exec -it kafka bash$KAFKA_HOME/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server kafka:9092 --topic=voice-topic

Rest servisin tetiklenmesi sonrası akışımızın gerçek zamanlı topic’e beslendiğini gördük.

Böylece veriyi gerçek zamanlı olarak akıtmış olduk. Bu noktaya kadar veri akışını yazdık ve test ettik. Devam niteliğindeki uygulamalı yazımızda veriyi gerçek zamanlı olarak işleyeceğiz.

Github : https://github.com/icaloji/demo1RestAPI

Sonraki yazıda görüşmek üzere.

Sağlıcakla..

--

--

No responses yet