handhole

0.0.7 • Public • Published

Handhole

Build Status

This is develop/maintenance tool for StreamAPI

Target

Make it eacy to use StreamAPI : StreamAPIをもっと簡単に

データがどこを流れている、どう流れているかわかりにくい点、 そしてデータを流し込んだり、パイプをつなぐ操作が煩雑なので、負荷を軽減をするために作りました。

Install npm install handhole

Use

var Handhole = require('handhole');
 
var hh = Handhole(stream);
var hh = HandHole([s1,s2]);

Methods

Pipe Controlle

method exsample descript
insert hh.insert([target], stream or array) 指定したstreamの前に追加する
remove hh.remove(target) 指定したstreamを取り除く
pipe hh.pipe([target], stream or array) 指定したstreamの後ろに追加
unpipe hh.unpipe(target) 指定したstreamの後ろを切り離す
split hh.sprit(to,[from]) 指定したstreamから切り離す

Stream Module

method exsample descript
hopper hh.hopper([target]) データを流し込み口を作る。指定があればtargetの前に追加する
flowMater hh.flowMater([target]) データ量(count,size)を計測する。終了時に他totalを返す
valve hh.valve([DPS],[target]) DPS(data/sec)を調整する(0=stop, 0 > no limit)

Module for Dev

開発時に陥りがちな不具合を避ける。 開発時に終端までpipeを通してないことは多いので、とりあえず終端にモニターをつける

method exsample descript
garbage hh.garbage(target, [done]) trash chunk data
garbageAll hh.garbageAll(target, [done]) set garbage to all EndTerm

Information Methods

Get stream information

list

Return registered stream list

var hh = HandHole([s1,s2]);
var list = hh.list();
console.log(list);
// [
//  {
//  id:0,              // uniqueid
//  name:"s1",         // name(from transform function name)
//  obj:StreamObject,  // stream object
//  next:[1]           // list pipes id
//  },
//  ...
// ]

viewlist

Check stream list by looking

var hh = HandHole([s1,s2]);
var list = hh.viewlist();
console.log(list);
// [
//  {
//  id:0,       // unique id
//  name:"s1",  // name(from transform function name)
//  next:[1]    // list pipes id
//  },
//  {
//  id:1,
//  mame:"s2",
//  next:[]     // have not next pipe
//  }
// ]

term

Get stream by termination type.

var hh = HandHole([s1,s2]);
 
// alone
hh.Add(s3);
 
// loop
hh.Add([s4,s5]);
hh.pipe("s5", "s4")
 
var term = hh.term();
console.log(term);
// {
//  start:[...], // start term.     ex.s1
//  end:[...], // end term.       ex.s2
//  alone:[...], // alone streams.  ex.s3
//  loop:[...], // loop streams.   ex.[s4,s5]
//  other:[...], // others
// }

Controll Methods

Insert

__targetの前に__streamを追加する

hh.insert([target], stream or array)

var t = [A,B,C]; // A-B-C 
var hh = HandHole(t);
 
// 指定なしなら先頭に追加
hh.insert(D); // D-A-B-C
 
// 指定をすれば前のStreamとの間に追加
hh.insert("B",E) // D-A-E-B-C
 
// readableの前には入れられない
var r = fs.createReadStream(filepath);
var hh = HandHole(r);
hh.insert(0, F); // Error!!!

Pipe

__targetの後ろに__streamを追加する

hh.pipe([target], stream or array)

var t = [A,B,C]; // A-B-C
var hh = HandHole(t);
 
// 指定なしなら最後尾に追加
hh.pipe(D); // A-B-C-D
 
// 指定をすれば並列に接続
hh.pipe("B",E)    // A-B-C-D
                  //   |-E
 
// ループ構造は基本的には作れない
hh.pipe("C","A")  // Error! stream loop is very slow
 
// if want to use
// hh._loopflag = true // _loopflag = true
// hh.pipe("C","A") // OK
 
// writableの後には入れられない
var r = fs.createWriteStream(filepath);
var hh = HandHole(r);
hh.insert("D", F); // Error!!!
 

Remove

__target__を取り除く

hh.pipe([target])

var t = stream; // A-B-C-D-Eという順番でつながっていると仮定
var hh = HandHole(t);
 
// 指定したstreamを取り除いて前後とつなげる
hh.remove("D"); // A-B-C-E
 

Unpipe

targetの下流を切り離す。

hh.unpipe(target)

var t = [A,B,C,D,E];     // A-B-C-D-E
var hh = HandHole(t);
 
// 指定したstreamを取り除いて前後とつなげる
hh.unpipe("C");       // A-B-C, D-Eができる
 

