Есть такой кусочек, который отслеживает события каталога:
Observable( observer => {
scheduler.scheduleRec( self=>{
Try(watcher.take()) match {
case Success(key) => {
for(event <- key.pollEvents.asScala) {
observer.onNext(event.asInstanceOf[WatchEvent[Path]])
}
key.reset
self
}
case Failure(error) => observer.onError(error); observer.onCompleted()
}
})
})
если на него подписаться:
val obs = ObservablePathWatchEx
.fromPathEvents(path).observeOn(rx.lang.scala.concurrency.Schedulers.threadPoolForIO)
import context.dispatcher
context.system.scheduler.scheduleOnce(30 seconds)(
obs
.subscribe(
event => { sendToSubscribers(event) },
error => { log.error(error.toString); context.stop(self) }
)
)
То поведение его такое: либо подписчик ожидает первого элемента в Observable, обрабатывает его и все; либо если значения в Observable были до подписки, то они обрабатываются и тоже все. //вариантов с несколькими подписчиками не рассматриваем
Такое поведение хоть и не особо очевидное, но вполне документировано. Чтобы получить что я хочу, мне нужно использовать PublishSubject.
Внезапно вопросы: 1) я правильно понимаю проблему? 2) Ну и как мне чертов PublishSubject использовать? Документация по rxJava скудная, а по адаптеру для scala так вообще не очень, может кто подскажет
Перемещено maxcom из talks