bulldozer-c

0.0.72 • Public • Published

bulldozer-c


let BulldozerC = require('bulldozer-c');
let httpclient = require('http-clientp');
let bc = new BulldozerC();
//global.http_proxy = {'host': '127.0.0.1', 'port': 8888}; //local proxy set
let crawl = bc.newSingletonCrawl();

let handlerContext = bc.httpUtils.buildHttpcontext('GET', {
    'next': '1718',
    'url': 'http://www.forbeschina.com/lists/1718'
});
handlerContext.mainProgram = crawl;
bc.testDelayStartRequest(handlerContext);

crawl.on('1718', function (prehandlerContext) {
    let body = prehandlerContext.response.body;
    let $ = bc.$(body);
    let trs = $('#data-view > tbody > tr');
    let postdata = [];
    for (let i = 0; i < trs.length; i++) {
        let tr = $(trs[i]);
        let tds = tr.find('td');
        let Ranking = $(tds[0]).text();
        let PName = $(tds[1]).text();
        let CName = $(tds[2]).text();
        let HeadPro = $(tds[3]).text();
        let HeadCity = $(tds[4]).text();
        let Total = $(tds[5]).text();
        let result = {
            'Ranking': Ranking,
            'PName': PName,
            'CName': CName,
            'HeadPro': HeadPro,
            'HeadCity': HeadCity,
            'Total': Total,
        };
        postdata.push(result);
    }
    let collection = {
        'name': 'table_name',
        'data': postdata
    };
    console.log(collection)
    //bc.dbClient.save(collection);//save to mongodb
    //bc.dbClient.mysql_save(collection);//save to mysql
    /*
        httpclient.post('http://services/save', function (err, body, res, httpcontext) {
            if (err) {
                 console.error('save fail');
             } else {
                 console.error('save succ');
             }
         }, {
             'request': {
                 'postdata': collection,
                 'headers': {
                     'content-type': 'application/json'
                 }
             }
         })
    */
});


项目介绍

  1. 分布式爬虫系统,需要和bulldozer(服务端)组合使用
  2. 支持抓取、解析、存储、任务初始化、任务调度、任务监控告警等
  3. bulldozer(服务端)主要用到redis、mysql、mongodb,其中redis为必须;
    1. redis存储request模板、主要包含:请求地址及参数、解析此request的方法函数事件、附带的数据信息等
    2. mysql、mongodb主要存储解析后的数据,其中mysql通过将json对象转换成sql语句实现通用的insert、update、delete、select操作;
  4. 此系统涉及到的主要包或者服务: bulldozer(服务端)httpclientpjsontosql2
  5. 此系统可以配合pm2-agent进行指标的采集,实现监控告警
  6. 爬虫脚本可以通过 pm2 start crawl.js(脚本) --log-date-format='YYYY-MM-DD HH:mm:ss.SSS'(打印日志时间) -i 2(进程数) 执行,日志会存储在 .pm2/logs/ 下面

软件架构

avatar


细节介绍

  • 下面bc皆为 let bc = new BulldozerC, BulldozerC在同一进程中最好只new一次,new的对象会赋值给全局变量 global.bulldozerc_new。

  • bulldozer-c中通用格式爬取请求模板
 {
     "md5": "请求信息的md5,由path+method+postdata等组成,具体请看http_utils.buildHttpcontext",
     "request": {
         "postdata": "如果是post请求,请填写postbody参数,支持字符串和json格式;如果是get请求,可以为null",
         "options": {
             "method": "POST-请求方式",
             "path": "请求地址,如:https://www.baidu.com/"
         },
         "timeout": 超时时间ms,不填默认没有
     },
     "response": {},
     "data": {
         "next": "处理此次请求的方法名称,系统会通过事件去触发此方法执行",
         "url": "请求地址,如:https://www.baidu.com/",
         "属性1": "next方法需要用到的属性",
         "属性2": "next方法需要用到的属性",
         "属性n": "next方法需要用到的属性"
     },
     "charset":"返回内容编码,不填将默认取res头中的编码,如果是二级制,如:图片,pdf等,请填buffer"
 }
 此模板可以通过 bc.httpUtils.buildHttpcontext方法获构建,如:
 let httpcontext = bc.httpUtils.buildHttpcontext('post', {
     'next': 'textfunction',
     'url': 'https://www.baidu.com/',
     'prototype1': 'next方法需要用到的属性',
     'prototype2': 'next方法需要用到的属性',
     'prototypen': 'next方法需要用到的属性'
 }, 'a=1&b=2', 2000, null, 'gbk');
 console.log(httpcontext);
  • bulldozer-c中通用格式爬取请求返回结果模板
  let httpcontext = {
      "md5": "请求信息的md5,由path+method+postdata等组成,具体请看http_utils.buildHttpcontext",
      "request": {
          "postdata": "如果是post请求,请填写postbody参数,支持字符串和json格式;如果是get请求,可以为null",
          "options": {
              "method": "POST-请求方式",
              "path": "请求地址,如:https://www.baidu.com/"
          },
          "timeout": 超时时间ms,不填默认没有
      },
      "response": {
        'statusCode': 请求返回的状态码,
        'body':'请求返回的内容,如果charset=buffer,此body为buffer对象'
      },
      "res": 请求返回的完整res对象
      "data": {
          "next": "处理此次请求的方法名称,系统会通过事件去触发此方法执行",
          "url": "请求地址,如:https://www.baidu.com/",
          "属性1": "next方法需要用到的属性",
          "属性2": "next方法需要用到的属性",
          "属性n": "next方法需要用到的属性"
      },
      "charset":"返回内容编码,不填将默认取res头中的编码,如果是二级制,如:图片,pdf等,请填buffer"
  }
  此模板可以通过 bc.httpUtils.buildHttpcontext方法获构建
  • 和服务端bulldozer进行交互. mysql操作具体可以参考jsontosql2
