lordvlad
8/22/2014 - 10:41 AM

// source http://jsbin.com/vilotuyuwugi/1

function Stream(){this.listeners = [];}

Stream.prototype.listen = function(fn){
  this.listeners.push(fn);
  return this;
};

Stream.prototype.ignore = function(fn){
  var id;
  if (~(id = this.listeners.indexOf(fn)))
    this.listeners.splice(id, 1);
  return this;
};

Stream.prototype.tell = function(d){
  this.listeners.forEach(function(fn){fn(d);});
  return this;
};

Stream.prototype.map = function(fn){
  var s = new Stream(), a;
  if (!fn.bind){a=fn; fn=function(){return a;};}
  this.listen(function(d){s.tell(fn(d));});
  return s;
};

Stream.prototype.flatMap = function(fn){
  var s = new Stream();
  this.listen(function(d){fn(d).pipe(s);});
  return s;
};

Stream.prototype.flatMapLatest = function(fn){
  var s = new Stream(), p = s.tell.bind(s), l;
  this.listen(function(d){
    if (l) l.ignore(p);
    (l=fn(d)).listen(p);
  });
  return s;
};

Stream.prototype.pipe = function(s){
  this.listen(s.tell.bind(s));
  return this;
};

Stream.prototype.scan = function(fn){
  var s = new Stream(), acc, firstIn = false;
  this.listen(function(d){
    s.tell(acc = firstIn ? fn(acc, d) : (firstIn = true && d));
  });
  return s;
};

Stream.prototype.merge = function(s){
  return Stream.merge([this, s]);
};

Stream.prototype.combine = function(s){
  return Stream.combine([this, s]);
};

Stream.prototype.throttle = function(t){
  var s = new Stream(), l = false;
  this.listen(function(d){
    if (l) return;
    l = setTimeout(function(){l=false;}, t); s.tell(d);
  });
  return s;
};

Stream.prototype.debounce = function(t){
  var s = new Stream(), b;
  this.listen(function(d){
    if (b) clearTimeout(b);
    b = setTimeout(s.tell.bind(s, d), t);
  });
  return s;
};

Stream.prototype.buffer = function(t){
  var s = new Stream(), b, p = [];
  this.listen(function(d){
    if (b) clearTimeout(b);
    b = setTimeout(s.tell.bind(s, p), t);
    p.push(d);
  });
  return s;
};

Stream.prototype.bufferBySize = function(l){
  var s = new Stream(), p = [];
  this.listen(function(d){
    p.push(d);if (p.length >= l) {s.tell(p); p = [];}
  });
  return s;
};

Stream.prototype.filter = function(fn){
  var s = new Stream();
  this.listen(function(d){
    if (fn(d)) s.tell(d);
  });
  return s;
};

Stream.prototype.lookbehind = function(){
  var l, s = new Stream();
  this.listen(function(d){
    s.tell(l); l = d;
  });
  return s;
};

Stream.prototype.skipDuplicates = function(){
  var l, s = new Stream();
  this.listen(function(d){if (l !== d) s.tell(l=d);});
  return s;
};

Stream.merge = function(a){
  var s = new Stream();
  a.forEach(function(b){b.pipe(s);});
  return s;
};

Stream.combine = function(a){
  var b = new Stream(), c = [], p = [];
  function check(){
    if (c.filter(Boolean).length !== a.length)
      return;
    b.tell(p); c = []; p = [];
  }
  a.forEach(function(s,i){
    s.listen(function(d){
      p[i] = d; c[i] = true; check();
    });
  });
  return b;
};

Stream.beat = function(t, v){
  var s = new Stream(), i = setInterval(function(){
    s.tell(v ? (v.call ? v() : v) : true);
  }, t);
  s.kill = clearInterval.bind(null, i);
  return s;
};

Stream.timeout = function(t, v){
  var s = new Stream(), i = setTimeout(function(){
    s.tell(v ? (v.call ? v() : v) : true);
  }, t);
  s.kill = clearTimeout.bind(null, i);
  return s;
};


