Warning: Undefined property: stdClass::$ip in /home/public/blog/class.comment.php on line 31

Warning: Undefined property: stdClass::$ip in /home/public/blog/class.comment.php on line 31

Warning: Undefined property: stdClass::$ip in /home/public/blog/class.comment.php on line 31

DOM EventStreams

Last updated:

DOM Futures are a recently-added feature that captures a very common and useful callback pattern - a single task that will either complete in the future or has already completed, or possibly results in an error - in an easy-to-use and idiomatic way.

While trying to convert some upcoming DOM APIs to use Futures, I kept running into some particular patterns that could benefit from the Futures treatment (they don't need the baggage of DOM Events), but that don't fit the Futures model, because they resolve to multiple values or don't even "finish" in any meaningful way.

By looking at my DOM-related use-cases and similar APIs in other languages, I've come up with the following three classes I'd like to add to DOM/JS:

EventStream - represents a series of events as a first-class value, which means you can apply combinators to it, etc. Multi-listener, lossy by default. Should probably have a switch to auto-buffer results before the first listener is attached. I expect to define a generic way to create an EventStream from any element/event combo, which pushes the event object as the update value whenever the event reaches the element without being cancelled by an ancestor.

ReactiveValue - represents an underlying value, and updates every time the value changes. The "current value" (value of the most recent update) is remembered, so that whenever a new listener is attached, it replays that update for that listener. Also lets you access the "current value" directly. Otherwise identical to EventStream. I expect to define a generic way to create a ReactiveValue from any object property (this is easy to define on top of Object.observe).

??? - a more general "lazy/async ordered container" data structure. Single-listener, lossless. I haven't yet figured out the API for this, though.

Note that I am explicitly ignoring the use-case of "binary" or "IO" streams, like Node's Stream class. These are similar in nature to event streams, but have their own quirks and needs that make it not a great idea to try and bodge them together with the rest. There's a good chance that a binary stream API will end up with a similar API surface, though, so keeping that in mind when figuring out all the names is probably a good idea.

EventStream API

An EventStream is an object representing a value changing over time. That's it - it's an extremely simple, general concept, but also extremely powerful and useful when applied widely.

The EventStream API looks like this in WebIDL:

callback StreamInit = void (StreamResolver resolver);
callback AnyCallback = any (optional any value);
typedef (EventStream or Future or Iterable) StreamLike;

[Constructor(StreamInit init)]
interface EventStream {
  EventStream then(optional AnyCallback? updateCB = null, optional AnyCallback? rejectCB = null, optional AnyCallback? completeCB = null);
  EventStream catch(optional AnyCallback rejectCB);
  EventStream complete(optional AnyCallback completeCB);
}

Event streams have a then method, which you use to get updates from the stream, or catch any errors the stream throws. Additionally, some streams can become complete and not push any more updates. This function takes up to three callbacks: one that listens to updates, one that listens for errors, and one that is called if the stream ever ends without error (not all streams do so). Any of the callback arguments may be null instead, which just doesn't register a callback for that purpose. Any of the callbacks may be called multiple times, though in common cases you'll get multiple calls of updateCB and at most one call to rejectCB or completeCB.

then returns a brand new event stream, one constructed from the very callbacks you provided, just like Futures. The exact behavior depends on the callback.

For updateCB:

  • If you return a value, the output stream pushes that value as an update.
  • If you return a stream-like, the output stream adopts that stream. The meaning of "adopt a stream" is defined below.
  • If you throw an error, the output stream is rejected with that error.

For rejectCB and completeCB:

  • If you return a value, the output stream pushes that value as a final update, then completes.
  • If you return a stream-like, the output stream adopts the stream, then completes.
  • If you throw an error, the output stream is rejected with that error.

(The "return a value" semantics are all exactly equivalent to instead returning a streamlike with Future.of(value).)

Note that completeCB being called does not automatically complete the output stream. The output stream is only completed when the input stream and all adopted streams are completed.

When a stream "adopts" another stream, it partially merges its state with the adopted stream. Whenever the adopted stream pushes an update, the parent stream pushes the same value as an update. If the adopted stream is rejected, the parent stream is rejected (which can be caught by the rejectCB on the parent stream). The adopted stream also prevents the parent stream from "completing" normally - instead, the parent stream completes only when it would complete normally and all of its adopted streams have completed.

(Right now, updates that get pushed in a tick prior to when you register a then callback get lost. Should we have a flag that causes the stream to buffer until it gets its first listener?)

For convenience, EventStream#reject and EventStream#complete are provided, and are equivalent to calling then and only providing the given callback.

Creating Streams

Most of the time, streams will be returned by other operations, so you don't have to worry about creating them yourself. When you do, though, there are several methods to do so:

partial interface EventStream {
  static EventStream of(any... values);
  static EventStream from(any value);
  static EventStream reject(any reason);
  static EventStream listen(EventListener target, DOMString event);
}

The EventStream.from function converts a stream-like into a stream, just like the Array.from convenience function does for iterables. If the passed value isn't stream-like, this is identical to EventStream.of. Any functions that expect a stream should pass their argument through EventStream.from first, just in case.

(A Future becomes a stream that either pushes one update and completes (when the future accepts) or pushes no updates and rejects (when the future rejects). A ProgressFuture is the same, but progress updates are pushed as stream updates as well. An iterable becomes a stream that pulls all the values out of the iterable and pushes them as updates one-by-one, then completes.)

EventStream.of creates a completed stream out of its arguments, just like Array.of does for iterables. (It's the monadic "return/pure/point" operation.) It creates an event stream, pushes all the arguments into it in order, and then immediately completes.

EventStream.reject returns an event stream which immediately rejects with the given reason.

EventStream.listen automatically converts DOM Events into an event stream. Just specify an object that events are fired at to listen to, and the event you want, and every time that type of event is fired on the object (or bubbles to the object from a descendant), the event object will be pushed as an update to the output stream. This has no effect on the actual event, so you still need to use actual Events if you want to cancel an event/etc.

If none of these suffice, an event stream can be created manually with new EventStream(resolverCB).

interface EventStreamResolver {
  void push(any value);
  void complete(optional any value);
  void adopt(StreamLike value);
  void continueWith(StreamLike value);
  void reject(any value);
};

The constructor immediately returns an event stream, which is then controlled by the resolverCB function.

The resolverCB is called immediately, and is passed a resolver object, which represents the ability to update the stream. This can be passed around to other functions if desired, or just used in the resolverCB.

push takes a value, and pushes it to the stream as an update. complete completes the stream. It optionally takes a value to push as a final update before completing. adopt adopts the stream-like. continueWith adopts the stream-like, and immediately completes the parent stream as well. (It's syntax sugar for r.adopt(s); r.complete();.) reject rejects the stream, with the given reason. If the resolverCB throws an error, that also immediately rejects the stream.

Once complete or reject has been called, the resolver object becomes inert - calling any of the functions silently does nothing. This allows you to, for example, pass the resolver object to multiple functions, allowing them all to update the stream and race to complete/reject - once the first one ends the stream, the rest also lose the ability to update.

Event Stream Combinators

The real value of streams is the ability to manipulate and combine them, something which is non-trivial to do with callback interfaces or DOM Events. There's tons of possibilities for API here, a lot of which we'll have to leave to user-land libraries to fill in.

Before I get into explaining those, though, I'll point out that the basic stream listening operation - then - is also a great stream combiner. If you have a stream of streams, just calling s.then() on it, with no arguments, will "flatten" the event stream for you, producing a single stream that combines the updates from all the streams into one. (If you've got an array of streams, just pass it through EventStream.from first to turn it into a stream.) If you've got a stream of normal values, and a transformation function that'll turn those values into streams (or stream-likes, like Futures or arrays), calling then with the transformation function will similarly make a new stream that pushes updates from all the transformed values.

For example, let's assume that XHR grows a Future-based interface, called getJSON. If you've got an array of values that you want to send to the server and get back results from, you can just run EventStream.from(valArray).then(x=>getJSON(url, x)) and get back a stream that'll push all the results as updates as they come in, then complete itself. If you add a reject callback, you can get informed when any of the XHRs hit an error, too, and choose between making the entire stream fail or gracefully recovering! For example, to provide a default value, just do return EventStream.of({...});. To omit the failing one from the results, do return EventStream.of();, which makes an empty stream.

(If you're familiar with monads, this is because then is the monadic operation for event streams. I like the developing convention that of and then are the names for the monad functions in JS.)

Here's a start at some more interesting things that might belong in the core spec, though:

partial interface EventStream {
  EventStream filter(AnyCallback filterCB);
  EventStream switch(optional AnyCallback filterCB);
  EventStream map(AnyCallback mapCB);
  EventStream forEach(AnyCallback listenCB);
  EventStream throttle(number delay, optional number timeout = Infinity);

  Future next(optional AnyCallback filterCB);
  Future last();
}

For all of the functions defined here that return a new EventStream, throwing an error in the callback will automatically reject the output stream, just like in then.

EventStream#filter() takes a filter function, and returns a new stream that only contains the updates that the filter returned true for.

EventStream#map() is similar to then, but specialized for functions that don't return a stream-like. That is, no matter what you return, it just gets emitted as a single value in the output stream. This makes it easy to return arrays or Futures, which will get interpreted as a stream-like by then unless you manually wrap them in a dummy stream. (This is the functor operation for streams.)

EventStream#forEach() is used to "tap into" an existing stream, without modifying it - the results of its callback are completely ignored, so the output stream updates with the exact same values as the input stream. The only effect the callback can have on the output stream is to cause it to reject, by throwing an error.

EventStream#switch() is useful when you want to ignore "earlier" event streams once "later" ones have started giving out data. It's designed to take a stream of streams (or take a stream of values, plus a conversion function that returns streams, like then() does, defaulting to EventStream.from), and do a different type of interleaving - it stops listening to earlier streams once later streams have started putting out values.

This ACM article (scroll down to the part starting with Figure 3) has a great example of switch() - they want to watch a text input's value for changes, and as the user types, fire off XHRs for autocomplete suggestions and use them. However, if one XHR is slow, and the XHR for the next text input event is fast, they want to make sure they ignore the slow one, because it's obsolete now (if the user types "a", then "ab", you don't want to first show "ab" autocomplete results, then interrupt yourself and start showing "a" autocomplete results). This is complex given today's code, but with streams it's simple and easy (assuming a few convenience functions): watchTextInput(input).switch(t=>getJSON(url,t)).forEach(updateAutocomplete). DONE. (watchTextInput is a stand-in for some way to get a stream out of certain kinds of events. getJSON is an assumed future addition to the XHR spec that returns a Future for the result, which is a stream-like.)

(That article is great for multiple reasons - the figures are great visual illustrations of the effects of several functions: its "Where" is our "filter", its "Select" is our "map", its "SelectMany" is our "then".)

EventStream#throttle() slows a stream down, filtering updates that come in too fast. Whenever its input stream updates, it delays pushing it to its output stream for "delay" milliseconds, and if a new update comes in before the time is up, it simply throws away that update and restarts the clock for the new update. If "timeout" milliseconds have passed while the stream was continually idling, waiting for the stream to slow down, it goes ahead and pushes the most recent pending update. For example, the previous switch() example could probably use a throttle, placed between the watchTextInput() and the switch(), so you won't kick off 10 XHRs in quick succession when the user is typing fast; instead you'll just get the 10th update and XHR that.

ReactiveValue API

A ReactiveValue is just like an EventStream, except it maintains a memory of the last update as its "current value". The updates, thus, represent updates to the underlying value.

interface ReactiveValue {
  /* all of the EventStream methods */

  attribute any value;
  EventStream updates();
  EventStream diffs();
  ReactiveValue squash();

  static ReactiveValue watch(any object, DOMString property);
}

ReactiveValue is nearly identical to EventStream, with all the same methods, except for a few small differences. The first is that, obviously, all of the methods return ReactiveValues, not EventStreams.

The next change is that a ReactiveValue, whenever it's used as an input stream, pushes as its first update the current value. This happens automatically, even if a ReactiveValue is created manually, even if it's used in an EventStream function.

Finally, if using the explicit constructor, the return value of the resolverCB is taken as the stream's initial value. (In an EventStream, the return value of the resolverCB is ignored.)

It also adds a few things. First, the value attribute on the ReactiveValue holds the current value of the stream. This is a normal attribute, and is automatically changed by the ReactiveValue's handling of updates.

(Does this mean that the value changes syncly, even though you're not able to respond to the updates normally until a future tick? Or should this only update when update listeners are handled?)

The ReactiveValue#updates function returns a new EventStream that is identical to the ReactiveValue it was called on, except without the special "current value" handling. That is, if you call a combinator on it, you won't get the special "first update is the current value" behavior.

The ReactiveValue#diffs function is similar, but still gives you information about the previous value. Whenever the input stream updates, the output stream updates with an object containing an old key holding the old value, and a new key holding the input stream's update value.

The ReactiveValue#squash function avoids accidentally doing work on stale data. It returns a new ReactiveValue that mirrors the input stream, but with some special behavior for its updateCB - whenever an update would be pushed to a listener, it first checks to see if there are any pending updates still waiting on the event queue. If so, rather than queueing a new one, it just updates the old one to the new value. In this way, if multiple updates happen between successive calls to your updateCB, you'll only get the latest value, rather than stale data.

(Should this be the default, with a function that instead opts out? Or maybe we just rely on updates to cover the "I want every single change" use-cases?)

Finally, the static watch function lets you turn properties on arbitrary objects into value streams. For example, in my proposal for Font Load Events using Futures, I just expose a simple loadStatus property for whether the document is currently loading any fonts or not. If you want to watch this (for example, to provide some UI telling the user fonts are currently loading), you would just do:

ReactiveValue.watch(document.fonts, 'loadStatus')
  .then(updateLoadingUI);

The returned stream updates immediately with the current value of the property, and then updates again any time the value changes. If the property is removed, the stream updates to undefined. If the object is deleted, the stream rejects.

This can be implemented on top of Mutation Observers without a ton of trouble, but this is super-simple and easy for the common case.

(a limited set of Markdown is supported)

DOM Futures does not spec out a "progress" handler, as some other libraries do (see: jQuery Deferred, Q, or Promise.js). How does EventStream#listen differ from sending "progress" notifications in a compatible Futures API?

I'd also be interested to see how this API might interact with Object.observe (ES7?).

I feel Futures (or Promises, Deferreds, etc.) should be spec'ed as part of ES and not the DOM, as some new ES6+ proposals such as module loaders could make use of Futures in the API. Baking Futures into ES would also provide a consistent path forward for C#-async/await-style or Q.spawn style "asynchronous" development without the need for explicit ".then"-able continuation passing.

Reply?

(a limited set of Markdown is supported)

It doesn't spec out a progress event yet, but there's a proposal in the github repo, and Anne is considering it.

The difference between a ProgressFuture and an EventStream is low if you only look at the basic API, but big when you look at their theoretical thrust and the combinators that develop as a result. Futures are focused on completing, with progress updates being an incidental useful feature sometimes. Stream are focused on updating, with completion being an incidental useful feature sometimes.

Note, for example, the difference in their monad operations (the .then() methods). When futures chain, they depend on the input future's completion or rejection. When streams chain, they depend on the input stream's updates or rejection. This gives a fairly different result.

I think Object.observe fits in great with Streams, and intend to propose that it returns an event stream.

The reason we specced Futures in the DOM spec was largely a procedural issue - we have a decent need for Futures now, while for TC39 it wasn't an urgent proposal. That said, TC39 people were very involved with the development of the feature - see my other recent blog post about Futures for some background.

Reply?

(a limited set of Markdown is supported)

Hello!

From es-discuss I see that you are familiar with bacon.js and the similiarity between this and behaviors/signals from FRP.

Another piece of work that relates futures/promises and behaviors/signals is Push-Pull Functional Reactive Programming (http://conal.net/papers/push-pull-frp/) and the related Reactive library for Haskell (http://hackage.haskell.org/cgi-bin/hackage-scripts/package/reactive).

However you feel about types or functional programming, the paper and library are great sources to read for how the pieces fit together and how they fit with widespread structures. Hence they would be a treasure trove of combinators that are essentially guaranteed to work and be useful for your development.

  • Kenn
Reply?

(a limited set of Markdown is supported)

#4 - Emil Pirfält:

For convenience, EventStream#reject and EventStream#complete are provided, and are equivalent to calling then and only providing the given callback.

interface EventStream { EventStream then(optional AnyCallback? updateCB = null, optional AnyCallback? rejectCB = null, optional AnyCallback? completeCB = null); EventStream catch(optional AnyCallback rejectCB); EventStream complete(optional AnyCallback completeCB); }

One of them seems to be a typo, personally i like the .catch way more. Again the api unification, .catch(onError) == .then(null, onError).

About .complete(cb) == .then(null, null, cb), i first read it as "end the stream" that is resolver.complete() rather than stream.onComplete(). I dont know if onWhatever names are popular anymore but something like after/last/lastly would reduce my confusion.

Lastly. Steams, i like it. Thanks :)

Reply?

(a limited set of Markdown is supported)