Malicious emails

Web Crawls

Honeypot hits

Malware

What do these have in common?

Both deal with collections values delivered over time

Collection of values

An Array!

We know how to work with arrays .map/.filter/.reduce/.etc

We also have a term for "values delivered over time"

A Stream!

How do you .map a stream?

liner._transform = function (chunk, encoding, done) {
 var data = chunk.toString();
 if (this._lastLineData) { data = this._lastLineData + data; }

 var lines = data.split('\n');
 this._lastLineData = lines.splice(lines.length-1,1)[0];

 lines.forEach(this.push.bind(this));
 done();
};

There has to be a better way!

Observables!

You found a key point!

Observables represent a collection of values delivered over time.

What are Observables and why should I care?

Everything wants your attention

Promise


url => fetch(url);
    

Observable


url => someObservableHTTPGet(url);
    

You found a key point!

Observables only execute once someone's subscribed to them. A new instance is created for each subscription.

Let's make a stopwatch

Constructor


let tenthSecond$ = new Rx.Observable(o => {
  let i = 0;
  let interv = setInterval(() => o.next(++i), 100);
  o.next(i++);

  return () => clearInterval(interv);
});
  

Constructor v2


let tenthSecond$ = Rx.Observable.interval(100);
  

Convert


tenthSecond$
.map(item => (item / 10).toFixed(1));
  

Results


let resultsArea = document.querySelector('.output');

tenthSecond$
.map(item => (item / 10).toFixed(1))
.subscribe((int) => resultsArea.innerText = int);
  

Handle events


let startButton = document.querySelector('#start-button');
let stopButton = document.querySelector('#stop-button');

let startClick$ = Rx.Observable.fromEvent(startButton, 'click');
let stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
  

All together now!


let startButton = document.querySelector('#start-button');
let stopButton = document.querySelector('#stop-button');
let resultsArea = document.querySelector('.output');
let startClick$ = Rx.Observable.fromEvent(startButton, 'click');
let stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
let tenthSecond$ = Rx.Observable.interval(100);

startClick$.subscribe(() => {
  tenthSecond$
  .map(item => (item / 10).toFixed(1))
  .takeUntil(stopClick$)
  .subscribe((int) => resultsArea.innerText = int);
});
    

Observer


{
  next: (datum) => {/*...*/},
  err: (err) => {/*...*/},
  done: () => {/*...*/}
}
  

Subscription


let subscription = Rx.Observable.interval(100)
.subscribe(datum => console.log(datum));

subscription.unsubscribe();
  

Subject


let subj = new Rx.Subject();

let observer1 = subj.subscribe(datum => {
  console.log('observer 1', datum);
});
let observer2 = subj.subscribe(datum => {
  console.log('observer 2', datum);
});

subj.next(4);
subj.next(17);

observer1.unsubscribe();

subj.next(42);

observer2.unsubscribe();
  

I heard you liked subscribing


let subject = new Rx.Subject();

subject.subscribe(datum => console.log('observer 1', datum));
subject.subscribe(datum => console.log('observer 2', datum));

let observable = Rx.Observable.from([4, 17, 42]);

observable.subscribe(subject);
  

A brief mention of refcount


let source = Rx.Observable.interval(100);
let subject = new Rx.Subject();
let refCounted$ = source.multicast(subject).refCount();
let observer1, observer2;

// interval starts now
let observer1 = refCounted$.subscribe(
  datum => console.log('observer 1', datum)
);

setTimeout(() => {
  // still operating off a single stream
  observer2 = refCounted$.subscribe(
    datum => console.log('observer 2', datum)
  );
}, 500);

// still around!
setTimeout(() => observer1.unsubscribe(), 1000);

// finally dispose of observable
setTimeout(() => observer2.unsubscribe(), 1500);
    

You found a key point!

There are multiple ways to multiplex an Observable. They're all complicated.


Rx.Observable.fromEvent(document.querySelector('#myInput'), 'keyup')
  .map(e => e.target.value)
  .filter(val => val.length > 3)
  .debounceTime(333)
  .distinctUntilChanged()
// Search results
.mergeMap(e => {
  d.innerHTML = '';
  return fetch(stackOverflowApiUrl + e)
  .then(resp => resp.json());
})
.catch(e => ({items:[]}))
.mergeMap(val => val.items)
.map(item => item.title.link(item.link))
.subscribe(
  val => d.innerHTML = val + '
' + d.innerHTML, err => alert(err.message), () => console.log('done') );

import {Http} from 'angular2/http';

let ticks$ = Observable.interval(5000);

let responses$ =
  ticks$
    .flatMapLatest(() => http.get('carStatus.json').retry(3))
    .map(res => res.json());

let carStatus = responses$.subscribe(res => updateMap(res));

// When the driver arrives
carStatus.unsubscribe();
Let's build a data processing pipeline

Top Editors

Edits Editor name/IP

Top Pages

Edits Page name
let wikiStream$ = new Rx.Observable(o => {
  let socket = ioClient.connect('stream.wikimedia.org/rc');
  socket.on('connect_error', (objData) => o.error(objData));
  socket.on('connect', () => socket.emit('subscribe', '*'));

  socket.on('change', (objData) => o.next(objData);
})
.map(edit => ({
  id: edit.id,
  type: edit.type,
  source: edit.server_name,
  users: [edit.user],
  urls: [edit.server_url + '/wiki/' + edit.title.replace(/\s/g, '_')]
}))
.publish().connect();

$wikiStream.subscribe(edit => objStats.qData(edit));
$wikiStream
.bufferTime(updateInterval)
.subscribe(updateGraphs);

Who's using Observables?

  • Netflix & Microsoft
    • Frontend events
    • Stream processing, analytics & BI
    • Realtime Data Visualizations
  • Angular 2
    • HTTP
    • Inputs
    • Validity
    • WebWorker Message bus
    • async pipe
  • You?

Questions?

Further Reading

Me

@rkoutnik

rkoutnik.com

[email protected]

github.com/SomeKittens

STICKERS!