Gerçek Zamanlı Veri İşleme Uygulaması-III

İbrahim Çağrı Artun
5 min readJul 8, 2020

--

Merhaba,

Sıra, önceki yazımızda akış dahilinde gelen verileri işlemeye geldi. Verileri Apache’nin açık kaynak çözümü olan Flink teknoloji vasıtası ile işleyeceğiz.

Apache Flink ile verileri belirlediğimiz kriterlere göre filtreleyebilir, birden fazla akışı birleştirebilir, gruplayabilir ve birçok kompleks işlemi gerçekleştirebiliriz. Biz de bu yazımızda akan data üzerinden işlemlerimizi yapacağız. Uygulamamız sürekli ayakta olacak ve Kafka topic’leri sürekli dinleyecek. Topic’lere verinin gelmesi ile uygulamamız tetiklenecek ve belirlediğimiz kriterlere uygun ve gerçek zamanlı olarak işleyeceğiz.

Hadi uygulamaya geçelim.

Bu projenin amacı; bir akış dahilinde gelen verileri gerçek zamanlı olarak işlemek ve işlediğimiz verileri kalıcı olarak tutmaktır.

Projemiz Kafka topic’lerin beslenmesi ile tetiklenecektir. Kafka topic’leri anlık olarak sürekli dinleyeceğiz ve kriterlerimize uygun olanlar için veritabanına kayıt ekleyecek, bir nevi alarm üretmiş olacağız.

Şimdi projemizin teknik detaylarına geçelim.

Projeyi spring initializr üzerinde oluşturdum. IDE olarak IntelliJ kullanacağım, Spring Initializr üzerinden generate ettiğim projeyi IntelliJ’de açıyorum.

Proje Yapısı

Proje temel anlamda 3 paketten oluşmaktadır;

· POJO sınıflarımızın olduğu Model paketi

· Rest API’lerin yazıldığı Controller paketi

· İş mantığının yazıldığı Service paketi

Dependencies

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

Application

@SpringBootApplication
public class Demo2FlinkApiApplication {

public static void main(String[] args) {
SpringApplication.run(Demo2FlinkApiApplication.class, args);
}

}

VoiceStream

/**
*
* Created by I.C.ARTUN
*/

@Data
public class VoiceStream implements Serializable {

private String id;
private String phoneNumber;
private Long duration;
private Date callStartDate;
private Boolean active;

public VoiceStream(String phoneNumber, Long duration, Date callStartDate, Boolean active) {
this.phoneNumber = phoneNumber;
this.duration = duration;
this.callStartDate = callStartDate;
this.active = active;
}
}

VoiceStreamResult

// Lombok kutuphanesini kullanarak olusturuyoruz
@Data
@EqualsAndHashCode(of = {"id"}) // equals ve hashCode motodlarini olustururken id field'ini kullanarak olusturuyoruz.
@NoArgsConstructor // hicbir parametrenin olmadigi constructor olusturuyoruz
@AllArgsConstructor // tum parametrenin olmadigi constructor olusturuyoruz
@Table(value = "voice_stream_result") // cassandra db de tablo olusturuyoruz
public class VoiceStreamResult implements Serializable {

@PrimaryKey
private String id = UUID.randomUUID().toString(); // cassandra da time based UUID dir. Bu nesne olusturuldugunda UUID nin random hali bu nesneye esit olsun.

@Column(value = "phone_number")
private String phoneNumber;

@Column(value = "duration")
private Long duration;

@Column(value = "call_start_date")
private Date callStartDate;

@Column(value = "created_at")
private Date createdAt;
}

FlinkController

@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping(value = "/flink")
public class FlinkController {

private final FlinkService flinkService;

@GetMapping(value = "/processTopicAndCassandra")
public void processTopicAndCassandra() throws Exception {
flinkService.processTopicAndCassandraService();
}
}

FlinkService

import com.artun.demo2FlinkAPI.model.VoiceStream;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.Properties;
import java.util.UUID;

/**
*
* Created by I.C.ARTUN
*/

@Slf4j
@Service
public class FlinkService {

public void processTopicAndCassandraService() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("group.id", "voice-group");
properties.setProperty("auto.offset.reset", "latest");

DataStream<String> streamVoice = env
.addSource(new FlinkKafkaConsumer<>("voice-topic", new SimpleStringSchema(), properties));

DataStream<Tuple5<String, String, Long, Date, Date>> result = streamVoice
.map(new MapToVoiceStream())
.keyBy(new KeySelector<VoiceStream, String>() {
@Override
public String getKey(VoiceStream voiceStream) throws Exception {
return voiceStream.getPhoneNumber();
}
})
.timeWindow(Time.seconds(1))
.apply(new WindowFunction<VoiceStream, Tuple5<String, String, Long, Date, Date>, String, TimeWindow>() {
@Override
public void apply(String phoneNumber, TimeWindow timeWindow, Iterable<VoiceStream> iterable, Collector<Tuple5<String, String, Long, Date, Date>> collector) throws Exception {

for (VoiceStream voiceStream : iterable) {

if (voiceStream.getActive() && voiceStream.getDuration() >= 50L) {
collector.collect(new Tuple5<>(UUID.randomUUID().toString(), voiceStream.getPhoneNumber(), voiceStream.getDuration(), voiceStream.getCallStartDate(), new Date()));
}
}
}
})
.setParallelism(5);

CassandraSink.addSink(result)
.setQuery("INSERT INTO sample.voice_stream_result(id, phone_number, duration, call_start_date, created_at) values (?, ?, ?, ?, ?);")
.setHost("127.0.0.1")
.build();

env.execute();
}

