Gerçek Zamanlı Veri İşleme Uygulaması-I

İbrahim Çağrı Artun
5 min readJul 1, 2020

--

Yeni bir yazı serisine başlıyoruz. Bu serimizde daha önce anlattığım Gerçek Zamanlı Veri İşleme yazımızın uygulamasını yapacağız. Gerçek Zamanlı Veri İşleme ile ilgili görüşlerim için tıklayınız.

Günümüzde büyük veriden değer yaratabilme, hız, performans, ölçeklendirme, yatayda genişleme gibi terimler çok önemli hale gelmiştir. Bu kapsamda verinin anlık olarak işlenebilmesi ilk hedeflerimizden biri olmalıdır. Bu yazı serisinin amacı da verinin gerçek zamanlı olarak işlenebilmesi ve analiz edilebilmesi için genel bir bakış açısı vermektir.

Bu uygulamalı yazı serisinde temel olarak aşağıdaki başlıklar ve teknolojiler olacaktır.

· Gerçek Zamanlı Veri İşleme Uygulaması — I

· Spring Boot ile Rest Servisler yazacağız. Sonrasında Docker Container üzerinde Cassandra DB ayağa kaldıracağız. Rest servisler ile Cassandra DB’ye kaydetme ve listeleme gibi işlemleri yapacağız.

· Kullanılan Teknolojiler : Spring Boot, Apache Cassandra, Docker

· Gerçek Zamanlı Veri İşleme Uygulaması — II

· Docker Container üzerinde Apache Kafka ‘yı ayağa kaldıracağız. Apache Kafka ile kuyruk yapımızı oluşturacağız. Spring Boot ile yazdığımız Rest Servislerimizi genişleteceğiz ve Kafka’ya verimizi göndereceğiz.

· Kullanılan Teknolojiler : Spring Boot, Apache Kafka, Docker

· Gerçek Zamanlı Veri İşleme Uygulaması — III

· Apache Flink ile anlık olarak verimizi işleyeceğiz. Apache Kafka’ya anlık olarak gelen verilerimizi Apache Flink ile anlık olarak inceleyeceğiz. Apache Flink ile yazacağımız kod, ilgili kriterlere uymayan kitleye alarm üretecek ve Cassandra DB’ye kaydedeceğiz.

· Kullanılan Teknolojiler : Spring Boot, Apache Flink, Apache Kafka, Apache Cassandra, Docker

Yol haritasında belirttiğim üzere ilk uygulamamız ile başlayalım.

İlk projenin amacı; veriler için gerçek zamanlı bir akış oluşturmak ve gerçek zamanlı olarak işlenebilmesi için gerekli altyapıyı hazırlamaktır.

Projemiz Voice verisinin gelmesi ile tetiklenmektedir.

Peki Voice verisi nedir? Voice verisini temelde bir kişinin bir arama kaydı olarak düşünebiliriz. İlgili arama kaydı dahilinde telefon numarasının tutulduğu phoneNumber, konuşma süresinin tutulduğu duration, konuşmanın başlangıç zamanının tutulduğu callStartDate ve aktifliğin tutulduğu active alanlarından oluşmaktadır.

Bir kişiye ait arama kaydı(Voice verisi) tamamlandığında, bu verilerin sistemimizi beslediğini düşünelim. Bu besleme sonrasında ;

· Bir akış oluşturacağız.

· Bu akışta tetiklenen servislerimiz üzerinden Apache Kafka ile kuyruğa verimizi göndereceğiz. Buna paralel olarak proje dahilinde beslenen verimizi veritabanına kaydedeceğiz.

· Kafka topic’lerini sürekli ve anlık olarak dinleyen Apache Flink ile yazdığımız proje vasıtası ile, bu verileri belirlediğimiz kriterlere göre analiz ederek işleyeceğiz.

· Bu işleme sonunda kriterlerimize uygun verileri Cassandra Db’ye kaydedeceğiz. (Cassandra Db’ye kaydettiğimiz veriler akış içerisinde kriterlere uyan verilerdir. Bu veriler bir nevi alarm ürettiğimiz verilerdir.)

Şimdi projemizin teknik detaylarına geçelim.

Projeyi spring initializr üzerinde oluşturdum. IDE olarak IntelliJ kullanacağım, Spring Initializr üzerinden generate ettiğim projeyi IntelliJ’de açıyorum.

Proje Yapısı

Proje temel anlamda 4 paketten oluşmaktadır;

· POJO sınıflarının olduğu Model paketi

· Rest API’lerin yazıldığı Controller paketi

· İş mantığının yazıldığı Service paketi

· Veritabanı işlemlerini yaptığımız Repository paketi

Dependencies

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

Application

@SpringBootApplication
public class Demo1RestApiApplication {
public static void main(String[] args) {
SpringApplication.run(Demo1RestApiApplication.class, args);
}
}

VoiceStream

