Y
Published on

BigFrontEnd Category 14 Observable Implementation Questions

Authors
  • avatar
    Name
    Yinhuan Yuan
    Twitter

Introduction

This blog post summarizes the Observable implementation related questions found on BigFrontEnd.Dev.

1.create an Observable

57.https://bigfrontend.dev/problem/create-an-Observable

Have you ever used RxJS before? The most important concept in it is Observable and Observer.

Observable defines how values are delivered to Observer. Observer is just a set of callbacks.

Let's look at the code.

const observer = {
  next: (value) => {
    console.log('we got a value', value)
  },
  error: (error) => {
    console.log('we got an error', error)
  },
  complete: () => {
    console.log('ok, no more values')
  },
}

Above is an Observer which is pretty clear about what it is doing.

Then we could attach this Observer to some Observable. Observable feeds this observer with values or errors.

const observable = new Observable((subscriber) => {
  subscriber.next(1)
  subscriber.next(2)
  setTimeout(() => {
    subscriber.next(3)
    subscriber.next(4)
    subscriber.complete()
  }, 100)
})

The code plainly says when is a subscriber is attached,

  1. subscriber is fed with a value 1
  2. subscriber is then fed with a value 2
  3. wait 100 ms
  4. subscriber is fed with 3
  5. subscriber is fed with 4
  6. no more values for subscriber Now if we attach above observer to observable, next and complete of subscriber are called in order.(be aware that there is a delay between 2 and 3)
const sub = observable.subscribe(subscriber)
// we got a value 1
// we got a value 2

// we got a value 3
// we got a value 4
// ok, no more values

Notice subscribe() return a Subscription which could be used to stop listening to the value delivery.

const sub = observable.subscribe(subscriber)
setTimeout(() => {
  // ok we only subscribe for 100ms
  sub.unsubscribe()
}, 100)

So this is the basic idea of Observable and Observer. There will be a few more interesting follow-up problems.

Now you are asked to implement a basic Observable class, which makes above possible.

Some extra requirements are listed here.

  1. error and complete can only be delivered once, next/error/complete after error/complete should not work
  2. for a subscriber object, next/error/complete callback are all optional. if a function is passed as observer, it is treated as next.
  3. should support multiple subscription Further Reading

https://github.com/tc39/proposal-observable

Solution:

subscriber function takes a observer as input. An observer contains next: (val) => , error: (error) => , complete: () =>

class Observable {
  constructor(subscriber) {
    this.subscriber = subscriber
  }

  subscribe(observer) {
    // If observer is a function, treat it as the next method
    if (typeof observer === 'function') {
      observer = { next: observer }
    }

    let isUnsubscribed = false

    const safeObserver = {
      next: (value) => {
        if (!isUnsubscribed && observer.next) {
          observer.next(value)
        }
      },
      error: (error) => {
        if (!isUnsubscribed && observer.error) {
          observer.error(error)
        }
        isUnsubscribed = true
      },
      complete: () => {
        if (!isUnsubscribed && observer.complete) {
          observer.complete()
        }
        isUnsubscribed = true
      },
    }

    // Execute the subscriber function
    this.subscriber(safeObserver)

    return {
      unsubscribe: () => {
        isUnsubscribed = true
      },
    }
  }
}

2.implement Observable.from()

70.https://bigfrontend.dev/problem/implement-Observable-from

This is a follow-up on 57. create an Observable.

Suppose you have solved 57. create an Observable, here you are asked to implement a creation operator from().

From the document, from()

Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.

Your from() should accept all above types.

from([1, 2, 3]).subscribe(console.log)
// 1
// 2
// 3

Note

Observable is already given for you, no need to create it. for the problem here, Observable-like means Observable instance. Though in real-world you should check Symbol.observable

Solution:

/**
 * @param {Array | ArrayLike | Promise | Iterable | Observable} input
 * @return {Observable}
 */
function from(input) {
  if (input instanceof Observable) return input

  const isIterable = typeof input[Symbol.iterator] === 'function'
  const isArrayLike = input.length !== undefined

  if (isIterable || isArrayLike) {
    return new Observable((subscriber) => {
      try {
        if (isArrayLike) input = Array.from(input)

        for (const value of input) {
          subscriber.next(value)
        }
      } catch (err) {
        subscriber.error(err)
      } finally {
        subscriber.complete()
      }
    })
  }

  if (input instanceof Promise) {
    return new Observable((subscriber) => {
      input
        .then((result) => {
          subscriber.next(result)
        })
        .catch((err) => {
          subscriber.error(err)
        })
        .finally(() => {
          subscriber.complete()
        })
    })
  }

  throw new Error('You can provide an Observable, Promise, Array, or Iterable.')
}

3.implement Observable Subject

71.https://bigfrontend.dev/problem/implement-Observable-Subject

