zod-stream
TypeScript icon, indicating that this package has built-in type declarations

3.0.0 • Public • Published

zod-stream


> Type-safe structured extraction from LLM streams with progressive validation



zod-stream adds structured output validation and streaming capabilities to LLM responses. Built on top of schema-stream, it enables type-safe extraction with progressive validation.

Key Features

  • 🔄 Stream structured LLM outputs with validation
  • 🎯 Multiple response modes (TOOLS, FUNCTIONS, JSON, etc.)
  • 📝 OpenAI client integration
  • 🌳 Progressive validation with partial results
  • ⚡ Built on schema-stream
  • 🔍 Full TypeScript support

Why zod-stream?

zod-stream solves key challenges in handling streaming LLM responses:

  • Dependency Management: Process data as soon as dependencies are met, rather than waiting for complete responses

    if (isPathComplete(['user', 'preferences'], chunk)) {
      // Start personalizing immediately, don't wait for content
      initializeUserExperience(chunk.user.preferences);
    }
  • Type-Safe LLM Integration: Full TypeScript support for structured outputs from OpenAI and other providers

    const params = withResponseModel({
      response_model: { schema, name: "Extract" },
      mode: "TOOLS"  // or "FUNCTIONS", "JSON", etc.
    });
  • Progressive Processing: Built on schema-stream for immediate access to partial results

    for await (const chunk of stream) {
      // Safely access partial data with full type inference
      chunk._meta._completedPaths.forEach(path => {
        processDependency(path, chunk);
      });
    }
  • Provider Flexibility: Consistent interface across different LLM response formats

    // Works with various response modes
    const stream = OAIStream({ res: completion });  // OpenAI tools/functions
    const stream = JSONStream({ res: completion }); // Direct JSON

Think of it as a type-safe pipeline for handling streaming LLM data where you need to:

  • Start processing before the full response arrives
  • Ensure type safety throughout the stream
  • Handle complex data dependencies
  • Work with multiple LLM response formats

Installation

# npm
npm install zod-stream zod openai

# pnpm
pnpm add zod-stream zod openai

# bun
bun add zod-stream zod openai

Core Concepts

The ZodStream client provides real-time validation and metadata for streaming LLM responses:

import ZodStream from "zod-stream";
import { z } from "zod";

const client = new ZodStream({
  debug: true  // Enable debug logging
});

// Define your extraction schema
const schema = z.object({
  content: z.string(),
  metadata: z.object({
    confidence: z.number(),
    category: z.string()
  })
});

// Create streaming extraction
const stream = await client.create({
  completionPromise: async () => {
    const response = await fetch("/api/extract", {
      method: "POST",
      body: JSON.stringify({ prompt: "..." })
    });
    return response.body;
  },
  response_model: {
    schema,
    name: "ContentExtraction"
  }
});

// Process with validation metadata
for await (const chunk of stream) {
  console.log({
    data: chunk,              // Partial extraction result
    isValid: chunk._meta._isValid,    // Current validation state
    activePath: chunk._meta._activePath,    // Currently processing path
    completedPaths: chunk._meta._completedPaths  // Completed paths
  });
}

Progressive Processing

zod-stream enables processing dependent data as soon as relevant paths complete, without waiting for the full response:

// Define schema for a complex analysis
const schema = z.object({
  user: z.object({
    id: z.string(),
    preferences: z.object({
      theme: z.string(),
      language: z.string()
    })
  }),
  content: z.object({
    title: z.string(),
    body: z.string(),
    metadata: z.object({
      keywords: z.array(z.string()),
      category: z.string()
    })
  }),
  recommendations: z.array(z.object({
    id: z.string(),
    score: z.number(),
    reason: z.string()
  }))
});

