Introduction
The Data Export pipeline allows for the export of client data from our data warehouse to a GCS bucket from which clients can retrieve the data. This document is intended to highlight the base tables provided, the features, and the caveats of this pipeline.
Tables Provided
This section aims to describe on a high level what data the pipeline provides.
All tables described have a daily push frequency. The fields provided are configurable.
2.1 Purchases
This is all data relevant to a particular purchase, such as the member who made the purchase, payment method, total amount, etc. Pushes data from the previous day in the local timezone.
2.2 Purchases, Itemized
This is similar to the purchases table data except broken down by the items purchased. Note that it is broken down by the different items purchased, not for the number of items purchased. For example, a purchase of 3 croissants, and a coffee will show as two rows in this table.
2.3 User Actions
Provides data on actions a member may take or may occur to them.
Some salient examples are Joined Club (when they joined the loyalty program),
Punch, Received Asset, Redeem Asset, Point Transaction, Member Email, etc.
Pushes data from the previous day in the local timezone.
2.4 Memberships
A Type 1 slowly changing dimension table. Provides all members information such as name, phone number, email, date joined the loyalty program, etc. Changes are logged as a user action. Consists of data of last updated members from the previous day on wards in UTC.
2.5 Assets
A Type 2 slowly changing dimension table. Hosts data on the assets templates such as Deals, Discounts, and Gifts.
Hosting & Retrieval
3.1 Hosting
All data is pushed in the CSV file format and all files will be hosted on a google cloud storage (GCS) bucket.
3.2 Retrieval
Data will be retrieved from GCS directly using Google’s SDKs. We will provide a JSON file with a key to access the service account, along with the project name and bucket name. A Python example script is provided at the end of this document.
Additional Considerations
4.1 Enterprise accounts can receive one file per table, concatenating data across businesses.
4.2 If a file exceeds the size of one gigabyte, it will be sharded into several smaller files.
4.3 The data will synchronize once a day and will be available for 30 days. After 30 days, the data will be deleted.
Client Requirements
The client needs to determine the sufficiency of the fields provided and the actions for the user action table, before initiating the pipeline. Should they wish to concatenate multiple businesses they need to inform us whether they wish each business to be separated, or for each table to have all businesses.
Prefix file names
The names of the tables and date format, as provided, allow the business to organize tables according to the received data.
assets_yyyymmdd.csv
memberships_yyyymmdd.csv
purchases_yyyymmdd.csv
purchaseitems_yyyymmdd.csv
useraction_yyyymmdd.csv
Additional information
The following code snippet in Python may be used to retrieve the files from GCS directly.
import os from google.cloud import storage from datetime import datetime
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'PATH_TO_CREDENTIALS'
storage_client = storage.Client('project_name') blobs = storage_client.list_blobs('bucket_name')
# Download all files to current directory
def download_blobs_to_current_directory(): for blob in blobs:
blob.download_to_filename('./' + blob.name.split('/')[-1]) print(f'Downloading: {blob.name.split("/")[-1]}')
def download_blob_to_dir(blob): path = '/'.join(blob.name.split('/')[:-1]) download_str = f'Downloading: {blob.name}' if os.path.exists(path):
print(download_str)
blob.download_to_filename(blob.name) else:
print(f'Making Dir: {path}') os.makedirs('/'.join(blob.name.split('/')[:-1])) print(download_str) blob.download_to_filename(blob.name)
# Download files using the same directory structure used in GCS
def download_blobs_to_dir(): for blob in blobs:
download_blob_to_dir(blob)
# Download file if you don't already have it
def download_blobs_if_file_not_exists(): for blob in blobs:
if os.path.exists(blob.name): print(f'{blob.name} already exists')
else: download_blob_to_dir(blob)
# Download all blobs that were created today
def download_blobs_today(): for blob in blobs:
if datetime.utcnow().date() == blob.time_created.date(): download_blob_to_dir(blob)