State Management & RxJS

RxJS Fundamentals

Learn the basics of Reactive Extensions for JavaScript (RxJS) — Observables, Observers, Subscriptions, and the reactive programming paradigm in Angular.

What is RxJS?

RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables. Angular uses RxJS extensively for:

  • HTTP requests (HttpClient returns Observables)
  • Form value changes
  • Router events
  • Component communication
  • Real-time data streams (WebSockets, timers)

The Core Concept

Observable (Data Source)
    │
    ├─▶ emits values over time: 1, 2, 3, "hello", { name: 'John' }
    │
Observer (Consumer)
    │
    ├─▶ next(value)    — handle emitted values
    ├─▶ error(err)     — handle errors
    └─▶ complete()     — handle completion

Creating Observables

Using of — Emit Fixed Values

typescript
import { of } from 'rxjs';

const numbers$ = of(1, 2, 3, 4, 5);
numbers$.subscribe(value => console.log(value));
// 1, 2, 3, 4, 5, then completes

Using from — From Arrays/Promises

typescript
import { from } from 'rxjs';

// From array
const items$ = from(['Angular', 'React', 'Vue']);
items$.subscribe(item => console.log(item));

// From Promise
const promise$ = from(fetch('/api/users').then(r => r.json()));
promise$.subscribe(data => console.log(data));

Using interval and timer

typescript
import { interval, timer } from 'rxjs';

// Emit every second: 0, 1, 2, 3, ...
const counter$ = interval(1000);

// Wait 2 seconds, then emit every second
const delayed$ = timer(2000, 1000);

// Emit once after 3 seconds
const oneShot$ = timer(3000);

Using new Observable — Custom Observable

typescript
import { Observable } from 'rxjs';

const custom$ = new Observable<string>(subscriber => {
  subscriber.next('First value');
  subscriber.next('Second value');

  setTimeout(() => {
    subscriber.next('Async value');
    subscriber.complete(); // Signal completion
  }, 1000);
});

custom$.subscribe({
  next: value => console.log(value),
  error: err => console.error(err),
  complete: () => console.log('Done!'),
});

Using Subject — Multicast

typescript
import { Subject } from 'rxjs';

const subject = new Subject<string>();

// Multiple subscribers
subject.subscribe(v => console.log('Sub 1:', v));
subject.subscribe(v => console.log('Sub 2:', v));

subject.next('Hello');
// Sub 1: Hello
// Sub 2: Hello

subject.next('World');
// Sub 1: World
// Sub 2: World

Observables in Angular

HTTP Requests

typescript
@Injectable({ providedIn: 'root' })
export class UserService {
  private http = inject(HttpClient);

  getUsers(): Observable<User[]> {
    return this.http.get<User[]>('/api/users');
    // Returns a cold Observable — request fires on subscribe
  }
}

Component Usage

typescript
@Component({
  template: `
    @if (users$ | async; as users) {
      @for (user of users; track user.id) {
        <p>{{ user.name }}</p>
      }
    } @else {
      <p>Loading...</p>
    }
  `,
})
export class UserListComponent {
  users$ = inject(UserService).getUsers();
}

Form Value Changes

typescript
this.searchForm.get('query')?.valueChanges.subscribe(value => {
  console.log('Search term:', value);
});

Router Events

typescript
import { Router, NavigationEnd } from '@angular/router';

export class AppComponent {
  private router = inject(Router);

  ngOnInit() {
    this.router.events.subscribe(event => {
      if (event instanceof NavigationEnd) {
        console.log('Navigated to:', event.url);
      }
    });
  }
}

Subscribing and Unsubscribing

Manual Subscribe

typescript
export class MyComponent implements OnInit, OnDestroy {
  private subscription!: Subscription;

  ngOnInit() {
    this.subscription = interval(1000).subscribe(n => {
      console.log('Tick:', n);
    });
  }

  ngOnDestroy() {
    this.subscription.unsubscribe(); // Prevent memory leak!
  }
}

Using AsyncPipe (Auto-Unsubscribe)

html
<!-- The async pipe subscribes AND unsubscribes automatically -->
<p>Count: {{ counter$ | async }}</p>

Using takeUntilDestroyed (Angular 16+)

typescript
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

export class MyComponent {
  constructor() {
    interval(1000).pipe(
      takeUntilDestroyed(), // Auto-unsubscribes when component is destroyed
    ).subscribe(n => console.log(n));
  }
}

Using DestroyRef

typescript
import { DestroyRef, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

export class MyComponent implements OnInit {
  private destroyRef = inject(DestroyRef);

  ngOnInit() {
    interval(1000).pipe(
      takeUntilDestroyed(this.destroyRef),
    ).subscribe(n => console.log(n));
  }
}

Hot vs Cold Observables

TypeBehaviorExamples
ColdCreates a new execution per subscriberHttpClient.get(), of(), from()
HotShares execution across subscribersSubject, BehaviorSubject, DOM events
typescript
// COLD: Each subscriber gets its own HTTP request
const users$ = this.http.get('/api/users');
users$.subscribe(d => console.log('Sub 1:', d)); // Request #1
users$.subscribe(d => console.log('Sub 2:', d)); // Request #2

// HOT: All subscribers share the same stream
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe(e => console.log('Sub 1:', e)); // Same clicks
clicks$.subscribe(e => console.log('Sub 2:', e)); // Same clicks

Subject Variants

TypeBehavior
SubjectNo initial value, only receives values emitted after subscribing
BehaviorSubjectHas initial value, new subscribers get the latest value immediately
ReplaySubjectReplays specified number of previous values to new subscribers
AsyncSubjectOnly emits the last value on completion
typescript
// BehaviorSubject — most common in Angular services
const count$ = new BehaviorSubject<number>(0);
count$.subscribe(v => console.log(v)); // Immediately logs: 0
count$.next(1); // logs: 1
count$.next(2); // logs: 2

// Late subscriber still gets the latest value
count$.subscribe(v => console.log('Late:', v)); // Immediately: "Late: 2"

Best Practices

  1. Use async pipe whenever possible — it handles subscriptions automatically
  2. Always unsubscribe from long-lived Observables (intervals, WebSockets, form changes)
  3. Use takeUntilDestroyed() in components for clean unsubscription
  4. Prefer BehaviorSubject in services for state that needs an initial value
  5. Type your ObservablesObservable<User[]> not Observable<any>

Next Steps

Now that you understand Observables, let's explore RxJS operators — powerful functions that transform, filter, combine, and control Observable data streams.