Both deal with collections values delivered over time
Collection of values
An Array!
We know how to work with arrays
.map/.filter/.reduce/.etc
We also have a term for "values delivered over time"
A Stream!
How do you .map a stream?
liner._transform = function (chunk, encoding, done) {
var data = chunk.toString();
if (this._lastLineData) { data = this._lastLineData + data; }
var lines = data.split('\n');
this._lastLineData = lines.splice(lines.length-1,1)[0];
lines.forEach(this.push.bind(this));
done();
};
There has to be a better way!
Observables!
You found a key point!
Observables represent a collection of values delivered over time.
What are Observables and why should I care?
Everything wants your attention
Promise
url => fetch(url);
Observable
url => someObservableHTTPGet(url);
You found a key point!
Observables only execute once someone's subscribed to them. A new instance is created for each subscription.
Let's make a stopwatch
Constructor
let tenthSecond$ = new Rx.Observable(o => {
let i = 0;
let interv = setInterval(() => o.next(++i), 100);
o.next(i++);
return () => clearInterval(interv);
});
let startButton = document.querySelector('#start-button');
let stopButton = document.querySelector('#stop-button');
let startClick$ = Rx.Observable.fromEvent(startButton, 'click');
let stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
All together now!
let startButton = document.querySelector('#start-button');
let stopButton = document.querySelector('#stop-button');
let resultsArea = document.querySelector('.output');
let startClick$ = Rx.Observable.fromEvent(startButton, 'click');
let stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
let tenthSecond$ = Rx.Observable.interval(100);
startClick$.subscribe(() => {
tenthSecond$
.map(item => (item / 10).toFixed(1))
.takeUntil(stopClick$)
.subscribe((int) => resultsArea.innerText = int);
});
let subscription = Rx.Observable.interval(100)
.subscribe(datum => console.log(datum));
subscription.unsubscribe();
Subject
let subj = new Rx.Subject();
let observer1 = subj.subscribe(datum => {
console.log('observer 1', datum);
});
let observer2 = subj.subscribe(datum => {
console.log('observer 2', datum);
});
subj.next(4);
subj.next(17);
observer1.unsubscribe();
subj.next(42);
observer2.unsubscribe();
I heard you liked subscribing
let subject = new Rx.Subject();
subject.subscribe(datum => console.log('observer 1', datum));
subject.subscribe(datum => console.log('observer 2', datum));
let observable = Rx.Observable.from([4, 17, 42]);
observable.subscribe(subject);
A brief mention of refcount
let source = Rx.Observable.interval(100);
let subject = new Rx.Subject();
let refCounted$ = source.multicast(subject).refCount();
let observer1, observer2;
// interval starts now
let observer1 = refCounted$.subscribe(
datum => console.log('observer 1', datum)
);
setTimeout(() => {
// still operating off a single stream
observer2 = refCounted$.subscribe(
datum => console.log('observer 2', datum)
);
}, 500);
// still around!
setTimeout(() => observer1.unsubscribe(), 1000);
// finally dispose of observable
setTimeout(() => observer2.unsubscribe(), 1500);
You found a key point!
There are multiple ways to multiplex an Observable. They're all complicated.
import {Http} from 'angular2/http';
let ticks$ = Observable.interval(5000);
let responses$ =
ticks$
.flatMapLatest(() => http.get('carStatus.json').retry(3))
.map(res => res.json());
let carStatus = responses$.subscribe(res => updateMap(res));
// When the driver arrives
carStatus.unsubscribe();