RxJava Nedir?

Hasan Kadir Demircan
5 min readFeb 17, 2020

--

13 Şubat 2020 tarihinde iyzico’da gerçekleşen Hüseyin Akdoğan hocamızın güzel anlatımıyla ilgimi daha çok çeken RxJava konusunu biraz daha araştırıp, bu konu hakkında blog yazmak istedim :)

RxJava Nedir?

RxJava, ReactiveX’in Java VM uygulaması, gözlemlenebilir sekanslar kullanarak asenkron ve olay tabanlı programlar oluşturmak için bir kütüphanedir. Asenkron akışları non-blocking backpressure ile işleyen Reaktif Akış spesifikasyonunun programlama modelini uygular.

ReactiveX Nedir?

  • Observable değişkenlerini kullanarak asenkron ve olay tabanlı programlar oluşturmamızı sağlayan programlama paradigmasıdır.
  • Kod akışını takip etmek yerine olayları(events) takip ederler.
  • Aynı anda birden çok olayı takip ederek aksiyon alabilmektedir.(multi threading).
  • Observer design pattern yapısını kullanmaktadır.

Asenkron Nedir?

  • Aynı zaman diliminde işletilen iş parçacıklarıdır.
  • Bir iş diğer işi bloke etmez.
  • Bir iş diğer işe bağımlı değildir.

Olay Tabanlı (Event-based) Nedir?

  • Geleneksel yazılım geliştirme yaklaşımlarında, uygulama kodun başlangıcıyla çalışır ve uygulamanın ne zaman ne yapacağına yazılan kod karar verilir.
  • İşlenebilecek olayların belirlenip, bu olaylar karşısında nasıl cevap verileceğinin kodlanmasıdır.
  • 201Kodun başlangıcıyla başlayıp bitmesi yerine, belirli bir olay olduğunda çalışır, işi bitirir ve sonucu iletir.

RxJava Örnek Kodlar ve Yapı Taşları

Burada örneklerden önce yapı taşları hakkında açıklama yapıp meetup’da yapılan örnek kodlamalar şeklinde gideceğiz.

Observable: Observer(gözlemcilere) veri veren(öğe yayan) kaynaktır.

Observer: Observable(gözlemcileri) dinleyen kaynaktır.

Örnek kodumuza geçmeden önce içerisinde kullanılan fonksiyonları açıklayalım,

subscribe, Observable’a bir abone, abone olduğunda gerçekleştirilecek eylemi tanımlayan bir arabirimdir. Abone olma yöntemi yalnızca bir Observer Observable’e abone olduğunda çalışır.

onNext, Observable’ın gönderdiği her element tek tek onNext metoduna düşer. Bir sonraki öğeyi yaymak için kullanılır.

onComplete, Observable’ın gönderdiği son öğe gönderildikten sonra çağrılır.

Kullanacağımız fonksiyonlar hakkında bilgi verecek olursak;

just, Sadece basitçe diziyi ya da yinelenebilir ya da sahip olduğunuz ya da ne varsa, değişmeden , tek bir öğe olarak o öğeyi yayan bir Observable’a dönüştürür.

fromCallable, Sıfırdan bir Observable oluşturur ancak yalnızca bir öğe yayabileceği anlamına gelir.

range, , aralığın başlangıcını ve uzunluğunu seçtiğiniz sırayla bir dizi sıralı tamsayı yayar.

interval, emisyonlar arasında seçtiğiniz sabit bir zaman aralığı ile sonsuz bir artan tamsayı dizisi yayan bir Observable döndürür.

FactoryMethods.java

Çalıştırdığımızda ekran çıktımız;

FactoryMethods.java — Output

Single, Maybe ve Completable Kavramları

Single, yalnızca bir değeri ileten ya da hata veren Observable’dır. Bir kere tetikleneceği için api çağırımlarında kullanabilirsiniz.

Yalnızca bir değeri iletmek için kullanıldığından onNext() ve onComplete(), onSuccess() ile birleştirilmiştir.

SingleObservable.java

Çalıştırdığımızda ekran çıktımız;

SingleObservable.java — Output

