Pipeline

tess expects a JSON formatted "pipeline" file that declares the sources, sinks, and transforms to be run.

Some values in the pipeline file can be overridden by command line options.

Pipeline Declaration Format

Print Pipeline Template
tess --print-pipeline
{
  "source" : {
    "inputs" : [ ], (1)
    "schema" : {
      "declared" : [ ], (2)
      "format" : null, (3)
      "compression" : "none", (4)
      "embedsSchema" : false (5)
    },
    "partitions" : [ ] (6)
  },
  "transform" : [ ], (7)
  "sink" : {
    "output" : null, (8)
    "schema" : {
      "declared" : [ ], (9)
      "format" : null, (10)
      "compression" : "none", (11)
      "embedsSchema" : false (12)
    },
    "partitions" : [ ] (13)
  }
}
1 URLs to read from, required
2 Schema fields to declare, required if not embedded or type information should be declared
3 Format type
4 Compression type
5 Whether the schema is embedded in the files (has headers)
6 Partitions to parse into fields
7 Transforms to apply to the data
8 URL to write to, required
9 Schema fields to declare, by default all fields are written
10 Format type
11 Compression type
12 Whether the schema should be embedded in the files (add headers)
13 Partitions to write out

To view all pipeline options:

Print Complete Pipeline Template
tess --print-pipeline all

Pipeline File Overrides

Some command line options are merged at runtime with the pipeline JSON file. Command line options take precedence over the pipeline JSON file.

Overriding command line options include

  • --inputs

  • --input-manifest

  • --input-manifest-lot

  • --output

  • --output-manifest

  • --output-manifest-lot

Pipeline Template Expressions

In order to embed system properties, environment variables, or other provided intrinsic values, MVEL templates are supported in the pipeline JSON file.

Provided intrinsic values include:

env[…​]

Environment variables.

sys[…​]

System properties.

source.*

Pipeline source properties.

sink.*

Pipeline sink properties.

pid

ProcessHandle.current().pid().

rnd32

Math.abs(random.nextInt()) always returns the same value.

rnd64

Math.abs(random.nextLong()) always returns the same value.

rnd32Next

Math.abs(random.nextInt()) never returns the same value.

rnd64Next

Math.abs(random.nextLong) never returns the same value.

hostAddress

localHost.getHostAddress().

hostName

localHost.getCanonicalHostName().

currentTimeMillis

now.toEpochMilli().

currentTimeISO8601

now.toString() at millis precision.

currentTimeYear

utc.getYear().

currentTimeMonth

utc.getMonthValue() zero padded.

currentTimeDay

utc.getDayOfMonth() zero padded.

currentTimeHour

utc.getHour() zero padded.

currentTimeMinute

utc.getMinute() zero padded.

currentTimeSecond

utc.getSecond() zero padded.

Where:

  • Random random = new Random()

  • InetAddress localHost = InetAddress.getLocalHost()

  • Instant now = Instant.now()

  • ZonedDateTime utc = now.atZone(ZoneId.of("UTC"))

For example:

  • @{env['USER']} - resolve an environment variable

  • @{sys['user.name']} - resolve a system property

  • @{sink.manifestLot} - resolve a sink property from the pipeline JSON definition

Used in a transform to embed the current lot value into the output:

{
  "transform": [
    "@{source.manifestLot}=>lot|string"
  ]
}

Or create a filename that prevents collisions but simplifies duplicate removal:

{
  "filename": {
    "prefix": "access",
    "includeGuid": true,
    "providedGuid": "@{sink.manifestLot}-@{currentTimeMillis}",
    "includeFieldsHash": true
  }
}

Will result in a filename similar to access-1717f2ea-20230717PT5M250-1689896792672-00000-00000-m-00000.gz.