// Process data as it becomes available
for await (const chunk of stream) {
  // Start personalizing UI as soon as user preferences are ready
  if (isPathComplete(['user', 'preferences'], chunk)) {
    applyUserTheme(chunk.user.preferences.theme);
    setLanguage(chunk.user.preferences.language);
  }

  // Begin content indexing once we have title and keywords
  if (isPathComplete(['content', 'metadata', 'keywords'], chunk) && 
      isPathComplete(['content', 'title'], chunk)) {
    indexContent({
      title: chunk.content.title,
      keywords: chunk.content.metadata.keywords
    });
  }

  // Start fetching recommended content in parallel
  chunk._meta._completedPaths.forEach(path => {
    if (path[0] === 'recommendations' && path.length === 2) {
      const index = path[1] as number;
      const recommendation = chunk.recommendations[index];
      
      if (recommendation?.id) {
        prefetchContent(recommendation.id);
      }
    }
  });
}

This approach enables:

  • Early UI updates based on user preferences
  • Parallel processing of independent data
  • Optimistic loading of related content
  • Better perceived performance
  • Resource optimization

Stream Metadata

Every streamed chunk includes metadata about validation state:

type CompletionMeta = {
  _isValid: boolean;           // Schema validation status
  _activePath: (string | number)[];     // Current parsing path
  _completedPaths: (string | number)[][]; // All completed paths
}

// Example chunk
{
  content: "partial content...",
  metadata: {
    confidence: 0.95
  },
  _meta: {
    _isValid: false,  // Not valid yet
    _activePath: ["metadata", "category"],
    _completedPaths: [
      ["content"],
      ["metadata", "confidence"]
    ]
  }
}

Schema Stubs

Get typed stub objects for initialization:

const schema = z.object({
  users: z.array(z.object({
    name: z.string(),
    age: z.number()
  }))
});

const client = new ZodStream();
const stub = client.getSchemaStub({
  schema,
  defaultData: {
    users: [{ name: "loading...", age: 0 }]
  }
});

Debug Logging

Enable detailed logging for debugging:

const client = new ZodStream({ debug: true });

// Logs will include:
// - Stream initialization
// - Validation results
// - Path completion
// - Errors with full context

Using Response Models

The withResponseModel helper configures OpenAI parameters based on your schema and chosen mode:

import { withResponseModel } from "zod-stream";
import { z } from "zod";

const schema = z.object({
  sentiment: z.string(),
  keywords: z.array(z.string()),
  confidence: z.number()
});

// Configure for OpenAI tools mode
const params = withResponseModel({
  response_model: {
    schema,
    name: "Analysis",
    description: "Extract sentiment and keywords"
  },
  mode: "TOOLS",
  params: {
    messages: [{ role: "user", content: "Analyze this text..." }],
    model: "gpt-4"
  }
});

const completion = await oai.chat.completions.create({
  ...params,
  stream: true
});

Response Modes

zod-stream supports multiple modes for structured LLM responses:

import { MODE } from "zod-stream";

const modes = {
  FUNCTIONS: "FUNCTIONS",   // OpenAI function calling
  TOOLS: "TOOLS",          // OpenAI tools API
  JSON: "JSON",            // Direct JSON response
  MD_JSON: "MD_JSON",      // JSON in markdown blocks
  JSON_SCHEMA: "JSON_SCHEMA", // JSON with schema validation
  THINKING_MD_JSON: "THINKING_MD_JSON" // JSON with thinking in markdown blocks (deepseek r1)
} as const;

Mode-Specific Behaviors

TOOLS Mode

// Results in OpenAI tool configuration
{
  tool_choice: {
    type: "function",
    function: { name: "Analysis" }
  },
  tools: [{
    type: "function",
    function: {
      name: "Analysis",
      description: "Extract sentiment and keywords",
      parameters: {/* Generated from schema */}
    }
  }]
}

FUNCTIONS Mode (Legacy)

// Results in OpenAI function configuration
{
  function_call: { name: "Analysis" },
  functions: [{
    name: "Analysis",
    description: "Extract sentiment and keywords",
    parameters: {/* Generated from schema */}
  }]
}

JSON Mode

// Results in direct JSON response configuration
{
  response_format: { type: "json_object" },
  messages: [
    {
      role: "system",
      content: "Return JSON matching schema..."
    },
    // ... user messages
  ]
}

Response Parsing

Built-in parsers handle different response formats:

import { 
  OAIResponseParser,
  OAIResponseToolArgsParser,
  OAIResponseFnArgsParser,
  OAIResponseJSONParser,
  thinkingJsonParser
} from "zod-stream";

