State Management & RxJS

Observables and Operators

Master RxJS operators — transformation, filtering, combination, error handling, and utility operators for powerful reactive data flows.

What Are Operators?

Operators are functions that transform Observable streams. They take an Observable as input and return a new Observable as output, allowing you to build complex data pipelines.

typescript
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5).pipe(
  filter(n => n % 2 === 0),   // Keep only even numbers
  map(n => n * 10),            // Multiply by 10
).subscribe(console.log);
// 20, 40

The pipe() Method

Chain operators using .pipe():

typescript
source$.pipe(
  operator1(),
  operator2(),
  operator3(),
).subscribe(result => { /* ... */ });

Transformation Operators

map — Transform Each Value

typescript
import { map } from 'rxjs/operators';

// Transform API response
this.http.get<ApiResponse>('/api/users').pipe(
  map(response => response.data),           // Extract data
  map(users => users.filter(u => u.active)), // Filter active users
);

// Simple transforms
of(1, 2, 3).pipe(
  map(n => n * n), // Square each number
).subscribe(console.log); // 1, 4, 9

switchMap — Map and Switch to New Observable

Cancels the previous inner Observable when a new value arrives:

typescript
import { switchMap } from 'rxjs/operators';

// Type-ahead search: each keystroke cancels the previous request
this.searchControl.valueChanges.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query => this.searchService.search(query)),
).subscribe(results => this.results = results);

mergeMap — Map and Merge Observables

Runs inner Observables concurrently:

typescript
import { mergeMap } from 'rxjs/operators';

// Process all items in parallel
from(userIds).pipe(
  mergeMap(id => this.userService.getUser(id)),
).subscribe(user => console.log(user));

concatMap — Map and Concatenate (Sequential)

Waits for each inner Observable to complete before starting the next:

typescript
import { concatMap } from 'rxjs/operators';

// Execute uploads one at a time
from(files).pipe(
  concatMap(file => this.uploadService.upload(file)),
).subscribe(result => console.log('Uploaded:', result));

Comparison

OperatorConcurrencyCancels Previous?Use Case
switchMap1 (latest)✅ YesSearch, autocomplete
mergeMapUnlimited❌ NoParallel API calls
concatMap1 (sequential)❌ NoOrdered operations
exhaustMap1 (first)Ignores newSubmit button (prevent double)

Filtering Operators

filter — Keep Matching Values

typescript
import { filter } from 'rxjs/operators';

numbers$.pipe(
  filter(n => n > 10),
).subscribe(console.log); // Only values > 10

distinctUntilChanged — Skip Duplicates

typescript
import { distinctUntilChanged } from 'rxjs/operators';

of(1, 1, 2, 2, 3, 1).pipe(
  distinctUntilChanged(),
).subscribe(console.log); // 1, 2, 3, 1

// With custom comparison
users$.pipe(
  distinctUntilChanged((prev, curr) => prev.id === curr.id),
);

take / takeUntil / takeWhile

typescript
import { take, takeUntil, takeWhile } from 'rxjs/operators';

// Take first 3 values then complete
interval(1000).pipe(take(3)).subscribe(console.log);
// 0, 1, 2

// Take until another Observable emits
interval(1000).pipe(
  takeUntil(this.destroy$),
).subscribe(console.log);

// Take while condition is true
of(1, 2, 3, 4, 5, 1, 2).pipe(
  takeWhile(n => n < 4),
).subscribe(console.log); // 1, 2, 3

skip — Skip First N Values

typescript
import { skip } from 'rxjs/operators';

interval(1000).pipe(
  skip(3), // Skip first 3 emissions
).subscribe(console.log); // Starts from 3

debounceTime — Wait for Pause in Emissions

typescript
import { debounceTime } from 'rxjs/operators';

// Wait 300ms after the user stops typing
this.searchInput.valueChanges.pipe(
  debounceTime(300),
).subscribe(query => this.search(query));

