KORM is a sophisticated TypeScript Object-Relational Mapping (ORM) library designed specifically for KSQL stream processing. It enables developers to build and execute KSQL queries using type-safe TypeScript objects instead of raw SQL strings, providing robust compile-time safety, excellent IDE support, and comprehensive validation.
- 🛡️ Complete TypeScript type checking for query construction
- ✅ Comprehensive Zod schema validation
- 🔍 Runtime validation of complex query relationships
- 🚫 Prevention of SQL injection vulnerabilities
- 🔄 Full support for KSQL streams and tables
- 🛠️ Rich set of transformation functions (string, numeric, date, aggregate)
- 📦 Composable query components
- 🔌 Seamless integration with existing KSQL deployments
- 💡 Full IDE IntelliSense support
- 🔧 Refactoring-friendly design
- 📝 Clear version control diffs
- 📚 Extensive inline documentation
npm install @eleven-am/korm
import { KsqlDBClient } from '@eleven-am/korm';
const client = new KsqlDBClient({
host: 'localhost',
port: 8088,
protocol: 'http',
auth: {
username: 'user',
password: 'pass'
}
});
// Building a query to count orders
import { KSQLStatement } from './statements';
const countOrdersQuery: KSQLStatement = {
type: SelectType.COLUMN,
columns: [
{
type: SelectType.COLUMN,
expression: {
type: ExpressionType.TRANSFORMATION,
value: {
type: TransformType.AGGREGATE,
function: AggregateFunction.COUNT,
parameters: []
}
},
alias: 'total_orders'
}
],
from: {
sourceType: DataSourceType.STREAM,
source: {
type: SourceType.DIRECT,
name: 'orders',
sourceType: DataSourceType.STREAM
}
},
groupBy: {
columns: []
},
emit: EmitType.CHANGES
};
// Equivalent SQL:
// SELECT COUNT(*) AS total_orders
// FROM orders
// GROUP BY
// EMIT CHANGES;
// Execute the query
const result = await client.executeStatement(countOrdersQuery);
// Example of string and numeric transformations
const transformationExample = {
type: TransformType.STRING,
function: StringFunction.CONCAT,
parameters: [/* parameters */]
};
// Example of a tumbling window
const windowedQuery = {
window: {
type: WindowType.TUMBLING,
size: {
value: 5,
unit: WindowTimeUnit.MINUTES
}
}
};
// Example of stream-table join
const joinExample = {
type: JoinType.INNER,
source: {
name: 'order_details',
sourceType: DataSourceType.TABLE
},
conditions: [
{
leftField: 'order_id',
rightField: 'id'
}
]
};
KORM provides detailed error information through two main error types:
try {
await client.executeStatement(query);
} catch (error) {
if (error instanceof KsqlDBError) {
// Handle server-side errors
console.error('KSQL Error:', error.message);
} else if (error instanceof ValidationError) {
// Handle validation failures
console.error('Validation Error:', error.details);
}
}
interface KsqlDBConfig {
host: string;
port: number;
protocol?: 'http' | 'https';
auth?: {
username: string;
password: string;
};
defaultStreamProperties?: {
'ksql.streams.auto.offset.reset'?: 'earliest' | 'latest';
'ksql.streams.cache.max.bytes.buffering'?: number;
[key: string]: string | number | boolean | undefined;
};
}
We welcome contributions! Please see our Contributing Guide for details on:
- Code of Conduct
- Development setup
- Testing guidelines
- Pull request process
MIT