What is RxJS?
RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables. Angular uses RxJS extensively for:
- HTTP requests (
HttpClientreturns Observables) - Form value changes
- Router events
- Component communication
- Real-time data streams (WebSockets, timers)
The Core Concept
Observable (Data Source)
│
├─▶ emits values over time: 1, 2, 3, "hello", { name: 'John' }
│
Observer (Consumer)
│
├─▶ next(value) — handle emitted values
├─▶ error(err) — handle errors
└─▶ complete() — handle completionCreating Observables
Using of — Emit Fixed Values
typescript
import { of } from 'rxjs';
const numbers$ = of(1, 2, 3, 4, 5);
numbers$.subscribe(value => console.log(value));
// 1, 2, 3, 4, 5, then completesUsing from — From Arrays/Promises
typescript
import { from } from 'rxjs';
// From array
const items$ = from(['Angular', 'React', 'Vue']);
items$.subscribe(item => console.log(item));
// From Promise
const promise$ = from(fetch('/api/users').then(r => r.json()));
promise$.subscribe(data => console.log(data));Using interval and timer
typescript
import { interval, timer } from 'rxjs';
// Emit every second: 0, 1, 2, 3, ...
const counter$ = interval(1000);
// Wait 2 seconds, then emit every second
const delayed$ = timer(2000, 1000);
// Emit once after 3 seconds
const oneShot$ = timer(3000);Using new Observable — Custom Observable
typescript
import { Observable } from 'rxjs';
const custom$ = new Observable<string>(subscriber => {
subscriber.next('First value');
subscriber.next('Second value');
setTimeout(() => {
subscriber.next('Async value');
subscriber.complete(); // Signal completion
}, 1000);
});
custom$.subscribe({
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('Done!'),
});Using Subject — Multicast
typescript
import { Subject } from 'rxjs';
const subject = new Subject<string>();
// Multiple subscribers
subject.subscribe(v => console.log('Sub 1:', v));
subject.subscribe(v => console.log('Sub 2:', v));
subject.next('Hello');
// Sub 1: Hello
// Sub 2: Hello
subject.next('World');
// Sub 1: World
// Sub 2: WorldObservables in Angular
HTTP Requests
typescript
@Injectable({ providedIn: 'root' })
export class UserService {
private http = inject(HttpClient);
getUsers(): Observable<User[]> {
return this.http.get<User[]>('/api/users');
// Returns a cold Observable — request fires on subscribe
}
}Component Usage
typescript
@Component({
template: `
@if (users$ | async; as users) {
@for (user of users; track user.id) {
<p>{{ user.name }}</p>
}
} @else {
<p>Loading...</p>
}
`,
})
export class UserListComponent {
users$ = inject(UserService).getUsers();
}Form Value Changes
typescript
this.searchForm.get('query')?.valueChanges.subscribe(value => {
console.log('Search term:', value);
});Router Events
typescript
import { Router, NavigationEnd } from '@angular/router';
export class AppComponent {
private router = inject(Router);
ngOnInit() {
this.router.events.subscribe(event => {
if (event instanceof NavigationEnd) {
console.log('Navigated to:', event.url);
}
});
}
}Subscribing and Unsubscribing
Manual Subscribe
typescript
export class MyComponent implements OnInit, OnDestroy {
private subscription!: Subscription;
ngOnInit() {
this.subscription = interval(1000).subscribe(n => {
console.log('Tick:', n);
});
}
ngOnDestroy() {
this.subscription.unsubscribe(); // Prevent memory leak!
}
}Using AsyncPipe (Auto-Unsubscribe)
html
<!-- The async pipe subscribes AND unsubscribes automatically -->
<p>Count: {{ counter$ | async }}</p>Using takeUntilDestroyed (Angular 16+)
typescript
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
export class MyComponent {
constructor() {
interval(1000).pipe(
takeUntilDestroyed(), // Auto-unsubscribes when component is destroyed
).subscribe(n => console.log(n));
}
}Using DestroyRef
typescript
import { DestroyRef, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
export class MyComponent implements OnInit {
private destroyRef = inject(DestroyRef);
ngOnInit() {
interval(1000).pipe(
takeUntilDestroyed(this.destroyRef),
).subscribe(n => console.log(n));
}
}Hot vs Cold Observables
| Type | Behavior | Examples |
|---|---|---|
| Cold | Creates a new execution per subscriber | HttpClient.get(), of(), from() |
| Hot | Shares execution across subscribers | Subject, BehaviorSubject, DOM events |
typescript
// COLD: Each subscriber gets its own HTTP request
const users$ = this.http.get('/api/users');
users$.subscribe(d => console.log('Sub 1:', d)); // Request #1
users$.subscribe(d => console.log('Sub 2:', d)); // Request #2
// HOT: All subscribers share the same stream
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe(e => console.log('Sub 1:', e)); // Same clicks
clicks$.subscribe(e => console.log('Sub 2:', e)); // Same clicksSubject Variants
| Type | Behavior |
|---|---|
Subject | No initial value, only receives values emitted after subscribing |
BehaviorSubject | Has initial value, new subscribers get the latest value immediately |
ReplaySubject | Replays specified number of previous values to new subscribers |
AsyncSubject | Only emits the last value on completion |
typescript
// BehaviorSubject — most common in Angular services
const count$ = new BehaviorSubject<number>(0);
count$.subscribe(v => console.log(v)); // Immediately logs: 0
count$.next(1); // logs: 1
count$.next(2); // logs: 2
// Late subscriber still gets the latest value
count$.subscribe(v => console.log('Late:', v)); // Immediately: "Late: 2"Best Practices
- Use
asyncpipe whenever possible — it handles subscriptions automatically - Always unsubscribe from long-lived Observables (intervals, WebSockets, form changes)
- Use
takeUntilDestroyed()in components for clean unsubscription - Prefer
BehaviorSubjectin services for state that needs an initial value - Type your Observables —
Observable<User[]>notObservable<any>
Next Steps
Now that you understand Observables, let's explore RxJS operators — powerful functions that transform, filter, combine, and control Observable data streams.