throttleTime — Rate Limit Emissions

typescript
import { throttleTime } from 'rxjs/operators';

// Only emit once every 1000ms
fromEvent(window, 'scroll').pipe(
  throttleTime(1000),
).subscribe(() => this.checkScrollPosition());

Combination Operators

combineLatest — Combine Latest Values

typescript
import { combineLatest } from 'rxjs';

const filters$ = combineLatest([
  this.categoryFilter$,
  this.priceFilter$,
  this.sortOrder$,
]).pipe(
  map(([category, price, sort]) => ({ category, price, sort })),
);

filters$.subscribe(filters => this.loadProducts(filters));

forkJoin — Wait for All to Complete

typescript
import { forkJoin } from 'rxjs';

// Load multiple API calls in parallel, wait for all
forkJoin({
  users: this.http.get<User[]>('/api/users'),
  products: this.http.get<Product[]>('/api/products'),
  settings: this.http.get<Settings>('/api/settings'),
}).subscribe(({ users, products, settings }) => {
  this.users = users;
  this.products = products;
  this.settings = settings;
});

merge — Merge Multiple Streams

typescript
import { merge } from 'rxjs';

// Combine multiple event sources
const allClicks$ = merge(
  fromEvent(button1, 'click'),
  fromEvent(button2, 'click'),
  fromEvent(button3, 'click'),
);

allClicks$.subscribe(() => console.log('A button was clicked'));

Error Handling Operators

catchError — Handle Errors

typescript
import { catchError, of } from 'rxjs';

this.http.get<User[]>('/api/users').pipe(
  catchError(error => {
    console.error('Failed to load users:', error);
    return of([]); // Return empty array as fallback
  }),
).subscribe(users => this.users = users);

retry — Retry on Error

typescript
import { retry, catchError } from 'rxjs';

this.http.get('/api/data').pipe(
  retry(3),                    // Retry up to 3 times
  catchError(err => of(null)), // Fallback after retries exhausted
);

retryWhen with Delay

typescript
import { retryWhen, delay, take } from 'rxjs/operators';

this.http.get('/api/data').pipe(
  retryWhen(errors => errors.pipe(
    delay(2000), // Wait 2 seconds between retries
    take(3),     // Maximum 3 retries
  )),
);

Utility Operators

tap — Side Effects (Debugging)

typescript
import { tap } from 'rxjs/operators';

this.http.get<User[]>('/api/users').pipe(
  tap(users => console.log('Raw response:', users)),
  map(users => users.filter(u => u.active)),
  tap(active => console.log('Active users:', active.length)),
);

finalize — Run on Complete/Error

typescript
import { finalize } from 'rxjs/operators';

this.loading = true;
this.http.get('/api/data').pipe(
  finalize(() => this.loading = false), // Always runs
).subscribe(data => this.data = data);

shareReplay — Cache and Share

typescript
import { shareReplay } from 'rxjs/operators';

// Cache the result and share with multiple subscribers
@Injectable({ providedIn: 'root' })
export class ConfigService {
  config$ = this.http.get<Config>('/api/config').pipe(
    shareReplay(1), // Cache last emission, share across subscribers
  );
}

Real-World Example: Search with Autocomplete

typescript
export class SearchComponent {
  private searchService = inject(SearchService);
  searchControl = new FormControl('');

  results$ = this.searchControl.valueChanges.pipe(
    debounceTime(300),              // Wait for typing pause
    distinctUntilChanged(),          // Skip if same value
    filter(query => query.length >= 2), // Minimum 2 chars
    tap(() => this.loading = true),
    switchMap(query =>
      this.searchService.search(query).pipe(
        catchError(() => of([])),    // Return empty on error
      )
    ),
    tap(() => this.loading = false),
  );

  loading = false;
}

Next Steps

With RxJS mastered, you're ready to tackle state management — organizing and sharing application state across components using services, signals, and dedicated state management patterns.