Maybe, Single ve Completable birleşimi gibi düşünebiliriz.

Yalnızca bir değeri iletmek için kullanılır. Fakat kesinlikle bir sonuç dönmek zorunda değildir. Hata alırsa onError metoduna düşer. Değer dönerse onSuccess methoduna düşer.

Bir datanın olup olmadığını kontrol etmek için kullanabiliriz.

MaybeObservable.java
MaybeObservable.java — Output

Completable, yalnızca görevin başarılı şekilde tamamlanmış onSuccess() veya bir hata oluşması durumunda onError() olayları ile ilgilenir

Ek olarak bir değer dönmesini beklemeyiz.

Örnek olarak gelen veriyi .txt dosyasına yazıp yazmadığını kontrol etmek isteyebiliriz.

CompletableObservable.java
CompletableObservable.java — Output

Operatörler

RxJava kütüphanesindeki operatörlere ve kullanımlarına java8 ile beraber aşine olmuş haldeyiz.

Observable tarafından yayılan datalarda çeşitli şekillerde işleyebileceğimiz işlemler yapabiliriz.

map, Observable kaynağı tarafından yayılan her öğeye seçtiğiniz bir işlevi uygular ve bu işlev uygulamalarının sonuçlarını yayan bir Observable öğesini döndürür.

repeat, Bir öğeyi tekrar tekrar yayar. Bu operatörün bazı uygulamaları bir dizi öğeyi tekrarlamanıza ve bazıları da tekrar sayısını sınırlamanıza izin verir.

distinct, Yalnızca daha önce yayınlanmamış öğelere izin vererek bir Observable filtreyi filtreler.

flatMap, Bir Observable’ın sonucunda oluşan değer tekrar başka bir observable’a bağlanıyorsa yararlı bir çözümdür.

Yani birden çok Observable işlemini tek bir değere indirgemiş olmaktadır.

flatMap’in diğer bir özelliği ise veriyi asenkron şekilde yaymaktadır.

Yani 100 ayrı akışta Observable’a sahip olur ve concurrency’e de böylelikle sahip oluruz.

Tüm işlemler ayrı ayrı işlenmiş olur.

zip, Birden çok Observable’ı birleştirmek için kullanırız.

En önemli özelliği ise her Observable’ın aynı indisli elemanlar birbirini beklemektedir.

Örnek olarak,

İki tane Observable’ımız olsun bunlardan birinde isimler, diğerinde soyisimler bulunsun. Aynı indisli elemanlan birbirini bekleyeceği için isim soyisim şeklinde yazdırabiliriz.

Not olarak şunu belirtmeliyim ki, eleman sayısı daha az olana göre işlem yapılmaktadır. Yani bir Observable’da 10 diğer Observable’da 9 eleman varsa, 9 elemana göre işlem yapılır.

zip için bir başka örnek vermek gerekirse,

Üç farklı servisimiz olduğunu düşünelim,

Bir tanesi hava durumunu getiren servis,

Bir tanesi otelleri getiren servis,

Bir tanesi şehirleri getiren servis,

Biz hem sıcaklık 20 derece üzeri, otel yıldızı 4 ve üzeri hemde İstanbul’u istiyoruz.

Burada hangi servisten önce cevap dönecek gibi bir sıkıntı oluyor. Oysa ki zip operatörü ile aynı anda çağrılan servisler için yeni thread açılır ve aynı anda başlar. Bir servis diğer servisi bekler ve 3 servisten cevap gelince bizim verdiğimiz bilgilerle eşleşenleri bize döner.

Operators.java
Operators.java — Output

Schedulers

Son olarak RxJava’da bulunan Schedulers türlerinden bahsetmek istiyorum.

Farklı thread’lerde Observable işlemlerini yapmaktan sorumludurlar.

Observable değişkeninizde Schedulers’ları observeOn ya da subscribeOn ile kullanabilirsiniz.

.subscribeOn ile Observable’daki verinin hangi thread’de işlem göreceğini belirleyebilirsiniz.

.observeOn ile ise Observable’da işlem gören verinin dinleyici konumunda olan Observer’a gönderildikten sonra hangi thread’de yayınlanacağını belirleyebilirsiniz.