// Automatic format detection
const result = OAIResponseParser(response);

// Format-specific parsing
const toolArgs = OAIResponseToolArgsParser(response);
const fnArgs = OAIResponseFnArgsParser(response);
const jsonContent = OAIResponseJSONParser(response);
const thinkingJson = thinkingJsonParser(response);

Streaming Utilities

Handle streaming responses with built-in utilities:

import { OAIStream, readableStreamToAsyncGenerator } from "zod-stream";

// Create streaming response
app.post("/api/stream", async (req, res) => {
  const completion = await oai.chat.completions.create({
    ...params,
    stream: true
  });

  return new Response(
    OAIStream({ res: completion })
  );
});

// Convert stream to async generator
const generator = readableStreamToAsyncGenerator(stream);
for await (const chunk of generator) {
  console.log(chunk);
}

Path Tracking Utilities

Monitor completion status of specific paths:

import { isPathComplete } from "zod-stream";

const activePath = ["analysis", "sentiment"];
const isComplete = isPathComplete(activePath, {
  _meta: {
    _completedPaths: [["analysis", "sentiment"]],
    _activePath: ["analysis", "keywords"],
    _isValid: false
  }
});

Error Handling

zod-stream provides error handling at multiple levels:

const stream = await client.create({
  completionPromise: async () => response.body,
  response_model: { schema }
});

let finalResult

// Path tracking for progressive updates
for await (const chunk of stream) {
  finalResult = chunk
  // Check which paths are complete
  console.log("Completed paths:", chunk._meta._completedPaths);
  console.log("Current path:", chunk._meta._activePath);
}

// Final validation happens after stream completes
const isValid = finalResult._meta._isValid

Real-World Use Cases

1. Progressive Data Analysis

const analysisSchema = z.object({
  marketData: z.object({
    trends: z.array(z.object({
      metric: z.string(),
      value: z.number()
    })),
    summary: z.string()
  }),
  competitors: z.array(z.object({
    name: z.string(),
    strengths: z.array(z.string()),
    weaknesses: z.array(z.string())
  })),
  recommendations: z.object({
    immediate: z.array(z.string()),
    longTerm: z.array(z.string()),
    budget: z.number()
  })
});

for await (const chunk of stream) {
  // Start visualizing market trends immediately
  if (isPathComplete(['marketData', 'trends'], chunk)) {
    initializeCharts(chunk.marketData.trends);
  }

  // Begin competitor analysis in parallel
  chunk._meta._completedPaths.forEach(path => {
    if (path[0] === 'competitors' && path.length === 2) {
      const competitor = chunk.competitors[path[1] as number];
      fetchCompetitorData(competitor.name);
    }
  });

  // Start budget planning once we have immediate recommendations
  if (isPathComplete(['recommendations', 'immediate'], chunk) && 
      isPathComplete(['recommendations', 'budget'], chunk)) {
    planBudgetAllocation({
      actions: chunk.recommendations.immediate,
      budget: chunk.recommendations.budget
    });
  }
}

2. Document Processing Pipeline

const documentSchema = z.object({
  metadata: z.object({
    title: z.string(),
    author: z.string(),
    topics: z.array(z.string())
  }),
  sections: z.array(z.object({
    heading: z.string(),
    content: z.string(),
    annotations: z.array(z.object({
      type: z.string(),
      text: z.string(),
      confidence: z.number()
    }))
  })),
  summary: z.object({
    abstract: z.string(),
    keyPoints: z.array(z.string()),
    readingTime: z.number()
  })
});

for await (const chunk of stream) {
  // Start document indexing as soon as metadata is available
  if (isPathComplete(['metadata'], chunk)) {
    indexDocument({
      title: chunk.metadata.title,
      topics: chunk.metadata.topics
    });
  }

  // Process sections as they complete
  chunk._meta._completedPaths.forEach(path => {
    if (path[0] === 'sections' && isPathComplete([...path, 'annotations'], chunk)) {
      const sectionIndex = path[1] as number;
      const section = chunk.sections[sectionIndex];
      
      // Process annotations for each completed section
      processAnnotations({
        heading: section.heading,
        annotations: section.annotations
      });
    }
  });

  // Generate preview once we have abstract and reading time
  if (isPathComplete(['summary', 'abstract'], chunk) && 
      isPathComplete(['summary', 'readingTime'], chunk)) {
    generatePreview({
      abstract: chunk.summary.abstract,
      readingTime: chunk.summary.readingTime
    });
  }
}