public static class MapToVoiceStream implements MapFunction<String, VoiceStream> {

private static final ObjectMapper mapper = new ObjectMapper();

@Override
public VoiceStream map(String s) throws Exception {

JsonNode voiceStreamDataJson = mapper.readTree(s);
JsonNode phoneNumber = voiceStreamDataJson.get("phoneNumber");
JsonNode duration = voiceStreamDataJson.get("duration");
JsonNode callStartDate = voiceStreamDataJson.get("callStartDate");
JsonNode active = voiceStreamDataJson.get("active");

return new VoiceStream(phoneNumber.textValue(), duration.asLong(), new Date(callStartDate.longValue() * 1000), active.asBoolean());
}
}
}

Application.properties

server.port=9220

#cassandra properies
spring.data.cassandra.keyspace-name=sample
spring.data.cassandra.contact-points=127.0.0.1
spring.data.cassandra.port=9042
spring.data.cassandra.username=cassandra
spring.data.cassandra.password=cassandra
spring.data.cassandra.schema-action=create_if_not_exists

spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration

Şimdi projemizi çalıştırıyoruz.

Proje ayağa kalkarken sample keyspace’in altında voice_stream_result adında bir tablonun oluştuğunu görebilirsiniz.

IntelliJ ‘deki Cassandra console üzerinden de bunu görebilir ve sorgulama yapabiliriz.

Hadi şu ana kadar yaptıklarımızı baştan sona test edelim.

İlk olarak serimizin ilk iki yazısında hazırladığımız demo1RestAPI isimli proje ve demo2FlinkAPI isimli projeleri çalıştıralım.

Ardından Flink servisini çalıştıralım. Aşağıdaki gibi Flink servisine GET isteği gönderdiğimizde demo2FlinkAPI projemiz Kafka topic’lerini sürekli dinleyecek ve akış içerisinde ilgili kriter kontrollerini sağlamak için tetikte bekleyecektir.

Şimdi rest servislerimizi tetikleyeceğiz. Tetikleme sonunda verilerin Kafka topic’lere dolmasını ve Cassandra Db’ye kayıt edilmesini sağlayacağız. Sonrasında sürekli ayakta olan ve Kafka topic’leri dinleyen Flink projemiz, verilerin ilgili kriterleri olup olmadığı kontrolünü sağlayacak, bir nevi işleyecektir. Flink projem gelen verilerdeki duration parametresinin 50 den büyük eşit ve active parametresinin true olması kontrolünü yapacak; veriler bu kriterlere uyar ise bir sonraki aşamaya geçecektir. Bir sonraki aşama olarak da kriterlerimize uyan verileri Cassandra Db’deki farklı bir tabloya kaydedeceğiz.

Hadi rest servisleri tetikleyerek akışımızı başlatalım.

Örnek olarak 3 adet rest servis tetikleyeceğim. Aşağıda resimlerde de görüldüğü gibi.

  1. Servisin duration parametresi 60 ve active parametresi true olarak gelmekte. Sonuç olarak ise alarm üreteceğiz.

2. Servisin duration parametresi 30 ve active parametresi true olarak gelmekte. Sonuç olarak ise alarm üretmeyeceğiz.

3. Servisin duration parametresi 60 ve active parametresi false olarak gelmekte. Sonuç olarak ise alarm üretmeyeceğiz.

Bu 3 servisin tetiklenmesi sonucunda voice-stream isimli tablomuzda 3 adet kayıt olacaktır. Aynı şekilde Kafka topic’te de yine 3 adet kayıt olacaktır. Fakat bu 3 adet kaydın sadece 1 tanesi belirlediğimiz kriterlere uyduğu için voice-stream-result isimli tablomuzda 1 adet kayıt göreceğiz.

Voice-stream tablo görüntüsü;

Voice-stream-result tablo görüntüsü;

Testimizi yaptık ve projenin doğru çalıştığını görmüş olduk.

Büyük uygulamalara temel oluşturacak küçük parçalar ile gerçek zamanlı veri inceleme ve işleme mantığını uygulamalı olarak anlatmaya çalıştım. Yararlı olmuştur umarım.

Github : https://github.com/icaloji/demo2FlinkAPI

Tekrar görüşmek üzere.

Sağlıcakla..

--

--

No responses yet