Custom Transformations
Build powerful data transformations with async generators—the readable, maintainable way.
Why Async Generators?
When you need custom transformations beyond map() and filter(), you have two choices: async generators or TransformStream. Async generators are almost always better.
Compare these approaches for parsing JSON lines:
Async Generator:
async function* parseJsonLines(lines: AsyncIterable<string>) {
for await (const line of lines) {
try {
const obj = JSON.parse(line.trim());
if (obj.id && obj.timestamp) {
yield obj;
}
} catch {
// Skip invalid JSON
}
}
}
TransformStream:
const parseJsonLines = new TransformStream({
transform(chunk, controller) {
try {
const obj = JSON.parse(chunk.trim());
if (obj.id && obj.timestamp) {
controller.enqueue(obj);
}
} catch {
// Error handling is more complex
}
}
});
The generator reads like the logic you're implementing. The stream forces you into callbacks.
Batching Data
Group items into fixed-size chunks:
import { enumerate } from "jsr:@j50n/proc";
async function* batch<T>(items: AsyncIterable<T>, size: number) {
let batch: T[] = [];
for await (const item of items) {
batch.push(item);
if (batch.length === size) {
yield batch;
batch = [];
}
}
if (batch.length > 0) yield batch;
}
const batches = await enumerate([1, 2, 3, 4, 5, 6, 7])
.transform(items => batch(items, 3))
.collect();
console.log(batches); // [[1, 2, 3], [4, 5, 6], [7]]
Stateful Processing
Keep running calculations as data flows:
async function* runningAverage(numbers: AsyncIterable<number>) {
let sum = 0;
let count = 0;
for await (const num of numbers) {
sum += num;
count++;
yield sum / count;
}
}
const averages = await enumerate([10, 20, 30, 40])
.transform(runningAverage)
.collect();
console.log(averages); // [10, 15, 20, 25]
State variables live naturally in the function scope—no external state management needed.
Parsing with Error Recovery
Handle complex parsing gracefully:
interface LogEntry {
id: string;
timestamp: string;
level: string;
message: string;
}
async function* parseJsonLines(lines: AsyncIterable<string>) {
for await (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const obj = JSON.parse(trimmed);
if (obj.id && obj.timestamp && obj.level && obj.message) {
yield obj as LogEntry;
}
} catch {
// Skip invalid JSON, could log errors here
}
}
}
const logs = await enumerate([
'{"id":"1","timestamp":"2024-01-01","level":"info","message":"Started"}',
'invalid json line',
'{"id":"2","timestamp":"2024-01-01","level":"error","message":"Failed"}',
''
]).transform(parseJsonLines).collect();
console.log(logs.length); // 2 (invalid lines skipped)
Rate Limiting
Control timing between items:
import { enumerate, sleep } from "jsr:@j50n/proc";
async function* throttle<T>(items: AsyncIterable<T>, delayMs: number) {
let first = true;
for await (const item of items) {
if (!first) {
await sleep(delayMs);
}
first = false;
yield item;
}
}
// Rate-limit API calls
const results = await enumerate(["url1", "url2", "url3"])
.transform(urls => throttle(urls, 1000))
.map(async (url) => {
const response = await fetch(url);
return response.status;
})
.collect();
Multi-Stage Processing
Combine filtering, enrichment, and transformation:
async function* processLogEntries(lines: AsyncIterable<string>) {
for await (const line of lines) {
try {
const entry = JSON.parse(line);
if (entry.level !== 'error') continue;
const enriched = {
...entry,
processedAt: new Date().toISOString(),
severity: entry.message.toLowerCase().includes('critical') ? 'high' : 'medium'
};
yield {
timestamp: enriched.timestamp,
severity: enriched.severity,
summary: enriched.message.substring(0, 100)
};
} catch {
// Skip invalid entries
}
}
}
const processed = await enumerate([
'{"level":"info","message":"System started","timestamp":"2024-01-01T10:00:00Z"}',
'{"level":"error","message":"Critical database failure","timestamp":"2024-01-01T10:01:00Z"}',
'{"level":"error","message":"Minor timeout","timestamp":"2024-01-01T10:02:00Z"}'
]).transform(processLogEntries).collect();
console.log(processed.length); // 2 (only error entries)
Generator vs TransformStream
The same batching logic, both ways:
Generator (8 lines):
async function* batch<T>(items: AsyncIterable<T>, size: number) {
let batch: T[] = [];
for await (const item of items) {
batch.push(item);
if (batch.length === size) {
yield batch;
batch = [];
}
}
if (batch.length > 0) yield batch;
}
TransformStream (15+ lines):
function createBatchTransform<T>(size: number) {
let batch: T[] = [];
return new TransformStream<T, T[]>({
transform(chunk, controller) {
batch.push(chunk);
if (batch.length === size) {
controller.enqueue([...batch]);
batch = [];
}
},
flush(controller) {
if (batch.length > 0) {
controller.enqueue(batch);
}
}
});
}
Generators are shorter, more readable, easier to debug, and handle errors naturally with try-catch.
When to Use Each
Use Async Generators for:
- Complex state management (faster + easier)
- Error handling and recovery
- Multi-stage processing
- Readable, maintainable code
- Most custom transformations
Use TransformStream for:
- Simple 1:1 transformations (much faster)
- Built-in streams (
CompressionStream,DecompressionStream) - Interfacing with existing stream APIs
In practice:
// Built-in streams - use directly
.transform(new CompressionStream("gzip"))
// Custom logic - use generators
.transform(items => batch(items, 100))
.transform(parseJsonLines)
Best Practices
// Good: Clear, focused, well-typed
async function* parseAndValidateUsers(
lines: AsyncIterable<string>
): AsyncGenerator<User> {
for await (const line of lines) {
try {
const user = JSON.parse(line) as User;
if (isValidUser(user)) {
yield user;
}
} catch (error) {
console.warn(`Skipping invalid user data: ${error.message}`);
}
}
}
- Keep generators focused - One responsibility per function
- Handle errors gracefully - Use try-catch for parsing/validation
- Yield frequently - Don't buffer unnecessarily
- Use meaningful names -
parseJsonLinesnottransform1 - Add type annotations - Help TypeScript help you
Performance
We ran comprehensive benchmarks comparing async generators vs TransformStream across different scenarios:
TransformStream excels at simple operations:
- Small datasets: Similar performance
- Large datasets: Up to 810x faster for simple transformations
- JSON parsing: Up to 150x faster
- Best for: Simple 1:1 transformations, especially with large data
Async generators excel at complex operations:
- Stateful processing: 4-6x faster (batching, running totals)
- Error handling: 3-4x faster with try-catch
- Multi-stage logic: 4x faster for complex processing
- Best for: State management, error recovery, complex logic
Recommendation:
- Use TransformStream for simple operations on large datasets
- Use async generators for complex logic, state management, or when readability matters
- For most real-world transformations (parsing, validation, multi-step processing), generators are both faster and easier to write
Start with these patterns and build more sophisticated processing pipelines as needed.