Copy Files Between S3 Locations

The simplest pipeline that can be deployed is one that copies data as it arrives from one location to another location.

In this example, we will deploy a boundary to listen for data arrivals in the specified bucket and publish events about the data’s availability. And also deploy an arc that will copy the arrived files when it sees the boundary availability event.

This example does not create and manage a new bucket, either create one manually, or deploy the Manage an AWS S3 Bucket how-to guide.

AWS CLI

Some tasks in this example require the AWS cli (command line interface) to be installed. It can be found here: https://aws.amazon.com/cli/

To create a bucket:

# pick a region and use it consistently in this guide
$ aws s3 mb s3://mybucket --region us-east-2
Prerequisites

It is assumed the cls cli tool is installed. And that the current cloud placement has been bootstrapped.

To initialize a new project, in a new directory call:

$ cls show model --model deployable > project-copy.json
project-copy.json
{
  "project" : {
    "name" : null,
    "version" : null
  },
  "placement" : {
    "provider" : null,
    "stage" : null,
    "account" : null,
    "region" : null
  },
  "resources" : [ ],
  "activities" : [ ],
  "boundaries" : [ ],
  "barriers" : [ ],
  "arcs" : [ ]
}

Review the Manage an AWS S3 Bucket how-to guide learn about the project and placement properties.

Next we need to declare the Boundary. Boundaries listen for new data arrivals.

To see all available Boundary components, call:

$ cls show boundary --list

We want the json model for aws:core:s3PutListenerBoundary. This Boundary will listen for new data arrivals in the specified bucket under the specified prefix.

$ cls show boundary --model aws:core:s3PutListenerBoundary --required
{
  "type" : "aws:core:s3PutListenerBoundary",
  "name" : null,
  "dataset" : {
    "name" : null,
    "version" : null,
    "pathURI" : null
  },
  "lotUnit" : null,
  "eventArrival" : "infrequent"
}

Paste the results into the boundaries JSON property of the project.

project-copy.json
{
  "boundaries": [
    {
      "type" : "aws:core:s3PutListenerBoundary",
      "name" : null,
      "dataset" : {
        "name" : null,
        "version" : null,
        "pathURI" : null
      },
      "lotUnit" : null,
      "eventArrival" : "infrequent"
    }
  ]
}

Set the name to something short and meaningful. This will be used as a component of any cloud resources created. If a project declares more than one boundary, each boundary must have a unique name within the project.

Boundaries catalog data as it arrives into a dataset. A dataset has a name, version, and pathURI (the location of the data). When data arrives, it’s cataloged into a manifest file, and the availability of this manifest is published as an event to downstream subscribers. Subscribers subscribe to a dataset by supplying the dataset name and version it wants to receive events for.

For the pathURI, use the bucket created above with a prefix named /source/: s3://mybucket/source/.

Every manifest contains data that arrived in the current interval unit of time. We call this interval a lot. If a lot is 5 minutes in duration, a manifest will be published every 5 minutes cataloging all the data that arrived within that 5 minute interval.

The currently supported intervals are:

  • Twelfths - 5 minute interval

  • Sixths - 10 minute interval

  • Fourths - 15 minute interval

Set the lotUnit to the desired interval name.

The interval specified here represents the tick of the workflow clock. All downstream arcs listening for events from this dataset will be triggered on this interval.

Next set eventArrival to frequent. This will allow for multiple objects to arrive within the interval. It will also send an event if no objects arrive within the interval. Use infrequent if you only want to send an event if a single file may arrive in the interval.

The infrequent option is cheaper since the boundary won’t be triggered as often for infrequently arriving data. But this will leave gaps in the data catalog. If you want to ensure all the intervals are cataloged, use frequent. The resulting manifest file will be marked as empty so it’s obvious nothing arrived during that interval.

Finally, we add the workload, or Arc, to the project. The Arc will be triggered by the boundary event and will copy the data from the source to the destination.

If the dataset is a node, the workload is the edge or arc between the nodes. The result is a DAG (directed acyclic graph).

To see all available Arc components, call:

$ cls show arc --list

We want the JSON model for aws:core:s3CopyArc.

$ cls show arc --model aws:core:s3CopyArc --required
{
  "type" : "aws:core:s3CopyArc",
  "name" : null,
  "sources" : { },
  "sinks" : { }
}

Paste the results into the arcs JSON property of the project.

project-copy.json
{
  "arcs": [
    {
      "type" : "aws:core:s3CopyArc",
      "name" : null,
      "sources" : { },
      "sinks" : { }
    }
  ]
}

Again, specify a name for the arc. This will be used as a name component of any cloud resources created.

Arcs consume data, and write data. They consume data when new data they subscribe to arrives. They publish new availability events after writing data. Every invocation of an arc is against the current lot interval specified in the event message.

Arcs are not constrained to a single source or sink dataset. But in this example, we will listen to a single dataset as the source, and copy and publish to a single dataset as the sink.

For the source, copy the JSON from the boundary and paste it into the source property of the arc.

You can also look up the model JSON via:

$ cls show model --model SourceDataSet --required
"sources" :
 {
    "main" :
    {
      "name" : null,
      "version" : null
    }
 }

Since we can have more than one source dataset, we name them. Above we created a single source dataset named main. This name is only used to reference the dataset within the arc.

For the sink, we declare a new dataset.

$ cls show model --model SinkDataSet --required

Paste the results into the sinks property of the arc under the main key.

"sinks" :
 {
    "main" :
    {
      "name" : null,
      "version" : null,
      "pathURI" : null
    }
 }

The dataset properties follow the same rules as the dataset declared in the above boundary. Any arc listening for the availability of data created by this arc will use these values to subscribe.

For the pathURI, use the bucket created above with a prefix named /sink/: s3://mybucket/sink/.

That’s it! The project is complete.

To verify the project is valid, call:

$ cls verify -p project-copy.json

Finally, to deploy the project:

$ cls deploy -p project-copy.json

The following commands will list all currently deployed placements and projects:

$ cls placements
$ cls projects

Now we need to upload some data to the source bucket.

Remember to use the same bucket name as the one specified in the project.
$ echo "Hello World" > hello.txt
$ aws s3 cp hello.txt s3://mybucket/source/

In a few minutes, the file will be copied to the sink location.

$ aws s3 ls s3://mybucket/sink/

The following commands will summarize the status of all currently deployed arcs and datasets.

$ cls arcs status
$ cls datasets status
It is recommended at this point to log into your AWS Console and browse the new S3 buckets created to store clusterless metadata, and the deployed AWS Step Functions state machines.

Finally, destroy the project.

$ cls destroy -p project-copy.json

The bucket should be removed if removeOnDestroy was set to true.