Taking Models from Data Scientists to Users(Part 6): Joining the Pipeline - II
Building the full Data Pipeline - II
This is a series of write-up on a modern data pipeline implementaiton with case study of Tyco Cam security solutions using various data tooling features like Pacyderm, Kubernetes, Docker, Tensorflow and Storage infrastructre like AWS
Table of Contents
- Intro - Analytics and Deployment: Why should Data Scientist care about Production?
- Cloud Locations: Case for Objects Storage
- Model and Framework selection in Production: A Case of Object Detection with TensorFlow
- Building the full Data Pipeline - I
- Container- boxing code dependencies with Docker
- Building the full Data Pipeline - II
- Update, Maintain and Scale your Data Science Pipeline
In the series, we previously talked about various components and how different stages fit together to form a data pipeline. This sixth post first gives an introduction to the revolutionary taskmaster in our production environment - Kubernetes and then proceed to demonstrate step-by-step how to possibly implement a running data pipeline of Tyco Object Detection in Google Cloud.
Kubernetes - Why?
The processing infrastructure from GCS will be operated and managed by Kubernetes(also known as K8s; origin from Greek meaning pilot). In layman’s terms, Kubernetes is that member of your team who can deploy or undeploy a new machine-cluster when traffic fluctuates. But of course, it is much more than that. As a platform, K8s it is the container orchestration tool that provides
- visualizer: gives sorted info about running instances, their network utilization

