DataStreamer
Data streamer for streaming analytics test platforms. Takes batch CSV,JSON data and streams to targets (Kafka,HTTP).
Installation
DataStreamer at NPM (version 1.2.7)
Install with NPM -> npm install datastreamer
Details of DataStreamer
;
Parameters
-
configName: Name or path of config file in JSON format without extension (if your config file 'config.json', configName will be 'config')
-
lineListener: Callback function which will be triggered when every line readed from file. This function takes these parameters:
-
fileStream: File stream for resume, pause and close the stream.
-
fieldNames: Field names of data. You can add extra fields via give field names as array to extraFields parameter of DataStreamer constructor.
-
fieldValues: Field values of data. You can add values of extra fields to this array
-
jsonGenerator: Json generator for corresponding data schema which has given with config file's dataSchema attribute.
-
{ /* your implementation */ }
- pauseListener: Callback function which will be triggered when stream paused.
{ /* your implementation */ }
- resumeListener: Callback function which will be triggered when stream resumed.
{ /* your implementation */ }
-
streamListener: Callback function which will be triggered when data streamed to Kafka. These function takes kafkaBuffer and fileStream as parameters:
- kafkaBuffer: File stream writes data to this buffer to send data to Kafka and DataStreamer reads this buffer and sends to Kafka.
- fileStream: File stream for resume, pause and close the stream.
{ /* your implementation */ }
Usage of DataStreamer
var DataStreamer = ;var begin = Date; var vars = "queue": "timestamp": begin "tx_id": 1; var dataStreamer = "paysim-config" // config file name without file extension (.json mandatory) lineListener null null null "timestamp""tx_id"; { fieldValues; fieldValues; var now = Date; var timestamp = vars"timestamp" + vars"tx_id"; if timestamp < now dataStreamer; else vars"queue"; ++vars"tx_id";} { var currentTime = Date; for var i = 0; i >= 0 && i < vars"queue"length; ++i if vars"queue"i"timestamp" <= currentTime dataStreamer; vars"queue"; --i; ;} dataStreamer;;
Configuration of DataStreamer
Config for Kafka target
Config for HTTP target
Configuration attributes:
-
filename: Name or path of the data
-
dataSchema: Data types of data's columns
-
target: Configuration of stream target
-
chunkSize: Number of datas will be written in triggerInterval
-
triggerInterval: Period of writing data
-
loggerEnabled: Logging flag for log to file or console
-
loggerType: Type of logger which can be file or console
-
logFilename: If logger type is file, logFilename will be used by logger to create a log file with name logFilename.