This is a follow-up on 57. create an Observable.

Plain Observables are unicast, meaning every subscription is independent. To create multicast, you need to use Subject.

Following code is easier to understand.

// default behavior with plain Observable
const observable = from([1, 2, 3])
observable.subscribe(console.log)
observable.subscribe(console.log)
// 1
// 2
// 3
// 1
// 2
// 3

You can see that two subscriptions are independent so the logs are grouped by subscription.

with Subject, it works like Event Listeners in DOM world.

const subject = new Subject()
subject.subscribe(console.log)
subject.subscribe(console.log)

const observable = from([1, 2, 3])
observable.subscribe(subject)

// 1
// 1
// 2
// 2
// 3
// 3

Now the logs are different! That is because Subject first works as a observer, get the values, then works as an Observable and dispatch the value to different observers.

Cool right? Ok, you are asked to implement a simple Subject Class.

  1. Observable is given for you, you can just use it.
  2. you can use new Observer({next,error,complete}) or new Observer(function) to create an observer.

Solution:

// You can use Observer which is bundled to your code

// class Observer {
//   // subscriber could one next function or a handler object {next, error, complete}
//   constructor(subscriber) { }
//   next(value) { }
//   error(error) { }
//   complete() {}
// }

class Subject {
  constructor() {
    this.observers = []
    this.next = this.next.bind(this)
    this.error = this.error.bind(this)
    this.complete = this.complete.bind(this)
  }

  subscribe(subscriber) {
    const observer = new Observer(subscriber)
    this.observers.push(observer)
    return observer
  }

  next(value) {
    for (const observer of this.observers) {
      observer.next(value)
    }
  }

  error(err) {
    for (const observer of this.observers) {
      observer.error(err)
    }
  }

  complete() {
    for (const observer of this.observers) {
      observer.complete()
    }
  }
}

4.implement Observable interval()

72.https://bigfrontend.dev/problem/implement-Observable-interval

This is a follow-up on 57. create an Observable.

Suppose you have solved 57. create an Observable, here you are asked to implement a creation operator interval().

From the document, interval()

Creates an Observable that emits sequential numbers every specified interval of time

interval(1000).subscribe(console.log)

Above code prints 0, 1, 2 .... with an interval of 1 seconds.

Note Observable is already given for you, no need to create it.

Solution:

/**
 * @param {number} period
 * @return {Observable}
 */
function interval(period) {
  let val = 0
  return new Observable((subscriber) => {
    setInterval(() => {
      subscriber.next(val)
      val += 1
    }, period)
  })
}

5.implement Observable fromEvent()

73.https://bigfrontend.dev/problem/implement-Observable-fromEvent

This is a follow-up on 57. create an Observable.

Suppose you have solved 57. create an Observable, here you are asked to implement a creation operator fromEvent()( for DOM Event)

From the document, fromEvent()

Creates an Observable that emits events of a specific type coming from the given event target.

Simply speaking, it is a util to attach event listener in Observable fashion.

const source = fromEvent(node, 'click')
source.subscribe((e) => console.log(e))

When node is clicked, the event is logged.

Note

Observable is already given for you, no need to create it. the event listener removal is handled by add(), which is beyond our scope here, you can ignore that.

Solution:

/**
 * @param {HTMLElement} element
 * @param {string} eventName
 * @param {boolean} capture
 * @return {Observable}
 */
function fromEvent(element, eventName, capture = false) {
  return new Observable((subscriber) => {
    element.addEventListener(
      eventName,
      (e) => {
        subscriber.next(e)
      },
      capture
    )
  })
}

6.implement Observable Transformation Operators

74.https://bigfrontend.dev/problem/implement-Observable-transformation-operators

This is a follow-up on 57. create an Observable.

There are a lot of operators for Observable, if we think of Observable as event stream, then modifying the stream is a common task, transformation operators are useful at this.

In this problem, you are asked to implement map(), as the name indicates, it maps the value to another value thus creating a new event stream.

Here is an example.

const source = Observable.from([1, 2, 3])
map((x) => x * x)(source) // this transformer doubles numbers and create a new stream
  .subscribe(console.log)
// 1
// 4
// 9

Observable has pipe() method which could make this more readable.

const source = Observable.from([1, 2, 3])
source.pipe(map((x) => x * x)).subscribe(console.log)
// 1
// 4
// 9

Note Observable is already given for you, no need to create it.

Solution:

/**
 * @param {any} input
 * @return {(observable: Observable) => Observable}
 * returns a function which transform Observable
 */
function map(transform) {
  return (source) => {
    return new Observable((subscriber) => {
      const originalNext = subscriber.next
      subscriber.next = (value) => {
        const newValue = transform(value)
        originalNext.call(subscriber, newValue)
      }
      source.subscribe(subscriber)
    })
  }
}