- federator: load-balancing;cloud agnostic controls without intervention
- operator: bootstrap, resize and backup storage mounts, again without intervention
Additionally, it provides individual IPs to pods(pods are the smallest deployable container units by K8s) which provides the ability for applications to discover backing services(like Databases etc.) and basically create a network of containers.
So in nutshell Kubernetes becomes the pilot of your container pods which make them extensible and portable both. It just impressively abstracts away the messy details of cloud providers or in-house data-center to give a seamless experience to the community.
Architecture of pipeline - connecting the dots
In the above architecture:-
- Kubernetes spins up clusters (Node1/Node2..) based on the demand of initial data and subsequent loads. There is a master node which is like an API server that talks to other clusters. K8S maintains a Persistent volume to store metadata in a etcd key-value store system.
- Pachyderm and other container pods are deployed on individual nodes with separate IPs. Multiple Docker images(Pipeline workers) can be put on one node by K8S to internally optimize for performance.
- Pachyderm connects with object storage for data layer thereby maintaining various versions without any overheads
- Pachyderm dashboard allows us to visualize the pipeline in a browser via HTTP connection
Steps to create pipeline
PREREQUISITES:
- Analysis Code
Object Detection: The example model used in the post is coco-trained MobileNets TensorFlow model and will attempt to detect objects in the production environment. Additionally, it has python scripts for
- Validation - whether incoming information are standardized
- Threat Detector - matches classified results with rules defined in rule.json
- Plot - basic Python plotting of number of case observed
We have a ready-made Github repo to get the codes (forked from Daniel Whitman). except for TensorFlow model which we will get from official repo.
- Docker Images
Every processing stage will be deployed as Docker container which is defined with Dockerfile for each stage individually. This contains a layered definition of an image containing dependencies, binaries and codes.
For example:
FROM <model> RUN <python installation> ADD <python script>
GH contains Dockerfile as well. More on Docker container is discussed in here.
-
JSON Pipeline specification A standard way to represent pipeline in Pachyderm is via a standardized JSON file with four main components tags:
- name - unique name to indetify pipeline
- transform - a reference to the Docker image that jobs run in
- parallelism specs - a reference to parallel configuration(constant or K8s cluster)
- input - reference to input repo for data inputs
More on the Pipeline Specification can be found here
-
Storage Infrastructure This example employed Google Cloud Storage for illustration purpose, though, it can be just as easily be deployed on other Cloud servers like AWS/Digital Ocean etc. or in local machine thanks to cloud-agnostic K8S platform
-
Miscellaneous
- Bash Shell (We will be using Linux Subsystem in Windows)
- Sendgrid API key (optional)
STEPS
Following steps are the ones I used to implement it on a windows machine with Google Cloud Storage. Steps are formulated to be as detailed as possible for full reproducibility so if it is too elementary for you - I suggest BREATHE!!
-
Activate Linux Subsystem on Win10 and install Ubuntu app from Windows Store. Open Ubuntu shell. Install Python with
sudo apt-get python
-
Setup GCS account and download SDK in UNIX folder as provided here. You may need to put credit card details even for a free-tier account.
-
Go to the SDK folder and run install.sh as
./install.sh
- Initiate Google Cloud instance with
gcloud init
and use the default configuration and project(can be set-up from Google Cloud portal)~/google-cloud-sdk$ gcloud init Welcome! This command will take you through the configuration of gcloud. Settings from your current configuration [default] are: compute: zone: us-west1-a container: cluster: pach-cluster use_client_certificate: 'True'
- Spin up a Kubernetes Cluster with:
$ gcloud container clusters create ${CLUSTER_NAME} --scopes storage-rw --machine-type ${MACHINE_TYPE} --zone $GCP_ZONE --num-nodes 2 Creating cluster pach-cluster...done. Created [https://container.googleapis.com/v1/projects/amiable-shuttle-187721/zones/us-west1-a/clusters/pach-cluster]. kubeconfig entry generated for pach-cluster. NAME ZONE MASTER_VERSION MASTER_IP MACHINE_TYPE NODE_VERSION NUM_NODES STATUSpach-cluster us-west1-a 1.7.8-gke.0 104.198.10.149 n1-standard-4 1.7.8-gke.0 2 RUNNING
${} are UNIX environmental variables which can be set up by simple assignment I configured 2 nodes of ‘n1-standard-4’ machine which is a 4 virtual CPU machine as described here
- Install different CLI utility to interact with different components
a. install
kubectl
binary with$ gcloud components install kubectl
fromgoogle-cloud-sdk/bin
directory b. installgsutil
- a tool that enables access to GCS from CLI as described here c. installpachctl
- a tool for to make and receive calls from Pachyderm cluster# For Linux (64 bit): $ curl -o /tmp/pachctl.deb -L https://github.com/pachyderm/pachyderm/releases/download/v1.6.5/pachctl_1.6.5_amd64.deb && sudo dpkg -i /tmp/pachctl.deb
- Create a GCS Bucket for Pacyderm with
$ gsutil mb gs://${BUCKET_NAME}
and deploy Pachyderm with$ pachctl deploy google ${BUCKET_NAME} ${STORAGE_SIZE} --dynamic-etcd-nodes=2 --dashboard serviceaccount "pachyderm" created storageclass "etcd-storage-class" created service "etcd-headless" created statefulset "etcd" created service "etcd" created service "pachd" created deployment "pachd" created secret "pachyderm-storage-secret" created Pachyderm is launching. Check its status with "kubectl get all"
$STORAGE_SIZE specifies size of a persistent disk. For demo it can be set to 10 i.e 10 GB. Here 2 etc-nodes stores key-value Pachyderm metadata.
- Clone Git repository from GH account $ git clone https://github.com/anuragsoni9/mgmt690-pipeline.git
- Update
threat-detect.json
to include the SG API in rootmgmt690-pipeline
foder - Update
rule.json
inmgmt690-pipeline/threat-detect
to include your email address
Putting in SendGrid key is an optional step, we will validate completion of this step by plotting the result.
- Now this step is all about setting up of Pachyderm pipeline components. These are setup in Pachyderm and not on Unix so repo created will not be viewable by normal ls.
Create
#Repos $ pachctl create-repo images $ pachctl create-repo model $ pachctl create-repo rules #Pipelines(Run from the root directory) $ pachctl create-pipeline -f validate.json $ pachctl create-pipeline -f object-detect.json $ pachctl create-pipeline -f threat-detect.json $ pachctl create-pipeline -f plot.json
Push
#####################################################################
#Get Model from TensorFlow
$ wget http://download.tensorflow.org/models/object_detection/ssd_mobilenet_v1_coco_11_06_2017.tar.gz
$ tar -xvf ssd_mobilenet_v1_coco_11_06_2017.tar.gz
$ cd ssd_mobilenet_v1_coco_11_06_2017
#Push object-detection model from TF ssd_mobilenet_v1_coco_11_06_2017 to the model repo
$ pachctl put-file model master -c -f frozen_inference_graph.pb
######################################################################
#Push Rule.json
$ pachctl put-file rules master -c -f rule.json
Check At any point in time, running status of process/pods/cluster can be checked by
$ kubectl get pods
$ kubectl get all
$ pachctl list-job
$ pachctl list-pipeline
#$ pachctl inspect-pipeline <pipeline-name>
$ pachctl list-repo
#$ pachctl list-file <repo-name> <commit-id> <path/to/dir [flags]>
$ pachctl list-file rules master
$ pachctl inspect-pipeline threat-detect
- Now is some time for action
Push the input file from test images/
to Pachyderm
$ pachctl put-file images master -c -f dev1_1511891098.jpg
$ pachctl list-job
ID OUTPUT COMMIT STARTED DURATION RESTART PROGRESS DL UL STATE
8bdf8421-81ca-4bad-80c0-e915d7525e54 plot/73f4b873b97a4169919dca8426a5dd76 5 seconds ago 4 seconds 0 1 + 0 / 1 103B 13.67KiB success
f0ede3ad-bc91-49c2-92b9-5a921521c575 threat-detect/ea61ae8995cb4aea9397c2a17280e82c 11 seconds ago 5 seconds 0 1 + 0 / 1 228B 103B success
b75a7963-dfb9-4484-b853-2158228fd6bc object-detect/599fb55fe6e64b99839862dba4a25b42 25 seconds ago 14 seconds 0 1 + 0 / 1 27.89MiB 103B success
be827b4f-3d12-4007-a8a0-8a281930c3fc validate/7bad879539fa484ebf7a87264603ca88 28 seconds ago 3 seconds 0 1 + 0 / 1 62.98KiB 62.98KiB success
- If it ran successfully, you should have gotten an email with body coming in from
threatdetect.py
. Alternatively, if you didn’t put any Sendgrid key for email, you should be able to see the progress on a cool Pachyderm dashboard mentioned in next step.

- Deploy Pachyderm Dashboard on the GCS cluster with
pachctl deploy local --dashboard-only
. Ensure if dashboard pod is running.
To ensure GCS doesn’t block the viewing in your local do following:
- Add Firewall Rule under VPC network with
IP ranges 0.0.0.0/0
andtcp:38080
on Specified protocols and ports - Select
Ingress
for the Direction
Next run, pachctl port-forward &
Now you should be able to get the dashboard running on your browser with localhost:30080


##

Next Step
In the last post of the series, we will go over some strategies and steps needed to update, maintain and scale the pipeline. We will also discuss about the possible enhancements that can be implemented here and compare it with Hadoop infrastructure.