// Lombok kutuphanesini kullanarak olusturuyoruz
//@Getter // getter metodlarini anotasyon kullanarak olusturduk
//@ToString // toString metodlarini anotasyon kullanarak olusturduk
@Data
// @ToString, @EqualsAndHashCode, @Getter on all fields, @Setter on all non-final fields, and @RequiredArgsConstructor
@EqualsAndHashCode(of = {"id"}) // equals ve hashCode motodlarini olustururken id field'ini kullanarak olusturuyoruz.
@NoArgsConstructor // hicbir parametrenin olmadigi constructor olusturuyoruz
@AllArgsConstructor // tum parametrenin olmadigi constructor olusturuyoruz
@Table(value = "voice_stream") // cassandra db de tablo olusturuyoruz
public class VoiceStream implements Serializable {
@PrimaryKey
private String id = UUID.randomUUID().toString(); // cassandra da time based UUID dir. Bu nesne olusturuldugunda UUID nin random hali bu nesneye esit olsun.

@Column(value = "phone_number")
private String phoneNumber;
@Column(value = "duration")
private Long duration;
@Column(value = "call_start_date")
private Date callStartDate;
@Column(value = "active")
private Boolean active;
}

VoiceStreamController

Http isteklerimizi yakalamamızı sağlayan endpoint’leri tanımlıyoruz.

@Slf4j // lombok kutuphanesi log anotasyonu
@RequiredArgsConstructor // lombok kutuphanesi anotasyonu ile voiceStreamService nesnesinin constructor unu olusturuyorum ve bunun uzerinden inject yapiyorum
@RestController
@RequestMapping(value = "/voiceStream")
public class VoiceStreamController {

private final VoiceStreamService voiceStreamService; // final dedim, constructor uzerinden inject islemini yapacagim.

@GetMapping("/{id}")
public ResponseEntity<VoiceStream> findVoiceStreamById(@PathVariable String id) {
log.info("Incoming parameter id : " + id);
return ResponseEntity.ok(voiceStreamService.findVoiceStreamByIdService(id));
}
@PostMapping(value = "/findAllVoiceStream")
public ResponseEntity<List<VoiceStream>> findAllVoiceStream() {
return ResponseEntity.ok(voiceStreamService.findAllVoiceStreamService());
}
@PostMapping(value = "/saveVoiceStream")
public ResponseEntity<VoiceStream> saveVoiceStream(@RequestBody VoiceStream voiceStream) {
return ResponseEntity.ok(voiceStreamService.saveVoiceStreamService(voiceStream));
}
}

VoiceStreamService

@RequiredArgsConstructor // voiceStreamRepository nesnesinin constructor unu olusturuyorum ve bunun uzerinden inject yapiyorum
@Service
public class VoiceStreamService {

private final VoiceStreamRepository voiceStreamRepository; // final dedim, constructor uzerinden inject islemini yapacagim.

public VoiceStream findVoiceStreamByIdService(String id){
return voiceStreamRepository.findById(id).orElseThrow(() -> new IllegalArgumentException());
}

public List<VoiceStream> findAllVoiceStreamService(){
return voiceStreamRepository.findAll();
}

public VoiceStream saveVoiceStreamService(VoiceStream voiceStream){
// cassandra kayit islemi
return voiceStreamRepository.save(voiceStream);
}
}

VoiceStreamRepository

@Repository
public interface VoiceStreamRepository extends CassandraRepository<VoiceStream, String> {
}

Şimdi Docker üzerinde Cassandra kurulumu yapalım. Official Sitesi üzerinden ilgili komutlar ile ilerleyebiliriz.

Komut satırı üzerinden “docker pull cassandra” komutunu çalıştırarak Apache Cassandra imajını oluştururuz.

docker run -p 9042:9042 --name my-cassandra -d cassandra:latest

my-cassandra isminde container ayağa kaldırdık. Varsayılan olarak 9042 portunu kullanır. Bu portu local’de erişilebilir hale getirdik ve latest diyerek son versiyonu ile çalışmasını istedik.

IntelliJ üzerinden aşağıdaki gibi Cassandra Db ekleyebilir ve kontrolleri sağlayabiliriz.

Sonrasında aşağıdaki gibi yeni bir Console açarak Cassandra Db için sorguları yazabiliriz.

Console üzerinde aşağıdaki komutu çalıştırarak “sample” isminde bir keyspace oluşturuyoruz.

CREATE KEYSPACE sample
WITH replication = {‘class’:’SimpleStrategy’, ‘replication_factor’ : 3};

Application.properties

Son olarak projemizin application.properties dosyasını güncelleyerek son adımı tamamlıyoruz.

server.port=9210
#cassandra properies
spring.data.cassandra.keyspace-name=sample
spring.data.cassandra.contact-points=127.0.0.1
spring.data.cassandra.port=9042
spring.data.cassandra.username=cassandra
spring.data.cassandra.password=cassandra
spring.data.cassandra.schema-action=create_if_not_exists
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration

Sonrasında projemizi çalıştırıyoruz.

Proje ayağa kalkarken sample keyspace’i altında voice_stream adında bir tablonun oluştuğunu görebilirsiniz.

Bu aşamada test amaçlı aşağıdaki gibi yeni bir voice ekliyoruz.

Bunun üzerinde sorgulama yaptığımızda yeni kayıtın eklendiğini görüyoruz.

Eklenen tüm kayıtları görüntüleyelim.

Böylece gerçek zamanlı veri akışına hazırlık yapmış olduk. Bu noktaya kadar servislerimizi yazdık ve test ettik. Devam niteliğindeki uygulamalı yazımızda veriyi gerçek zamanlı olarak akıtacağız.

Sonraki yazıda görüşmek üzere.

Sağlıcakla..

--

--

No responses yet