Stream.fromClick = function(el){
  return Stream.fromEvent(el, "click");
};

Stream.fromEvent = function(el, ev){
  var s = new Stream();
  if (el.match)
    el = document.querySelector(el);
  el.addEventListener(ev, s.tell.bind(s));
  return s;
};

Stream.fromPromise = function(p){
  var s = new Stream();
  p.then(s.tell.bind(s));
  return s;
};

function getJSON(url){
  return Stream.fromPromise($.getJSON(url));
}

function rand(x){
  return Math.floor(Math.random()*x);
}

function add(a,b){return a+b;}
function count(a){return a.length;}


logStream = new Stream();
logStream.listen(console.log.bind(console));

var buttons = Array.prototype.slice.call(document.querySelectorAll("img"));

function render(i, d){
  buttons[i]
  .nextElementSibling
  .innerHTML = d ? d[0].login : 'loading...';
  
  buttons[i]
  .src = d ? d[0].avatar_url : null;
}

buttons
.map(Stream.fromClick)
.map(function(stream, i){
  stream
  .buffer(300)
  .filter(function(b){return b.length > 1;})
  .debounce(300)
  .map(rand.bind(null, 500))
  .map(add.bind(null, 'https://api.github.com/users?since='))
  .listen(render.bind(null, i, null))
  .flatMapLatest(getJSON)
  .listen(render.bind(null, i));
});

Stream.fromEvent('input', 'keyup')
.debounce(300)
.map(function(e){return e.target.value;})
.skipDuplicates()
.listen(console.log.bind(console));
img {
  width: 30px;
  height: 30px;
  border-radius: 15px;
  border: solid 1px #ccc;
}
ul {
  list-style-type: none;
}
<!DOCTYPE html>
<html>
<head>
<script src="http://code.jquery.com/jquery-2.1.1.min.js"></script>
  <meta charset="utf-8">
  <title>JS Bin</title>
<style id="jsbin-css">
img {
  width: 30px;
  height: 30px;
  border-radius: 15px;
  border: solid 1px #ccc;
}
ul {
  list-style-type: none;
}
</style>
</head>
<body>
  
  <ul>
    <li class="a">
      <img/>
      <span></span>
    </li>
    <li class="b">
      <img/>
      <span></span>
    </li>
    <li class="c">
      <img/>
      <span></span>
    </li>
  </ul>
  <input type="text"/>
  <pre></pre>
  
<script id="jsbin-javascript">
function Stream(){this.listeners = [];}

Stream.prototype.listen = function(fn){
  this.listeners.push(fn);
  return this;
};

Stream.prototype.ignore = function(fn){
  var id;
  if (~(id = this.listeners.indexOf(fn)))
    this.listeners.splice(id, 1);
  return this;
};

Stream.prototype.tell = function(d){
  this.listeners.forEach(function(fn){fn(d);});
  return this;
};

Stream.prototype.map = function(fn){
  var s = new Stream(), a;
  if (!fn.bind){a=fn; fn=function(){return a;};}
  this.listen(function(d){s.tell(fn(d));});
  return s;
};

Stream.prototype.flatMap = function(fn){
  var s = new Stream();
  this.listen(function(d){fn(d).pipe(s);});
  return s;
};

Stream.prototype.flatMapLatest = function(fn){
  var s = new Stream(), p = s.tell.bind(s), l;
  this.listen(function(d){
    if (l) l.ignore(p);
    (l=fn(d)).listen(p);
  });
  return s;
};

Stream.prototype.pipe = function(s){
  this.listen(s.tell.bind(s));
  return this;
};

Stream.prototype.scan = function(fn){
  var s = new Stream(), acc, firstIn = false;
  this.listen(function(d){
    s.tell(acc = firstIn ? fn(acc, d) : (firstIn = true && d));
  });
  return s;
};

Stream.prototype.merge = function(s){
  return Stream.merge([this, s]);
};

Stream.prototype.combine = function(s){
  return Stream.combine([this, s]);
};

