Welcome to proc
Running child processes and working with streams should be simple. proc makes it simple.
✨ The Big Idea: Treat processes and streams like arrays. Use
map,filter,reduceon anything. Errors flow naturally through pipelines. No callbacks, no edge cases, no headaches.
What is proc?
proc is a Deno library that gives you two superpowers:
- Run child processes with a clean, composable API
- Work with async iterables using the Array methods you already know
But here's the real magic: errors just work. They flow through your pipelines naturally, like data. No edge cases, no separate error channels, no callbacks. One try-catch at the end handles everything.
💡 Tip: If you've ever struggled with JavaScript streams, you're going to love this.
A Taste of proc
Count lines in a compressed file—streaming, no temp files:
import { read } from "jsr:@j50n/proc@0.23.3";
const lines = await read("war-and-peace.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.count();
console.log(`${lines} lines`); // 23,166 lines
Chain processes like shell pipes:
import { run } from "jsr:@j50n/proc@0.23.3";
const result = await run("cat", "data.txt")
.run("grep", "error")
.run("wc", "-l")
.lines.first;
Handle errors gracefully:
import { run } from "jsr:@j50n/proc@0.23.3";
try {
await run("npm", "test")
.lines
.map((line) => line.toUpperCase())
.filter((line) => line.includes("FAIL"))
.forEach((line) => console.log(line));
} catch (error) {
// All errors caught here—from the process, from map, from filter
console.error(`Tests failed: ${error.code}`);
}
Why proc?
JavaScript streaming is fast, but error handling shouldn't break your brain. proc gives you:
- Errors that propagate naturally through pipelines
- Array methods on async iterables (map, filter, reduce, and more)
- Process management that feels like shell scripting
- Streaming everything for memory efficiency
- Type safety with full TypeScript support
Who is this for?
- DevOps engineers automating deployments, processing logs, and managing infrastructure
- Data engineers processing large CSV files, log files, or streaming data
- Backend developers building CLI tools, batch processors, or data pipelines
- System administrators replacing Bash scripts with type-safe, testable Deno code
- Anyone who needs to run child processes or work with large datasets efficiently
Ready to dive in?
Start with Installation or jump straight to the Quick Start.
Current Version: 0.23.3
Status: Stable, actively maintained, ready for production
Found a bug? Have a question? File an issue or check the FAQ.
Installation
Getting started with proc is simple—it's just a Deno import away.
Import from JSR
Add proc to your Deno project:
import * as proc from "jsr:@j50n/proc@0.23.3";
Or import just what you need:
import { run, enumerate, read } from "jsr:@j50n/proc@0.23.3";
That's it! No installation step, no package.json, no node_modules.
Permissions
proc needs permissions to run child processes and access files. When you run your script, Deno will prompt you, or you can grant them upfront:
deno run --allow-run --allow-read your-script.ts
Common permissions:
--allow-run- Required to run child processes--allow-read- Needed to read files--allow-write- Needed to write files--allow-env- If your processes need environment variables
You can be more specific:
# Only allow running specific commands
deno run --allow-run=ls,grep,wc your-script.ts
# Only allow reading specific directories
deno run --allow-read=/var/log your-script.ts
Version Pinning
For production, pin to a specific version:
import { run } from "jsr:@j50n/proc@1.0.0";
For development, use the latest:
import { run } from "jsr:@j50n/proc";
Next Steps
Ready to write your first proc script? Head to the Quick Start guide.
Quick Start
Let's get you running code in 5 minutes.
Your First Process
Create a file called hello.ts:
import { run } from "jsr:@j50n/proc@0.23.3";
// Run a command and capture output
const lines = await run("echo", "Hello, proc!").lines.collect();
console.log(lines); // ["Hello, proc!"]
Run it:
deno run --allow-run hello.ts
What just happened?
run()started theechocommand.linesconverted the output to text lines.collect()gathered all lines into an array
Chaining Processes
Let's chain commands together, like shell pipes:
import { run } from "jsr:@j50n/proc@0.23.3";
const result = await run("echo", "HELLO WORLD")
.run("tr", "A-Z", "a-z") // Convert to lowercase
.lines.first;
console.log(result); // "hello world"
Each .run() pipes the previous output to the next command's input.
Working with Files
Process a file line by line:
import { read } from "jsr:@j50n/proc@0.23.3";
const errorCount = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.count();
console.log(`Found ${errorCount} errors`);
Handling Errors
Errors propagate naturally—catch them once at the end:
import { run } from "jsr:@j50n/proc@0.23.3";
try {
await run("false") // This command exits with code 1
.lines
.collect();
} catch (error) {
console.error(`Command failed: ${error.code}`);
}
No need to check errors at each step. They flow through the pipeline and you catch them once. For details, see Error Handling.
Using Array Methods
Work with async data using familiar Array methods:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const data = ["apple", "banana", "cherry"];
const numbered = await enumerate(data)
.enum() // Add indices
.map(([fruit, i]) => `${i + 1}. ${fruit}`)
.collect();
console.log(numbered);
// ["1. apple", "2. banana", "3. cherry"]
A Real Example
Let's find the 5 most recent commits that mention "fix":
import { run } from "jsr:@j50n/proc@0.23.3";
const commits = await run("git", "log", "--oneline")
.lines
.filter(line => line.includes("fix"))
.take(5)
.collect();
commits.forEach(commit => console.log(commit));
This chains multiple operations, all streaming, using minimal memory. For more complex examples, see Recipes.
What's Next?
Now that you've got the basics, learn about:
- Key Concepts - Properties vs methods, resource management
- Error Handling - The killer feature explained
- Running Processes - All the ways to run commands
Or jump straight to Recipes for copy-paste solutions.
Key Concepts
Before you dive deep, let's cover a few concepts that will make everything click.
Properties vs Methods
This trips up everyone at first. Some APIs are properties (no parentheses), some are methods (with parentheses).
Properties:
.lines // Not .lines()
.status // Not .status()
.first // Not .first()
.last // Not .last()
Methods:
.collect()
.map()
.filter()
.count()
Why? Properties are getters that return new objects or promises. Methods are functions you call. Your IDE will help, but when in doubt, check the docs.
Error Propagation
Errors flow through pipelines like data—no need to check at every step:
try {
await run("command1")
.run("command2")
.run("command3")
.lines
.forEach(process);
} catch (error) {
// All errors caught here
handle(error);
}
Errors from processes, transformations, or your own code all propagate to the same place. For details, see Error Handling.
Resource Management
Golden rule: Always consume process output.
Good:
await run("ls").lines.collect(); // ✅ Output consumed
await run("ls").lines.forEach(console.log); // ✅ Output consumed
Bad:
const p = run("ls"); // ❌ Output never consumed = resource leak
Why? Unconsumed output keeps the process handle open. Always use .collect(), .forEach(), or iterate through the output.
Enumeration Pattern
enumerate() wraps an iterable to give it Array-like methods. To add indices, call .enum() on the result:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
// enumerate() wraps the iterable, .enum() adds indices
const numbered = await enumerate(["a", "b", "c"])
.enum() // Returns [item, index] tuples
.map(([item, i]) => `${i}: ${item}`)
.collect();
// ["0: a", "1: b", "2: c"]
Why two steps? enumerate() gives you the methods (map, filter, etc.), while .enum() is just one of many operations you can perform. You might not always need indices:
// Use enumerate() without .enum() for other operations
const doubled = await enumerate([1, 2, 3])
.map(n => n * 2) // No indices needed
.collect();
Streaming Everything
proc is lazy and streaming by default. Nothing happens until you consume the output.
// This doesn't run anything yet
const pipeline = run("cat", "huge-file.txt")
.run("grep", "error")
.lines
.map(line => line.toUpperCase());
// Now it runs, one line at a time
for await (const line of pipeline) {
console.log(line); // Processes one line at a time
}
This means you can process files larger than memory. The data flows through, never all loaded at once.
Type Safety
proc is fully typed. Your IDE will guide you:
const lines: string[] = await run("ls").lines.collect();
// ^-- TypeScript knows this is string[]
const count: number = await run("ls").lines.count();
// ^-- TypeScript knows this is number
If you see a type error, you're probably using the API wrong. Check the docs!
Next Steps
Now that you understand the concepts, dive into:
- Error Handling - Deep dive into the killer feature
- Running Processes - All the ways to run commands
- Array-Like Methods - map, filter, reduce, and more
Common Patterns
This guide shows typical usage patterns to help you understand how proc works in practice.
Pattern: Run and Collect
The most basic pattern—run a command and get all output:
import { run } from "jsr:@j50n/proc@0.23.3";
const lines = await run("ls", "-la").lines.collect();
// lines is string[]
Key points:
.linesis a property (no parentheses) that returns an Enumerable.collect()is a method that consumes the stream and returns an array- Always consume output to avoid resource leaks
Pattern: Process Pipeline
Chain commands like shell pipes:
import { run } from "jsr:@j50n/proc@0.23.3";
const count = await run("cat", "data.txt")
.run("grep", "error")
.run("wc", "-l")
.lines.first;
Key points:
- Each
.run()pipes the previous command's stdout to the next command's stdin .firstis a property that returns a Promise<string | undefined>- Errors from any command in the chain propagate to the catch block
Pattern: Transform and Filter
Process output with Array methods:
import { run } from "jsr:@j50n/proc@0.23.3";
const errors = await run("cat", "app.log")
.lines
.filter((line) => line.includes("ERROR"))
.map((line) => line.trim())
.take(10)
.collect();
Key points:
.linesconverts byte stream to line stream- Methods like
.filter(),.map(),.take()work on the stream - Nothing executes until you call a terminal operation like
.collect()
Pattern: Error Handling
Catch errors at the end of the pipeline:
import { run } from "jsr:@j50n/proc@0.23.3";
try {
await run("npm", "test")
.lines
.forEach((line) => console.log(line));
} catch (error) {
if (error.code) {
console.error(`Process exited with code ${error.code}`);
} else {
console.error(`Error: ${error.message}`);
}
}
Key points:
- Processes that exit with non-zero codes throw
ExitCodeError - The error has a
.codeproperty with the exit code - All errors (process, transform, your code) propagate to the same catch block
Pattern: Check Status Without Throwing
Get exit status without throwing an error:
import { run } from "jsr:@j50n/proc@0.23.3";
const p = run("some-command");
await p.lines.collect(); // Consume output first
const status = await p.status; // .status is a property
if (status.code !== 0) {
console.error(`Command failed with code ${status.code}`);
}
Key points:
.statusis a property that returnsPromise<CommandStatus>- You must consume output before checking status
- This doesn't throw on non-zero exit codes
Pattern: Enumerate with Indices
Add indices to any iterable:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const numbered = await enumerate(["a", "b", "c"])
.enum() // Adds [item, index] tuples
.map(([item, i]) => `${i + 1}. ${item}`)
.collect();
// ["1. a", "2. b", "3. c"]
Key points:
enumerate()wraps an iterable to add Array-like methods.enum()is a method that transforms items to[item, index]tuples- This is a two-step process: wrap, then enumerate
Pattern: Enumerate Without Indices
Use Array methods without adding indices:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const filtered = await enumerate(["a", "b", "c"])
.filter((item) => item !== "b")
.map((item) => item.toUpperCase())
.collect();
// ["A", "C"]
Key points:
- You don't need to call
.enum()if you don't need indices enumerate()just adds Array-like methods to any iterable
Pattern: Stream Large Files
Process files without loading into memory:
import { read } from "jsr:@j50n/proc@0.23.3";
const errorCount = await read("huge-log.txt")
.lines
.filter((line) => line.includes("ERROR"))
.count();
Key points:
read()returns an Enumerable of bytes.linesconverts to line stream- Everything streams—no memory spike for large files
Pattern: Decompress and Process
Handle compressed files:
import { read } from "jsr:@j50n/proc@0.23.3";
const lines = await read("data.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.collect();
Key points:
.transform()applies a TransformStreamDecompressionStreamis built into Deno- Everything streams—no temp files needed
Pattern: Concurrent Processing
Process items in parallel with concurrency control:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const urls = ["url1", "url2", "url3"];
await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetch(url);
return { url, status: response.status };
}, { concurrency: 5 })
.forEach((result) => console.log(result));
Key points:
.concurrentMap()processes items in parallelconcurrencyoption limits how many run at once- Results maintain input order (use
.concurrentUnorderedMap()for faster unordered results)
Pattern: Build Objects with Reduce
Aggregate data into a single value:
import { run } from "jsr:@j50n/proc@0.23.3";
const wordCount = await run("cat", "data.txt")
.lines
.reduce((acc, line) => {
const words = line.split(/\s+/);
for (const word of words) {
acc[word] = (acc[word] || 0) + 1;
}
return acc;
}, {} as Record<string, number>);
Key points:
.reduce()works like Array.reduce()- Provide an initial value (second argument)
- The accumulator can be any type
Pattern: Split Stream with Tee
Process the same stream multiple ways:
import { run } from "jsr:@j50n/proc@0.23.3";
const [errors, warnings] = run("cat", "app.log")
.lines
.tee(2);
const errorCount = errors.filter((line) => line.includes("ERROR")).count();
const warningCount = warnings.filter((line) => line.includes("WARN")).count();
console.log(`Errors: ${await errorCount}, Warnings: ${await warningCount}`);
Key points:
.tee(n)splits a stream into n identical streams- Each stream can be processed independently
- All streams must be consumed
Anti-Pattern: Not Consuming Output
Don't do this:
const p = run("ls"); // ❌ Output never consumed
Why it's bad: Unconsumed output keeps the process handle open, causing resource leaks.
Do this instead:
await run("ls").lines.collect(); // ✅ Output consumed
Anti-Pattern: Mixing Sync and Async
Don't do this:
const lines = run("ls").lines; // ❌ Not awaited
lines.forEach((line) => console.log(line)); // ❌ Won't work
Why it's bad: .lines returns an Enumerable, but you need to await the
terminal operation.
Do this instead:
await run("ls").lines.forEach((line) => console.log(line)); // ✅ Awaited
Error Handling
Error handling is proc's primary design goal. Errors flow through pipelines naturally, just like data.
The Problem
Traditional stream error handling requires managing errors at multiple points:
// With Deno.Command - manual error handling at each step
const cmd1 = new Deno.Command("cat", { args: ["file.txt"] });
const proc1 = cmd1.spawn();
const output1 = await proc1.output();
if (!output1.success) {
throw new Error(`cat failed: ${output1.code}`);
}
const cmd2 = new Deno.Command("grep", {
args: ["pattern"],
stdin: "piped"
});
const proc2 = cmd2.spawn();
// ... manually pipe output1 to proc2 stdin ...
const output2 = await proc2.output();
if (!output2.success) {
throw new Error(`grep failed: ${output2.code}`);
}
With Node.js streams, you need error handlers on each stream:
stream1.on('error', handleError);
stream2.on('error', handleError);
stream3.on('error', handleError);
The proc Solution
Errors flow through pipelines like data. Handle them once, at the end:
try {
await run("cat", "file.txt")
.run("grep", "pattern")
.run("wc", "-l")
.lines
.map(transform)
.filter(predicate)
.forEach(process);
} catch (error) {
// All errors caught here:
// - Process exit codes
// - Transform errors
// - Filter errors
// - Your own errors
console.error(`Pipeline failed: ${error.message}`);
}
One try-catch. No edge cases. No separate error channels.
How It Works
When something goes wrong anywhere in the pipeline:
- The error is captured
- Downstream operations are skipped
- The error propagates to your catch block
It's functional programming—errors are just another type of data flowing through.
Error Types
proc throws specific error types so you can handle them differently:
ExitCodeError
Thrown when a process exits with a non-zero code:
import { ExitCodeError } from "jsr:@j50n/proc@0.23.3";
try {
await run("false").lines.collect();
} catch (error) {
if (error instanceof ExitCodeError) {
console.error(`Process failed with code ${error.code}`);
console.error(`Command: ${error.command.join(" ")}`);
}
}
SignalError
Thrown when a process is killed by a signal:
import { SignalError } from "jsr:@j50n/proc@0.23.3";
try {
await run("sleep", "1000").lines.collect();
// Kill it with Ctrl+C
} catch (error) {
if (error instanceof SignalError) {
console.error(`Process killed by signal: ${error.signal}`);
}
}
UpstreamError
Thrown when an error comes from upstream in a pipeline:
import { UpstreamError } from "jsr:@j50n/proc@0.23.3";
try {
await run("cat", "missing.txt") // This fails
.run("grep", "pattern") // This gets UpstreamError
.lines.collect();
} catch (error) {
if (error instanceof UpstreamError) {
console.error(`Upstream failure: ${error.cause}`);
}
}
Checking Exit Status Without Throwing
Sometimes you want to check the exit code without throwing:
const p = run("some-command");
await p.lines.collect(); // Consume output
const status = await p.status; // Check status
if (status.code !== 0) {
console.error(`Command failed with code ${status.code}`);
}
Important: Consume the output first, then check status. Otherwise you'll leak resources.
Handling Specific Exit Codes
try {
await run("grep", "pattern", "file.txt").lines.collect();
} catch (error) {
if (error instanceof ExitCodeError) {
if (error.code === 1) {
// grep returns 1 when no matches found
console.log("No matches found");
} else {
// Other errors
throw error;
}
}
}
Errors in Transformations
Errors in your own code propagate the same way:
try {
await run("cat", "numbers.txt")
.lines
.map(line => {
const num = parseInt(line);
if (isNaN(num)) {
throw new Error(`Invalid number: ${line}`);
}
return num;
})
.forEach(console.log);
} catch (error) {
// Catches both process errors AND your parsing errors
console.error(`Pipeline failed: ${error.message}`);
}
Custom Error Handling
You can customize how errors are handled per process using the fnError option:
await run(
{
fnError: (error, stderrData) => {
// Custom error handling
if (error?.code === 1) {
// Suppress or transform specific errors
console.warn("Command returned 1, continuing anyway");
return;
}
// Re-throw other errors
throw error;
}
},
"command"
).lines.collect();
Suppress All Errors
Sometimes you want to ignore failures:
// Ignore all errors from this command
await run(
{ fnError: () => {} },
"command"
).lines.collect();
Transform Errors
Add context or change error types:
await run(
{
fnError: (error) => {
throw new Error(`Database backup failed: ${error.message}`);
}
},
"pg_dump", "mydb"
).lines.collect();
Working with Stderr
By default, stderr is passed through to Deno.stderr. You can capture and process it:
await run(
{
fnStderr: async (stderr) => {
for await (const line of stderr.lines) {
console.error(`[STDERR] ${line}`);
}
}
},
"command"
).lines.collect();
Collect Stderr
Capture stderr for analysis:
const stderrLines: string[] = [];
await run(
{
fnStderr: async (stderr) => {
for await (const line of stderr.lines) {
stderrLines.push(line);
}
}
},
"command"
).lines.collect();
console.log("Stderr output:", stderrLines);
Combine Stdout and Stderr
Process both streams together:
const allOutput: string[] = [];
await run(
{
fnStderr: async (stderr) => {
for await (const line of stderr.lines) {
allOutput.push(`[ERR] ${line}`);
}
}
},
"command"
).lines.forEach(line => {
allOutput.push(`[OUT] ${line}`);
});
Best Practices
1. Catch at the End
Don't catch errors in the middle of a pipeline unless you're handling them specifically:
// ❌ Don't do this
try {
const lines = await run("command").lines.collect();
} catch (e) {
// Handle here
}
try {
const filtered = lines.filter(predicate);
} catch (e) {
// And here
}
// ✅ Do this
try {
await run("command")
.lines
.filter(predicate)
.forEach(process);
} catch (error) {
// Handle once
}
2. Always Consume Output
Even if you don't care about the output, consume it:
// ❌ Resource leak
const p = run("command");
// Never consumed!
// ✅ Consume it
await run("command").lines.collect();
// Or
await run("command").lines.forEach(() => {});
3. Use Specific Error Types
Handle different errors differently:
try {
await pipeline();
} catch (error) {
if (error instanceof ExitCodeError) {
// Process failed
} else if (error instanceof SignalError) {
// Process killed
} else {
// Something else
}
}
4. Use Custom Handlers Sparingly
Only customize error handling when you have a specific need. The default behavior works well for most cases.
Why This Matters
Error handling is the primary reason proc exists. If you've ever:
- Fought with stream error events
- Debugged edge cases in error propagation
- Written the same error handling code over and over
- Lost errors in complex pipelines
...then you understand why this is revolutionary.
Errors just work. Like they should have all along.
Next Steps
- Running Processes - Learn all the ways to run commands
- Process Pipelines - Chain commands together
- Custom Error Handling - Advanced error customization
Running Processes
Running a child process with proc is as simple as it gets.
Basic Usage
import { run } from "jsr:@j50n/proc@0.23.3";
// Run a command
await run("ls", "-la").lines.collect();
That's it. No boilerplate, no configuration, just run it.
Command and Arguments
The first parameter is the command, the rest are arguments:
run("command", "arg1", "arg2", "arg3")
Important: Arguments are separate parameters, not a single string:
// ✅ Correct
run("ls", "-la", "/home")
// ❌ Wrong - this won't work
run("ls -la /home")
Capturing Output
As an Array
const lines = await run("ls", "-la").lines.collect();
// lines is string[]
Line by Line
for await (const line of run("ls", "-la").lines) {
console.log(line);
}
First or Last Line
const first = await run("ls").lines.first;
const last = await run("ls").lines.last;
As Raw Bytes
const bytes = await run("cat", "file.bin").collect();
// bytes is Uint8Array[]
Printing to Console
Send output directly to stdout:
await run("ls", "-la").toStdout();
This is perfect for commands where you just want to see the output.
Building Commands Dynamically
Sometimes you need to build a command from variables:
import { type Cmd, run } from "jsr:@j50n/proc@0.23.3";
const cmd: Cmd = ["ls"];
if (showAll) {
cmd.push("-la");
}
if (directory) {
cmd.push(directory);
}
await run(...cmd).toStdout();
The Cmd type is an array where the first element is the command (string or URL) and the rest are string arguments. Using the Cmd type ensures type safety when building commands dynamically.
Process Options
Customize process behavior with options:
await run(
{
cwd: "/tmp", // Working directory
env: { FOO: "bar" }, // Environment variables
},
"command",
"arg1"
).lines.collect();
Working Directory
await run(
{ cwd: "/var/log" },
"ls"
).toStdout();
Environment Variables
await run(
{ env: { PATH: "/custom/path" } },
"command"
).lines.collect();
Checking Exit Status
Get the exit status without throwing:
const p = run("command");
await p.lines.collect(); // Consume output first
const status = await p.status;
console.log(`Exit code: ${status.code}`);
console.log(`Success: ${status.success}`);
Remember: Always consume output before checking status, or you'll leak resources.
Process ID
Get the process ID:
const p = run("sleep", "10");
console.log(`PID: ${p.pid}`);
await p.lines.collect();
Running with URLs
You can use URLs for the command:
const scriptUrl = new URL("./script.sh", import.meta.url);
await run(scriptUrl).toStdout();
Common Patterns
Silent Execution
Run a command and ignore output:
await run("command").lines.forEach(() => {});
Capture and Print
Capture output while also printing it:
const lines: string[] = [];
await run("command").lines.forEach(line => {
console.log(line);
lines.push(line);
});
Conditional Execution
if (needsProcessing) {
await run("process-data").toStdout();
}
Error Handling
By default, non-zero exit codes throw ExitCodeError:
try {
await run("false").lines.collect();
} catch (error) {
console.error(`Command failed: ${error.code}`);
}
See Error Handling for complete details.
Performance Tips
Stream Instead of Collect
Process data as it arrives rather than loading everything into memory:
// ❌ Loads everything into memory
const lines = await run("cat", "huge-file.txt").lines.collect();
for (const line of lines) {
process(line);
}
// ✅ Processes one line at a time
for await (const line of run("cat", "huge-file.txt").lines) {
process(line);
}
Pipe Instead of Collect Intermediate Results
Chain processes instead of collecting intermediate results:
// ❌ Collects intermediate results
const lines1 = await run("cat", "file.txt").lines.collect();
const input = lines1.join("\n");
const lines2 = await run("grep", "pattern").lines.collect();
// ✅ Streams through
await run("cat", "file.txt")
.run("grep", "pattern")
.toStdout();
Use take() to Stop Early
Stop processing once you have what you need:
// Stops after finding 10 matches
const first10 = await run("grep", "ERROR", "huge.log")
.lines
.take(10)
.collect();
Filter Before Expensive Operations
Reduce the amount of data flowing through expensive operations:
// ✅ Filter first (fast), then transform (expensive)
const result = await run("cat", "data.txt")
.lines
.filter(line => line.length > 0) // Fast filter
.map(expensiveTransform) // Only runs on filtered data
.collect();
For more performance optimization strategies, see Concurrent Processing and Streaming Large Files.
Next Steps
- Process Pipelines - Chain commands together
- Working with Output - Transform and process output
- Error Handling - Handle failures gracefully
Process Pipelines
Chain processes together like shell pipes. It's beautiful.
The Basics
In a shell, you'd write:
cat file.txt | grep error | wc -l
In proc, you write:
const count = await run("cat", "file.txt")
.run("grep", "error")
.run("wc", "-l")
.lines.first;
Each .run() pipes the previous output to the next command's stdin.
How It Works
run("command1") // Produces output
.run("command2") // Receives command1's output as stdin
.run("command3") // Receives command2's output as stdin
The data flows through, one buffer at a time. Nothing is collected in memory unless you ask for it.
Real Examples
Count Lines
const lines = await run("cat", "file.txt")
.run("wc", "-l")
.lines.first;
console.log(`${lines} lines`);
Find and Count
const errorCount = await run("cat", "app.log")
.run("grep", "ERROR")
.run("wc", "-l")
.lines.first;
Sort and Unique
const unique = await run("cat", "words.txt")
.run("sort")
.run("uniq")
.lines.collect();
Case Conversion
const lowercase = await run("echo", "HELLO WORLD")
.run("tr", "A-Z", "a-z")
.lines.first;
// "hello world"
Mixing Processes and Transformations
You can mix process pipes with JavaScript transformations:
const result = await run("cat", "data.txt")
.run("grep", "pattern")
.lines
.map(line => line.trim())
.filter(line => line.length > 0)
.collect();
The .lines converts bytes to text, then JavaScript takes over.
Complex Pipelines
Build sophisticated data processing pipelines:
const stats = await run("cat", "access.log")
.run("grep", "ERROR")
.run("cut", "-d", " ", "-f", "1") // Extract IP addresses
.run("sort")
.run("uniq", "-c") // Count occurrences
.run("sort", "-rn") // Sort by count
.run("head", "-10") // Top 10
.lines
.collect();
console.log("Top 10 error sources:");
stats.forEach(line => console.log(line));
Branching Pipelines
Sometimes you need to process the same data in multiple ways. Use .tee() to split a pipeline into multiple branches:
const [branch1, branch2] = run("cat", "data.txt")
.lines
.tee();
// Process both branches concurrently
const [result1, result2] = await Promise.all([
branch1.filter(line => line.includes("A")).collect(),
branch2.filter(line => line.includes("B")).collect(),
]);
How it works: .tee() creates two independent iterables from one source. Each branch can be processed differently, and both can run concurrently.
Use cases:
- Collect different subsets of data in one pass
- Calculate multiple statistics simultaneously
- Process data while also logging it
Important: Both branches must be consumed, or you'll leak resources.
Error Handling in Pipelines
Errors propagate through the entire pipeline:
try {
await run("cat", "missing.txt") // This fails
.run("grep", "pattern") // Never runs
.run("wc", "-l") // Never runs
.lines.collect();
} catch (error) {
// Catches the error from cat
console.error(`Pipeline failed: ${error.message}`);
}
See Error Handling for details.
Performance Characteristics
Pipelines are:
- Streaming - Data flows through, not collected in memory
- Lazy - Nothing runs until you consume the output
- Concurrent - All processes run at the same time
- Efficient - Minimal memory usage, even for huge files
// This processes a 10GB file using ~constant memory
await run("cat", "huge-file.txt")
.run("grep", "pattern")
.run("wc", "-l")
.lines.first;
Debugging Pipelines
Print intermediate results:
await run("cat", "file.txt")
.run("grep", "pattern")
.lines
.map(line => {
console.log(`Processing: ${line}`);
return line;
})
.forEach(process);
Or split it up:
const step1 = run("cat", "file.txt");
const step2 = step1.run("grep", "pattern");
const step3 = step2.lines;
// Now you can inspect each step
for await (const line of step3) {
console.log(line);
}
Common Patterns
Extract and Count
const count = await run("cat", "file.txt")
.run("grep", "-o", "pattern")
.lines.count();
Filter and Transform
const results = await run("cat", "data.csv")
.run("grep", "-v", "^#") // Remove comments
.run("cut", "-d", ",", "-f", "1,3") // Extract columns
.lines
.map(line => line.split(","))
.collect();
Aggregate Data
const sum = await run("cat", "numbers.txt")
.lines
.map(line => parseInt(line))
.reduce((acc, n) => acc + n, 0);
When to Use Pipelines
Use pipelines when:
- You're processing large files
- You want to chain Unix tools
- You need streaming performance
- You're replacing shell scripts
Use JavaScript when:
- You need complex logic
- You're working with structured data (JSON, etc.)
- You need type safety
- The operation is CPU-bound
Mix both for the best of both worlds!
Next Steps
- Working with Output - Transform and process output
- Concurrent Processing - Parallel pipelines
- Streaming Large Files - Handle huge files efficiently
Working with Output
Capture, transform, and process command output.
Choosing Your Approach
Use .lines.collect() when you need all output as an array (small outputs only):
const lines = await run("ls").lines.collect(); // All lines in memory
Use .lines with for-await when processing large outputs line-by-line:
for await (const line of run("cat", "huge.log").lines) {
process(line); // Constant memory usage
}
Use .toStdout() when you just want to see the output:
await run("ls", "-la").toStdout(); // Prints directly to console
Use .first or .last when you only need one line:
const result = await run("git", "rev-parse", "HEAD").lines.first;
Getting Output
As Lines
import { run } from "jsr:@j50n/proc@0.23.3";
const lines = await run("ls", "-la").lines.collect();
// string[]
As Bytes
const bytes = await run("cat", "file.bin").collect();
// Uint8Array[]
First Line
const result = await run("git", "rev-parse", "HEAD").lines.first;
// Single string
Print to Console
await run("ls", "-la").toStdout();
Transforming Output
Map Lines
const uppercase = await run("cat", "file.txt")
.lines
.map(line => line.toUpperCase())
.collect();
Filter Lines
const errors = await run("cat", "app.log")
.lines
.filter(line => line.includes("ERROR"))
.collect();
Parse Output
const commits = await run("git", "log", "--oneline")
.lines
.map(line => {
const [hash, ...message] = line.split(" ");
return { hash, message: message.join(" ") };
})
.collect();
Streaming Output
Process output as it arrives:
for await (const line of run("tail", "-f", "app.log").lines) {
if (line.includes("ERROR")) {
console.error(line);
}
}
Counting Output
const lineCount = await run("ls", "-la").lines.count();
Finding in Output
const match = await run("ps", "aux")
.lines
.find(line => line.includes("node"));
Real-World Examples
Parse JSON Output
const data = await run("curl", "https://api.example.com/data")
.lines
.map(line => JSON.parse(line))
.collect();
Extract Fields
const pids = await run("ps", "aux")
.lines
.drop(1) // Skip header
.map(line => line.split(/\s+/)[1])
.collect();
Aggregate Data
const total = await run("du", "-sh", "*")
.lines
.map(line => {
const size = line.split("\t")[0];
return parseInt(size);
})
.reduce((sum, size) => sum + size, 0);
Next Steps
- Process Pipelines - Chain commands
- Running Processes - More ways to run
- Array-Like Methods - Transform output
Working with Input
Send data to process stdin.
Choosing Your Approach
Use .run() for process-to-process pipes (most common):
await run("cat", "file.txt").run("grep", "pattern").toStdout();
Use enumerate() for in-memory data:
await enumerate(["line1", "line2"]).run("grep", "1").toStdout();
Use read() for file input:
await read("input.txt").run("grep", "pattern").toStdout();
Use range() for generated sequences:
await range({ to: 100 }).map(n => n.toString()).run("shuf").toStdout();
Piping Between Processes
The most common way to provide input:
import { run } from "jsr:@j50n/proc@0.23.3";
await run("echo", "hello")
.run("tr", "a-z", "A-Z") // Receives "hello" as stdin
.toStdout();
// HELLO
From Enumerable
Pipe any enumerable to a process:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const data = ["line 1", "line 2", "line 3"];
await enumerate(data)
.run("grep", "2")
.toStdout();
// line 2
From File
import { read } from "jsr:@j50n/proc@0.23.3";
await read("input.txt")
.run("grep", "pattern")
.toStdout();
Real-World Examples
Filter Data
await read("data.txt")
.run("grep", "ERROR")
.run("sort")
.run("uniq")
.toStdout();
Transform and Process
await read("input.txt")
.lines
.map(line => line.toUpperCase())
.run("sort")
.toStdout();
Generate and Process
import { range } from "jsr:@j50n/proc@0.23.3";
await range({ to: 100 })
.map(n => n.toString())
.run("shuf") // Shuffle
.run("head", "-10")
.toStdout();
Next Steps
- Process Pipelines - Chain commands
- Working with Output - Capture results
Resource Management
Avoid leaks and manage resources properly.
The Golden Rule
Always consume process output.
import { run } from "jsr:@j50n/proc@0.23.3";
// ❌ Resource leak
const p = run("ls");
// Output never consumed!
// ✅ Output consumed
await run("ls").lines.collect();
Why This Matters
Unconsumed output keeps the process handle open, preventing cleanup.
Ways to Consume Output
collect()
const lines = await run("ls").lines.collect();
forEach()
await run("ls").lines.forEach(line => {
console.log(line);
});
for-await
for await (const line of run("ls").lines) {
console.log(line);
}
toStdout()
await run("ls").toStdout();
Aggregations
const count = await run("ls").lines.count();
const first = await run("ls").lines.first;
Checking Status
Consume output before checking status:
const p = run("command");
await p.lines.collect(); // Consume first
const status = await p.status; // Then check
Error Handling
Errors automatically clean up resources:
try {
await run("false").lines.collect();
} catch (error) {
// Resources cleaned up automatically
}
Long-Running Processes
For processes that run indefinitely:
// This is fine - consuming output as it arrives
for await (const line of run("tail", "-f", "log").lines) {
process(line);
}
Best Practices
- Always consume output - Use collect(), forEach(), or iterate
- Check status after consuming - Don't check status first
- Let errors propagate - They clean up automatically
- Use try-finally for cleanup - If you need custom cleanup
Next Steps
- Error Handling - Handle failures
- Running Processes - Process basics
Understanding Enumerable
Enumerable is the heart of proc's async iterable magic. It wraps any iterable and gives you Array-like superpowers.
What is Enumerable?
Think of Enumerable as an Array, but for async data. It gives you map, filter, reduce, and more—but for data that arrives over time.
import { enumerate } from "jsr:@j50n/proc@0.23.3";
// Wrap any iterable
const nums = enumerate([1, 2, 3, 4, 5]);
// Use Array methods
const doubled = await nums
.map(n => n * 2)
.filter(n => n > 5)
.collect();
console.log(doubled); // [6, 8, 10]
Why Enumerable?
JavaScript has Arrays for sync data and Streams for async data. But Streams are awkward:
// Streams are verbose
const stream = readableStream
.pipeThrough(new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk * 2);
}
}))
.pipeThrough(new TransformStream({
transform(chunk, controller) {
if (chunk > 5) controller.enqueue(chunk);
}
}));
Enumerable makes it simple:
// Enumerable is clean
const result = await enumerate(asyncIterable)
.map(n => n * 2)
.filter(n => n > 5)
.collect();
Creating Enumerables
From Arrays
const nums = enumerate([1, 2, 3]);
From Async Generators
async function* generate() {
yield 1;
yield 2;
yield 3;
}
const nums = enumerate(generate());
From Process Output
import { run } from "jsr:@j50n/proc@0.23.3";
const lines = run("ls", "-la").lines;
// lines is already an Enumerable<string>
From Files
import { read } from "jsr:@j50n/proc@0.23.3";
const bytes = read("file.txt");
// bytes is Enumerable<Uint8Array>
const lines = read("file.txt").lines;
// lines is Enumerable<string>
Consuming Enumerables
Collect to Array
const array = await enumerate([1, 2, 3]).collect();
// [1, 2, 3]
Iterate with for-await
for await (const item of enumerate([1, 2, 3])) {
console.log(item);
}
Process Each Item
await enumerate([1, 2, 3]).forEach(item => {
console.log(item);
});
Get First or Last
const first = await enumerate([1, 2, 3]).first;
const last = await enumerate([1, 2, 3]).last;
Lazy Evaluation
Enumerables are lazy—nothing happens until you consume them:
// This doesn't run anything yet
const pipeline = enumerate([1, 2, 3])
.map(n => {
console.log(`Processing ${n}`);
return n * 2;
});
// Now it runs
const result = await pipeline.collect();
// Logs: Processing 1, Processing 2, Processing 3
This is powerful for large datasets:
// Processes one line at a time, never loads entire file
for await (const line of read("huge-file.txt").lines) {
process(line);
}
Chaining Operations
Chain as many operations as you want:
const result = await enumerate([1, 2, 3, 4, 5])
.map(n => n * 2) // [2, 4, 6, 8, 10]
.filter(n => n > 5) // [6, 8, 10]
.map(n => n.toString()) // ["6", "8", "10"]
.collect();
Each operation returns a new Enumerable, so you can keep chaining.
Type Safety
Enumerable is fully typed:
const nums: Enumerable<number> = enumerate([1, 2, 3]);
const strings: Enumerable<string> = nums.map(n => n.toString());
// ^-- TypeScript knows this is Enumerable<string>
const result: string[] = await strings.collect();
// ^-- TypeScript knows this is string[]
Your IDE will guide you with autocomplete and type errors.
Common Patterns
Transform and Collect
const result = await enumerate(data)
.map(transform)
.collect();
Filter and Count
const count = await enumerate(data)
.filter(predicate)
.count();
Find First Match
const match = await enumerate(data)
.find(predicate);
Check if Any/All
const hasMatch = await enumerate(data).some(predicate);
const allMatch = await enumerate(data).every(predicate);
Performance
Enumerable is:
- Streaming - Processes one item at a time
- Lazy - Only runs when consumed
- Memory efficient - Doesn't load everything at once
- Fast - Minimal overhead
// This processes a 10GB file using constant memory
await read("huge-file.txt")
.lines
.filter(line => line.includes("ERROR"))
.forEach(console.log);
Enumerable vs Array
| Feature | Array | Enumerable |
|---|---|---|
| Data | Sync | Async |
| Memory | All in memory | Streaming |
| Size | Limited by RAM | Unlimited |
| Methods | map, filter, etc. | map, filter, etc. |
| Lazy | No | Yes |
Use Arrays for small, sync data. Use Enumerable for large, async data.
Caching Iterables
Sometimes you need to reuse an iterable's results. Use cache() to store results for replay:
import { cache, enumerate } from "jsr:@j50n/proc@0.23.3";
const expensive = enumerate(data)
.map(expensiveOperation);
const cached = cache(expensive);
// First time - runs the operations
const result1 = await cached.collect();
// Second time - uses cached results, doesn't re-run
const result2 = await cached.collect();
Use cases:
- Reuse expensive computations
- Replay iterables multiple times
- Share results across operations
Warning: Caching stores all results in memory. Only cache when:
- The dataset is small enough to fit in memory
- You need to iterate multiple times
- The computation is expensive enough to justify memory usage
Writable Iterables
Create async iterables you can write to programmatically:
import { WritableIterable } from "jsr:@j50n/proc@0.23.3";
const writable = new WritableIterable<string>();
// Write to it
await writable.write("item1");
await writable.write("item2");
await writable.write("item3");
await writable.close();
// Read from it
const items = await writable.collect();
// ["item1", "item2", "item3"]
Use cases:
- Generate data programmatically
- Bridge between push and pull models
- Create custom data sources
- Implement producer-consumer patterns
Example: Event-driven data:
const events = new WritableIterable<Event>();
// Producer: write events as they occur
eventEmitter.on("data", async (event) => {
await events.write(event);
});
eventEmitter.on("end", async () => {
await events.close();
});
// Consumer: process events as they arrive
for await (const event of events) {
processEvent(event);
}
Next Steps
- Array-Like Methods - All the methods available
- Transformations - map, flatMap, transform
- Aggregations - reduce, count, sum
Array-Like Methods
Enumerable gives you the Array methods you know and love, but for async data.
Transformations
map()
Transform each item:
const doubled = await enumerate([1, 2, 3])
.map(n => n * 2)
.collect();
// [2, 4, 6]
Works with async functions:
const results = await enumerate(urls)
.map(async (url) => {
const response = await fetch(url);
return response.json();
})
.collect();
filter()
Keep only items that match:
const evens = await enumerate([1, 2, 3, 4])
.filter(n => n % 2 === 0)
.collect();
// [2, 4]
flatMap()
Map and flatten in one step:
const words = await enumerate(["hello world", "foo bar"])
.flatMap(line => line.split(" "))
.collect();
// ["hello", "world", "foo", "bar"]
Aggregations
reduce()
Combine items into a single value:
const sum = await enumerate([1, 2, 3, 4])
.reduce((acc, n) => acc + n, 0);
// 10
Build complex objects:
const grouped = await enumerate(items)
.reduce((acc, item) => {
acc[item.category] = acc[item.category] || [];
acc[item.category].push(item);
return acc;
}, {});
count()
Count items:
const total = await enumerate([1, 2, 3]).count();
// 3
some()
Check if any item matches:
const hasError = await enumerate(lines)
.some(line => line.includes("ERROR"));
every()
Check if all items match:
const allPositive = await enumerate([1, 2, 3])
.every(n => n > 0);
Finding Items
find()
Find first match:
const match = await enumerate([1, 2, 3, 4])
.find(n => n > 2);
// 3
first
Get first item:
const first = await enumerate([1, 2, 3]).first;
// 1
last
Get last item:
const last = await enumerate([1, 2, 3]).last;
// 3
nth()
Get item at index:
const third = await enumerate([1, 2, 3, 4]).nth(2);
// 3 (zero-indexed)
Slicing
take()
Take first N items:
const first3 = await enumerate([1, 2, 3, 4, 5])
.take(3)
.collect();
// [1, 2, 3]
drop()
Skip first N items:
const rest = await enumerate([1, 2, 3, 4, 5])
.drop(2)
.collect();
// [3, 4, 5]
slice()
Get a range:
const middle = await enumerate([1, 2, 3, 4, 5])
.slice(1, 4)
.collect();
// [2, 3, 4]
Iteration
forEach()
Process each item:
await enumerate([1, 2, 3]).forEach(n => {
console.log(n);
});
for-await
Use standard JavaScript iteration:
for await (const item of enumerate([1, 2, 3])) {
console.log(item);
}
Collecting
collect()
Gather all items into an array:
const array = await enumerate([1, 2, 3]).collect();
// [1, 2, 3]
toArray()
Alias for collect():
const array = await enumerate([1, 2, 3]).toArray();
Utilities
enum()
Add indices to items:
const indexed = await enumerate(["a", "b", "c"])
.enum()
.collect();
// [["a", 0], ["b", 1], ["c", 2]]
Use with map:
const numbered = await enumerate(["a", "b", "c"])
.enum()
.map(([item, i]) => `${i + 1}. ${item}`)
.collect();
// ["1. a", "2. b", "3. c"]
tee()
Split into multiple streams:
const [stream1, stream2] = enumerate([1, 2, 3]).tee();
const [sum, product] = await Promise.all([
stream1.reduce((a, b) => a + b, 0),
stream2.reduce((a, b) => a * b, 1),
]);
flatten()
Flatten nested iterables:
const flat = await enumerate([[1, 2], [3, 4]])
.flatten()
.collect();
// [1, 2, 3, 4]
Concurrent Operations
concurrentMap()
Map with controlled concurrency:
const results = await enumerate(urls)
.concurrentMap(async (url) => {
return await fetch(url);
}, { concurrency: 5 })
.collect();
Results are returned in order.
concurrentUnorderedMap()
Map with maximum concurrency:
const results = await enumerate(urls)
.concurrentUnorderedMap(async (url) => {
return await fetch(url);
}, { concurrency: 5 })
.collect();
Results are returned as they complete (faster).
Chaining Examples
Complex Pipeline
const result = await enumerate(data)
.filter(item => item.active)
.map(item => item.value)
.filter(value => value > 0)
.map(value => value * 2)
.take(10)
.collect();
Real-World Example
const topErrors = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.map(line => {
const match = line.match(/ERROR: (.+)/);
return match ? match[1] : line;
})
.reduce((acc, error) => {
acc[error] = (acc[error] || 0) + 1;
return acc;
}, {});
Performance Tips
Use Streaming
Don't collect if you don't need to:
// ❌ Loads everything
const items = await enumerate(huge).collect();
for (const item of items) process(item);
// ✅ Streams
for await (const item of enumerate(huge)) {
process(item);
}
Use take() for Limits
// Get first 10 matches
const matches = await enumerate(data)
.filter(predicate)
.take(10)
.collect();
Use concurrentMap() for I/O
// Process 5 URLs at a time
const results = await enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 })
.collect();
Next Steps
- Transformations - Deep dive into map, flatMap, transform
- Aggregations - Deep dive into reduce, count, sum
- Slicing and Sampling - Deep dive into take, drop, slice
Transformations
Change data as it flows through your pipeline.
map()
Transform each item:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const doubled = await enumerate([1, 2, 3])
.map(n => n * 2)
.collect();
// [2, 4, 6]
With Async Functions
const results = await enumerate(urls)
.map(async (url) => {
const response = await fetch(url);
return response.json();
})
.collect();
Type Transformations
const strings = await enumerate([1, 2, 3])
.map(n => n.toString())
.collect();
// ["1", "2", "3"]
Complex Transformations
const processed = await enumerate(rawData)
.map(item => ({
id: item.id,
name: item.name.toUpperCase(),
value: parseFloat(item.value),
timestamp: new Date(item.timestamp)
}))
.collect();
flatMap()
Map and flatten in one step:
const words = await enumerate(["hello world", "foo bar"])
.flatMap(line => line.split(" "))
.collect();
// ["hello", "world", "foo", "bar"]
Expanding Items
const expanded = await enumerate([1, 2, 3])
.flatMap(n => [n, n * 10])
.collect();
// [1, 10, 2, 20, 3, 30]
Filtering While Mapping
const valid = await enumerate(data)
.flatMap(item => {
if (item.valid) {
return [item.value];
}
return []; // Skip invalid items
})
.collect();
filter()
Keep only matching items:
const evens = await enumerate([1, 2, 3, 4, 5])
.filter(n => n % 2 === 0)
.collect();
// [2, 4]
Complex Predicates
const active = await enumerate(users)
.filter(user =>
user.active &&
user.lastLogin > cutoffDate &&
user.role !== "guest"
)
.collect();
With Type Guards
const numbers = await enumerate(mixed)
.filter((item): item is number => typeof item === "number")
.collect();
transform()
Apply a TransformStream:
import { read } from "jsr:@j50n/proc@0.23.3";
const decompressed = await read("file.gz")
.transform(new DecompressionStream("gzip"))
.lines
.collect();
Custom Transform
const transformed = await enumerate(data)
.transform(new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
}))
.collect();
Chaining Transformations
Combine multiple transformations:
const result = await enumerate(data)
.map(item => item.trim())
.filter(item => item.length > 0)
.map(item => item.toUpperCase())
.filter(item => item.startsWith("A"))
.collect();
Real-World Examples
Parse CSV
const data = await read("data.csv")
.lines
.drop(1) // Skip header
.map(line => line.split(","))
.map(([name, age, city]) => ({
name,
age: parseInt(age),
city
}))
.filter(row => row.age >= 18)
.collect();
Extract URLs
const urls = await read("page.html")
.lines
.flatMap(line => {
const matches = line.match(/https?:\/\/[^\s"']+/g);
return matches || [];
})
.collect();
Clean Data
const cleaned = await enumerate(rawData)
.map(item => item.trim())
.filter(item => item.length > 0)
.map(item => item.toLowerCase())
.filter(item => !item.startsWith("#"))
.collect();
Transform JSON Lines
const objects = await read("data.jsonl")
.lines
.map(line => JSON.parse(line))
.filter(obj => obj.status === "active")
.map(obj => ({
id: obj.id,
name: obj.name,
value: obj.value * 1.1 // Apply 10% increase
}))
.collect();
Performance Tips
Lazy Evaluation
Transformations don't run until you consume:
// Nothing happens yet
const pipeline = enumerate(data)
.map(expensive)
.filter(predicate);
// Now it runs
const result = await pipeline.collect();
Early Filtering
Filter before expensive operations:
// ✅ Filter first
const result = await enumerate(data)
.filter(cheap) // Fast filter
.map(expensive) // Expensive operation
.collect();
// ❌ Map first
const result = await enumerate(data)
.map(expensive) // Runs on everything
.filter(cheap) // Then filters
.collect();
Use take() to Limit
// Stop after 10 matches
const first10 = await enumerate(huge)
.filter(predicate)
.take(10)
.collect();
Common Patterns
Normalize Data
const normalized = await enumerate(data)
.map(item => ({
...item,
name: item.name.trim().toLowerCase(),
email: item.email.toLowerCase(),
phone: item.phone.replace(/\D/g, "")
}))
.collect();
Extract Fields
const names = await enumerate(users)
.map(user => user.name)
.collect();
Conditional Transform
const processed = await enumerate(items)
.map(item => {
if (item.type === "A") {
return processTypeA(item);
} else {
return processTypeB(item);
}
})
.collect();
Batch Transform
const batched = await enumerate(items)
.map((item, i) => ({
...item,
batch: Math.floor(i / 100)
}))
.collect();
Error Handling
Errors in transformations propagate:
try {
await enumerate(data)
.map(item => {
if (!item.valid) {
throw new Error(`Invalid item: ${item.id}`);
}
return item.value;
})
.collect();
} catch (error) {
console.error(`Transform failed: ${error.message}`);
}
Next Steps
- Aggregations - Combine items into single values
- Array-Like Methods - All available methods
- Concurrent Processing - Transform in parallel
Aggregations
Combine many items into one result.
reduce()
The Swiss Army knife of aggregations:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const sum = await enumerate([1, 2, 3, 4])
.reduce((acc, n) => acc + n, 0);
// 10
How It Works
// Start with initial value: 0
// Step 1: 0 + 1 = 1
// Step 2: 1 + 2 = 3
// Step 3: 3 + 3 = 6
// Step 4: 6 + 4 = 10
Building Objects
const grouped = await enumerate(items)
.reduce((acc, item) => {
const key = item.category;
acc[key] = acc[key] || [];
acc[key].push(item);
return acc;
}, {});
Calculating Statistics
const stats = await enumerate(numbers)
.reduce((acc, n) => ({
sum: acc.sum + n,
count: acc.count + 1,
min: Math.min(acc.min, n),
max: Math.max(acc.max, n)
}), { sum: 0, count: 0, min: Infinity, max: -Infinity });
const average = stats.sum / stats.count;
count()
Count items:
const total = await enumerate([1, 2, 3, 4, 5]).count();
// 5
Count Matches
const errorCount = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.count();
some()
Check if any item matches:
const hasError = await enumerate(lines)
.some(line => line.includes("ERROR"));
// true or false
Early Exit
Stops as soon as it finds a match:
// Stops reading after first match
const hasLargeFile = await enumerate(files)
.some(file => file.size > 1_000_000_000);
every()
Check if all items match:
const allPositive = await enumerate([1, 2, 3, 4])
.every(n => n > 0);
// true
Validation
const allValid = await enumerate(records)
.every(record =>
record.name &&
record.email &&
record.age >= 0
);
find()
Find first matching item:
const firstError = await enumerate(lines)
.find(line => line.includes("ERROR"));
// First line with ERROR, or undefined
With Complex Predicate
const admin = await enumerate(users)
.find(user =>
user.role === "admin" &&
user.active
);
Real-World Examples
Sum Values
const total = await enumerate(orders)
.map(order => order.amount)
.reduce((sum, amount) => sum + amount, 0);
Count by Category
const counts = await enumerate(items)
.reduce((acc, item) => {
acc[item.category] = (acc[item.category] || 0) + 1;
return acc;
}, {});
Find Maximum
const max = await enumerate(numbers)
.reduce((max, n) => Math.max(max, n), -Infinity);
Build Index
const index = await enumerate(items)
.reduce((acc, item) => {
acc[item.id] = item;
return acc;
}, {});
Concatenate Strings
const combined = await enumerate(words)
.reduce((acc, word) => acc + " " + word, "");
Collect Unique Values
const unique = await enumerate(items)
.reduce((acc, item) => {
acc.add(item);
return acc;
}, new Set());
Advanced Patterns
Running Average
const runningAvg = await enumerate(numbers)
.reduce((acc, n) => {
acc.sum += n;
acc.count += 1;
acc.average = acc.sum / acc.count;
return acc;
}, { sum: 0, count: 0, average: 0 });
Nested Grouping
const grouped = await enumerate(items)
.reduce((acc, item) => {
const cat = item.category;
const type = item.type;
acc[cat] = acc[cat] || {};
acc[cat][type] = acc[cat][type] || [];
acc[cat][type].push(item);
return acc;
}, {});
Frequency Map
const frequency = await enumerate(words)
.reduce((acc, word) => {
acc[word] = (acc[word] || 0) + 1;
return acc;
}, {});
// Find most common
const mostCommon = Object.entries(frequency)
.sort((a, b) => b[1] - a[1])[0];
Accumulate with Transform
const processed = await enumerate(data)
.reduce((acc, item) => {
const transformed = transform(item);
if (transformed.valid) {
acc.push(transformed);
}
return acc;
}, []);
Performance Tips
Use Specific Methods
// ❌ Slower
const count = await enumerate(items)
.reduce((acc) => acc + 1, 0);
// ✅ Faster
const count = await enumerate(items).count();
Early Exit with some/every
// Stops at first match
const hasMatch = await enumerate(huge)
.some(predicate);
// Better than
const matches = await enumerate(huge)
.filter(predicate)
.count();
Combine Operations
// ✅ One pass
const stats = await enumerate(numbers)
.reduce((acc, n) => ({
sum: acc.sum + n,
count: acc.count + 1
}), { sum: 0, count: 0 });
// ❌ Two passes
const sum = await enumerate(numbers).reduce((a, b) => a + b, 0);
const count = await enumerate(numbers).count();
Common Mistakes
Forgetting Initial Value
// ❌ Error with empty array
const sum = await enumerate([]).reduce((a, b) => a + b);
// ✅ Works with empty array
const sum = await enumerate([]).reduce((a, b) => a + b, 0);
Not Returning Accumulator
// ❌ Returns undefined
const result = await enumerate(items)
.reduce((acc, item) => {
acc.push(item);
// Missing return!
}, []);
// ✅ Returns accumulator
const result = await enumerate(items)
.reduce((acc, item) => {
acc.push(item);
return acc;
}, []);
Next Steps
- Transformations - Transform items
- Array-Like Methods - All available methods
- Streaming Large Files - Aggregate huge datasets
Slicing and Sampling
Take portions of your data stream.
take()
Take first N items:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const first3 = await enumerate([1, 2, 3, 4, 5])
.take(3)
.collect();
// [1, 2, 3]
Early Exit
Stops reading after N items:
// Only reads first 10 lines
const preview = await read("huge-file.txt")
.lines
.take(10)
.collect();
With Filter
// First 5 errors
const errors = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.take(5)
.collect();
drop()
Skip first N items:
const rest = await enumerate([1, 2, 3, 4, 5])
.drop(2)
.collect();
// [3, 4, 5]
Skip Header
const data = await read("data.csv")
.lines
.drop(1) // Skip header row
.collect();
Combining drop() and take()
Get a range of items by combining drop and take:
const middle = await enumerate([1, 2, 3, 4, 5])
.drop(1)
.take(3)
.collect();
// [2, 3, 4]
Pagination
const page = 2;
const pageSize = 10;
const items = await enumerate(allItems)
.drop(page * pageSize)
.take(pageSize)
.collect();
first
Get first item:
const first = await enumerate([1, 2, 3]).first;
// 1
With Pipeline
const result = await run("ls", "-la")
.lines
.first;
last
Get last item:
const last = await enumerate([1, 2, 3]).last;
// 3
Note: Reads entire stream to find last item.
nth()
Get item at index:
const third = await enumerate([1, 2, 3, 4, 5]).nth(2);
// 3 (zero-indexed)
Real-World Examples
Preview File
console.log("First 10 lines:");
await read("file.txt")
.lines
.take(10)
.forEach(line => console.log(line));
Skip and Take
// Lines 11-20
const batch = await read("file.txt")
.lines
.drop(10)
.take(10)
.collect();
Sample Data
// Every 10th item
const sample = await enumerate(data)
.filter((_, i) => i % 10 === 0)
.collect();
Find Nth Match
// 5th error
const fifthError = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.nth(4); // Zero-indexed
Performance Tips
Use take() for Limits
// ✅ Stops early
const first100 = await enumerate(huge)
.take(100)
.collect();
// ❌ Reads everything
const all = await enumerate(huge).collect();
const first100 = all.slice(0, 100); // Array slice, not Enumerable
Combine with Filter
// Efficient: stops after 10 matches
const matches = await enumerate(data)
.filter(predicate)
.take(10)
.collect();
Next Steps
- Array-Like Methods - All available methods
- Transformations - Transform items
- Streaming Large Files - Work with huge files
Concurrent Processing
Process multiple items in parallel with controlled concurrency. It's easier than you think.
The Problem
You have a list of URLs to fetch. Sequential is slow:
// Takes 10 seconds for 10 URLs (1 second each)
for (const url of urls) {
await fetch(url);
}
Promise.all is fast but dangerous:
// Starts 1000 requests at once - might crash
await Promise.all(urls.map(url => fetch(url)));
The Solution
proc gives you controlled concurrency:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
// Defaults to CPU count (usually 4-8)
const results = await enumerate(urls)
.concurrentMap(async (url) => {
return await fetch(url);
})
.collect();
// Or specify a limit
const results = await enumerate(urls)
.concurrentMap(async (url) => {
return await fetch(url);
}, { concurrency: 5 })
.collect();
Fast, but won't overwhelm your system.
Understanding JavaScript Concurrency
Important: JavaScript concurrency is not parallelism. You're running on a single thread.
What This Means
When you use concurrentMap or concurrentUnorderedMap, you're not creating threads or workers. You're managing multiple async operations on one thread. The JavaScript event loop switches between them when they're waiting.
This works great for:
- Network requests - While waiting for a response, other operations run
- File I/O - While waiting for disk reads/writes, other operations run
- Process execution - While a child process runs, other operations continue
- Database queries - While waiting for results, other operations run
This does NOT work for:
- CPU-intensive calculations - Pure JavaScript math, parsing, etc. blocks everything
- Synchronous operations - Anything that doesn't
awaitblocks the thread - Heavy computation - You still only have one CPU core's worth of processing power
Example: What Works
// ✅ Good: I/O-bound operations run concurrently
const results = await enumerate(urls)
.concurrentUnorderedMap(async (url) => {
// While waiting for fetch, other URLs are being fetched
const response = await fetch(url);
return response.json();
})
.collect();
// This is genuinely faster - 10 URLs in ~1 second instead of ~10 seconds
Example: What Doesn't Work
// ❌ Bad: CPU-bound operations don't benefit
const results = await enumerate(numbers)
.concurrentUnorderedMap(async (n) => {
// This blocks the thread - no other operations can run
let result = 0;
for (let i = 0; i < 1000000; i++) {
result += Math.sqrt(n * i);
}
return result;
})
.collect();
// This is NOT faster - still uses one CPU core sequentially
Why It Still Matters
Even though it's not true parallelism, concurrency is incredibly useful:
- I/O operations dominate - Most real-world tasks are waiting for network/disk
- Child processes run in parallel - When you
run()a command, it uses a separate process - Better resource utilization - Keep the CPU busy while waiting for I/O
- Simpler than threads - No race conditions, no locks, no shared memory issues
When You Need True Parallelism
If you need to parallelize CPU-intensive JavaScript code, use:
- Web Workers (in browsers)
- Worker Threads (in Node.js/Deno)
- Child processes with
run()- each process gets its own CPU
But for most tasks (fetching URLs, processing files, running commands), JavaScript's concurrency model is perfect.
When to Use Concurrent Processing
Use concurrentUnorderedMap() (recommended default) when:
- Order doesn't matter - you want maximum speed
- Processing independent tasks where results can arrive in any order
- You'll sort or aggregate results anyway
- This is usually what you want - it keeps all workers busy and delivers results as they complete
- Example: Downloading files, processing logs, fetching data you'll aggregate
Use concurrentMap() when:
- You must have results in the same order as input
- Be aware: can bottleneck on the slowest item in each batch
- If work isn't balanced, faster items wait for slower ones
- Example: Fetching user profiles where display order must match input order
Use sequential processing when:
- Tasks depend on each other
- You must respect strict rate limits
- Order is critical and tasks are fast
- Example: Database transactions that must happen in sequence
Choose concurrency level based on:
- I/O-bound tasks (network, disk): Start with 5-10, increase if you have bandwidth (see "Understanding JavaScript Concurrency" above)
- CPU-bound tasks: Won't benefit from concurrency - use Worker Threads or child processes instead
- Rate-limited APIs: Match the rate limit (e.g., 10 requests/second = concurrency 1 with 100ms delays)
- Memory constraints: Lower concurrency if processing large data per task
concurrentUnorderedMap() - Recommended
Process items concurrently, return results as they complete (fastest):
// Defaults to CPU count
const results = await enumerate([1, 2, 3, 4, 5])
.concurrentUnorderedMap(async (n) => {
await sleep(Math.random() * 1000);
return n * 2;
})
.collect();
// [6, 2, 10, 4, 8] - order varies, but all workers stay busy
Why it's faster: Results are delivered as soon as they're ready. If item 3 finishes before item 1, you get item 3 immediately. No waiting for slower items.
Use when: You don't care about order (most cases). Better performance under real-world conditions where work isn't perfectly balanced.
Concurrency: Defaults to navigator.hardwareConcurrency (CPU count). Override with { concurrency: n } if needed.
concurrentMap() - Order Preserved
Process items concurrently, return results in input order:
const results = await enumerate([1, 2, 3, 4, 5])
.concurrentMap(async (n) => {
await sleep(Math.random() * 1000);
return n * 2;
}, { concurrency: 3 })
.collect();
// [2, 4, 6, 8, 10] - always in order
Performance caveat: If item 1 takes 5 seconds and item 2 takes 1 second, item 2 waits for item 1 before being returned. This can create bottlenecks where fast items wait for slow ones.
Use when: You specifically need results in the same order as input. Only use if order truly matters for your use case.
Concurrency: Defaults to CPU count. Override with { concurrency: n } if needed.
Real-World Examples
Fetch Multiple URLs
const urls = [
"https://api.example.com/1",
"https://api.example.com/2",
"https://api.example.com/3",
// ... 100 more
];
// Uses CPU count by default
const data = await enumerate(urls)
.concurrentUnorderedMap(async (url) => {
const response = await fetch(url);
return response.json();
})
.collect();
// Or limit for rate-limited APIs
const data = await enumerate(urls)
.concurrentUnorderedMap(async (url) => {
const response = await fetch(url);
return response.json();
}, { concurrency: 10 })
.collect();
Process Files in Parallel
import { read } from "jsr:@j50n/proc@0.23.3";
const files = ["log1.txt", "log2.txt", "log3.txt"];
const results = await enumerate(files)
.concurrentMap(async (file) => {
const errors = await read(file)
.lines
.filter(line => line.includes("ERROR"))
.count();
return { file, errors };
})
.collect();
Download and Process
const downloads = await enumerate(imageUrls)
.concurrentUnorderedMap(async (url) => {
const response = await fetch(url);
const blob = await response.blob();
return processImage(blob);
})
.collect();
Choosing Concurrency
Default behavior: Both methods default to navigator.hardwareConcurrency (CPU count, typically 4-8). This is usually a good starting point.
When to override:
For I/O-bound tasks (network, disk):
- Default is often fine
- Increase to 10-20 if you have bandwidth and no rate limits
- Decrease to 1-5 for rate-limited APIs
For CPU-bound tasks:
- Default (CPU count) is optimal
- Don't increase - you'll just add overhead
For rate-limited APIs:
- Set to match the rate limit
- Add delays if needed
// Respect rate limits with low concurrency
const results = await enumerate(apiCalls)
.concurrentUnorderedMap(async (call) => {
const result = await makeApiCall(call);
await sleep(100); // 10 requests per second
return result;
}, { concurrency: 1 })
.collect();
Error Handling
Errors propagate naturally:
try {
const results = await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Failed: ${url}`);
}
return response.json();
}, { concurrency: 5 })
.collect();
} catch (error) {
// First error stops everything
console.error(`Failed: ${error.message}`);
}
Progress Tracking
Track progress as items complete:
let completed = 0;
const total = urls.length;
const results = await enumerate(urls)
.concurrentMap(async (url) => {
const result = await fetch(url);
completed++;
console.log(`Progress: ${completed}/${total}`);
return result;
}, { concurrency: 5 })
.collect();
Combining with Other Operations
Chain concurrent operations with other methods:
const results = await enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 })
.filter(response => response.ok)
.concurrentMap(response => response.json(), { concurrency: 5 })
.filter(data => data.active)
.collect();
Performance Comparison
// Sequential: 10 seconds (one at a time)
for (const url of urls) {
await fetch(url);
}
// concurrentMap (5): 2-4 seconds
// Can bottleneck if one item is slow - others wait
await enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 })
.collect();
// concurrentUnorderedMap (5): 2 seconds
// Faster - no waiting, results delivered as ready
await enumerate(urls)
.concurrentUnorderedMap(fetch, { concurrency: 5 })
.collect();
Why unordered is faster: Imagine 5 tasks with times [1s, 1s, 1s, 1s, 5s]. With concurrentMap, the 5-second task blocks its batch. With concurrentUnorderedMap, the four 1-second tasks complete and return immediately while the 5-second task finishes in the background.
Advanced Patterns
Batch Processing
Process in batches:
const batchSize = 10;
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const results = await enumerate(batch)
.concurrentMap(process, { concurrency: 5 })
.collect();
await saveBatch(results);
}
Retry Failed Items
const results = await enumerate(urls)
.concurrentMap(async (url) => {
let attempts = 0;
while (attempts < 3) {
try {
return await fetch(url);
} catch (error) {
attempts++;
if (attempts === 3) throw error;
await sleep(1000 * attempts);
}
}
}, { concurrency: 5 })
.collect();
Dynamic Concurrency
Adjust concurrency based on results:
let concurrency = 5;
for (const batch of batches) {
const start = Date.now();
const results = await enumerate(batch)
.concurrentMap(process, { concurrency })
.collect();
const duration = Date.now() - start;
// Adjust based on performance
if (duration < 1000) concurrency = Math.min(concurrency + 1, 20);
if (duration > 5000) concurrency = Math.max(concurrency - 1, 1);
}
Best Practices
- Prefer unordered - Use
concurrentUnorderedMapunless you specifically need order - Start conservative - Begin with low concurrency, increase if needed
- Monitor resources - Watch memory and network usage
- Respect rate limits - Don't overwhelm external services
- Handle errors - One error stops everything, handle gracefully
- Understand the bottleneck -
concurrentMapcan wait on slow items; unordered doesn't
Common Mistakes
Too Much Concurrency
// ❌ Might crash with 10,000 concurrent requests
await enumerate(hugeList)
.concurrentMap(fetch, { concurrency: 10000 })
.collect();
// ✅ Reasonable concurrency
await enumerate(hugeList)
.concurrentMap(fetch, { concurrency: 10 })
.collect();
Forgetting to Await
// ❌ Returns promises, not results
const promises = enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 });
// ✅ Await the results
const results = await enumerate(urls)
.concurrentMap(fetch, { concurrency: 5 })
.collect();
Next Steps
- Streaming Large Files - Handle huge files efficiently
- Performance Optimization - Make it faster
- Parallel Downloads - Real-world example
Streaming Large Files
Process files bigger than your RAM. It's easier than you think.
When to Stream vs Collect
Always stream when:
- File is larger than available RAM (or even close to it)
- You don't need all data at once
- Processing can be done incrementally (line-by-line, record-by-record)
- You want to start processing immediately without waiting for full download/read
- Memory efficiency is important
Consider collecting when:
- File is small (< 100MB) and fits comfortably in memory
- You need random access to data
- You need to process data multiple times
- You need to sort or aggregate all data before processing
- Memory is not a concern
Memory/Speed Tradeoffs:
- Streaming: Constant memory (~64KB buffer), processes as data arrives, can't random access
- Collecting: Memory = file size, all data available immediately, can random access, must wait for full load
Example decision:
// 10GB log file - MUST stream
for await (const line of read("huge.log").lines) {
if (line.includes("ERROR")) console.log(line);
}
// 1MB config file - can collect
const config = await read("config.json").lines.collect();
const parsed = JSON.parse(config.join("\n"));
// 500MB data file - stream if processing once
const sum = await read("numbers.txt")
.lines
.map(line => parseFloat(line))
.reduce((a, b) => a + b, 0);
The Problem
You have a 10GB log file. Loading it into memory crashes your program:
// ❌ Crashes with large files
const content = await Deno.readTextFile("huge.log");
const lines = content.split("\n");
The Solution
Stream it, one line at a time:
import { read } from "jsr:@j50n/proc@0.23.3";
// ✅ Constant memory, any file size
for await (const line of read("huge.log").lines) {
if (line.includes("ERROR")) {
console.log(line);
}
}
How Streaming Works
Instead of loading everything:
- Read a chunk (buffer)
- Process it
- Discard it
- Repeat
Memory usage stays constant, no matter how big the file.
Real Examples
Count Lines in Huge File
const count = await read("10gb-file.txt").lines.count();
console.log(`${count} lines`);
Uses ~constant memory, even for 10GB.
Find Pattern in Large File
const matches = await read("huge.log")
.lines
.filter(line => line.includes("ERROR"))
.take(10) // Stop after 10 matches
.collect();
Stops reading once it finds 10 matches. Efficient!
Process CSV File
const data = await read("huge-data.csv")
.lines
.drop(1) // Skip header
.map(line => {
const [id, name, value] = line.split(",");
return { id, name, value: parseFloat(value) };
})
.filter(row => row.value > 100)
.collect();
Aggregate Large Dataset
const sum = await read("numbers.txt")
.lines
.map(line => parseFloat(line))
.reduce((acc, n) => acc + n, 0);
Compressed Files
Stream compressed files without extracting:
const lineCount = await read("huge.log.gz")
.transform(new DecompressionStream("gzip"))
.lines
.count();
Decompresses on-the-fly, never stores uncompressed data.
Multiple Files
Process multiple large files:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const files = ["log1.txt", "log2.txt", "log3.txt"];
for (const file of files) {
const errors = await read(file)
.lines
.filter(line => line.includes("ERROR"))
.count();
console.log(`${file}: ${errors} errors`);
}
Streaming Transformations
Chain transformations, all streaming:
const result = await read("data.txt")
.lines
.map(line => line.trim())
.filter(line => line.length > 0)
.map(line => line.toUpperCase())
.filter(line => line.startsWith("ERROR"))
.collect();
Each line flows through all transformations before the next line is read.
Writing Large Files
Stream output to a file:
import { concat } from "jsr:@j50n/proc@0.23.3";
const processed = await read("input.txt")
.lines
.map(line => line.toUpperCase())
.map(line => new TextEncoder().encode(line + "\n"))
.collect();
await Deno.writeFile("output.txt", concat(processed));
Performance Tips
Use take() for Early Exit
// Stops reading after 100 matches
const first100 = await read("huge.txt")
.lines
.filter(predicate)
.take(100)
.collect();
Don't Collect Unless Needed
// ❌ Loads everything into memory
const lines = await read("huge.txt").lines.collect();
for (const line of lines) process(line);
// ✅ Streams
for await (const line of read("huge.txt").lines) {
process(line);
}
Use Concurrent Processing
Process multiple files in parallel:
const results = await enumerate(files)
.concurrentMap(async (file) => {
return await read(file).lines.count();
}, { concurrency: 3 })
.collect();
Memory Usage
Streaming uses constant memory:
// File size: 10GB
// Memory used: ~64KB (buffer size)
await read("10gb-file.txt")
.lines
.forEach(line => process(line));
Real-World Example
Analyze a year of logs:
const errorsByDay = await read("year-of-logs.txt")
.lines
.filter(line => line.includes("ERROR"))
.map(line => {
const date = line.match(/\d{4}-\d{2}-\d{2}/)?.[0];
return date;
})
.filter(date => date !== null)
.reduce((acc, date) => {
acc[date] = (acc[date] || 0) + 1;
return acc;
}, {});
// Show top 10 error days
Object.entries(errorsByDay)
.sort((a, b) => b[1] - a[1])
.slice(0, 10)
.forEach(([date, count]) => {
console.log(`${date}: ${count} errors`);
});
Processes gigabytes of logs with minimal memory.
When to Stream
Always stream when:
- File is larger than available RAM
- You don't need all data at once
- Processing can be done incrementally
- You want to start processing immediately
Consider collecting when:
- File is small (< 100MB)
- You need random access
- You need to process data multiple times
- Memory is not a concern
Common Patterns
Filter and Count
const count = await read("file.txt")
.lines
.filter(predicate)
.count();
Transform and Save
const output = await read("input.txt")
.lines
.map(transform)
.map(line => new TextEncoder().encode(line + "\n"))
.collect();
await Deno.writeFile("output.txt", concat(output));
Aggregate Data
const stats = await read("data.txt")
.lines
.reduce((acc, line) => {
// Update statistics
return acc;
}, initialStats);
Next Steps
- Concurrent Processing - Process multiple files in parallel
- Performance Optimization - Make it faster
- Decompressing Files - Work with compressed files
File I/O
Read and write files with streaming efficiency.
Reading Files
Read as Bytes
import { read } from "jsr:@j50n/proc@0.23.3";
const bytes = await read("file.bin").collect();
// Uint8Array[]
Read as Lines
const lines = await read("file.txt").lines.collect();
// string[]
Stream Large Files
Process files larger than memory:
for await (const line of read("huge-file.txt").lines) {
process(line); // One line at a time
}
File Paths
Relative Paths
read("data.txt") // Relative to current directory
Absolute Paths
read("/var/log/app.log")
URLs
const path = new URL("./data.txt", import.meta.url);
read(path)
Common Patterns
Count Lines
const lineCount = await read("file.txt").lines.count();
Find Pattern
const matches = await read("file.txt")
.lines
.filter(line => line.includes("ERROR"))
.collect();
Transform Lines
const processed = await read("input.txt")
.lines
.map(line => line.toUpperCase())
.collect();
Parse CSV
const data = await read("data.csv")
.lines
.drop(1) // Skip header
.map(line => {
const [name, age, city] = line.split(",");
return { name, age: parseInt(age), city };
})
.collect();
Writing Files
Write Array to File
const lines = ["line 1", "line 2", "line 3"];
const content = lines.join("\n");
await Deno.writeTextFile("output.txt", content);
Stream to File
import { concat } from "jsr:@j50n/proc@0.23.3";
const bytes = await read("input.txt")
.lines
.map(line => new TextEncoder().encode(line + "\n"))
.collect();
await Deno.writeFile("output.txt", concat(bytes));
Working with Binary Files
Read Binary
const bytes = await read("image.png").collect();
const data = concat(bytes);
Process Binary
const processed = await read("data.bin")
.map(chunk => {
// Process each chunk
return transform(chunk);
})
.collect();
Compressed Files
Read Compressed
const lines = await read("file.gz")
.transform(new DecompressionStream("gzip"))
.lines
.collect();
See Decompressing Files for more.
Multiple Files
Process Multiple Files
const files = ["log1.txt", "log2.txt", "log3.txt"];
for (const file of files) {
const errors = await read(file)
.lines
.filter(line => line.includes("ERROR"))
.count();
console.log(`${file}: ${errors} errors`);
}
Concurrent Processing
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const results = await enumerate(files)
.concurrentMap(async (file) => {
const lines = await read(file).lines.count();
return { file, lines };
}, { concurrency: 3 })
.collect();
Error Handling
File Not Found
try {
const lines = await read("missing.txt").lines.collect();
} catch (error) {
console.error(`Failed to read file: ${error.message}`);
}
Permission Denied
try {
const lines = await read("/root/secret.txt").lines.collect();
} catch (error) {
if (error instanceof Deno.errors.PermissionDenied) {
console.error("Permission denied");
}
}
Performance Tips
Stream Don't Collect
// ❌ Loads entire file into memory
const lines = await read("huge.txt").lines.collect();
// ✅ Processes one line at a time
for await (const line of read("huge.txt").lines) {
process(line);
}
Use Chunked Lines for Performance
For files with many small lines:
const chunks = await read("file.txt").chunkedLines.collect();
// Array of string arrays
Real-World Examples
Log Analysis
const errorsByType = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.reduce((acc, line) => {
const type = line.match(/ERROR: (\w+)/)?.[1] || "unknown";
acc[type] = (acc[type] || 0) + 1;
return acc;
}, {});
Data Extraction
const emails = await read("contacts.txt")
.lines
.map(line => line.match(/[\w.-]+@[\w.-]+\.\w+/))
.filter(match => match !== null)
.map(match => match[0])
.collect();
File Conversion
const jsonLines = await read("data.csv")
.lines
.drop(1) // Skip header
.map(line => {
const [name, age] = line.split(",");
return JSON.stringify({ name, age: parseInt(age) });
})
.collect();
await Deno.writeTextFile("data.jsonl", jsonLines.join("\n"));
Next Steps
- Decompressing Files - Work with compressed files
- Streaming Large Files - Handle huge files
- Log Processing - Analyze logs
Range and Iteration
Generate sequences of numbers lazily.
Basic Range
import { range } from "jsr:@j50n/proc@0.23.3";
const numbers = await range({ to: 5 }).collect();
// [0, 1, 2, 3, 4]
Exclusive vs Inclusive
to (exclusive)
const nums = await range({ to: 3 }).collect();
// [0, 1, 2]
until (inclusive)
const nums = await range({ until: 3 }).collect();
// [0, 1, 2, 3]
Custom Start
const nums = await range({ from: 5, to: 10 }).collect();
// [5, 6, 7, 8, 9]
Custom Step
const evens = await range({ from: 0, to: 10, step: 2 }).collect();
// [0, 2, 4, 6, 8]
Counting Down
const countdown = await range({ from: 5, to: 0, step: -1 }).collect();
// [5, 4, 3, 2, 1]
Real-World Examples
Repeat N Times
await range({ to: 10 }).forEach(i => {
console.log(`Iteration ${i}`);
});
Generate Test Data
const users = await range({ to: 100 })
.map(i => ({
id: i,
name: `User ${i}`,
email: `user${i}@example.com`
}))
.collect();
Batch Processing
const batchSize = 10;
const total = 100;
for await (const batch of range({ from: 0, to: total, step: batchSize })) {
const items = data.slice(batch, batch + batchSize);
await processBatch(items);
}
Pagination
const pages = Math.ceil(total / pageSize);
for await (const page of range({ to: pages })) {
const items = await fetchPage(page);
await processItems(items);
}
Retry Logic
for await (const attempt of range({ to: 3 })) {
try {
await operation();
break;
} catch (error) {
if (attempt === 2) throw error;
await sleep(1000 * (attempt + 1));
}
}
Infinite Ranges
Warning: Don't collect infinite ranges!
// ❌ Never completes
const infinite = await range({ from: 0, to: Infinity }).collect();
// ✅ Use with take()
const first100 = await range({ from: 0, to: Infinity })
.take(100)
.collect();
Performance
Ranges are lazy—numbers generated on demand:
// Doesn't generate all numbers upfront
const huge = range({ to: 1_000_000_000 });
// Only generates what you use
const first10 = await huge.take(10).collect();
Next Steps
- Zip and Enumerate - Combine iterables
- Array-Like Methods - Transform ranges
Zip and Enumerate
Combine and index iterables.
enumerate()
Wrap any iterable for Array-like methods:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const result = await enumerate([1, 2, 3])
.map(n => n * 2)
.collect();
// [2, 4, 6]
.enum()
Add indices to items:
const indexed = await enumerate(["a", "b", "c"])
.enum()
.collect();
// [["a", 0], ["b", 1], ["c", 2]]
Format with Indices
const numbered = await enumerate(["apple", "banana", "cherry"])
.enum()
.map(([fruit, i]) => `${i + 1}. ${fruit}`)
.collect();
// ["1. apple", "2. banana", "3. cherry"]
zip()
Combine two iterables:
import { zip } from "jsr:@j50n/proc@0.23.3";
const names = ["Alice", "Bob", "Charlie"];
const ages = [25, 30, 35];
const people = await zip(names, ages)
.map(([name, age]) => ({ name, age }))
.collect();
// [{ name: "Alice", age: 25 }, ...]
Multiple Iterables
const combined = await zip(iter1, iter2)
.map(([a, b]) => a + b)
.collect();
Real-World Examples
Number Lines
const numbered = await read("file.txt")
.lines
.enum()
.map(([line, i]) => `${i + 1}: ${line}`)
.forEach(console.log);
Combine Data Sources
const merged = await zip(
read("names.txt").lines,
read("emails.txt").lines
)
.map(([name, email]) => ({ name, email }))
.collect();
Track Progress
const items = [...]; // Large array
await enumerate(items)
.enum()
.forEach(([item, i]) => {
console.log(`Processing ${i + 1}/${items.length}`);
process(item);
});
Next Steps
- Range and Iteration - Generate sequences
- Array-Like Methods - Transform data
WritableIterable
WritableIterable is a fascinating utility that inverts the normal data flow: instead of pulling data from an iterable, you push data into it. It bridges the gap between push-based (callbacks, events) and pull-based (async iteration) programming models.
The Problem It Solves
Imagine you have a callback-based API (like event emitters, WebSocket messages, or sensor data) and you want to process it with proc's pipeline operations. You can't easily convert callbacks to an AsyncIterable... until now.
How It Works
WritableIterable is both:
- Writable: You can
.write()items to it - AsyncIterable: You can iterate over it with
for await
It uses an internal queue to buffer items between the writer and reader, allowing them to operate at different speeds.
Basic Usage
import { WritableIterable, sleep } from "jsr:@j50n/proc@0.23.3";
const writable = new WritableIterable<number>();
// Write in background (simulating slow producer)
(async () => {
await writable.write(1);
await sleep(100);
await writable.write(2);
await sleep(100);
await writable.write(3);
await writable.close();
})();
// Read (items arrive as they're written)
const results: number[] = [];
for await (const item of writable) {
console.log("Received:", item);
results.push(item);
}
console.log(results); // [1, 2, 3]
This demonstrates the streaming nature: the reader receives items as they're written, not all at once.
⚠️ Important: You MUST call
.close()when done writing, or iteration will hang forever waiting for more data.
Key Concepts
Push vs Pull
Traditional AsyncIterable (pull-based):
// Consumer pulls data
for await (const item of iterable) {
// Process item
}
WritableIterable (push-based):
// Producer pushes data
await writable.write(item);
Backpressure
WritableIterable implements automatic backpressure. If the writer is faster than the reader, .write() will pause until the reader catches up. This prevents unbounded memory growth.
Real-World Examples
Example 1: Event Stream to Pipeline
Convert DOM events into a processable stream:
import { WritableIterable, enumerate } from "jsr:@j50n/proc@0.23.3";
const clicks = new WritableIterable<MouseEvent>();
// Producer: capture clicks
document.addEventListener("click", async (event) => {
await clicks.write(event);
});
// Consumer: process clicks
enumerate(clicks)
.map(event => ({ x: event.clientX, y: event.clientY }))
.filter(pos => pos.x > 100)
.forEach(pos => console.log("Click at:", pos));
// Close when done (e.g., on page unload)
window.addEventListener("unload", () => clicks.close());
Example 2: WebSocket to Process
Feed WebSocket messages to a process:
import { WritableIterable } from "jsr:@j50n/proc@0.23.3";
const messages = new WritableIterable<string>();
// Producer: WebSocket messages
const ws = new WebSocket("wss://example.com");
ws.onmessage = async (event) => {
await messages.write(event.data);
};
ws.onclose = () => messages.close();
// Consumer: pipe to process
await enumerate(messages)
.run("jq", ".") // Pretty-print JSON
.toStdout();
Example 3: Sensor Data Stream
Process sensor readings as they arrive:
import { WritableIterable, enumerate } from "jsr:@j50n/proc@0.23.3";
interface SensorReading {
temperature: number;
timestamp: number;
}
const readings = new WritableIterable<SensorReading>();
// Producer: sensor callback
sensor.onReading(async (reading) => {
await readings.write(reading);
});
// Consumer: calculate moving average
const averages = await enumerate(readings)
.map(r => r.temperature)
.take(100) // First 100 readings
.reduce((acc, temp) => acc + temp, 0)
.then(sum => sum / 100);
console.log(`Average: ${averages}°C`);
await readings.close();
Example 4: Manual Process stdin
Feed data to a process programmatically:
import { WritableIterable, enumerate } from "jsr:@j50n/proc@0.23.3";
const input = new WritableIterable<string>();
// Producer: generate data
(async () => {
for (let i = 0; i < 10; i++) {
await input.write(`line ${i}`);
}
await input.close();
})();
// Consumer: pipe to process
await enumerate(input)
.run("grep", "5")
.toStdout();
// Output: line 5
Error Handling
Errors propagate through the iteration:
import { WritableIterable } from "jsr:@j50n/proc@0.23.3";
const writable = new WritableIterable<number>();
// Write and close with error
(async () => {
await writable.write(1);
await writable.write(2);
await writable.close(new Error("something failed"));
})();
try {
for await (const item of writable) {
console.log(item);
}
} catch (error) {
console.error("Error:", error.message);
}
// Output:
// 1
// 2
// Error: something failed
Cleanup with onclose
You can provide a cleanup callback:
import { WritableIterable } from "jsr:@j50n/proc@0.23.3";
const writable = new WritableIterable<string>({
onclose: async () => {
console.log("Cleaning up resources...");
// Close connections, files, etc.
}
});
await writable.write("data");
await writable.close();
// Output: Cleaning up resources...
API Reference
Constructor
new WritableIterable<T>(options?: { onclose?: () => void | Promise<void> })
options.onclose: Optional callback invoked when.close()is called
Methods
.write(item: T): Promise<void>
- Write an item to the stream
- Throws if already closed
- Implements backpressure (pauses if reader is slow)
.close(error?: Error): Promise<void>
- Close the stream
- Must be called to end iteration
- Safe to call multiple times
- Optional error propagates to reader
Properties
.isClosed: boolean
- Returns
trueif.close()has been called
Common Patterns
Pattern: Timed Data Generation
const timed = new WritableIterable<number>();
(async () => {
for (let i = 0; i < 5; i++) {
await timed.write(i);
await new Promise(resolve => setTimeout(resolve, 1000));
}
await timed.close();
})();
for await (const item of timed) {
console.log(item); // Prints 0, 1, 2, 3, 4 (one per second)
}
Pattern: Conditional Close
const conditional = new WritableIterable<number>();
(async () => {
for (let i = 0; i < 100; i++) {
await conditional.write(i);
if (i === 10) {
await conditional.close(); // Stop early
break;
}
}
})();
const items = await enumerate(conditional).collect();
console.log(items.length); // 11 (0 through 10)
When to Use WritableIterable
Use it when:
- Converting callback-based APIs to AsyncIterable
- Feeding data to process stdin programmatically
- Bridging event-driven and stream-based code
- You need backpressure between producer and consumer
Don't use it when:
- You already have an AsyncIterable (use
enumerate()instead) - You're working with synchronous data (use arrays)
- You need multi-consumer support (WritableIterable is single-consumer)
Performance Notes
- Internal queue grows if writer is faster than reader
- Backpressure prevents unbounded growth
- Each
.write()creates a Promise (small overhead) - Best for moderate data rates (not millions of items/second)
Comparison with Other Approaches
vs. Array
// Array: all data in memory
const data = [1, 2, 3];
for (const item of data) { }
// WritableIterable: streaming, backpressure
const writable = new WritableIterable<number>();
for await (const item of writable) { }
vs. TransformStream
// TransformStream: byte-oriented, Web Streams API
const { readable, writable } = new TransformStream();
// WritableIterable: value-oriented, AsyncIterable
const writable = new WritableIterable<T>();
vs. Channel (from other languages)
If you're familiar with Go channels or Rust channels, WritableIterable is similar but:
- Single-consumer (not multi-consumer)
- Unbuffered by default (backpressure on every write)
- Integrates with AsyncIterable ecosystem
The "Interesting Little Beast"
What makes WritableIterable interesting:
- Inverted Control: Most iterables pull data; this one receives pushes
- Backpressure: Automatically slows down fast producers
- Bridge Pattern: Connects imperative (callbacks) to declarative (iteration)
- Error Propagation: Errors flow naturally through the iteration
- Simple API: Just
.write(),.close(), and iterate
It's a small utility that solves a specific problem elegantly: turning push-based data sources into pull-based async iterables that work seamlessly with proc's pipeline operations.
Sleep
The sleep function pauses execution for a specified duration. While it might seem like an outlier in a process management library, it's surprisingly useful when working with async pipelines, rate limiting, and testing.
Basic Usage
import { sleep } from "jsr:@j50n/proc@0.23.3";
console.log("Starting...");
await sleep(2000); // Pause for 2 seconds
console.log("Done!");
Why It's Included
When working with processes and async iterables, you often need to:
- Rate limit operations
- Add delays between retries
- Simulate slow data sources for testing
- Throttle concurrent operations
- Add breathing room for external services
Having sleep built-in means you don't need to import it from another library or write the setTimeout wrapper yourself.
Common Use Cases
Rate Limiting API Calls
import { enumerate, sleep } from "jsr:@j50n/proc@0.23.3";
const urls = ["url1", "url2", "url3"];
await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetch(url);
await sleep(1000); // Wait 1 second between requests
return response.json();
}, { concurrency: 1 })
.forEach(data => console.log(data));
Retry with Backoff
import { run, sleep } from "jsr:@j50n/proc@0.23.3";
async function runWithRetry(maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
return await run("flaky-command").lines.collect();
} catch (error) {
if (i === maxRetries - 1) throw error;
const delay = Math.pow(2, i) * 1000; // Exponential backoff
console.log(`Retry ${i + 1} after ${delay}ms...`);
await sleep(delay);
}
}
}
Simulating Slow Data Sources
import { WritableIterable, sleep } from "jsr:@j50n/proc@0.23.3";
const slowData = new WritableIterable<number>();
// Simulate data arriving slowly
(async () => {
for (let i = 0; i < 10; i++) {
await slowData.write(i);
await sleep(500); // 500ms between items
}
await slowData.close();
})();
for await (const item of slowData) {
console.log("Received:", item);
}
Throttling Process Output
import { run, sleep } from "jsr:@j50n/proc@0.23.3";
// Process lines slowly to avoid overwhelming downstream
await run("cat", "large-file.txt")
.lines
.map(async (line) => {
await sleep(10); // 10ms delay per line
return line;
})
.toStdout();
Testing Concurrent Operations
import { enumerate, sleep } from "jsr:@j50n/proc@0.23.3";
// Verify concurrency limit works correctly
const startTimes: number[] = [];
await enumerate([1, 2, 3, 4, 5])
.concurrentMap(async (n) => {
startTimes.push(Date.now());
await sleep(100); // Simulate work
return n;
}, { concurrency: 2 })
.collect();
// Analyze timing to verify only 2 ran concurrently
Time Constants
The library also provides time constants for readability:
import { sleep, SECONDS, MINUTES } from "jsr:@j50n/proc@0.23.3";
await sleep(5 * SECONDS); // 5 seconds
await sleep(2 * MINUTES); // 2 minutes
Available constants:
SECONDS= 1000 millisecondsMINUTES= 60 secondsHOURS= 60 minutesDAYS= 24 hoursWEEKS= 7 days
API
function sleep(delayms: number): Promise<void>
Parameters:
delayms: Delay in milliseconds
Returns:
- Promise that resolves after the specified delay
Notes
- Uses
setTimeoutinternally - Non-blocking (other async operations can run)
- Minimum delay depends on JavaScript runtime (typically ~4ms)
- For precise timing, consider using
performance.now()to measure actual elapsed time
Counting Words
A classic example that shows the power of process pipelines.
Simple Word Count
Count total words in a file:
import { run } from "jsr:@j50n/proc@0.23.3";
const wordCount = await run("wc", "-w", "book.txt").lines.first;
console.log(`Total words: ${wordCount}`);
Unique Words
Count unique words:
const uniqueWords = await run("cat", "book.txt")
.run("tr", "-cs", "A-Za-z", "\n") // Extract words
.run("tr", "A-Z", "a-z") // Lowercase
.run("sort") // Sort
.run("uniq") // Unique
.lines
.count();
console.log(`Unique words: ${uniqueWords}`);
Word Frequency
Find most common words:
const topWords = await run("cat", "book.txt")
.run("tr", "-cs", "A-Za-z", "\n")
.run("tr", "A-Z", "a-z")
.run("sort")
.run("uniq", "-c")
.run("sort", "-rn")
.run("head", "-10")
.lines
.collect();
console.log("Top 10 words:");
topWords.forEach(line => console.log(line));
Pure JavaScript Version
Do it all in JavaScript:
import { read } from "jsr:@j50n/proc@0.23.3";
const wordCounts = await read("book.txt")
.lines
.flatMap(line => line.toLowerCase().match(/\w+/g) || [])
.reduce((acc, word) => {
acc[word] = (acc[word] || 0) + 1;
return acc;
}, {});
const topWords = Object.entries(wordCounts)
.sort((a, b) => b[1] - a[1])
.slice(0, 10);
console.log("Top 10 words:");
topWords.forEach(([word, count]) => {
console.log(`${count} ${word}`);
});
Compressed Files
Count words in a compressed file:
const wordCount = await read("book.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.flatMap(line => line.match(/\w+/g) || [])
.count();
console.log(`Total words: ${wordCount}`);
Multiple Files
Count words across multiple files:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const files = ["book1.txt", "book2.txt", "book3.txt"];
const results = await enumerate(files)
.concurrentMap(async (file) => {
const words = await read(file)
.lines
.flatMap(line => line.match(/\w+/g) || [])
.count();
return { file, words };
}, { concurrency: 3 })
.collect();
results.forEach(({ file, words }) => {
console.log(`${file}: ${words} words`);
});
Filter Stop Words
Exclude common words:
const stopWords = new Set([
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for"
]);
const meaningfulWords = await read("book.txt")
.lines
.flatMap(line => line.toLowerCase().match(/\w+/g) || [])
.filter(word => !stopWords.has(word))
.reduce((acc, word) => {
acc[word] = (acc[word] || 0) + 1;
return acc;
}, {});
Word Length Distribution
Analyze word lengths:
const lengthDist = await read("book.txt")
.lines
.flatMap(line => line.match(/\w+/g) || [])
.reduce((acc, word) => {
const len = word.length;
acc[len] = (acc[len] || 0) + 1;
return acc;
}, {});
console.log("Word length distribution:");
Object.entries(lengthDist)
.sort((a, b) => parseInt(a[0]) - parseInt(b[0]))
.forEach(([len, count]) => {
console.log(`${len} letters: ${count} words`);
});
Real-World Example: War and Peace
Analyze Tolstoy's War and Peace:
const [totalWords, uniqueWords] = await Promise.all([
// Total words
read("warandpeace.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.flatMap(line => line.match(/\w+/g) || [])
.count(),
// Unique words
read("warandpeace.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.flatMap(line => line.toLowerCase().match(/\w+/g) || [])
.reduce((acc, word) => {
acc.add(word);
return acc;
}, new Set())
.then(set => set.size)
]);
console.log(`Total words: ${totalWords.toLocaleString()}`);
console.log(`Unique words: ${uniqueWords.toLocaleString()}`);
console.log(`Vocabulary richness: ${(uniqueWords / totalWords * 100).toFixed(1)}%`);
Performance Comparison
Shell Pipeline (fast)
// Uses native Unix tools
const count = await run("cat", "book.txt")
.run("wc", "-w")
.lines.first;
JavaScript (flexible)
// More control, type-safe
const count = await read("book.txt")
.lines
.flatMap(line => line.match(/\w+/g) || [])
.count();
Hybrid (best of both)
// Use Unix tools for heavy lifting, JavaScript for logic
const words = await run("cat", "book.txt")
.run("tr", "-cs", "A-Za-z", "\n")
.lines
.filter(word => word.length > 5) // JavaScript filter
.count();
Next Steps
- Process Pipelines - Chain commands together
- Concurrent Processing - Process multiple files
- Streaming Large Files - Handle huge files
Processing Log Files
Analyze logs efficiently, even huge ones.
Count Errors
import { read } from "jsr:@j50n/proc@0.23.3";
const errorCount = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.count();
console.log(`${errorCount} errors found`);
Group by Error Type
const errorTypes = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.reduce((acc, line) => {
const match = line.match(/ERROR: (\w+)/);
const type = match ? match[1] : "unknown";
acc[type] = (acc[type] || 0) + 1;
return acc;
}, {});
console.log("Errors by type:");
Object.entries(errorTypes)
.sort((a, b) => b[1] - a[1])
.forEach(([type, count]) => {
console.log(` ${type}: ${count}`);
});
Extract Timestamps
const errors = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.map(line => {
const timestamp = line.match(/\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/)?.[0];
const message = line.split("ERROR:")[1]?.trim();
return { timestamp, message };
})
.collect();
Find Patterns
const suspiciousIPs = await read("access.log")
.lines
.map(line => {
const ip = line.match(/\d+\.\d+\.\d+\.\d+/)?.[0];
const status = line.match(/HTTP\/\d\.\d" (\d+)/)?.[1];
return { ip, status };
})
.filter(entry => entry.status === "404")
.reduce((acc, entry) => {
if (entry.ip) {
acc[entry.ip] = (acc[entry.ip] || 0) + 1;
}
return acc;
}, {});
// Show IPs with > 100 404s
Object.entries(suspiciousIPs)
.filter(([_, count]) => count > 100)
.forEach(([ip, count]) => {
console.log(`${ip}: ${count} 404s`);
});
Time-Based Analysis
const errorsByHour = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.reduce((acc, line) => {
const hour = line.match(/T(\d{2}):/)?.[1];
if (hour) {
acc[hour] = (acc[hour] || 0) + 1;
}
return acc;
}, {});
console.log("Errors by hour:");
Object.entries(errorsByHour)
.sort((a, b) => a[0].localeCompare(b[0]))
.forEach(([hour, count]) => {
console.log(`${hour}:00 - ${count} errors`);
});
Multiple Log Files
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const files = ["app1.log", "app2.log", "app3.log"];
const results = await enumerate(files)
.concurrentMap(async (file) => {
const errors = await read(file)
.lines
.filter(line => line.includes("ERROR"))
.count();
return { file, errors };
}, { concurrency: 3 })
.collect();
results.forEach(({ file, errors }) => {
console.log(`${file}: ${errors} errors`);
});
Compressed Logs
const errors = await read("app.log.gz")
.transform(new DecompressionStream("gzip"))
.lines
.filter(line => line.includes("ERROR"))
.take(10)
.collect();
Real-Time Monitoring
// Process log as it grows
for await (const line of read("app.log").lines) {
if (line.includes("ERROR")) {
console.error(`🔴 ${line}`);
} else if (line.includes("WARN")) {
console.warn(`⚠️ ${line}`);
}
}
Next Steps
- Streaming Large Files - Handle huge logs
- Concurrent Processing - Process multiple files
- Decompressing Files - Work with compressed logs
Decompressing Files
Process compressed files without creating temporary files. Stream everything.
Decompress and Count Lines
import { read } from "jsr:@j50n/proc@0.23.3";
const lineCount = await read("war-and-peace.txt.gz")
.transform(new DecompressionStream("gzip"))
.lines
.count();
console.log(`${lineCount} lines`);
What's happening:
read()opens the file as a stream of bytes.transform()pipes through the decompression stream.linesconverts bytes to text lines.count()counts them
All streaming. No temp files. Constant memory usage.
Search in Compressed File
import { read } from "jsr:@j50n/proc@0.23.3";
const matches = await read("logs.gz")
.transform(new DecompressionStream("gzip"))
.lines
.filter(line => line.includes("ERROR"))
.collect();
console.log(`Found ${matches.length} errors`);
Process Multiple Compressed Files
import { read, enumerate } from "jsr:@j50n/proc@0.23.3";
const files = ["log1.gz", "log2.gz", "log3.gz"];
for (const file of files) {
const errors = await read(file)
.transform(new DecompressionStream("gzip"))
.lines
.filter(line => line.includes("ERROR"))
.count();
console.log(`${file}: ${errors} errors`);
}
Decompress and Transform
import { read } from "jsr:@j50n/proc@0.23.3";
const data = await read("data.json.gz")
.transform(new DecompressionStream("gzip"))
.lines
.map(line => JSON.parse(line))
.filter(obj => obj.status === "active")
.collect();
Supported Formats
The Web Streams API supports:
- gzip -
.gzfiles - deflate -
.zipfiles (deflate compression) - deflate-raw - Raw deflate
// Gzip
.transform(new DecompressionStream("gzip"))
// Deflate
.transform(new DecompressionStream("deflate"))
// Deflate-raw
.transform(new DecompressionStream("deflate-raw"))
Compress Output
You can also compress:
import { read } from "jsr:@j50n/proc@0.23.3";
const compressed = await read("large-file.txt")
.transform(new CompressionStream("gzip"))
.collect();
await Deno.writeFile("large-file.txt.gz", concat(compressed));
Real-World Example: Log Analysis
Analyze compressed logs without extracting them:
import { read } from "jsr:@j50n/proc@0.23.3";
interface LogEntry {
timestamp: string;
level: string;
message: string;
}
const errors = await read("app.log.gz")
.transform(new DecompressionStream("gzip"))
.lines
.map(line => {
const [timestamp, level, ...message] = line.split(" ");
return { timestamp, level, message: message.join(" ") };
})
.filter(entry => entry.level === "ERROR")
.collect();
console.log(`Found ${errors.length} errors`);
errors.slice(0, 10).forEach(e => {
console.log(`${e.timestamp}: ${e.message}`);
});
Performance Tips
Stream, Don't Collect
// ❌ Loads entire file into memory
const lines = await read("huge.gz")
.transform(new DecompressionStream("gzip"))
.lines
.collect();
// ✅ Processes one line at a time
for await (const line of read("huge.gz")
.transform(new DecompressionStream("gzip"))
.lines) {
process(line);
}
Use Concurrent Processing
Process multiple files in parallel:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const files = ["log1.gz", "log2.gz", "log3.gz"];
const results = await enumerate(files)
.concurrentMap(async (file) => {
const errors = await read(file)
.transform(new DecompressionStream("gzip"))
.lines
.filter(line => line.includes("ERROR"))
.count();
return { file, errors };
}, { concurrency: 3 })
.collect();
Why This Is Better
Traditional approach:
# Extract first
gunzip file.gz
# Then process
grep ERROR file
# Clean up
rm file
proc approach:
// One step, no temp files
await read("file.gz")
.transform(new DecompressionStream("gzip"))
.lines
.filter(line => line.includes("ERROR"))
.forEach(console.log);
Faster, cleaner, more memory-efficient.
Next Steps
- Streaming Large Files - Handle huge files
- Concurrent Processing - Process multiple files in parallel
- File I/O - More file operations
Parallel Downloads
Download multiple files concurrently with controlled concurrency.
Basic Example
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const urls = [
"https://example.com/file1.json",
"https://example.com/file2.json",
"https://example.com/file3.json",
// ... more URLs
];
const results = await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Failed to fetch ${url}: ${response.status}`);
}
return {
url,
data: await response.json(),
size: response.headers.get("content-length")
};
}, { concurrency: 5 })
.collect();
console.log(`Downloaded ${results.length} files`);
Download and Save Files
Download files and save them to disk:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const downloads = [
{ url: "https://example.com/image1.jpg", path: "./downloads/image1.jpg" },
{ url: "https://example.com/image2.jpg", path: "./downloads/image2.jpg" },
{ url: "https://example.com/image3.jpg", path: "./downloads/image3.jpg" },
];
await enumerate(downloads)
.concurrentMap(async ({ url, path }) => {
const response = await fetch(url);
const blob = await response.blob();
const buffer = await blob.arrayBuffer();
await Deno.writeFile(path, new Uint8Array(buffer));
console.log(`Downloaded: ${path}`);
return path;
}, { concurrency: 3 })
.collect();
console.log("All downloads complete");
With Progress Tracking
Track download progress:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
let completed = 0;
const total = urls.length;
const results = await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetch(url);
const data = await response.json();
completed++;
console.log(`Progress: ${completed}/${total} (${Math.round(completed/total*100)}%)`);
return { url, data };
}, { concurrency: 5 })
.collect();
With Retry Logic
Retry failed downloads:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
async function fetchWithRetry(url: string, maxRetries = 3): Promise<Response> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await fetch(url);
if (response.ok) return response;
if (attempt === maxRetries) {
throw new Error(`Failed after ${maxRetries} attempts: ${response.status}`);
}
} catch (error) {
if (attempt === maxRetries) throw error;
// Exponential backoff
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
console.log(`Retry ${attempt}/${maxRetries} for ${url} after ${delay}ms`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw new Error("Unreachable");
}
const results = await enumerate(urls)
.concurrentMap(async (url) => {
const response = await fetchWithRetry(url);
return await response.json();
}, { concurrency: 5 })
.collect();
Download Large Files
Stream large files to disk without loading into memory:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const largeFiles = [
{ url: "https://example.com/large1.zip", path: "./large1.zip" },
{ url: "https://example.com/large2.zip", path: "./large2.zip" },
];
await enumerate(largeFiles)
.concurrentMap(async ({ url, path }) => {
const response = await fetch(url);
if (!response.body) throw new Error("No response body");
const file = await Deno.open(path, { write: true, create: true });
await response.body.pipeTo(file.writable);
console.log(`Downloaded: ${path}`);
return path;
}, { concurrency: 2 })
.collect();
API Rate Limiting
Respect API rate limits:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const apiEndpoints = [
"/api/users/1",
"/api/users/2",
// ... 100 more
];
// Add delay between requests
await enumerate(apiEndpoints)
.concurrentMap(async (endpoint) => {
const response = await fetch(`https://api.example.com${endpoint}`);
const data = await response.json();
// Wait 100ms between requests (10 requests/second)
await new Promise(resolve => setTimeout(resolve, 100));
return data;
}, { concurrency: 1 }) // Sequential to respect rate limit
.collect();
Filter Failed Downloads
Continue even if some downloads fail:
import { enumerate } from "jsr:@j50n/proc@0.23.3";
const results = await enumerate(urls)
.concurrentMap(async (url) => {
try {
const response = await fetch(url);
if (!response.ok) return null;
return { url, data: await response.json() };
} catch (error) {
console.error(`Failed to download ${url}:`, error.message);
return null;
}
}, { concurrency: 5 })
.filter(result => result !== null)
.collect();
console.log(`Successfully downloaded ${results.length}/${urls.length} files`);
When to Use
Use parallel downloads when:
- You have multiple independent files to fetch
- Network latency is the bottleneck
- The server can handle concurrent requests
- You want to minimize total download time
Choose concurrency based on:
- Server rate limits (respect them!)
- Your network bandwidth
- Server capacity
- Start with 3-5, adjust based on results
Next Steps
- Concurrent Processing - Deep dive into concurrency
- Error Handling - Handle download failures
- Streaming Large Files - Work with large downloads
Shell Script Replacement
Replace Bash scripts with type-safe Deno.
Why Replace Shell Scripts?
Shell scripts are:
- Hard to debug
- No type safety
- Limited error handling
- Platform-specific
proc gives you:
- Full TypeScript
- IDE support
- Proper error handling
- Cross-platform
Common Patterns
File Operations
Bash:
#!/bin/bash
for file in *.txt; do
wc -l "$file"
done
proc:
#!/usr/bin/env -S deno run --allow-read --allow-run
import { run } from "jsr:@j50n/proc@0.23.3";
for await (const entry of Deno.readDir(".")) {
if (entry.name.endsWith(".txt")) {
const count = await run("wc", "-l", entry.name).lines.first;
console.log(count);
}
}
Process Logs
Bash:
#!/bin/bash
grep ERROR app.log | wc -l
proc:
#!/usr/bin/env -S deno run --allow-read --allow-run
import { read } from "jsr:@j50n/proc@0.23.3";
const errors = await read("app.log")
.lines
.filter(line => line.includes("ERROR"))
.count();
console.log(`${errors} errors`);
Backup Script
Bash:
#!/bin/bash
tar -czf backup-$(date +%Y%m%d).tar.gz /data
proc:
#!/usr/bin/env -S deno run --allow-read --allow-run
import { run } from "jsr:@j50n/proc@0.23.3";
const date = new Date().toISOString().split("T")[0].replace(/-/g, "");
await run("tar", "-czf", `backup-${date}.tar.gz`, "/data").toStdout();
System Monitoring
Bash:
#!/bin/bash
while true; do
df -h | grep /dev/sda1
sleep 60
done
proc:
#!/usr/bin/env -S deno run --allow-run
import { run, sleep } from "jsr:@j50n/proc@0.23.3";
while (true) {
const usage = await run("df", "-h")
.lines
.find(line => line.includes("/dev/sda1"));
console.log(usage);
await sleep(60000); // sleep() is exported from proc
}
Real Script Example
Complete deployment script:
#!/usr/bin/env -S deno run --allow-all
import { run } from "jsr:@j50n/proc@0.23.3";
console.log("🚀 Deploying application...");
try {
// Pull latest code
console.log("📥 Pulling latest code...");
await run("git", "pull").toStdout();
// Install dependencies
console.log("📦 Installing dependencies...");
await run("npm", "install").toStdout();
// Run tests
console.log("🧪 Running tests...");
await run("npm", "test").toStdout();
// Build
console.log("🔨 Building...");
await run("npm", "run", "build").toStdout();
// Restart service
console.log("🔄 Restarting service...");
await run("systemctl", "restart", "myapp").toStdout();
console.log("✅ Deployment complete!");
} catch (error) {
console.error("❌ Deployment failed:", error.message);
Deno.exit(1);
}
Benefits
- Type Safety - Catch errors before running
- IDE Support - Autocomplete and refactoring
- Error Handling - Proper try-catch
- Debugging - Use debugger, breakpoints
- Testing - Write unit tests
- Portability - Works on any platform with Deno
Making Scripts Executable
chmod +x script.ts
./script.ts
Next Steps
- Running Processes - Process basics
- Error Handling - Handle failures
- Process Pipelines - Chain commands
API Reference
Complete API documentation is auto-generated from the source code using Deno's documentation tool.
📚 View Full API Documentation
The API documentation includes:
- All exported functions - Complete signatures and descriptions
- All classes and interfaces - Full type information
- All methods and properties - Detailed documentation
- Type definitions - Complete TypeScript types
- Examples - Code examples from JSDoc
Quick Links
Core Functions
- run(){:target="_blank"} - Run a child process
- enumerate(){:target="_blank"} - Wrap an iterable
- read(){:target="_blank"} - Read a file
Classes
- Enumerable{:target="_blank"} - Array-like methods for async iterables
- ProcessEnumerable{:target="_blank"} - Process-specific enumerable
- Process{:target="_blank"} - Process management
Error Types
- ExitCodeError{:target="_blank"} - Non-zero exit code
- SignalError{:target="_blank"} - Process killed by signal
- UpstreamError{:target="_blank"} - Error from upstream process
Utilities
- range(){:target="_blank"} - Generate number ranges
- zip(){:target="_blank"} - Combine iterables
- concat(){:target="_blank"} - Concatenate byte arrays
- cache(){:target="_blank"} - Cache iterable results
Using the API Docs
The generated documentation includes:
Search
Use the search box to find any function, class, or type.
Type Information
Click on any type to see its definition and usage.
Examples
Most functions include working code examples.
Source Links
Click "Source" to view the implementation.
Integration with This Guide
This user guide provides:
- Conceptual explanations - Why and when to use features
- Tutorials - Step-by-step learning
- Recipes - Real-world solutions
- Best practices - How to use effectively
The API reference provides:
- Complete signatures - Exact function parameters
- Type definitions - TypeScript types
- Technical details - Implementation specifics
- All exports - Everything available
Use both together for complete understanding!
Keeping Docs Updated
The API documentation is regenerated every time the site is built, so it's always in sync with the code.
To regenerate manually:
deno doc --html --name="proc" --output=./site/src/api-docs ./mod.ts
Next Steps
- Browse the full API documentation{:target="_blank"}
- Getting Started - If you're new
- Core Features - Learn the essentials
- Recipes - See real examples
Migration Guide
Migrating from other tools to proc.
From Deno.Command
Before:
const command = new Deno.Command("ls", { args: ["-la"] });
const output = await command.output();
const text = new TextDecoder().decode(output.stdout);
After:
import { run } from "jsr:@j50n/proc@0.23.3";
const lines = await run("ls", "-la").lines.collect();
From Shell Scripts
See Shell Script Replacement for detailed examples.
Key Differences
- Properties vs methods:
.linesnot.lines() - Always consume output to avoid leaks
- Errors propagate through pipelines
- Use
enumerate()then.enum()for indices
See Also
- Getting Started - Installation
- Key Concepts - Important concepts
- FAQ - Common questions
Frequently Asked Questions
General
What is proc?
proc is a Deno library for running child processes and working with async iterables. It gives you Array-like methods (map, filter, reduce) for streaming data, with error handling that actually makes sense.
Why should I use proc instead of Deno.Command?
Deno.Command is low-level and requires manual stream handling. proc gives you:
- Automatic resource management
- Natural error propagation
- Array-like methods for data processing
- Process pipelines that feel like shell pipes
- Streaming by default
Is proc production-ready?
Yes! proc is stable, actively maintained, and used in production. The API is mature and unlikely to have breaking changes.
Does proc work with Node.js?
No, proc is Deno-only. It uses Deno-specific APIs like Deno.Command and
requires Deno's permission system.
Usage
Why do I get "resource leak" errors?
You must consume process output. Unconsumed output keeps the process handle open:
// ❌ Resource leak
const p = run("ls");
// ✅ Consume output
await run("ls").lines.collect();
Is .lines a method or property?
Property. Use .lines not .lines():
// ✅ Correct
run("ls").lines;
// ❌ Wrong
run("ls").lines();
Same for .status, .first, .last.
How do I check exit code without throwing?
Consume output first, then check .status:
const p = run("command");
await p.lines.collect(); // Consume first
const status = await p.status; // Then check
if (status.code !== 0) {
console.error("Failed");
}
Why doesn't enumerate() add indices?
enumerate() wraps an iterable. Use .enum() to add indices:
const result = await enumerate(["a", "b", "c"])
.enum() // This adds indices
.map(([item, i]) => `${i}: ${item}`)
.collect();
How do I pipe processes together?
Use .run() method:
await run("cat", "file.txt")
.run("grep", "pattern")
.run("wc", "-l")
.lines.first;
Can I use shell syntax like ls -la?
No, arguments must be separate:
// ✅ Correct
run("ls", "-la");
// ❌ Wrong
run("ls -la");
Error Handling
Do I need try-catch at every step?
No! That's the whole point. Errors propagate through the pipeline:
try {
await run("cmd1")
.run("cmd2")
.run("cmd3")
.lines.forEach(process);
} catch (error) {
// All errors caught here
}
What happens when a process fails?
By default, non-zero exit codes throw ExitCodeError. You can catch it:
try {
await run("false").lines.collect();
} catch (error) {
if (error instanceof ExitCodeError) {
console.error(`Exit code: ${error.code}`);
}
}
Can I customize error handling?
Yes, use fnError option. See
Custom Error Handling.
Performance
Is proc fast?
Yes! proc is streaming by default, which means:
- Constant memory usage, even for huge files
- Concurrent process execution
- Lazy evaluation (only runs when consumed)
How do I process large files?
Stream them:
// Processes 10GB file with constant memory
for await (const line of read("huge.txt").lines) {
process(line);
}
Can I process files in parallel?
Yes, use concurrentMap:
await enumerate(files)
.concurrentMap(async (file) => {
return await processFile(file);
}, { concurrency: 5 })
.collect();
Troubleshooting
My process hangs
You probably didn't consume the output:
// ❌ Hangs
const p = run("command");
await p.status; // Waiting for output to be consumed
// ✅ Works
const p = run("command");
await p.lines.collect(); // Consume first
await p.status;
I get type errors
Check if you're using properties as methods:
// ❌ Type error
run("ls").lines();
// ✅ Correct
run("ls").lines;
DecompressionStream type error
Add a type cast:
.transform(new DecompressionStream("gzip") as TransformStream<Uint8Array, Uint8Array>)
Or use --no-check flag.
Permission denied errors
Grant the necessary permissions:
deno run --allow-run --allow-read your-script.ts
Comparison
proc vs Deno.Command
| Feature | Deno.Command | proc |
|---|---|---|
| Boilerplate | High | Low |
| Error handling | Manual | Automatic |
| Streaming | Manual | Built-in |
| Pipelines | Manual | .run() |
| Array methods | No | Yes |
proc vs shell scripts
| Feature | Shell | proc |
|---|---|---|
| Type safety | No | Yes |
| Error handling | Manual | Automatic |
| IDE support | Limited | Full |
| Debugging | Hard | Easy |
| Portability | Limited | Cross-platform |
Getting Help
Where can I find examples?
How do I report bugs?
File an issue on GitHub.
Is there a Discord/Slack?
Not currently. Use GitHub issues for questions and discussions.
Contributing
Can I contribute?
Yes! Contributions are welcome. See the repository for guidelines.
How can I help?
- Report bugs
- Improve documentation
- Add examples
- Fix issues
Miscellaneous
Why "proc"?
Short for "process". Easy to type, easy to remember.
Who maintains proc?
proc is maintained by @j50n and contributors.
What's the license?
MIT License. Use it freely.
Can I use proc in commercial projects?
Yes! MIT license allows commercial use.