From tasks handle all data imports to your data lake. You can configure the items below. Click on the name for more info.
The `extract` part of the configuration considers the extract part of a task. The vocabulary of the datasource is leading in this part. Will Workflows talk about fields and records, it will use columns and rows here if the datasource uses those.
All from tasks come with pre-formatted schemas where possible. Examples of these are from_facebook or from_xandr. Tasks where a pre-formatted schema is not possible, we give you the opportunity to create one. Examples of these tasks are from_imap (which imports mail attachments) or from_aws_s3 (which imports files from Amazon S3)
New task types are added to Workflows on a monthly basis. Currently Workflows supports the following from
tasks:
The file part of the configuration contains the definition of the data file being downloaded. Use for from task where the format is not pre-defined. E.g. with from_aws_s3 to download a file.
file:
type: txt
transforms:
- type: uncompress
delimiter: ;
encoding: UTF-8
text_enclosure: '"'
new_line: '\n'
escape_char: '\'
has_header_row: yes
skip_rows: 9
skip_bottom_rows: 8
property | type | required | description |
---|---|---|---|
type | enumerator (txt, json, xml, parquet, avro) | yes | File type. Use txt for delimited files, json for newline delimited json, xml for XML, parquet for Parquet files and avro for AVRO files. |
node | string | yes | Only when type=xml. Node to take as the root node. |
delimiter | char | no | Only when type=txt. Default is comma(,). Contains the char that delimits the fields. |
encoding | enumerator (valid encoding type) | no | Only when type=txt. Default is UTF-8 . Use when you need a different encoding type. |
has_header_row | yesno (boolean) | no | Use when type=txt. Default is no . If file contains a header or not. |
text_enclosure | char | no | Use when type=txt. Default is " . Quoting character to enclose fields. |
new_line | char | no | Use when type=txt. Default is \n . Newline character. Use \n for unix based systems, or \n\r for Windows based systems. |
escape_char | char | no | Use when type=txt. Default is \ . Escape character being used in text files. |
quoting | enumerator (minimal, all, nonnumeric, none) | no | Use when type=txt. Default is minimal . How verbose is the quoting in the file? all = All fields, nonumeric = Only non-numeric fields, minimal = Only when needed (default), none = Never use quotes. |
skip_rows | int (>= 0) | no | Default is 0 . Amount of rows to skip at the top of the file. Happens before has_header_row is executed. |
skip_bottom_rows | int (>= 0) | no | Default is 0 . Amount of rows to skip at the bottom of the file. |
The `transforms` part of the configuration enables you to manipulate data before you load it in your data lake. You have to add all transform as an array.
transforms:
# Field transforms
- type: fields_to_snake_case
- type: fields_to_lower_case
- type: fields_regexp_and_replace
search: '_'
replace: ''
Below are the types of transforms. You have to add them as an array. Below is an explanation per transform on how to use it exactly.
Adds the current date (trigger_date) as a field.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to add_current_date_field |
field | string | yes | Field name of the new column, e.g. trigger_date |
Converts the type of a field. Only useful with BigQuery and no schema is set.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to convert_type |
fields | array of strings | yes | Array of field names |
to | enumerator (supported field type of BigQuery) | yes | Type the field should be transformed to |
Rename a field
property | type | required | description |
---|---|---|---|
type | string | yes | Set to rename_field |
from | string | yes | Current name of the field |
to | string | yes | Name the field should be renamed to |
Duplicate a field.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to copy_field |
from | string | yes | Current name of the field |
to | string | yes | Name the field should be duplicated to |
Drops fields.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to drop_fields |
fields | array of strings | yes | Array of field names the should be dropped. |
Encrypt field content. Support for several hash algorithms.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to encrypt |
encryption | enumerator (sha1, md5, sha1, sha224, sha256, sha384, sha512) | yes | Hash algorithm that should be used to hash the content of a field. |
fields | array of strings | yes | Array of field names the should be dropped. |
Search and replace a field name.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to fields_search_and_replace |
search | string | yes | String to search. |
replace | string | yes | String to replace search string with. |
Transform field names to snake_case. E.g. OneSecondBefore to one_second_before. Also replaces the field names in a schema.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to fields_to_snake_case |
Transform field names to lower case. E.g. ONESECONDBEFORE to onesecondbefore. Also replaces the field names in a schema.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to fields_to_lower_case |
Regular expression replacement of field names.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to fields_regexp_and_replace |
search | string | yes | Regular expression to search. |
replace | string | yes | Regular expression to replace found string with. |
Filter records from incoming dataset.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to filter_records |
fields | array of strings | yes | Array of field names on which the filter must be applied. |
value | string | yes | If a field contains this value, the whole record will be filtered. |
filter | enumerator (include, exclude) | yes | Use include if you want all records to be included when the field(s) contain the value. Use exclude if you want all records to be excluded when the field(s) contain the value. |
Find all occurrences of a regular expression and return an array of matches. Based on Python's re.findall
property | type | required | description |
---|---|---|---|
type | string | yes | Set to find_all |
fields | array of strings | yes | Array of field names on which the filter must be applied. |
search | string | yes | Regular expression to search for in the field(s). |
Find the first occurrence of a regular expression and the match. Based on Python's re.findall
property | type | required | description |
---|---|---|---|
type | string | yes | Set to find_first |
fields | array of strings | yes | Array of field names on which the filter must be applied. |
search | string | yes | Regular expression to search for in the field(s). |
Regular expression replacement of field data.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to regexp_and_replace |
fields | array of strings | yes | Array of field names on which the filter must be applied. |
search | string | yes | Regular expression to search. |
replace | string | yes | Regular expression to replace found string with. |
Conditionally replace field data with null value.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to replace_with_null |
fields | array of strings | yes | Array of field names on which the filter must be applied. |
values | array of strings | yes | If the field value matches any of the strings in this array, the field will be replaced with a null value. |
Search and replace a substring in field data.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to search_and_replace |
search | string | yes | Array of field names on which the filter must be applied. |
fields | array of strings | yes | If the field value matches any of the strings in this array, the field will be replaced with a null value. |
Parse a field value as a time string. Uses Python's datetime.strptime.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to strptime |
format | string | yes | Format of the string that should be parsed as a date & time. Use these format codes |
fields | array of strings | yes | If the field value matches any of the strings in this array, the field will be replaced with a null value. |
Don't use a schema for the API's that use pre-formatted schemas. The extract section of the task type you chose contains this information. The `schema` part of the configuration is used to create the destination table. Obliged for Snowflake, but optional for BigQuery.
schema:
description: You can put a table description here
time_partitioning:
field: date_field
type: day
require_partition_filter: yes
fields:
- name: date_field
type: DATE
mode: REQUIRED
description: Description for date_field will be visible as field comment. Field will be used for partitioning in BigQuery and clustering in Snowflake.
- name: some_string_field
type: STRING
mode: NULLABLE
description: Description for some_string_field will be visible as field comment.
- name: some_int_field
type: INTEGER
mode: NULLABLE
description: Description for some_int_field will be visible as field comment.
- name: some_repeated_field
type: OBJECT
mode: REPEATED
description: Description for some_int_field will be visible as field comment.
fields:
- name: date_field
type: DATE
mode: REQUIRED
description: Description for date_field will be visible as field comment. Field will be used for partitioning in BigQuery and clustering in Snowflake.
- name: some_string_field
type: STRING
mode: NULLABLE
description: Description for some_string_field will be visible as field comment.
Obliged. Array of table fields.
property | type | required | Snowflake support | BigQuery support | description |
---|---|---|---|---|---|
name | string | yes | yes | yes | Field name. Use either valid names for BigQuery or Snowflake. |
type | enumerator (valid data type) | yes | yes | yes | SQL data type of field. Use only valid types of BigQuery or Snowflake |
mode | enumerator (nullable, required, repeated) | no | no | yes | BigQuery only. Define the mode of this field. Nullable = may be NULL, required = may not be NULL, repeated = array. |
description | string | no | yes | yes | Description of the table field. Is added in Snowflake as a column comment during table creation. Is updated with every load in BigQuery. |
fields | array of fields | no | no | yes | Contains nested fields. Use mode=repeated if this should be an array of fields. Use same format for fields as described here. |
Optional. Time partitioning considers either clustering (for Snowflake) or time partitioning (for BigQuery).
property | type | required | Snowflake support | BigQuery support | description |
---|---|---|---|---|---|
field | string | yes | yes | yes | Field name. Must be present in the field definitions |
type | enumerator (hour, day, month, year) | yes | yes | yes | Timeframe of the partition. Default is day. Per hour, day, month or year. |
require_partition_filter | yesno (boolean) | yes | no | yes | Default is no. BigQuery only. If the partition filter should be required in your SQL. |
Extra properties that can be added during table creation.
property | type | Snowflake support | BigQuery support | description |
---|---|---|---|---|
description | string | yes | yes | Table description. Is added during table creation and update. In Snowflake the description is added to the COMMENT and suffixed with the task TAGS that last touched the table. |
Optional. Deduplicate the destination table to make sure you don't import the same data twice. Configure once, deduplicates automatically. If no deduplication is added, all imported data will be added to the destination table.
You can deduplicate a storage or table destination. Click on a deduplicate type for an explanation.
Below are the types of deduplicate. Below is an explanation per deduplicate type.
deduplicate:
type: delete_all
Default setting. No deduplication. Adds all files to the destination storage location. Overwrites if the source files have the same name as the destination files.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to none |
Deletes all files in the destination storage location.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to delete_all |
Below are the types of deduplicate. Below is an explanation per deduplicate type.
deduplicate:
type: date_range
field: booking_date
Default setting. No deduplication. Appends all data to the destination table.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to append |
Preferred over type=truncate. Drops destination table and recreates the table (with latest fields and descriptions) before importing new data.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to drop |
with_loop | enumerator (all, once) | yes | Only needed when task.loop_by is set. Use all to drop and recreate the table with every loop. Use once to only drop and recreate the table with the first loop. |
Prefer type=drop over truncate. Truncates (or creates if not exists) destination table before importing new data.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to truncate |
with_loop | enumerator (all, once) | yes | Only needed when task.loop_by is set. Use all to drop and recreate the table with every loop. Use once to only drop and recreate the table with the first loop. |
Deletes the date range from deduplicate.start_date (or task.start_date) till deduplicate.end_date (or deduplicate.end_date) (inclusive) before importing new data. Currently only supports deletion by days. Contact support if you want a different timeframe, e.g. hour.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to date_range |
field | string | yes | Name of the date field. |
Basically a merge of the imported dataset into the destination table. First deletes all records in the destination table if the key(s) value(s) are present in the imported dataset. Then appends the imported dataset.
property | type | required | description |
---|---|---|---|
type | string | yes | Set to date_range |
field | string | yes | Name of the date field. |
Optional. Configures where in the data lake the imported data should be stored.
Below are the types of load. Click on a load type for an explanation.
Load imported data into a BigQuery table.
Example of loading the data files to table `my-data-lake.osb_raw.weather-data`
load:
destination: database
conn_id: google_cloud_default
project_id: my-data-lake
dataset_id: osb_raw
table_id: weather-data
property | type | required | description |
---|---|---|---|
destination | enumerator (database, storage) | no | Default is database |
conn_id | string | no | Name of the connection. If not declared, client_cloud.db_conn_id is used. |
project_id | string | no | Project ID of the destination table. If not declared, client_cloud.project_id is used. |
dataset_id | string | no | Dataset ID of the destination table. If not declared, client_cloud.dataset_id is used. |
table_id | string | no | Table ID of the destination table. If not declared, task.id is used. |
table_expiration | (relative) date or datetime | no | Sets expiration date of the table. If not declared, client_cloud.table_expiration is used. Set it to None if you don't want a default table_expiration, e.g. for your production tables. Recommended to add this to the sandbox files to make sure your sandbox tables will be deleted automatically. Besides absolute dates (like 2021-09-27), you can use relative dates as well. |
max_bad_records | integer (>= 0) | no | Maximum amount of bad records before the load operation fails. Must be an integer of 0 or greater. |
autodetect_schema | yesno (boolean) | no | BigQuery can detect the schema of a file being loaded. Is automatically set to yes if no schema is available. |
Load imported data into a Snowflake table.
Example of loading the data files to table PRODUCTION.OSB_RAW.WEATHER_DATA
load:
destination: database
conn_id: snowflake
database: PRODUCTION
schema: OSB_RAW
table: WEATHER_DATA
property | type | required | description |
---|---|---|---|
destination | enumerator (database, storage) | no | Default is database |
conn_id | string | no | Name of the connection. If not declared, client_cloud.db_conn_id is used. |
database | string | no | Database of the destination table. If not declared, client_cloud.database is used. |
schema | string | no | Schema of the destination table. If not declared, client_cloud.schema is used. |
table | string | no | Table of the destination table. If not declared, task.id.upper() is used. |
max_bad_records | integer (>= 0) | no | Maximum amount of bad records before the load operation fails. Must be an integer of 0 or greater. |
Example of loading the data files to s3://my-test-bucket/some/folder
load:
destination: storage
conn_id: s3
bucket: my-test-bucket
folder: some/folder
Load imported data to Amazon S3, Google Cloud Storage or Azure Blob Storage (in beta).
property | type | required | description |
---|---|---|---|
destination | enumerator (database, storage) | yes | Set to storage |
conn_id | string | no | Name of the connection. If not declared, client_cloud.storage_conn_id is used. |
bucket | string | no | Name of the bucket. If not declared, client_cloud.bucket is used. |
location | string | no | Name of the location. Please add extension as well. If not declared, client_cloud.folder is used. |
compression | enumerator(gzip, deflate) | no | Compression type |
export_format | enumerator(json, csv, avro) | no | File format |
project_id | string | no | BigQuery only. Project ID of the bucket. If not declared, client_cloud.project_id is used. |
The `validate` part of the configuration enables you to check the data you just imported. If the imported data does not pass the validation an error will be generated.
Below are the validation types. You have to add each validation as an array. Below is an explanation per validation on how to use it exactly.
Set this property to the query you want to execute. Make sure the resultset is flattened (only rows and columns, not nested or repeated columns).
The query
property is obliged for the first validation, but optional for all subsequent validations. If you choose
not to add query
to the subsequent validations, Workflows will use the same resultset for the validation until it
hits another query
property. If you add a query
property to any of the subsequent validations, Workflows will execute
the query, and use the resultset in that validation and the subsequent validations until it hits a another query
property again.
You are allowed to use SQL Templating.
Validates a number field.
The examples validates if the value of all rows in the resultset of column lines is greater than (>) the value (1000).
The example uses SQL Templating, hence the {{
and }}
.
validate:
- query: |
SELECT my_segment, COUNT(1) AS lines
FROM `{{ load.project_id }}.{{ load.dataset_id }}.{{ load.table_id }}`
GROUP BY 1
type: number
operator: '>'
value: 1000
field: lines
property | type | required | description |
---|---|---|---|
query | string | required for first validation | Query to execute. See special property: query for more info. |
type | string | yes | Set to number |
field | string | yes | Field name that will be validated |
operator | enumerator (<=, <, ==, >, >=) | yes | Operator to use in the validation |
value | integer or float | yes | Value to match the field against. |
Validates the number of rows of a resultset.
The examples validates if the amount of rows equals (==) eight.
lines is greater than (>) the value (1000).
The example uses SQL Templating, hence the {{
and }}
.
validate:
- query: |
SELECT my_segment, COUNT(1) AS transactions
FROM `{{ load.project_id }}.{{ load.dataset_id }}.{{ load.table_id }}`
GROUP BY 1
type: total_rows
operator: '=='
value: 8
property | type | required | description |
---|---|---|---|
query | string | required for first validation | Query to execute. See special property: query for more info. |
type | string | yes | Set to total_rows |
operator | enumerator (<=, <, ==, >, >=) | yes | Operator to use in the validation |
value | integer or float | yes | Value to match the field against. |
Validates if the value of a field is yes
(True
) in all rows.
The examples validates if the value of field has_enough_transactions
is yes
(True
).
The example uses SQL Templating, hence the {{
and }}
.
validate:
- query: |
SELECT COUNT(1) > 1000 AS has_enough_transactions
FROM `{{ load.project_id }}.{{ load.dataset_id }}.{{ load.table_id }}`
type: yesno
field: has_enough_transactions
property | type | required | description |
---|---|---|---|
query | string | required for first validation | Query to execute. See special property: query for more info. |
type | string | yes | Set to yesno |
field | string | yes | Field name of which the value must be yes (True ) |