LINUX.ORG.RU

rxjava-scala, внезапные события и PublishSubject

 , ,


0

1

Есть такой кусочек, который отслеживает события каталога:

Observable( observer => {
      scheduler.scheduleRec( self=>{
        Try(watcher.take()) match {
          case Success(key) => {
            for(event <- key.pollEvents.asScala) {
              observer.onNext(event.asInstanceOf[WatchEvent[Path]])
            }
            key.reset
            self
          }
          case Failure(error) => observer.onError(error); observer.onCompleted()
        }
      })
    })

если на него подписаться:

  val obs = ObservablePathWatchEx
.fromPathEvents(path).observeOn(rx.lang.scala.concurrency.Schedulers.threadPoolForIO)

  import context.dispatcher
  context.system.scheduler.scheduleOnce(30 seconds)(
  obs
    .subscribe(
      event => { sendToSubscribers(event) },
      error => { log.error(error.toString); context.stop(self) }
    )
  )

То поведение его такое: либо подписчик ожидает первого элемента в Observable, обрабатывает его и все; либо если значения в Observable были до подписки, то они обрабатываются и тоже все. //вариантов с несколькими подписчиками не рассматриваем

Такое поведение хоть и не особо очевидное, но вполне документировано. Чтобы получить что я хочу, мне нужно использовать PublishSubject.

Внезапно вопросы: 1) я правильно понимаю проблему? 2) Ну и как мне чертов PublishSubject использовать? Документация по rxJava скудная, а по адаптеру для scala так вообще не очень, может кто подскажет

Перемещено maxcom из talks

★★★★★

Последнее исправление: RedPossum (всего исправлений: 1)

блин, тов. модераторы, перетащите в Development, пожалуйста, я случайно совершенно

RedPossum ★★★★★
() автор топика
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.