iOS のアプリ開発を始めて 1 年が経過しました.RxSwift との付き合いも 1 年が経過しました.最近は仕事で再び Android の担当になりました.

この記事は RxSwift と Schedulers について書きました.GCD の話は出て来ませんので真剣に勉強した方には以下の解説がとても素晴らしいです.


Observable とスレッド

Swift は 3.1 です.

まず,RxSwift でてきとうな Observable を作ります.

let observable = Observable<String>.create { observer -> Disposable in
  observer.onNext("YO!😎")
  observer.onCompleted()
}
observable.subscribe(onNext: { (message: String) in
  print(message)
})
YO!😎

次に,5 秒後に「YO!」を受け取るように Observable を変更します.

let observable = Observable<String>.create { observer -> Disposable in
  Thread.sleep(forTimeInterval: 5) // Blocking
  observer.onNext("YO!😎")
  observer.onCompleted()
  return Disposables.create()
}

Observable の処理は,通常はsubscribeを呼び出したスレッドで実行されます. なのでThread.sleepは main スレッドをブロックしています.main スレッドをブロックするのはイケないことなので,明示的に別のスレッドで処理するように変更します.

let observable = Observable<String>.create { observer -> Disposable in

  DispatchQueue.global(qos: .default).async {

    Thread.sleep(forTimeInterval: 5) // ...zzZ

    observer.onNext("YO!😎")
    observer.onCompleted()
  }
  return Disposables.create()
}

「YO!」がどのスレッドから来ているのか知りたいのでヘルパー関数を作ってsubscribeする側を少し変更します.

func which() -> String {
  return Thread.isMainThread ? "main" : "background"
}
observable.subscribe(onNext: { (message: String) in
  print("\(message) on \(which())")
})
YO!😎 on background

バックグラウンドから「YO!」が届きました.

main スレッド以外から UI に触ろうとすると System に怒られてしまいます.なので別スレッドで眠ってから「YO!」の送信だけを main スレッドで行うように変更します.

let observable = Observable<String>.create { observer -> Disposable in

  DispatchQueue.global(qos: .default).async {

    Thread.sleep(forTimeInterval: 5)

    DispatchQueue.main.async {
      observer.onNext("YO!😎")
      observer.onCompleted()
    }
  }
  return Disposables.create()
}
YO!😎 on main

うまくいきました.これでノンブロッキング YO!YO!が可能になりました.

さて,ここまでの「YO!」を振り返ってみます.

let observable = Observable<String>.create { observer -> Disposable in

  observer.onNext("YO!😎") // where subscribe() is called

  DispatchQueue.global(qos: .default).async {

    Thread.sleep(forTimeInterval: 5)
    observer.onNext("YO!😎") // background

    DispatchQueue.main.async {
      observer.onNext("YO!😎") // main
      observer.onCompleted()
    }
  }
  return Disposables.create()
}
YO!😎 on main
YO!😎 on background
YO!😎 on main

いい感じです.

Schedulers

次はいよいよ Scheduler を使います.Observable のobserveOnMainSchedulerを指定します.

observable
  .observeOn(MainScheduler.instance)
  .subscribe(onNext: { (message: String) in
    print("\(message) (Here is \(which()))")
  })
YO!😎 (Here is main)
YO!😎 (Here is main)
YO!😎 (Here is main)

subscribeの中が全て main スレッドで実行されるようになりました.MainSchedulerを指定したおかげでDispatchQueue.main.asyncは不要になりました.

「YO!」がどこから来ているのかわからなくなってしまったので少し変更を加えます.

let observable = Observable<String>.create { observer -> Disposable in

  observer.onNext("YO!😎 from \(which()).")

  DispatchQueue.global(qos: .default).async {

    Thread.sleep(forTimeInterval: 5)

    observer.onNext("YO!😎 from \(which()).")
    observer.onCompleted()
  }
  return Disposables.create()
}
YO!😎 from main. (Here is main)
YO!😎 from background. (Here is main)

observeOnの振る舞いがわかりました.

次に,subscribeOnで Observable の処理を実行するスレッドを指定します.先ほど述べた通り,Observable の処理は,通常はsubscribeを呼び出したスレッドで実行されます. しかしsubscribeOnを使うとこれを変更することができます.

observable
  .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default))
  .observeOn(MainScheduler.instance)
  .subscribe(onNext: { (message: String) in
    print("\(message) (Here is \(which()))")
  })
YO!😎 from background. (Here is main)
YO!😎 from background. (Here is main)

スリープの処理を DispatchQueue で指定する必要が無くなりますので,Observable はこうなります.

let observable = Observable<String>.create { observer -> Disposable in
  Thread.sleep(forTimeInterval: 5)
  observer.onNext("YO!😎 from \(which()).")
  observer.onCompleted()
  return Disposables.create()
}

これは冒頭に出て来た Observable です.Schedulers を利用すれば簡単にノンブロッキング YO!YO!が出来ます.以上です.

ソースコードはGist に置いておきます.

参考