Split

split stream pipe by from-to

hh.unpipe(to, [from])

var t = [A,B,C,D,E];     // A-B-C-D-E
var hh = HandHole(t);
 
// split in front of target
hh.split("C");       // A-B, C-D-Eができる
 
// get between "to" and "from"
hh.split("B","D"); // A-E, B-C-D
 

Support Streams

Hopper

データを個別に導入するためのStream

var Handhole = require('handohole');
 
// 生成方法は二つ srreamobjを受け取るか
var hp = Handhole.hopper();
 
// 直接追加する
var hh = handhole(stream);
var hp = hh.hopper(1); // insertと同じ動作を行う
 
 
// データはpushもしくはdataで追加できる。
hp.push("A");
hp.push("B");
hp.push("C");
hp.push("D");
 
hp.data(["A","B","C","D"]); // data Methodなら配列で渡すこともできる
 
// close
hp.push(null);
hp.data(null);
 

Garbage / garbageAll()

閉じていないStreamを動作させるための終端Stream。
終了時にはcallbackを呼ぶ。
基本的にはgarbageAllで勝手にDuplexの終端を閉じるのでそちらを使う

var Handhole = require('handohole');
 
// 生成方法は二つ srreamobjを受け取るか
var hp = Handhole.hopper();
 
// 直接追加する
var hh = handhole(stream); // A-B-C-D
 
// Callbackを指定 A-B-C-D-[garbage]
hh.garbageAll(function(){
    // call by streamD.onfinish
    done();
});
 
hh.data(data);
hh.data(null);
 

FlowMater

Monitoring data flow call count and chunk size.

event

data

emiting on timer(default:500ms) return info latest dataflow.

total

emitting on finish. return info total dataflow.

handhole.flowMater([option])

 
// get Object from class
var fm = Handhole.flowMeter(option)
 
// or insert to instance
var hh = handhole(stream); 
var fm = hh.flowMater("B"); // insert
 
// option
option  = {
    timer:1000 // interval of emit flow event
}
 
// result
fm.on("flow", function (flow){
    console.log(flow.count);  // count of call in interval
    console.log(flow.size);  // size of total datasize in interval
})
 

Valve

valve is regurate data count per second.

Handhole.valve([option])

 
// get Object from class
var vl = Handhole.valve(option)
 
// or insert to instance
var hh = handhole(stream); 
var vl = hh.valve("B"); // insert
 
// option
option  = {
    valve:1000 // iDPS target
}
 
// change valve
vl.valve(50);
 
// result
vl.on("flow", function (flow){
    console.log(flow.count);  // count of call in interval
    console.log(flow.size);  // size of total datasize in interval
    console.log(flow.timer);  // start datetime of controll span
    console.log(flow.wait);  // buffering chunk count
    console.log(flow.valve);  // now valve setting
})
 

Capture

Data output.

  • default : console.log(chunk)
  • file : output to file(string/buffer/JSON)
 
// get Object from class
var cp = Handhole.capture(option)
 
// or insert to instance
var hh = handhole(stream); 
var cp = hh.capture("B"); // insert
 

util func

Please read test/index.js

var getDatatype = HandHole.util.getDatatype;

Stacker

データブロックサイズ変更。 流れてきたデータを配列にためて出力する。

  • option.splitChar
    特定の文字列があったらstackした内容を書き出す。
    デフォルトは","
 
// get Object from class
var hp = handole.hopper();
var st = handhole.stacker({splitChar:","})
var hh = handhole([hp, st]);
 
for(var i=0; i< 50; i++){
    hp.push("D"+i);
}
// ["D0","D1"...]
 
hp.push(",") // split
 
for(var i=0; i< 50; i++){
    hp.push("C"+i);
}
// ["D0","D1"...] , ["C0","C1"...] 
 

Conful

複数のパイプから一つの処理パイプに流すための合流pipe
すべてのpipeがcloseしたらconfulも閉じる。

var hp1 = HandHole.hopper();
var hp2 = HandHole.hopper();
var hp3 = HandHole.hopper();
var cnf = HandHole.conful([hp1, hp2]); // initial set pipe
cnf.conful(hp3); // add pipe

turnstile

改札機 並列実行+timeout制御をする

 
var opt = {
    t: ff, // function _transform
    max: 4, // process max
    timeout: 800 // timeout[ms]
}
var tsl = HandHole.turnstile(opt);
 
tsl.on("timeout", function(chunk){
    // timeout chunk
})
 

Readme

Keywords

Package Sidebar

Install

npm i handhole

Weekly Downloads

2

Version

0.0.7

License

MIT

Last publish

Collaborators

  • uzuna