Transforms
Script transforms operate on results and cursors to transform them in-flight. Transforms can be applied to Views, Policies and inline for Cursors and BulkOperations.
Though aggregation and projection is often much faster and less expensive that running a script transform, sometimes complex computation or data-interleaving is required that would be impractical to accomplish within a single script instance.
Transforms work by applying a script to an output cursor. Depending on the complexity and number of documents, a cursor transformation may be executed and re-queued multiple times before the cursor is finally exhausted.
When a running transform hits a timeout or ops threshold of 80%, it will stop iterating over the cursor, run the
after
finalizing method, store the memo
object and re-queue for another run.To create a transform, simply define a class annotated with
@transform
. All methods are optional.const { transform } = require('decorators-transform')
@transform
class Transformer {
/**
* Called if there is an error in the source operation. If the method
* does not throw an error, Cortex throws the original error.
*
* @param err {Fault} thrown fault.
*/
error(err) {
throw err
}
/**
* Called if the result is not a cursor (but a single result value).
* The method must return a result if it's to be transformed. If the
* method does not return a value, Cortex outputs the original result value.
*
* @param result {*}
* @returns {*}
*/
result(result) {
return result
}
/**
* Cursor handler called only once and before all other processing.
*
* @param memo { Object } An empty object that can be modified and is saved
* for the duration of the transformation.
* @param cursor {Cursor} The output cursor.
*/
beforeAll(memo, { cursor }) {
}
/**
* Cursor handler called before each script run, and shares
* the current script context with each and after.
*
* @param memo { Object }
* @param cursor {Cursor}
*/
before(memo, { cursor }) {
}
/**
* Cursor handler called with each cursor object. Returning undefined
* filters the object from the output stream.
*
* @param object { Object } the current output document
* @param memo { Object }
* @param cursor {Cursor}\
* @returns {*} a value to be pushed to the output cursor.
*/
each(object, memo, { cursor }) {
return object
}
/**
* Cursor handler called after before() and each() was called.
*
* @param memo { Object }
* @param cursor {Cursor}
* @param count { Number } The number of documents pushed by the script
* runtime during the current execution.
*/
after(memo, { cursor, count }) {
}
/**
* Cursor handler called only once and after the cursor is exhausted.
*
* @param memo { Object }
* @param cursor {Cursor}
*/
afterAll(memo, { cursor }) {
}
}
A policy transform can handle a result value as well as a cursor. The define one, set the policy
action
to "Transform"
and the script
value to a transform export.script.arguments
:- policy { Object }
- _id { ObjectID } The policy id
- name { String } The policy name
- policyOptions { Object }
- triggered {String[]} The list of triggered conditions (methods, paths, etc.)
const { transform } = require('decorators-transform')
@transform
class PolicyTransform {
error(err) {
throw Fault.create('app.error.policyError', { faults: [err] })
}
result(result) {
result.policyTriggers = script.arguments.policyOptions.triggered
return result
}
}
module.exports = PolicyTransform
A view transform acts on the result form a view cursor. The define one, set the
script
value to a transform export.script.arguments
:- view { Object }
- _id { ObjectID } The view id
- name { String } The view name
- viewOptions { Object } The original view cursor options (object, skipAcl, paths, pipeline, etc.)
const { transform } = require('decorators-transform')
@transform
class ViewTransform {
beforeAll(memo, { cursor }) {
const { arguments: { view, viewOptions } } = script,
{ paths, object } = viewOptions
cursor.push({
object: 'transform',
view,
viewOptions
})
}
}
module.exports = ViewTransform
A transform may be applied to any QueryCursor, AggregationCursor, or top-level BulkOperation.
script.arguments
:- cursorOptions { Object } The original cursor options (object, skipAcl, paths, pipeline, etc.)
There are a few ways to define a transform for a cursor.
A series of implicitly usable transform methods:
const { c_datum: Datum } = org.objects
return Datum.find().transform(`
beforeAll(memo, { cursor }) {
Object.assign(memo, {
object: 'resultTotals',
sum: 0,
deviceTypes: [],
culled: 0
})
}
each(object, memo) {
if (object.c_inactive) {
memo.culled += 1
return // returning undefined filters the object from the output stream
}
memo.sum += object.c_amount || 0
if (!memo.deviceTypes.includes(object.c_deviceType)) {
memo.deviceTypes.push(object.c_deviceType)
}
return object // return the object to include it in the output stream.
}
afterAll(memo, { cursor }) {
cursor.push(memo)
}
`)
An implicit transform class:
const { c_datum: Datum } = org.objects
return Datum.find().transform(`
class {
constructor() {
this.runtTimestamp = Date.now()
}
each(object) {
object.runtTimestamp = this.runtTimestamp
return object
}
}
`)
A reference to a Library Script that exports an annotated transform class:
const { c_datum: Datum } = org.objects
return Datum.find().transform('c_datum_transformer')
An inline export definition:
const { c_datum: Datum } = org.objects
return Datum.find().transform({
script: `
const { transform } = require('decorators-transform'),
{ Accounts } = org.objects
@transform
class Transformer {
each(object, memo, { cursor, index }) {
return object._id
}
}
module.exports = Transformer
`
})
Transformations can be applied to top-level bulk operations only.
const { accounts: Datum, accounts: Other, accounts: Device } = org.objects,
_id = Device
.readOne({owner: script.principal._id})
.execute()._id // 1 device per account.
return org.objects.bulk()
.add(
Datum.find({ c_device: _id }).limit(10)
)
.add(
Other.find({ c_device: _id }).limit(10).transform('c_some_transform')
)
.transform(`
each(object) {
return object.data // unwrap
}
`)
const { transform } = require('decorators-transform'),
schemas = require('schemas'),
res = require('response')
let Undefined
// @transform
class CSVTransform {
constructor() {
this.quote = '"'
this.doubleQuote = '""'
}
beforeAll(memo) {
const { arguments: { view, viewOptions } } = script,
{ paths, object } = viewOptions
memo.fields = paths || schemas.read(object, 'properties').filter(v => !v.optional).map(v => v.name)
res.setHeader('Content-Type', 'text/csv')
res.write('#view: ' + JSON.stringify(view) + '\n')
res.write('#viewOptions: ' + JSON.stringify(viewOptions) + '\n')
res.write(memo.fields.join(','))
}
each(object, memo) {
res.write(
'\n' +
memo.fields.reduce((row, field) => {
row.push(this.toCSV(object[field]))
return row
}, []).join(',')
)
}
toCSV(value) {
if (value === null || value === Undefined) {
return Undefined
}
const valueType = typeof value
if (valueType !== 'boolean' && valueType !== 'number' && valueType !== 'string') {
value = JSON.stringify(value)
if (value === Undefined) {
return Undefined
}
if (value[0] === this.quote) {
value = value.replace(/^"(.+)"$/, '$1')
}
}
if (typeof value === 'string') {
if (value.includes(this.quote)) {
value = value.replace(new RegExp(this.quote, 'g'), this.doubleQuote)
}
value = this.quote + value + this.quote
}
return value
}
}
module.exports = CSVTransform
const { c_datum: Datum } = org.objects
return Datum.find().transform(`
quote = '"'
doubleQuote = '""'
res = require('response)
beforeAll(memo) {
const schemas = require('schemas'),
{ arguments: { cursorOptions: { Object } } } = script
memo.fields = schemas.read(object, 'properties').filter(v => !v.optional).map(v => v.name)
this.res.setHeader('Content-Type', 'text/csv')
this.res.write(memo.fields.join(','))
}
each(object, memo) {
this.res.write(
'\\n' +
memo.fields.reduce((row, field) => {
row.push(this.toCSV(object[field]))
return row
}, []).join(',')
)
}
toCSV(value) {
if (value === null || value === undefined) {
return undefined
}
const valueType = typeof value
if (valueType !== 'boolean' && valueType !== 'number' && valueType !== 'string') {
value = JSON.stringify(value)
if (value === undefined) {
return undefined
}
if (value[0] === this.quote) {
value = value.replace(/^"(.+)"$/, '$1')
}
}
if (typeof value === 'string') {
if (value.includes(this.quote)) {
value = value.replace(new RegExp(this.quote, 'g'), this.doubleQuote)
}
value = this.quote + value + this.quote
}
return value
}
`)
This example triggers the policy on any individual account read
// Triggers on GET /account(s)?/([a-fa-f0-9]{24})
const { transform } = require('decorators-transform'),
{ equalIds } = require('util.id')
let Undefined
@transform
class PolicyTransform {
// throw generic opaque error
error(err) {
throw Fault.create('app.error.accountAccess')
}
// throw access error even if the caller has access to the account
// this might be the result of privileged JWT.
result(result) {
if (!equalIds(script.principal._id, result._id)) {
throw Fault.create('app.accessDenied.account')
}
// add the policy triggers to the result for fun and profit
// in this case: ['methods', 'paths']
result.policyTriggers = script.arguments.policyOptions.triggered
return result
}
}
module.exports = PolicyTransform
Last modified 1yr ago