JuHyang
RxKotlin 본문
Reactive 코틀린 #1 - 개념 및 설치
Reactive program 옵저버 패턴을 이용하여 구독자에게 변경사항을 알려주는 programming 기법입니다.
내가 어떤 데이터가 필요하여 요청하고, 그 데이터를 얻어와 가공하는게 아니라, 데이터 관리 주체쪽에 데이터가 변경 시 요청을 받겠다는 구독 신청을 해놓고, 변경사항(event)이 발생하면 이를 전달 받습니다.
보통 프로그래밍을 하는 방법과는 반대되는 설정이죠.
따라서 reactive programming은 하나의 갑슬 반환하기 보다는 data stream을 반환합니다.
ReactiveX의 개념은 iteraotr와 비교되는데, 좀더 명확하고 자세한 설명이 필요하다면 아래 페이지를 참고하시면 됩니다.
Rx의 개념은 여러 언어로 개발되고 있습니다.
여기서는 RxKotlin에 대한 사용법을 설명하고자 합니다.
설치방법 링크
RxKotlin 설치
-
RxKotlin은 RxJava에 Kotlin의 extension function을 이용하여 좀더 사용하기 편리하도록 만든 경량 library입니다.
-
Maven :
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxkotlin</artifactId>
<version>2.4.0</version>
</dependency>
-
Gradle :
implementation(“io.reactivex.rxjava2:rxkotlin:2.4.0”)
-
아래 링크에서 RxJava, RxKotlin, RxAndroid의 최신 버전을 확인할 수 있습니다.
Reactive 코틀린 #2 - Observable
Observable은 Consumer가 소비하는 값을 생성하는 역할을 합니다.
Consumer들은 Observable에 구독을 신청하고, Observable은 값을 생성한 후에 Consumer들에게 push 방식으로 값을 전달합니다.
Observable의 주요 이벤트
-
Observable은 주요하게 아래 event를 발생시킵니다.
-
onNext(item T) : 값을 전달할 때 호출하여 값을 넘겨줌
-
onError(e : Throwable) : 에러가 발생하면 호출 함
-
onSubscribed(d : Diposable) : 구독을 신청하면 호출
-
이때 넘어오는 Diposable 객체는 Observer가 구독을 해제할 때 사용함.
-
OnComplete() : 가지고 있는 값을 모두 전달하면 호출 함.
-
따라서 위 callback 함수들을 구현하여 Observable에 등록하면 Observable이 전달하는 이벤트를 받을 수 있습니다.
-
이 이벤트는 Observable interface에 정의되어 있으므로 Observable interface를 구현한 객체를 Observable에 구독신청할 수 있습니다.
val observer : Observer<String> = object : Observer<String> {
override fun onComplete () {
println (“onComplete()”)
}
override fun onNext (item : String) {
println (“onNext() - $item”)
}
override fun onError (e : Throwable) {
println (“onError() - ${e.message}”)
}
override fun onSubscribe (d : Diposable) {
println (“onSubscribe() - $d “)
}
}
Observable의 생성
-
Observable을 생성하는 방법에는 여러가지가 존재합니다.
create
-
Observable.create를 이용하여 아래와 같이 생성할 수 있습니다.
fun main (args : Array<String>) {
val observable1 = Observable.create {
it.onNext(1)
it.onNext(2)
it.onComplete()
}
val observable2 = Observable.create {
it.onNext(1)
it.onNext(2)
it.onError(Exception (“WoW ! exception “))
}
val observer : Observer = object : Observer {
override fun onComplete () = println (“onComplete()”)
override fun onNext (item : Int) = println (“onNext() - $item”)
override fun onError (e : Throwable) = println (“onError() - ${e.message}”)
override fun onSubscribe (d : Diposablee) = println (“onSubscribe() - $d”)
}
observable1.subscribe(observer)
observable2.subscribe(observer)
}
-
결과는 다음과 같습니다.
onSubscribe() - null
onNext() - 1
onNext() - 2
onComplete()
onSubscribe() - null
onNext() - 1
onNext() - 2
onError() - Wow!! exception
fromxxx
-
from을 이용하면 기존 구조체로 부터 Observable을 생성할 수 있습니다.
fun main (args : Array<String>) {
val list = listOf(1, 2, 3)
val listOb = Observable.fromIterable(list)
val call = Callable<Int> { 4 }
val callOb = Observable.fromCallable(call)
val future = object : Future<Int> {
override fun get () = 5
override fun get(timeout : Long, unit : TimeUnit) = 6
override fun isDone () = true
override fun cancel (mayInterruptIfRunning : Boolean) = false
override fun isCancelled () = false
}
val futreOb = Observable.fromFuture(future)
val observer : Observer<Int> = object : Observer<Int> {
override fun onComplete () = println (“onComplete()”)
override fun onNext (item : Int) = println (“onNext() - $item”)
override fun onError (e : Throwable) = println (“onError() - ${e.message}”)
override fun onSubscribe (d : Diposable) = println (“onSubscribe - $d”)
}
listOb.subscribe(observer)
callOb.subscribe(observer)
futerOb.subscribe(observer)
}
-
결과
onSubscribe() - io.reactivex.internal.operaotr.observable.ObservableFromIterable$~~
onNext() - 1
onNext() - 2
onNext() - 3
onComplete()
onSubscribe - 0
onNext() - 4
onComplete()
onSubscribe() - 0
onNext() - 5
onComplete()
-
fromIterable() : list같이 interable을 지원하는 instance를 Observable 형태로 변경합니다
-
각 개별 아이템이 하나씩 전달 됩니다.
-
fromCallable() : Callable객체를 Observable 형태로 변경합니다.
-
call()함수의 return 값이 전달 됩니다.
-
fromFuture() : Futre 객체를 Observable 형태로 변경합니다.
-
get() 함수의 return 값이 전달됩니다.
-
iterable의 경우 아래처럼 코틀린 확장함수인 toObservable()을 이용하여 Observable을 만들수도 있습니다. (내부적으로는 fromIterable()을 이용합니다.)
val listOb = listOf(1, 2, 3).toObservable()
just
-
just는 받은 인자를 그대로 전달합니다.
-
따라서 list를 받든 map을 받는 객체 자체를 전달하며, 여러개를 전달하려면 각각의 인자로 넣어서 호출해야 합니다.
fun main (args : Array<String>) {
val list = listOf(1, 2, 3)
val num = 3
val str = “wow!”
val map = mapOf(1 to “one”, 2 to “two”)
val justOb = Observable.just(list, num, str, map)
val observer : Observable<Any> = object : Observer<Any> {
override fun onComplete () = println (“onComplete()”)
override fun onNext (item : Any) = println (“onNext() - $item”)
override fun onError (e : Exception) = println (“onError() - ${e.message}”)
override fun onSubscribe (d : Diposable) = println (“onSubscribe() - $d”)
}
justOb.subscribe(observer)
}
-
결과
onSubscribe() - io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable@~~
onNext() - [1, 2, 3]
onNext() - 3
onNext() - wow!
onNext() - {1 = one, 2 = two}
onComplete()
-
넘긴 인자만큼 onNext()를 호출해서 해당 item을 전달하며, 받은 객체 그대로를 전달하는 것을 알 수 있습니다.
-
만약 list의 각각의 우너소를 전달해야 한다면 위에서 언급한 fromIterable()을 사용해야 한다.
-
또한 just는 모든 item의 전달이 끝나면 onComplete()를 호출해준다.
range
-
특정 범위만큼 수를 생성하여 전달합니다.
val observer = object : Observer<Int> {
override fun onComplete () = println (“onComplete()”)
override fun onNext (item : Int) = println (“onNext() - $item”)
override fun onError (e : Throwable) = println (“onError() - ${e.message}”)
override fun onSubscribe (d : Diposable) = println (“onSubscribe() - $d”)
}
Observable.range(1, 3).subscribe(observer)
-
결과
onSubcribe() - 0
onNext() - 1
onNext() - 2
onNext() - 3
onComplete()
empty
-
아무값을 전달하지는 않지만 onComplete()를 호출해 줍니다.
val observer = object : Observer {
…
}
Observable.empty().subscribe(observer)
-
결과
onSubscribe() - INSTANCE
onComplete()
interval
-
특정 시간 간격으로 0부터 숫자를 증가시키면서 반환합니다.
fun main (args : Array) {
val observer = object : Observer<Long> {
. . .
}
Thread() { Observable.interval (100, TimeUnit.MILLISECONDS).subscribe(observer) }.start()
// 0.3초간 main thread 를 대기시킨다.
val th1 = Thread() { Thread.sleep(300) }
th1.start()
th1.join()
}
-
결과
-
main 함수이기 때문에 thread sleep을 하지 않으면 process가 죽기 때문에 observer의 수행은 background에서 하고 main thread는 0.3초가 block 시킵니다
onSubscribe() - null
onNext() - 0
onNext() - 1
timer
-
주어진 시간에 한번만 값을 전달합니다.
Thread() { Observable.time(100, TimeUnit.MILLISECONDS).subscribe(observer) }.start()
// 0.3초간 대기한다.
val th2 = Thread () { Thread.sleep(300) }
th2.start()
th2.join()
-
결과
-
이 함수 역시 그냥 main함수에서 수행하면 proecess가 종료되어 버리므로, background에서 처리하고 main thread는 0.3초 대기시키도록 합니다.
onSubscribe() - null
onNext() - 0
onComplete()
Subscribe
Observable에서 방출하는 값을 받기 위해서는 subscribe() 함수를 이용해 등록해야 합니다.
위 예제에서는 Observable instance를 등록했으나, 각각의 메소드를 필요한 것만 따로 등록할 수도 있습니다.
Observable의 class를 보면 subscribe에 대해 param에 따라 다양한 정의가 되어 있습니다.
-
subscribe() : Disposable
-
subscribe(onNext : Consumer) : Disposable
-
subscribe(onNext : Consumer, onError : Consumer) : Disposable
-
subscribe(onNext : Consumer, onError : Consumer, onComplete : Action) : Disposable
-
subscribe(onNext : Consumer, onError : Consumer, onComplete : Action, onSubscribe : Consumer) : Disposable
따라서 observer의 instance를 등록해도 되지만 필요한 callback만 따로 등록해서 사용할 수도 있습니다.
fun main (args : Array<String>) {
val observable = Observable.range(1, 3)
observable.subscribe (
{ item -> println (“onNext - $item”) },
{ err -> println (“onError - $err”) },
{ println (“onComplete()”) }
)
}
-
결과
onNext - 1
onNext - 2
onNext - 3
onComplete()
구독 해지
Observer를 subscribe를 이용해 등록하면 onSubscribe(d : Disposable) = { . . . } 으로 Disposable의 instance를 전달 받습니다.
Disposable은 interface로 다음과 같습니다.
public interface Disposable {
/**
* Dispose the resource, the operation should be idempotent.
*/
void dispose ();
/**
* Returns true if this resource has been disposed.
* @return true if this resource has been disposed.
*/
boolean isDisposed ();
}
-
즉 구독이 해지 되었는지를 확인하는 함수와 구독 해지를 요청하는 함수로 구성됩니다.
fun main (args : Array) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val observer = object : Observer<Long> {
lateinit var disposable : Disposable
override fun onComplete () = println (“onComplete()”)
override fun onNext (item : Long) {
println (“onNext() - item”)
if (item >= 5 && disposable.isDisposed == false) {
disposable.dispose()
}
}
override fun onError (e : Throwable) = println (“onError() - ${e.message}”)
override fun onSubscribe (d : Disposable) {
println (“onSubscribe() - $d”)
disposable = d
}
}
observable.subscribe(observer)
// 0.5초 대기 후 dispose() 호출
Thread () {
Thread.sleep(1000)
}.apply {
start ()
join ()
}
}
-
만약 subscribe()함수에 따로 따로 각각의 함수를 구현해 넣었다면 return 값으로 Disposable instance를 전달받을 수 있습니다.
-
따라서 이렇게 전달받은 함수로 dispose()를 호출해 주면 구독이 해지 됩니다.
fun main (args : Array) {
val observable = Observable.interval (100, TimeUnit.MILLISECONDS)
val disposable = observable.subscribe (
{ item -> println (“onNext - $item”) },
{ err -> println (“onError - ${e.message}”) },
{ println (“onComplete()”) }
)
// 0.5초 대기 후 dispose() 호출
Thread () {
Thread.sleep(500)
disposable.dispose()
Thread.sleep(100)
}.apply {
start ()
join ()
}
}
Hot & Cold Observable
앞서 언급된 Observable들은 subscribe를 신청하면 가지고 있는 데이터를 순서에 맞춰 전부 내보내 줍니다.
여러번 subscribe를 하더라도 처음부터 순서에 맞춰서 동일한 데이터를 내어줍니다.
즉, Observable의 데이터는 subscribe해서 소모되는 것이 아니라, 계속 저장되어 있다가 구독자가 추가될 때마다 데이터 전부를 내어주도록 되어 있습니다.
이를 Cold Observable이라고 합니다.
반대의 의미인 Hot Observable은 배출하는 시점이 구독시점이 아니며, 이에 따라 뒤늦게 구독하는 Observer는 이전 데이터를 받지 못하고 구독 신청 이후의 데이터 부터 받게 됩니다.
Cold Observable
-
subscribe가 호출되면 데이터를 배출하기 시작한다. (Observable이 배출하는 동작을 시작한다.)
-
처음부터 모든 데이터가 순서대로 배출된다.
-
구독할 때마다 동일한 데이터가 동일한 순서로 배출된다. (데이터가 배출되었다고 소모되지 않는다.)
Hot Observable
-
subscribe와 상관 없이 데이터를 배출한다.
-
구독시점부터 데이터를 전달 받으며 구독신청 전의 데이터는 받을 수 없다.
-
Event를 전달받는 형태로 사용함.
ConnectableObservable
Hot observable 중에 하나로 connect를 호출하면 배출을 시작합니다.
따라서 observer는 subscribe와 상관 없이 구독 신청 시점부터 데이터를 전달 받습니다.
fun main (args : Array<String>) {
val connectableObservable = (1 .. 10).toObservable().publish()
// 1번 구독자 등록
connectableObservable.subscribe{ println (“first subscribe : $it”) }
println (“Add first subscriber.”)
// 2번 구독자 등록
connectableObservable.subscribe{ println (“second scribe : $it”) }
println (“Add second subscribe.”)
// observable connect ()
connectableObservable.connect()
// 3번 구독자 등록
connectableObservable.subscribe{ println (“Subscription 3 : $it”) }
}
-
결과
Add first subscriber.
Add second subscriber.
first subscriber : 1
second subscriber : 1
first scriber : 2
second subscriber : 2
first scriber : 3
second subscriber : 3
first scriber : 4
second subscriber : 4
first scriber : 5
second subscriber : 5
first scriber : 6
second subscriber : 6
first scriber : 7
second subscriber : 7
first scriber : 8
second subscriber : 8
first scriber : 9
second subscriber : 9
first scriber : 10
second subscriber : 10
-
publish () 를 통하여 hot observable로 변경합니다.
-
subscribe () 를 호출하였으나, onNext() 가 호출되지 않습니다.
-
1번과 2번 구독자를 등록 후 Observable의 connect()를 호출하면 그때서야 데이터가 배출됩니다.
-
또한 배출이 완료된 이후에 등록된 3번은 데이터를 하나도 전달받지 못합니다.
-
아래에서는 동일한 코드에 Observable을 interval로 바꾸어 항상 출력 가능한 상태로 만들고, connect 이전과 이후에 구독 신청에 따라 어떤게 데이터를 수신 받는지 확인 해 보겠습니다.
fun main (args : Array<String>) {
val connectableObservable = Observable.interval (100, TimeUnit.MILLISECONDS).publish()
// 1번 구독자 등록
connectableObservable.subscribe { println (“1st subscriber : $it”) }
// 2번 구독자 등록
connectableObservable.map { “2nd subscriber : $it” }.subscribe { println (it) }
//observable connect()
connectableObservable.connect()
runBlocking {
delay (300)
}
// 3번 구독자 등록
connectableObservable.map { “3rd subscriber : $it” }.subscribe { println (it) }
runBlocking {
delay (300)
}
}
-
결과
1st subscriber : 0
2nd subscriber : 0
1st subscriber : 1
2nd subscriber : 1
1st subscriber : 2
2nd subscriber : 2
1st subscriber : 3
2nd subscriber : 3
3rd subscriber : 3
1st subscriber : 4
2nd subscriber : 4
3rd subscriber : 4
1st subscriber : 5
2nd subscriber : 5
3rd subscriber : 5
-
connect () 이후에 300ms 을 대기하므로 1번과 2번 구독자는 각각 세개씩 배출된 데이터를 받습니다.
-
그 이후 3번 구독자가 등록되면 3번은 등록 이후 데이터 부터 전달받습니다.
-
위에서 쓰인 runBlocking은 코틀린의 coroutine으로 0.3초동안 delay를 주며, 해당 라인에서 대기합니다. (단 해당 Thread가 block되는건 아닙니다.)
-
thread.join으로 계속 써오다가 불편하여 코루틴으로 변경하여 예제를 작성했습니다.
Reactive 코틀린 #3 - Subject
Hot observable을 구현시 publish() 이외에 subject를 이용할 수도 있습니다.
Subject는 Observer 역할을 하기 때문에 여러 Observable에 구독을 신청할 수 있고, Observable의 역할도 하기 때문에 받은 item을 재배출 하거나, 새로운 값을 배출할 수 있습니다.
Observable이면서 Observer 인거죠.
-
PublishSubject
-
PublishSubject를 등록할 경우 등록 시점부터 이후 데이터를 전달 받습니다.
-
fun main (args : Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>()
obsevable.subscribe(subject)
runBlocking { delay(300) }
'Android > RxAndroid' 카테고리의 다른 글
Rx Just vs Create (0) | 2023.02.05 |
---|---|
RxAndroid (0) | 2020.05.18 |
RxJava (0) | 2020.01.14 |