mysql_minionpool

A mysql minionpool, suitable to work on rows

About

Extends minionpool to have worker pools that can process rows from a mysql table. It uses node-mysql as its MySQL driver.

The npm package is called mysql_minionpool.

It's used as a standard minionpool, you only need to provide some of the callbacks and mysql_minionpool will do the rest (see below).

Here's a simple program that will use mysql_minionpool to process a whole table paginating the rows, suitable to process a large number of rows.

var pool = new mysqlMinionPoolMod.MysqlMinionPool({
  mysqlConfig: {
    host: '127.0.0.1',
    user: 'root',
    password: 'pass',
    database: 'db',
    port: 3306
  },
  concurrency: 5,    // How many pages to get concurrently... 
  rowConcurrency: 1, // ... and how many concurrent rows processed PER query 
 
  // Since we're paginating, let's create a state where we can store the 
  // current page and the total rows per page. 
  // First argument is the error, if something failed. 
  taskSourceStartfunction(callback) {
    callback(undefined, {page: 0, pageSize: 10});
  },
 
  // Called to retrieve rows to process (a page, in our case). In the 'state' 
  // variable, there will be a property state.mysqlPool that grants mysql 
  // access. 
  taskSourceNextfunction(statecallback) {
    var db = 'db';
    var table = 'table';
    var query = "SELECT * FROM `" + db + "`.`" + table + "` LIMIT ?,?";
    state.mysqlPool.getConnection(function(errmysqlConnection) {
      if(err) {
        callback(err, undefined);
      } else {
        mysqlConnection.query(
          query, [state.page * state.pageSize, state.pageSize], function(errrows) {
            mysqlConnection.release();
            // First argument for the callback is the error, if something failed. 
            if(err) {
              callback(err, undefined);
            } else if(rows.length === 0) {
              callback(undefined, undefined);
            } else {
              callback(undefined, rows);
            }
          }
        );
      }
    });
    state.page++;
    return state;
  },
 
  // The handle also gets state.mysqlPool. 
  minionTaskHandlerfunction(taskstatecallback) {
    console.log('item: ' + util.inspect(task));
    // First argument is the error, if something failed. 
    callback(undefined, state);
  },
 
  poolEndfunction() {
    console.log('done');
  },
});
pool.start();