stream data to mixpanel... quickly
mixpanel-import
implements Mixpanel's /import
, /engage
, /groups
, and /lookup
APIs with best practices, providing a clean, configurable interface to stream JSON, NDJSON, or CSV files compliant with Mixpanel's data model through Mixpanel's ingestion pipeline.
by implementing interfaces as streams in node.js, high-throughput backfills are possible with no intermediate storage and a low memory footprint.
note: if you're trying to add real-time mixpanel tracking to a node.js web application - this module is NOT what you want; you want mixpanel-node the official node.js SDK.
this module can be used in two ways:
- as a CLI, standalone script via:
npx mixpanel-import file --options
- as a module in code via
//for esm:
import mpStream from 'mixpanel-import'
//for cjs:
const mpStream = require('mixpanel-import')
const myImportedData = await mpSteam(creds, data, options)
npx --yes mixpanel-import@latest ./pathToData
when running as a CLI, pathToData
can be a .json
, .jsonl
, .ndjson
, .csv
or .txt
file OR a directory which contains said files.
when using the CLI, you will supply params to specify options of the form --option value
, for example your project credentials:
npx --yes mixpanel-import ./data.ndjson --secret abc123
many other options are available; to see a full list of CLI params, use the --help
option:
npx --yes mixpanel-import --help
alternatively, you may use an .env
configuration file to provide your project credentials (and some other values).
the CLI will write response logs to a ./logs
directory by default. you can specify a --where dir
option as well if you prefer to put logs elsewhere.
install mixpanel-import
as a dependency in your project
npm i mixpanel-import --save
then use it in code:
const mpStream = require("mixpanel-import");
const importedData = await mpStream(credentials, data, options);
console.log(importedData);
/*
{
success: 5003,
failed: 0,
total: 5003,
batches: 3,
rps: 3,
eps: 5000,
recordType: "event",
duration: 1.299,
retries: 0,
responses: [ ... ],
errors: [ ... ]
}
*/
read more about credentials
, data
, and options
below
when using mixpanel-import
in code, you will pass in 3 arguments: credentials
, data
, and options
Mixpanel's ingestion APIs authenticate with service accounts OR API secrets; service accounts are the preferred authentication method.
const creds = {
acct: `my-service-acct`, //service acct username
pass: `my-service-secret`, //service acct secret
project: `my-project-id`, //project id
};
const importedData = await mpStream(creds, data, options);
const creds = {
secret: `my-api-secret`, //api secret (deprecated auth)
};
const importedData = await mpStream(creds, data, options);
if you are importing user
profiles, group
profiles, or lookup tables
, you should also provide also provide the you project token
and some other values in your creds
configuration:
const creds = {
token: `my-project-token`, //for user/group profiles
groupKey: `my-group-key`, //for group profiles
lookupTableId: `my-lookup-table-id`, //for lookup tables
}
it is possible to delegate the authentication details to environment variables, using a .env
file under the MP_
prefix of the form:
# if using service account auth; these 3 values are required:
MP_PROJECT={{your-mp-project}}
MP_ACCT={{your-service-acct}}
MP_PASS={{your-service-pass}}
# if using secret based auth; only this value is required
MP_SECRET={{your-api-secret}}
# type of records to import; valid options are event, user, group or table
MP_TYPE=event
# required for user profiles + group profiles
MP_TOKEN={{your-mp-token}}
# required for group profiles
MP_GROUP_KEY={{your-group-key}}
# required for lookup tables
MP_TABLE_ID={{your-lookup-id}}
note: pass null
(or {}
) as the creds
to the module to use .env
variables for authentication:
const importedData = await mpStream(null, data, options);
the data
param represents the data you wish to import; this might be events, user profiles, group profiles, or lookup tables
the value of data can be:
-
a path to a file, which contains records as
.json
,.jsonl
,.ndjson
, or.txt
const data = `./myEventsToImport.json`;
const importedData = await mpStream(creds, data, options);
-
a path to a directory, which contains files that have records as
.json
,.jsonl
,.ndjson
, or.txt
const data = `./myEventsToImport/`; //has json files
const importedData = await mpStream(creds, data, options);
-
a list of paths, which contains files that have records as
.json
,.jsonl
,.ndjson
, or.txt
const data = [`./file1.jsonl`, `./file2.jsonl`] ; //has json files
const importedData = await mpStream(creds, data, options);
- an array of objects (records), in memory
const data = [{event: "foo"}, {event: "bar"}, {event: "baz"}]
const importedData = await mpStream(creds, data, options);
- a stringified array of objects, in memory
const records = [{event: "foo"}, {event: "bar"}, {event: "baz"}]
const data = JSON.stringify(data);
const importedData = await mpStream(creds, data, options);
- a JSON (or JSONL) readable file stream
const myStream = fs.createReadStream("./myData/lines.json");
const imported = await mpStream(creds, myStream, { streamFormat: `json` });
note: please specify streamFormat
as json
or jsonl
in the options
- an "object mode" readable stream:
const { createMpStream } = require('mixpanel-import');
const mixpanelStream = createMpStream(creds, options, (results) => { ... })
const myStream = new Readable.from(data, { objectMode: true });
const myOtherStream = new PassThrough()
myOtherStream.on('data', (response) => { ... });
myStream.pipe(mixpanelStream).pipe(myOtherStream)
note: object mode streams use a different named import: createMpStream()
... the callback
receives a summary of the import and downstream consumers of the stream will receives API responses from Mixpanel.
you will use the options
(below) to specify what type of records you are importing; event
is the default type
options
is an object that allows you to configure the behavior of this module. there are LOTS of options for different types of import use cases. you can specify options as the third argument in module mode or as flags in CLI mode.
all options are... optional... for a full list of what these do, see the type definition
export type Options = {
recordType?: RecordType;
vendor?: "amplitude" | "heap" | "mixpanel" | "ga4" | "adobe" | "pendo" | "mparticle"
region?: Regions;
streamFormat?: SupportedFormats;
compress?: boolean;
compressionLevel?: 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9;
strict?: boolean;
logs?: boolean;
verbose?: boolean;
fixData?: boolean;
removeNulls?: boolean;
abridged?: boolean;
forceStream?: boolean;
streamSize?: number;
timeOffset?: number;
recordsPerBatch?: number;
bytesPerBatch?: number;
maxRetries?: number;
workers?: number;
where?: string;
transformFunc?: transFunc;
parseErrorHandler?: transFunc;
tags?: genericObj;
aliases?: genericObj;
epochStart?: number;
epochEnd?: number;
dedupe?: boolean;
eventWhitelist?: string[];
eventBlacklist?: string[];
propKeyWhitelist?: string[];
propKeyBlacklist?: string[];
propValWhitelist?: string[];
propValBlacklist?: string[];
start?: string;
end?: string;
};
use npx mixpanel-import --help
to see the full list.
option, alias description default
----------------------------------------------------------------
--type, --recordType event/user/group/table "event"
--compress, --gzip gzip on egress false
--strict /import strict mode true
--logs log import results to file true
--verbose show progress bar true
--streamFormat, --format either json or jsonl "jsonl"
--region either US or EU "US"
--fixData fix common mistakes false
--streamSize 2^n value of highWaterMark 27
--recordsPerBatch # records in each request 2000
--bytesPerBatch max size of each request 2MB
--where directory to put logs
note: the recordType
param is very important; by default this module assumes you wish to import event
records.
added in 2.5.20: you can now specify certain vendor
's in the options like amplitude
or ga4
and mixpanel-import
will provide the correct transform on the source data to bring it into mixpanel.
change this value to user
, group
, or table
if you are importing other entities.
the transformFunc
is useful because it can pre-process records in the pipeline using arbitrary javascript.
here are some examples:
- putting a
token
on everyuser
record:
function addToken(user) {
user.token = `{{my token}}`;
return user;
}
const imported = await mpStream(creds, data, {
transformFunc: addToken,
recordType: "user",
});
- constructing an
$insert_id
for each event:
const md5 = require('md5')
function addInsert(event) {
const hash = md5(event);
event.properties.$insert_id = hash;
return event
}
const imported = await mpStream(creds, data, { transformFunc: addInsert })
- reshape/rename profile data with a proper
$set
key and$distinct_id
value
function fixProfiles(user) {
const mpUser = { $set: { ...user } };
mpUser.$set.$distinct_id = user.uuid;
return mpUser
}
const imported = await mpStream(creds, data, { transformFunc: fixProfiles, recordType: "user"});
- only bringing in certain events; by returning
{}
from thetransformFunc
, results will be omitted
function onlyProps(event) {
if (!event.properties) return {}; //don't send events without props
return event;
}
const data = [{ event: "foo" }, {event: "bar"}, {event: "baz", properties: {}}]
const imported = await mpStream(creds, data, { transformFunc: onlyProps }); //imports only one event
- "exploding" single events into many; by returning an
[]
from thetransformFunc
, each item will be treated as a new record
const data = [{ event: false }, {event: "foo"}]
// turns "false" event into 100 events
function exploder(o) => {
if (!o.event) {
const results = [];
const template = { event: "explode!" };
for (var i = 0; i < 100; i++) {
results.push(template);
}
return results;
}
return o;
};
const imported = await mpStream(creds, data, { transformFunc: exploder }) //imports 101 events
- importing a CSV file of events using
aliases
to identify the correct mixpanel fields:
const eventsCSV = './myEvents.csv'
/*
myEvents.csv looks like this:
row_id,uuid,timestamp,action,colorTheme,luckyNumber
a50b0a01b9df43e74707afb679132452aee00a1f,7e1dd089-8773-5fc9-a3bc-37ba5f186ffe,2023-05-15 09:57:44,button_click,yellow,43
09735b6f19fe5ee7be5cd5df59836e7165021374,7e1dd089-8773-5fc9-a3bc-37ba5f186ffe,2023-06-13 12:11:12,button_click,orange,7
*/
const imported = await mpStream(creds, eventsCSV, {
streamFormat: "csv",
aliases: {
row_id: "$insert_id",
uuid: "distinct_id",
action: "event",
timestamp: "time"
}
}
);
sometimes it's helpful to generate test data, so this module includes a separate utility to do that:
$ npm run generate
someTestData.json
will be written to ./testData
... so you can then node index.js ./testData/someTestData.json
because... i needed this and it didn't exist... so i made it.
then i made it public it because i thought it would be useful to others. then it was, so i made some improvements.
found a bug? have an idea?