和服务端进行交互的数据格式为: let collection = {'name':'queueName|tableName', 'data':[{对象内容}], 'duplicate':['mysql表字段1','mysql表字段2']};,其中 duplicate为支持 mysql duplicate update 用法。如果是redis或者mongodb则不需要duplicate
1. 将请求模板存储在redis队列中
    (1).bc.dbClient.lpushs({'name':'queueName','data':[httpcontext]}); //支持数组和对象,将请求模板 httpcontext 存储在list队列queueName中
    (2).multilpush 同(1),支持事物
    (3)sadds,支持数组  将请求模板 httpcontext 存储在set对了queueName中
    (4) multisadd,支持数组 同(3),支持事物
    (5) saddDistinct,支持对象,此方法会过滤掉之前抓取过的请求连接,过滤原理:此方法会将请求加入到两个队列中,既:queueName和queueName:distinct。将请求模板 httpcontext 加入到 queueName,将请求模板 httpcontext 的md5加入到queueName:distinct中,加入之前会先加入queueName:distinct中,如果加入成功,在加入queueName中,否则说明已经抓取过了,不在加入
    (6)saddDistincts,同 (5) 支持数组。
    (7)rpop,同redis rpop命令
    (8)lpop,同redis lpop命令
    (9)spop,同redis spop命令
    (10)rpoplpush({'name0':'queueName0', 'name1':'queueName1'}), 同redis rpoplpush 命令,从queueName0取出val,返回并加入到queueName1队列中
    (11)spopsadd({'name0':'queueName0', 'name1':'queueName1'}),,set版本取出再插入,从queueName0取出val,返回并加入到queueName1队列中
    (12)setTaskState('queueName','0000') 优雅的暂停任务,设置为0000最多30s内会暂停任务,设置为不等于0000,最多30s内会恢复
    (13)getTaskState('queueName') 获取任务的状态,当值为0000的时候,任务会暂停;非0000,任务恢复
2. 将解析完成的数据存储到mysql中,假设表已经建好
  (1) 设置了UNIQUE KEY, 用到duplicate update,根据UNIQUE KEY去重
    let collection = {
            'name': 'tableName',                             //表名称
            'data': [{'a':1,'b':2,'c':3,'UNIQUE_KEY':'1'}],   //数据
            'duplicate': ['a', 'b', 'c']                 //唯一约束冲突后进行更新的字段
        };
    bc.mysql_save(collection);
  (2) 未设置UNIQUE KEY
    let collection = {
            'name': 'tableName',
            'data': [{'a':1,'b':2,'c':3}]
       };
    bc.mysql_save(collection);
  (3) update数据
    let collection = {
           'name': 'tableName',          //表名称
           'data': {'a':1,'b':2,'c':3}, //更新的值
           'where': {'UNIQUE_KEY': 1}   //更新条件
       };
    bc.mysql_update(collection);
  • 任务开始
1. bc.runTask(collection, mainProgram, taskName, intervalTime, operation)
   (1) collection : 队列名称 {'name':'queueName'} (operation=rpop|spop), {'name0':'queueName0', 'name1':'queueName1'} (operation=rpoplpush|spopsadd)
   (2) mainProgram : 爬虫具体脚本对象
   (3) taskName : 爬虫名称
   (4) intervalTime : 爬虫请求时间间隔,单位秒。如 1 表示每1s请求一次
   (5) operation : spop,spopsadd从set中取出爬取请求模板;rpop,rpoplpush从list中取出爬取请求模板
