What Are Operators?
Operators are functions that transform Observable streams. They take an Observable as input and return a new Observable as output, allowing you to build complex data pipelines.
typescript
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5).pipe(
filter(n => n % 2 === 0), // Keep only even numbers
map(n => n * 10), // Multiply by 10
).subscribe(console.log);
// 20, 40The pipe() Method
Chain operators using .pipe():
typescript
source$.pipe(
operator1(),
operator2(),
operator3(),
).subscribe(result => { /* ... */ });Transformation Operators
map — Transform Each Value
typescript
import { map } from 'rxjs/operators';
// Transform API response
this.http.get<ApiResponse>('/api/users').pipe(
map(response => response.data), // Extract data
map(users => users.filter(u => u.active)), // Filter active users
);
// Simple transforms
of(1, 2, 3).pipe(
map(n => n * n), // Square each number
).subscribe(console.log); // 1, 4, 9switchMap — Map and Switch to New Observable
Cancels the previous inner Observable when a new value arrives:
typescript
import { switchMap } from 'rxjs/operators';
// Type-ahead search: each keystroke cancels the previous request
this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query => this.searchService.search(query)),
).subscribe(results => this.results = results);mergeMap — Map and Merge Observables
Runs inner Observables concurrently:
typescript
import { mergeMap } from 'rxjs/operators';
// Process all items in parallel
from(userIds).pipe(
mergeMap(id => this.userService.getUser(id)),
).subscribe(user => console.log(user));concatMap — Map and Concatenate (Sequential)
Waits for each inner Observable to complete before starting the next:
typescript
import { concatMap } from 'rxjs/operators';
// Execute uploads one at a time
from(files).pipe(
concatMap(file => this.uploadService.upload(file)),
).subscribe(result => console.log('Uploaded:', result));Comparison
| Operator | Concurrency | Cancels Previous? | Use Case |
|---|---|---|---|
switchMap | 1 (latest) | ✅ Yes | Search, autocomplete |
mergeMap | Unlimited | ❌ No | Parallel API calls |
concatMap | 1 (sequential) | ❌ No | Ordered operations |
exhaustMap | 1 (first) | Ignores new | Submit button (prevent double) |
Filtering Operators
filter — Keep Matching Values
typescript
import { filter } from 'rxjs/operators';
numbers$.pipe(
filter(n => n > 10),
).subscribe(console.log); // Only values > 10distinctUntilChanged — Skip Duplicates
typescript
import { distinctUntilChanged } from 'rxjs/operators';
of(1, 1, 2, 2, 3, 1).pipe(
distinctUntilChanged(),
).subscribe(console.log); // 1, 2, 3, 1
// With custom comparison
users$.pipe(
distinctUntilChanged((prev, curr) => prev.id === curr.id),
);take / takeUntil / takeWhile
typescript
import { take, takeUntil, takeWhile } from 'rxjs/operators';
// Take first 3 values then complete
interval(1000).pipe(take(3)).subscribe(console.log);
// 0, 1, 2
// Take until another Observable emits
interval(1000).pipe(
takeUntil(this.destroy$),
).subscribe(console.log);
// Take while condition is true
of(1, 2, 3, 4, 5, 1, 2).pipe(
takeWhile(n => n < 4),
).subscribe(console.log); // 1, 2, 3skip — Skip First N Values
typescript
import { skip } from 'rxjs/operators';
interval(1000).pipe(
skip(3), // Skip first 3 emissions
).subscribe(console.log); // Starts from 3debounceTime — Wait for Pause in Emissions
typescript
import { debounceTime } from 'rxjs/operators';
// Wait 300ms after the user stops typing
this.searchInput.valueChanges.pipe(
debounceTime(300),
).subscribe(query => this.search(query));throttleTime — Rate Limit Emissions
typescript
import { throttleTime } from 'rxjs/operators';
// Only emit once every 1000ms
fromEvent(window, 'scroll').pipe(
throttleTime(1000),
).subscribe(() => this.checkScrollPosition());Combination Operators
combineLatest — Combine Latest Values
typescript
import { combineLatest } from 'rxjs';
const filters$ = combineLatest([
this.categoryFilter$,
this.priceFilter$,
this.sortOrder$,
]).pipe(
map(([category, price, sort]) => ({ category, price, sort })),
);
filters$.subscribe(filters => this.loadProducts(filters));forkJoin — Wait for All to Complete
typescript
import { forkJoin } from 'rxjs';
// Load multiple API calls in parallel, wait for all
forkJoin({
users: this.http.get<User[]>('/api/users'),
products: this.http.get<Product[]>('/api/products'),
settings: this.http.get<Settings>('/api/settings'),
}).subscribe(({ users, products, settings }) => {
this.users = users;
this.products = products;
this.settings = settings;
});merge — Merge Multiple Streams
typescript
import { merge } from 'rxjs';
// Combine multiple event sources
const allClicks$ = merge(
fromEvent(button1, 'click'),
fromEvent(button2, 'click'),
fromEvent(button3, 'click'),
);
allClicks$.subscribe(() => console.log('A button was clicked'));Error Handling Operators
catchError — Handle Errors
typescript
import { catchError, of } from 'rxjs';
this.http.get<User[]>('/api/users').pipe(
catchError(error => {
console.error('Failed to load users:', error);
return of([]); // Return empty array as fallback
}),
).subscribe(users => this.users = users);retry — Retry on Error
typescript
import { retry, catchError } from 'rxjs';
this.http.get('/api/data').pipe(
retry(3), // Retry up to 3 times
catchError(err => of(null)), // Fallback after retries exhausted
);retryWhen with Delay
typescript
import { retryWhen, delay, take } from 'rxjs/operators';
this.http.get('/api/data').pipe(
retryWhen(errors => errors.pipe(
delay(2000), // Wait 2 seconds between retries
take(3), // Maximum 3 retries
)),
);Utility Operators
tap — Side Effects (Debugging)
typescript
import { tap } from 'rxjs/operators';
this.http.get<User[]>('/api/users').pipe(
tap(users => console.log('Raw response:', users)),
map(users => users.filter(u => u.active)),
tap(active => console.log('Active users:', active.length)),
);finalize — Run on Complete/Error
typescript
import { finalize } from 'rxjs/operators';
this.loading = true;
this.http.get('/api/data').pipe(
finalize(() => this.loading = false), // Always runs
).subscribe(data => this.data = data);shareReplay — Cache and Share
typescript
import { shareReplay } from 'rxjs/operators';
// Cache the result and share with multiple subscribers
@Injectable({ providedIn: 'root' })
export class ConfigService {
config$ = this.http.get<Config>('/api/config').pipe(
shareReplay(1), // Cache last emission, share across subscribers
);
}Real-World Example: Search with Autocomplete
typescript
export class SearchComponent {
private searchService = inject(SearchService);
searchControl = new FormControl('');
results$ = this.searchControl.valueChanges.pipe(
debounceTime(300), // Wait for typing pause
distinctUntilChanged(), // Skip if same value
filter(query => query.length >= 2), // Minimum 2 chars
tap(() => this.loading = true),
switchMap(query =>
this.searchService.search(query).pipe(
catchError(() => of([])), // Return empty on error
)
),
tap(() => this.loading = false),
);
loading = false;
}Next Steps
With RxJS mastered, you're ready to tackle state management — organizing and sharing application state across components using services, signals, and dedicated state management patterns.