Stream.prototype.throttle = function(t){
  var s = new Stream(), l = false;
  this.listen(function(d){
    if (l) return;
    l = setTimeout(function(){l=false;}, t); s.tell(d);
  });
  return s;
};

Stream.prototype.debounce = function(t){
  var s = new Stream(), b;
  this.listen(function(d){
    if (b) clearTimeout(b);
    b = setTimeout(s.tell.bind(s, d), t);
  });
  return s;
};

Stream.prototype.buffer = function(t){
  var s = new Stream(), b, p = [];
  this.listen(function(d){
    if (b) clearTimeout(b);
    b = setTimeout(s.tell.bind(s, p), t);
    p.push(d);
  });
  return s;
};

Stream.prototype.bufferBySize = function(l){
  var s = new Stream(), p = [];
  this.listen(function(d){
    p.push(d);if (p.length >= l) {s.tell(p); p = [];}
  });
  return s;
};

Stream.prototype.filter = function(fn){
  var s = new Stream();
  this.listen(function(d){
    if (fn(d)) s.tell(d);
  });
  return s;
};

Stream.prototype.lookbehind = function(){
  var l, s = new Stream();
  this.listen(function(d){
    s.tell(l); l = d;
  });
  return s;
};

Stream.prototype.skipDuplicates = function(){
  var l, s = new Stream();
  this.listen(function(d){if (l !== d) s.tell(l=d);});
  return s;
};

Stream.merge = function(a){
  var s = new Stream();
  a.forEach(function(b){b.pipe(s);});
  return s;
};

Stream.combine = function(a){
  var b = new Stream(), c = [], p = [];
  function check(){
    if (c.filter(Boolean).length !== a.length)
      return;
    b.tell(p); c = []; p = [];
  }
  a.forEach(function(s,i){
    s.listen(function(d){
      p[i] = d; c[i] = true; check();
    });
  });
  return b;
};

Stream.beat = function(t, v){
  var s = new Stream(), i = setInterval(function(){
    s.tell(v ? (v.call ? v() : v) : true);
  }, t);
  s.kill = clearInterval.bind(null, i);
  return s;
};

Stream.timeout = function(t, v){
  var s = new Stream(), i = setTimeout(function(){
    s.tell(v ? (v.call ? v() : v) : true);
  }, t);
  s.kill = clearTimeout.bind(null, i);
  return s;
};


Stream.fromClick = function(el){
  return Stream.fromEvent(el, "click");
};

Stream.fromEvent = function(el, ev){
  var s = new Stream();
  if (el.match)
    el = document.querySelector(el);
  el.addEventListener(ev, s.tell.bind(s));
  return s;
};

Stream.fromPromise = function(p){
  var s = new Stream();
  p.then(s.tell.bind(s));
  return s;
};

function getJSON(url){
  return Stream.fromPromise($.getJSON(url));
}

function rand(x){
  return Math.floor(Math.random()*x);
}

function add(a,b){return a+b;}
function count(a){return a.length;}


logStream = new Stream();
logStream.listen(console.log.bind(console));

var buttons = Array.prototype.slice.call(document.querySelectorAll("img"));

function render(i, d){
  buttons[i]
  .nextElementSibling
  .innerHTML = d ? d[0].login : 'loading...';
  
  buttons[i]
  .src = d ? d[0].avatar_url : null;
}

buttons
.map(Stream.fromClick)
.map(function(stream, i){
  stream
  .buffer(300)
  .filter(function(b){return b.length > 1;})
  .debounce(300)
  .map(rand.bind(null, 500))
  .map(add.bind(null, 'https://api.github.com/users?since='))
  .listen(render.bind(null, i, null))
  .flatMapLatest(getJSON)
  .listen(render.bind(null, i));
});

Stream.fromEvent('input', 'keyup')
.debounce(300)
.map(function(e){return e.target.value;})
.skipDuplicates()
.listen(console.log.bind(console));

</script>


</body>
</html>