Связывание компонентов приложения при помощи RxJS

(Reactive-Extensions)

Связывание компонентов

a=5
b=a*a //25
a=7
b //25

1)

2)

Black border puzzle piece top Layer 1

Распространение изменений

let a = 3
let b = 5
let sum = a + b
sum // 8

b = 10
sum // ?
sum == 8
sum == 13

Реактивная парадигма

Императивная парадигма

Реактивная парадигма

let a = 3
let b = 5
let sum = a + b
sum // 8

b = 10
sum // 13

a и b

  • коллекции значений или "потоки"
  • распространяют свои изменения
  • реализуют паттерн Observable

sum

  • "слушает" "потоки" a и b

Пример реактивного поведения

Redux же есть - зачем потоки?

Когда всё состояние приложения можно описать 20 - 50-ю значениями, хранить все эти значения в одном месте отличная идея

{
  name: value,
  name: value,
  name: value,
  name: value,
  name: value,
  name: value,
  name: value,
  name: value
}

state

Redux же есть - зачем потоки?

Когда для описания состояния приложения требудется от 100 до 100500 значений, разбираться со всеми сразу становится сложно - проще декомпозировать

{
  name: value,
  name: value,
  name: value,
  name: value,
  name: value,
}
{
  name: value,
  name: value,
  name: value,
  name: value,
  name: value,
}
{
  name: value,
  name: value,
  name: value,
  name: value,
  name: value,
}
{
  name: value,
  name: value,
  name: value,
  name: value,
  name: value,
}
{
  name: value,
  name: value,
  name: value,
  name: value,
  name: value,
}

Обмен изменениями

{
  name: value,
  name: value,
  name: value,
  name: value,
  name: value,
}
{
  name: value,
  name: value,
  name: value,
  name: value,
  name: value,
}

Комонент 1

Комонент 2

сообщение с новыми значениями полей

поток значений

Соблюдайте тишину

Соблюдайте тишину

Потоки в RxJs

  let a = new Rx.Subject() // let a 
  let b = new Rx.Subject() // let b 

  let sum = a.combineLatest(b, (aV, bV) => aV + bV) // let sum = a + b 

  sum.subscribe(sumVal => console.log(sumVal)) // console.log(sum) 

  a.next(3)  // a = 3 
 
  b.next(5)  // b = 5  // console -> 8
  b.next(10) // b = 10 // console -> 13  

Попробуем переписать предыдущий пример, используя потоки

Subject

  let a = new Rx.Subject()

Создаем новый поток

  a.subscribe(aV => console.log(aV))

Подписываемся на новые знаения

  a.next('new Value')

Добавляем ("присваиваем") новое значение

Изменения значений во времени будем изображать как точки на оси

Как еще можно создать поток?

let clicks = Rx.Observable.fromEvent(document, 'click')
clicks.subscribe(x => console.log(x))

Из любых DOM событий

let numbers = Rx.Observable.interval(1000)
numbers.subscribe(x => console.log(x))

Как альтернатива setInterval

1000ms

Как еще можно создать поток?

Как еще можно создать поток?

А так же из массивов, promise'ов, асинхронных запросов и вообще любых коллекций

Модификация потоков

let b = a.map(x => x * 10)

map

let b = a.delay(1000)

delay

Модификация потоков

Фильтрация потоков

let b = a.filter(x => x > 10)

filter

let b = a.skip(2)

skip

Фильтрация потоков

Комбинирование потоков

let sum = a.combineLatest(b, (aV, bV) => '' + aV + bV)

combineLatest

Комбинирование потоков

let sum = a.zip(b, (aV, bV) => '' + aV + bV)

zip

 Не-Rx против Rx

Rx.Observable.fromEvent(document, 'click')
    .skip(2)
    .take(2)
    .timeInterval()
    .subscribe(x => console.log(x))
let i = 0
let lastTs = Date()
let nowTs
document.addEventListener('click', () => {
  i++
  if(i <= 2 || i >= 4) return
  nowTs = new Date()
  console.log(diff(lastTs, nowTs))
})

Потоки между компонентами

{date: '05.04.2011'}
{name: 'Имя'}
{
  name: 'Имя',
  date: '05.04.2011'
}

 Имя

Потоки между компонентами

let userName = Rx.Observable
   .fromEvent(input, 'keyup')
   .map(e => e.target.value)

return { userName }
let date = datePicker
    .changesStream()

return { date }
let user = Rx.Observable.combineLatest(
    streams.userName,
    streams.date,
    (name, date) => {name, date})

return { user }

Вход и выход это потоки

Component(inputStreams) {
  //логика и render
  return {outputStreams}
}

Специализация компонентов

Компоненты можно разделить на

 

Компоненты представления

  • отвечают за отображение информации
  • возвращают потоки с UI событиями

 

Компоненты логики

  • принимают все UI события как потоки
  • инициируют запросы к серверу

Наглядно кто с кем и чем

Тестирование

Типичный тест любого компонента

Кидаем что-то во входящий поток - смотрим что в выходящем

let inputStreams = {
  name:          new Rx.Subject(),
  clicks:        new Rx.Subject(),
  restResponses: new Rx.Subject()
}

let outputStreams = new UserInfo(inputStreams)

it('sends one userInfo message on name change', () => {
  let userInfoMessages = []
  outputStreams.userInfo.subscribe(mes => userInfoMessages.push(mes))
  inputStreams.name.next('Peter')
  userInfoMessages[0].should.be.eql({name: 'Peter', date: ''})
})

Плюсы

  • Унификация - всё есть поток (Observable)
  • Удобство работы с асинхронными вызовами
  • Явность
  • Ленивость вычислений
  • Идентичное API на 16-ти платформах

Минусы

  • сложно отлаживать
  • комбинирование большого числа потоков в одном месте станет может стать проблемой
const activityStream = streams.routeActivities
    .combineLatest(clubOneTriggerStream, RxUtil.selectLeft)
    .wrapAs('bookingModel')
    .combineLatest(clubOnePaymentStream, _.merge)
    .select(appendClubOneDataToPassengers)
    .combineLatest(streams.entertainmentSelectionStream.startWith({}), (options, entertainmentSelection) => {
      return {...options, bookingModel: options.bookingModel.withAttributes({ entertainment: entertainmentSelection }) }
    })
    .combineLatest(streams.landServicesSelectionStream.startWith(undefined), (options, landServicesSelection) => {
      return {...options, bookingModel: options.bookingModel.withAttributes({ landServices: landServicesSelection }) }
    })
    .combineLatest(streams.vehiclesSelection.selectProperty('selection').startWith([]), (options, vehiclesSelection) => {
      return {...options, bookingModel: options.bookingModel.withAttributes({ vehiclesSelection: vehiclesSelection }) }
    })
    .combineLatest(extraServicesSelection, addExtraServicesSelection)
    .combineLatest(travelClassServicesSelectionStream.distinctUntilChanged(), (options, travelClassServices) => {
      return {...options, bookingModel: options.bookingModel.withAttributes({ travelClassServices: travelClassServices }) }
    })
    .throttle(Config.throttlingPeriodsInMs.immediate)

Кто использует?

Ссылки

The introduction to Reactive Programming you've been missing

https://gist.github.com/staltz/868e7e9bc2a7b8c1f754

от André Staltz

Официальная документация

http://reactivex.io/rxjs/

Тонна туториалов по Rx на все платформы

http://reactivex.io/tutorials.html

Спасибо.

Вопросы?