Pipeline
tess
expects a JSON formatted "pipeline" file that declares the sources, sinks, and
transforms to be run.
Some values in the pipeline file can be overridden by command line options.
Pipeline Declaration Format
tess --print-pipeline
{
"source" : {
"inputs" : [ ], (1)
"schema" : {
"declared" : [ ], (2)
"format" : null, (3)
"compression" : "none", (4)
"embedsSchema" : false (5)
},
"partitions" : [ ] (6)
},
"transform" : [ ], (7)
"sink" : {
"output" : null, (8)
"schema" : {
"declared" : [ ], (9)
"format" : null, (10)
"compression" : "none", (11)
"embedsSchema" : false (12)
},
"partitions" : [ ] (13)
}
}
1 | URLs to read from, required |
2 | Schema fields to declare, required if not embedded or type information should be declared |
3 | Format type |
4 | Compression type |
5 | Whether the schema is embedded in the files (has headers) |
6 | Partitions to parse into fields |
7 | Transforms to apply to the data |
8 | URL to write to, required |
9 | Schema fields to declare, by default all fields are written |
10 | Format type |
11 | Compression type |
12 | Whether the schema should be embedded in the files (add headers) |
13 | Partitions to write out |
To view all pipeline options:
tess --print-pipeline all
Pipeline File Overrides
Some command line options are merged at runtime with the pipeline JSON file. Command line options take precedence over the pipeline JSON file.
Overriding command line options include
-
--inputs
-
--input-manifest
-
--input-manifest-lot
-
--output
-
--output-manifest
-
--output-manifest-lot
Pipeline Template Expressions
In order to embed system properties, environment variables, or other provided intrinsic values, MVEL templates are supported in the pipeline JSON file.
Provided intrinsic values include:
env[…]
-
Environment variables.
sys[…]
-
System properties.
source.*
-
Pipeline source properties.
sink.*
-
Pipeline sink properties.
pid
-
ProcessHandle.current().pid()
. rnd32
-
Math.abs(random.nextInt())
always returns the same value. rnd64
-
Math.abs(random.nextLong())
always returns the same value. rnd32Next
-
Math.abs(random.nextInt())
never returns the same value. rnd64Next
-
Math.abs(random.nextLong)
never returns the same value. hostAddress
-
localHost.getHostAddress()
. hostName
-
localHost.getCanonicalHostName()
. currentTimeMillis
-
now.toEpochMilli()
. currentTimeISO8601
-
now.toString()
at millis precision. currentTimeYear
-
utc.getYear()
. currentTimeMonth
-
utc.getMonthValue()
zero padded. currentTimeDay
-
utc.getDayOfMonth()
zero padded. currentTimeHour
-
utc.getHour()
zero padded. currentTimeMinute
-
utc.getMinute()
zero padded. currentTimeSecond
-
utc.getSecond()
zero padded.
Where:
-
Random random = new Random()
-
InetAddress localHost = InetAddress.getLocalHost()
-
Instant now = Instant.now()
-
ZonedDateTime utc = now.atZone(ZoneId.of("UTC"))
For example:
-
@{env['USER']}
- resolve an environment variable -
@{sys['user.name']}
- resolve a system property -
@{sink.manifestLot}
- resolve a sink property from the pipeline JSON definition
Used in a transform to embed the current lot
value into the output:
{
"transform": [
"@{source.manifestLot}=>lot|string"
]
}
Or create a filename that prevents collisions but simplifies duplicate removal:
{
"filename": {
"prefix": "access",
"includeGuid": true,
"providedGuid": "@{sink.manifestLot}-@{currentTimeMillis}",
"includeFieldsHash": true
}
}
Will result in a filename similar to access-1717f2ea-20230717PT5M250-1689896792672-00000-00000-m-00000.gz
.