3. E-commerce Product Enrichment

const productSchema = z.object({
  basic: z.object({
    id: z.string(),
    name: z.string(),
    category: z.string()
  }),
  pricing: z.object({
    base: z.number(),
    discounts: z.array(z.object({
      type: z.string(),
      amount: z.number()
    })),
    final: z.number()
  }),
  inventory: z.object({
    status: z.string(),
    locations: z.array(z.object({
      id: z.string(),
      quantity: z.number()
    }))
  }),
  enrichment: z.object({
    seoDescription: z.string(),
    searchKeywords: z.array(z.string()),
    relatedProducts: z.array(z.string())
  })
});

for await (const chunk of stream) {
  // Start inventory checks as soon as basic info is available
  if (isPathComplete(['basic'], chunk)) {
    initializeProductCard(chunk.basic);
  }

  // Update pricing as soon as final price is calculated
  if (isPathComplete(['pricing', 'final'], chunk)) {
    updatePriceDisplay(chunk.pricing.final);
    
    // If we also have inventory, update buy button
    if (isPathComplete(['inventory', 'status'], chunk)) {
      updateBuyButton({
        price: chunk.pricing.final,
        status: chunk.inventory.status
      });
    }
  }

  // Start SEO optimization in parallel
  if (isPathComplete(['enrichment', 'seoDescription'], chunk) &&
      isPathComplete(['enrichment', 'searchKeywords'], chunk)) {
    optimizeProductSEO({
      description: chunk.enrichment.seoDescription,
      keywords: chunk.enrichment.searchKeywords
    });
  }

  // Prefetch related products as they're identified
  if (isPathComplete(['enrichment', 'relatedProducts'], chunk)) {
    prefetchRelatedProducts(chunk.enrichment.relatedProducts);
  }
}

With Next.js API Routes

// pages/api/extract.ts
import { withResponseModel, OAIStream } from "zod-stream";
import { z } from "zod";

const schema = z.object({
  summary: z.string(),
  topics: z.array(z.string()),
  sentiment: z.object({
    score: z.number(),
    label: z.string()
  })
});

export default async function handler(req, res) {
  const { content } = await req.json();

  const params = withResponseModel({
    response_model: { 
      schema,
      name: "ContentAnalysis"
    },
    mode: "TOOLS",
    params: {
      messages: [{ 
        role: "user", 
        content: `Analyze: ${content}` 
      }],
      model: "gpt-4"
    }
  });

  const stream = await oai.chat.completions.create({
    ...params,
    stream: true
  });

  return new Response(OAIStream({ res: stream }));
}

With React and stream-hooks

import { useJsonStream } from "stream-hooks";
import { z } from "zod";

const schema = z.object({
  summary: z.string(),
  topics: z.array(z.string())
});

function AnalysisComponent() {
  const [data, setData] = useState<z.infer<typeof schema>>();

  const { 
    loading, 
    error,
    startStream 
  } = useJsonStream({
    schema,
    onReceive: (data) => {
      setData(data)
    }
  });

  return (
    <div>
      <button 
        onClick={() => startStream({
          url: "/api/extract",
          method: "POST",
          body: { content: "..." }
        })}
        disabled={loading}
      >
        Start Analysis
      </button>

      {loading && <LoadingState paths={data._meta._completedPaths} />}
      {error && <ErrorDisplay error={error} />}
      
      <ProgressiveDisplay
        data={data}
        isComplete={data._meta._completedPaths.length > 0}
      />
    </div>
  );
}

Integration with Island AI

Part of the Island AI toolkit:

Contributing

We welcome contributions! Check out:

License

MIT © hack.dance

Package Sidebar

Install

npm i zod-stream

Weekly Downloads

13,432

Version

3.0.0

License

MIT

Unpacked Size

131 kB

Total Files

8

Last publish

Collaborators

  • dimitrikennedy