CSV Import — api.csv
api.csv lets a workflow fetch a CSV from a public URL, split it into fixed-size chunks stored in R2, and process each chunk in a separate step. This allows files of any size to be processed without hitting worker memory or CPU limits.
Methods
Section titled “Methods”// Phase 1 — fetch and split (call once in the start step)const meta = await api.csv.importCSV(url, options);
// Phase 2 — read one chunk (call in each fan-out step)const rows = await api.csv.readCSVChunk(key, chunkIndex);
// Phase 3 — cleanup (call once after all chunks are processed)await api.csv.deleteCSV(key);api.csv.importCSV(url, options?)
Section titled “api.csv.importCSV(url, options?)”Fetches the CSV from url, parses it, and writes the rows to R2 in chunks. Returns metadata used to schedule the fan-out steps.
| Option | Type | Default | Description |
|---|---|---|---|
url | string | required | Public URL of the CSV file — must be accessible without auth |
options.chunkSize | number | 500 | Rows per chunk. Lower values mean shorter per-step CPU time; higher values mean fewer scheduled steps |
options.delimiter | string | ',' | Field separator character. Use '\t' for TSV files |
Returns { key, totalRows, chunkCount, headers }:
| Field | Description |
|---|---|
key | Unique import ID — pass this to every readCSVChunk and deleteCSV call |
totalRows | Total row count, excluding the header row |
chunkCount | Number of chunks written — schedule this many fan-out steps |
headers | Column names from the first row |
api.csv.readCSVChunk(key, chunkIndex)
Section titled “api.csv.readCSVChunk(key, chunkIndex)”Reads one chunk of a previously imported CSV. Returns an array of row objects (one object per row, keys are column names), or null if the chunk does not exist. Always null-guard the result.
api.csv.deleteCSV(key)
Section titled “api.csv.deleteCSV(key)”Deletes all R2 objects for a CSV import. Call this in the final step after all chunks have been processed to release storage immediately rather than waiting for the automatic 7-day expiry.
Standard fan-out pattern
Section titled “Standard fan-out pattern”class Workflow { async start(data, headers, api) { const meta = await api.csv.importCSV('https://example.com/products.csv', { chunkSize: 500 }); api.log(`Imported ${meta.totalRows} rows across ${meta.chunkCount} chunks`);
for (let i = 0; i < meta.chunkCount; i++) { await api.scheduleNextStep({ delay: 10 + i * 5, // stagger chunks to avoid burst API calls action: 'processChunk', payload: { key: meta.key, chunkIndex: i, totalChunks: meta.chunkCount }, }); } }
async processChunk({ key, chunkIndex, totalChunks }, headers, api) { const rows = await api.csv.readCSVChunk(key, chunkIndex); if (!rows) { api.log(`Chunk ${chunkIndex} not found — skipping`); return; }
for (const row of rows) { api.log(`Row: ${JSON.stringify(row)}`); // process each row — e.g. create/update a Shopify resource }
// Clean up on the last chunk if (chunkIndex === totalChunks - 1) { await api.csv.deleteCSV(key); } }}Using a public Google Sheet as source
Section titled “Using a public Google Sheet as source”Google Sheets can serve as a CSV source without OAuth credentials if the sheet is shared as Anyone with the link can view. Convert the share URL to a CSV export URL:
const csvUrl = SHEET_URL.replace(/\/edit.*$/, '/export?format=csv');// For a specific tab, append: + '&gid=123456789' (copy gid from the sheet URL bar)const meta = await api.csv.importCSV(csvUrl, { chunkSize: 500 });Limitations
Section titled “Limitations”| Limitation | Detail |
|---|---|
| Public URL required | importCSV uses a plain fetch(url) with no auth. URLs that require login or redirect to an auth page will fail |
| UTF-8 encoding only | Other encodings produce garbled text. Google Sheets and Shopify CSV exports are UTF-8 |
| Header row required | The first row is always treated as column headers. Header-less CSVs are not supported |
| Delimiter is not auto-detected | Defaults to ,. Pass { delimiter: '\t' } explicitly for TSV files |
| 7-day chunk expiry | Chunk files are automatically deleted from R2 after 7 days. Do not schedule processChunk steps more than 7 days after importCSV |
| Import blocks before processing | The entire file is fetched and split before importCSV returns. There is no way to begin processing chunks while the import is still running |
| Not available in lifecycle hooks | api.csv is not available inside onWorkflowComplete or onWorkflowError |