IO

  • En çok kullanılan scheduler türlerinden biridir.
  • Genellikle ağ istekleri, dosya işlemleri, api istekleri gibi işlemlerde kullanılır.
  • Thread-pool özelliğine sahiptir. Böylelikle ihtiyaca göre thread oluşturabilir. Bu avantaj bazen sorunda olabilir çünkü sınırsızdır ve performans anlamında ciddi bir etkiye sahip olabilir.
  • Aynı thread başka işlemler içinde kullanılabilir.
  • Eğer 60 saniye boyunca kullanılmıyorsa thread silinir.
  • Kullanım şekli;
observable.subscribeOn(Schedulers.io())

Computation

  • Thread-pool özelliği olduğu için IO ‘a oldukça benzerdir.
  • Kullanılabilecek thread sayısı sistemde mevcut olan çekirdek sayısı kadardır. (IO’da olduğu gibi sınırsız değildir.)
  • Örneğin telefonunuzda 2 çekirdek varsa thread-pool’da da 2 thread olacaktır.
  • Aynı zamanda bu iki thread meşgulse işlemin kullanılabilir olmalarını beklemesi gerekmektedir.
  • Bu sınırlama, IO’ya göre dezavantaj oluşturmasının yanında, küçük hesaplamalar yapmak için uygundur ve genellikle işlemi hızlıdır.
  • Kullanım şekli,

observable.subscribeOn(Schedulers.computation())

NewThread

  • Observable için yeni bir thread oluşturur.
  • mainThread’den diğer thread’e zaman harcayan işlemler için kullanılır.(Yani uzun sürecek işlemleri arkaplanda yapmak için.)
  • Bu thread’i oluşturmak maliyetli bir işlemdir.
  • Kullanım şekli,
observable.subscribeOn(Schedulers.newThread())

Single

  • Tek bir thread destekler ve oldukça basittir.
  • Kaç tane Observable olursa olsun, yalnızca bir thread’de çalışır.
  • mainThread yerinede kullanılabilir.
  • Kullanım şekli,
observable.subscribeOn(Schedulers.single())

Trampoline

  • Kodu geçerli thread’de çalıştırır. (Yeni bir thread oluşturmaz.)
  • mainThread’de çalışan bir kod varsa trampoline mainThread kuyruğuna eklenir.
  • Birden fazla Observable olduğunda ve işlemlerin sırayla yürütülmelerini istediğimizde kullanışlı olur.

Örnek kod,

Observable.just(1,2,3,4)
.subscribeOn(Schedulers.trampoline())
.subscribe(onNext);
Observable.just( 5,6, 7,8, 9)
.subscribeOn(Schedulers.trampoline())
.subscribe(onNext); Output:
Number = 1
Number = 2
Number = 3
Number = 4
Number = 5
Number = 6
Number = 7
Number = 8
Number = 9
  • Kullanım şekli,
observable.subscribeOn(Schedulers.trampoline())

Executor

  • Custom(özel) bir IO Scheduler’dır.
  • Thread-pool boyutunu biz belirleyip kendimize thread-pool oluşturmamızı sağlar.
  • Observable sayısının IO Scheduler için büyük olabileceği senaryolarda kullanılabilir.
  • Kullanım şekli,
//Executer ile thread sayısı belirtme
ExecutorService executor = Executors.newFixedThreadPool(10);
//Java içerisinde kullanım
Scheduler scheduler = Schedulers.from(executor);
Observable.range(1,1000)
.flatMap(i -> Observable.just(i)
.subscribeOn(scheduler)
).subscribe(System.out::println);

Android Scheduler

  • rxAndriod kütüphanesi tarafından sağlanır.
  • IO modifikasyonu yapılabilmesi için yani arkaplanda yapılani için ui mainThread’de geri getirmek için kullanılır.
  • Genellikle observeOn yöntemi kullanılır.
  • Kullanım şekli,
observable.observeOn(AndroidSchedulers.io())

Anlatacaklarım bukadardı :)

Başka bir blogda görüşmek üzere:)

--

--

Hasan Kadir Demircan
Hasan Kadir Demircan

No responses yet