Skip to content

CSV Import — api.csv

api.csv lets a workflow fetch a CSV from a public URL, split it into fixed-size chunks stored in R2, and process each chunk in a separate step. This allows files of any size to be processed without hitting worker memory or CPU limits.

// Phase 1 — fetch and split (call once in the start step)
const meta = await api.csv.importCSV(url, options);
// Phase 2 — read one chunk (call in each fan-out step)
const rows = await api.csv.readCSVChunk(key, chunkIndex);
// Phase 3 — cleanup (call once after all chunks are processed)
await api.csv.deleteCSV(key);

Fetches the CSV from url, parses it, and writes the rows to R2 in chunks. Returns metadata used to schedule the fan-out steps.

OptionTypeDefaultDescription
urlstringrequiredPublic URL of the CSV file — must be accessible without auth
options.chunkSizenumber500Rows per chunk. Lower values mean shorter per-step CPU time; higher values mean fewer scheduled steps
options.delimiterstring','Field separator character. Use '\t' for TSV files

Returns { key, totalRows, chunkCount, headers }:

FieldDescription
keyUnique import ID — pass this to every readCSVChunk and deleteCSV call
totalRowsTotal row count, excluding the header row
chunkCountNumber of chunks written — schedule this many fan-out steps
headersColumn names from the first row

Reads one chunk of a previously imported CSV. Returns an array of row objects (one object per row, keys are column names), or null if the chunk does not exist. Always null-guard the result.

Deletes all R2 objects for a CSV import. Call this in the final step after all chunks have been processed to release storage immediately rather than waiting for the automatic 7-day expiry.

class Workflow {
async start(data, headers, api) {
const meta = await api.csv.importCSV('https://example.com/products.csv', { chunkSize: 500 });
api.log(`Imported ${meta.totalRows} rows across ${meta.chunkCount} chunks`);
for (let i = 0; i < meta.chunkCount; i++) {
await api.scheduleNextStep({
delay: 10 + i * 5, // stagger chunks to avoid burst API calls
action: 'processChunk',
payload: { key: meta.key, chunkIndex: i, totalChunks: meta.chunkCount },
});
}
}
async processChunk({ key, chunkIndex, totalChunks }, headers, api) {
const rows = await api.csv.readCSVChunk(key, chunkIndex);
if (!rows) {
api.log(`Chunk ${chunkIndex} not found — skipping`);
return;
}
for (const row of rows) {
api.log(`Row: ${JSON.stringify(row)}`);
// process each row — e.g. create/update a Shopify resource
}
// Clean up on the last chunk
if (chunkIndex === totalChunks - 1) {
await api.csv.deleteCSV(key);
}
}
}

Google Sheets can serve as a CSV source without OAuth credentials if the sheet is shared as Anyone with the link can view. Convert the share URL to a CSV export URL:

const csvUrl = SHEET_URL.replace(/\/edit.*$/, '/export?format=csv');
// For a specific tab, append: + '&gid=123456789' (copy gid from the sheet URL bar)
const meta = await api.csv.importCSV(csvUrl, { chunkSize: 500 });
LimitationDetail
Public URL requiredimportCSV uses a plain fetch(url) with no auth. URLs that require login or redirect to an auth page will fail
UTF-8 encoding onlyOther encodings produce garbled text. Google Sheets and Shopify CSV exports are UTF-8
Header row requiredThe first row is always treated as column headers. Header-less CSVs are not supported
Delimiter is not auto-detectedDefaults to ,. Pass { delimiter: '\t' } explicitly for TSV files
7-day chunk expiryChunk files are automatically deleted from R2 after 7 days. Do not schedule processChunk steps more than 7 days after importCSV
Import blocks before processingThe entire file is fetched and split before importCSV returns. There is no way to begin processing chunks while the import is still running
Not available in lifecycle hooksapi.csv is not available inside onWorkflowComplete or onWorkflowError