2. bc.testDelayStartRequest(handlerContext, queueName, delay, operation); //单个请求测试入口   
  • 任务重试
1. bc.retry(handlerContext);
  (1) 默认重试3次,重试3次失败后会打印日志,日志包含 retry_fail 或 request_fail_no_retry
  (2) 重试次数可以配置,配置参数 global.request_retry_count
  • 指标信息,指标是用pmx实现的,用pm2启动程序,执行命令pm2 show id可以看到相关指标信息:
   Code metrics value
  ┌───────────────────────────────────────────────────────────────────────┬────────┐
  │ Loop delay                                                            │ 0.36ms │
  │ Active requests                                                       │ 0      │
  │ Active handles                                                        │ 4      │
  │ bulldozer_c{type="queueName",next="downloadLogo",event="total",}      │ 7204   │  //总数
  │ bulldozer_c{type="queueName",next="downloadLogo",event="succ",}       │ 7197   │  //成功次数
  │ bulldozer_c{type="queueName",next="downloadLogo",event="fail",}       │ 6      │
  │ bulldozer_c{type="queueName",next="downloadLogo",event="retry_fail",} │ 3      │ //重试3次失败总数
  │ bulldozer_c{type="queueName",next="downloadLogo",event="retry",}      │ 3      │ //重试次数
  │ bulldozer_c_http{type="queueName",statusCode="301",}                  │ 3      │ //http statusCode相关指标
  │ bulldozer_c_http{type="queueName",statusCode="405",}                  │ 2      │
  │ bulldozer_c_http{type="queueName",statusCode="404",}                  │ 1      │
  └───────────────────────────────────────────────────────────────────────┴────────┘
  以上是默认指标,也可以自定义指标
  bc.getCounter(keyName) 可以获得指标名称: 
     1. 如果keyName是字符串,将直接以字符串为指标名称
     2.如果指标为对象,将以key键值为指标名称,如:let retryFailkeyName = {'key': 'bulldozer_c', 'type': queueName, 'next': 'downloadLogo', 'event': 'total'}; 指标为: bulldozer_c{type="queueName",next="downloadLogo",event="total",}                     
    采集例子:
    # HELP bulldozer_c HELP.
    # TYPE bulldozer_c gauge
    bulldozer_c{type="queueName",next="downloadLogo",event="total",nodeName="download_logo",pid="1218094",} 7281
    bulldozer_c{type="queueName",next="downloadLogo",event="succ",nodeName="download_logo",pid="1218094",} 7200
    bulldozer_c{type="queueName",next="downloadLogo",event="fail",nodeName="download_logo",pid="1218094",} 81
  • 0.0.43
 1. 增加taskInit函数,用于任务初始化
 2. 增加setTaskInitInterval函数,用于设置任务初始化时间,及任务执行周期
 3. 增加判断是否暂停爬虫任务: 
                        (1)如果连续5min没有爬虫请求(既redis队列为空),将暂停爬虫任务;
                        (2)如果爬虫任务处于暂停中,每30min后会尝试恢复爬虫任务;如果恢复后,最大30s内还没有任务将继续暂停;这是一个循环判断过程;
                        (3)可以通过 bc.setTaskTimeOut(timeOut, restoreTime);设置暂停爬虫时间和恢复爬虫时间
  • 0.0.44
 1. 增加 $ 函数,用于获取html选择器:let $ = bc.$(html);
 2. 增加 parseJson 函数,用于格式化json字符串:let jsonObj = bc.parseJson(jsonstr);
 3. 增加 runTaskQPS 函数,用于根据qps进行设置爬虫速度: bc.runTaskQPS(collection, mainProgram, taskName, QPS, operation)
  • 0.0.45
 1. 启动任务增加打印qps
  • 0.0.46
 1. 增加setProxy(host, port)设置代理函数
 2. 增加默认 newCrawl() 函数,可以直接通过bc.newCrawl()得到默认爬虫函数。新写法例子: https://github.com/jiang19210/bulldozer-c-example/blob/master/dianping_task_on.js
  • 0.0.47
 1. 增加newSingletonCrawl()
  • 0.0.66
 1. 增加setProxy(host, port, username, password), 支持设置代理用户名及密码
  • 0.0.69
 1. parseFailHandler(handlerContext)  解析失败后执行的方法,默认重试
 2. suspend(queue) restart(queue)     暂停和重启手动方法
 3. log(message, handlerContext)      增加日志打印

Package Sidebar

Install

npm i bulldozer-c

Weekly Downloads

14

Version

0.0.72

License

MIT

Unpacked Size

66.8 kB

Total Files

11

Last publish

Collaborators

  • xiaoming19210