Data Engineering — Building an ETL Pipeline for NYPD Arrest Data

using Python, Snowflake, and Power BI (part 1)

Sarthak Girdhar
9 min readFeb 10, 2024
Photo by Quinten de Graaf on Unsplash

Introduction

New York Police Department (NYPD) releases a breakdown of every arrest made in New York City (NYC) each quarter. This article/project analyzes the arrest data for 2023. Each record represents an arrest and includes information like, the type of crime, the location, suspect demographics, etc.

This article is divided into two parts. You can read the part 2 here.

In the first part, we will:-

  1. Write code in Python to extract arrest data from NYC Open Data API.
  2. Transform the extracted data (a little bit…more on this later).
  3. Perform data validation.
  4. Load the data into Snowflake.

In the second part, we will:-

5. Do Exploratory Data Analysis (EDA) to get answers for some of our questions.

6. Connect to Power BI and present a report providing some further detailed insights.

Extracting NYPD Arrest Data

The API fetches the data in JSON format. First and foremost, we import the necessary libraries. We also get our own unique API key for making API calls to NYC Open Data. Next, we create an empty pandas dataframe nyc_arrests and append the fetched results into it.

# import libraries
import requests
import pandas as pd
import json

# API endpoint
nyc_open_data_api_key = 'XXXXXXX'
headers={'X-App-Token': nyc_open_data_api_key}

nyc_arrests = pd.DataFrame()

# 'GET' request
for offset in range(0, 230000, 50000):
url = 'https://data.cityofnewyork.us/resource/uip8-fykc.json?$limit=50000&$offset='+str(offset)
response = requests.get(url, headers=headers)

if response.status_code == 200:
data = response.json()
temp_df = pd.DataFrame()
temp_df = pd.json_normalize(data)

nyc_arrests = nyc_arrests.append(temp_df, ignore_index=True)

You will gather from the for loop that I am fetching only 50,000 rows in one API call and there are about 227,000 rows in the dataset provided. I do not have to do that; I could have fetched all the rows in one API call. However, some portals allow only a limited number of records that can be fetched in one call or sometimes, you might need to extract millions of records and that’s a very long process; for those instances, a pagination loop with offset (just as shown above) is what you will need.

The above code will fetch all the rows but 25 columns, as opposed to 19 columns as gathered from the NYPD Arrest Data Dictionary.

nyc_arrests.shape

Let’s take a look at all the column names.

‘nyc_arrests’ columns list

It appears there are some redundant columns which should not be present as per the data dictionary.

Transforming the extracted data

The five columns like “:@computed_region_xxxx_xxxx” do not contain any useful information. In addition, the columns “geocoded_column.type” and “geocoded_column.coordinates” contain redundant information, and therefore, can also be dropped along with the above-mentioned five columns.

# Drop redundant columns from the dataframe
nyc_arrests.drop(nyc_arrests.loc[:, ':@computed_region_f5dn_yrer':'geocoded_column.coordinates'].columns, inplace=True, axis=1)

At this stage, we should also check for data quality (as covered in the next section) and make some other necessary transformations. However, since we are unaware (at this point) how the data will be analyzed/used by the downstream users (aka data analysts, business users), we won’t be doing any more transformations. Having said that,

it is a good practice to communicate your findings in a separate document to the downstream users.

Data Validation

The cheat sheet (depicted below) can serve as a guide for checking the data quality. We will check for completeness, consistency, validity, etc. (as required) for the columns in the nyc_arrests dataframe.

Data quality dimensions
Data quality dimensions cheat sheet by datacamp

Before proceeding, it is also a good idea to once again take a look at the column definitions as per the provided dataset.

1. ARREST_KEY

We’ll check if the arrest_key column has [i] no empty and/or null values, [ii] all values are numeric, and [iii] the values are 9 digits long.

# return indices for empty entries/values
nyc_arrests[nyc_arrests['arrest_key'] == ''].index

# check for NULL values
import numpy as np
np.where(pd.isnull(nyc_arrests['arrest_key']))

# check if all the values are numeric
pd.to_numeric(nyc_arrests['arrest_key'], errors='coerce').notnull().all()

# check if all the values are 9 digits long
nyc_arrests[nyc_arrests['arrest_key'].apply(lambda x: len(str(x)) != 9)]

When you run the above code, you’ll find that there are no issues found with the arrest_key column :)

2. ARREST_DATE

Let’s see if the column arrest_date has [i] no null values, and [ii] all values are datetime.

# check for NULL values
import numpy as np
np.where(pd.isnull(nyc_arrests['arrest_date']))

# check if all values are datetime
nyc_arrests['arrest_date'].astype(str).apply(lambda x: pd.to_datetime(x, errors='coerce')).notna().all()

Once again, no issues detected!

3. PD_CD

The column pd_cd should have only 3 digit numeric values.

# check if all the values are numeric
pd.to_numeric(nyc_arrests['pd_cd'], errors='coerce').notnull().all()

You’ll notice that not values are numeric in this column. Let’s find them out.

# find unique values
nyc_arrests['pd_cd'].unique()

The above code will return an array. Upon viewing the contents of the array, you will find that there are “nan” entries, and that’s why the previous code returned false.

The notnull() in the above code also includes NaN values. It appears there are some rows for which the three digit classification code is missing. Let’s find out how many such rows exist.

# find number of rows with 'nan' entries 
nyc_arrests['pd_cd'].isnull().sum()

So, only 2 such rows exist. We won’t be deleting these two rows, as the other columns still contain relevant information.

When looking at the unique values, you will also notice that there are codes that aren’t exactly 3 digits long. We can take a look at them as well.

# check if all the values are 3 digits long
temp = nyc_arrests[nyc_arrests['pd_cd'].apply(lambda x: len(str(x)) != 3)]

temp['pd_cd'].value_counts()

Once again, no transformations here either as we are (still) unaware of how the data would be analyzed by the downstream users.

4. KY_CD

The column ky_cd will involve the same checks as the pd_cd column — should have only 3 digit numeric values. Therefore, I am leaving the code for this column as an exercise for the reader.

Kindly note that the entire code is available on GitHub.

5. LAW_CAT_CD

This column should only have 3 values: Felony (F), Misdemeanor (M), and Violation (V).

# find unique values
nyc_arrests['law_cat_cd'].unique()

Once again, we find values that are inconsistent (like, ‘9’ and ‘I’) with what has been mentioned in the column definition. We still, won’t be making any changes, however, we would be communicating this to our downstream users.

6. ARREST_BORO

This column should have 5 unique values derived from the name of the boroughs: Bronx (B), Staten Island (S), Brooklyn (K), Manhattan (M), and Queens (Q).

# find unique values
nyc_arrests['arrest_boro'].unique()

Everything looks good here!

7. ARREST_PRECINCT, JURISDICTION_CODE, AGE_GROUP, PERP_SEX, PERP_RACE

These next five columns do not have any data quality issues. You can check it yourselves too :)

# check if all values in the 'arrest_precinct' column are numeric
pd.to_numeric(nyc_arrests['arrest_precinct'], errors='coerce').notnull().all()

# check if all values in the 'jurisdiction_code' column are numeric
pd.to_numeric(nyc_arrests['jurisdiction_code'], errors='coerce').notnull().all()

# find the list of values in the column 'age_group'
nyc_arrests['age_group'].unique()

# find unique values in the 'perp_sex' column
nyc_arrests['perp_sex'].unique()

# find unique values in the 'perp_race' column
nyc_arrests['perp_race'].unique()

8. LATITUDE AND LONGITUDE

Finally, let’s check if the coordinates/values in the latitude and longitude columns are within the range of the latitude and longitude values of New York state.

Getting the range of the latitude and longitude values for New York state from its Wikipedia page and putting it in the coordinate converter, we get the following range:-

Latitude → 40.5 to 45.01667

Longitude → -71.85 to -79.766667

# calculate min & max on 'latitude'
print(nyc_arrests['latitude'].agg(['min', 'max']))

# calculate min & max on 'longitude'
print(nyc_arrests['longitude'].agg(['min', 'max']))

There are rows with missing latitude and longitude values. A value of ‘0’ has been imputed in those rows.

# locate rows with longitude value equal to 0
nyc_arrests.loc[nyc_arrests['longitude'] == '0.0']

There is only one such row. Let’s delete this row and store the result in a temporary dataframe. We will compute the minimum and maximum value again for the latitude and longitude columns.

temp = nyc_arrests.drop(69643)

print(temp['latitude'].agg(['min', 'max']))

print(temp['longitude'].agg(['min', 'max']))

The values now seem to be in the range as mentioned above so, it’s all good!

I do realize that validating data quality is a dull and tedious process. However, if you want the data analysts to be able to diagnose the data effectively and the data scientists to be able to create correct predictive models, you have to take one for the team. Always remember: Garbage In, Garbage Out.

Loading into Snowflake

Now that the data validation is complete, we are ready to upload the nyc_arrests dataframe into a Snowflake table. But first, we need to install the Snowflake connector for python.

# install Snowflake connector
pip install snowflake-connector-python

And now, we can write the function to upload the data into a table in Snowflake.

import snowflake.connector
from snowflake.connector.pandas_tools import pd_writer
from snowflake.connector.pandas_tools import write_pandas


# create a function to upload the data into a table in Snowflake
def upload_to_snowflake(ACCOUNT, USER, PASSWORD, WAREHOUSE, DATABASE, SCHEMA):

# connect to Snowflake
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA
)

# create a cursor
cur = conn.cursor()

# create the warehouse
cur.execute(f'CREATE WAREHOUSE IF NOT EXISTS {WAREHOUSE} WAREHOUSE_SIZE = XSMALL AUTO_SUSPEND = 300')

# use the warehouse
cur.execute(f'USE WAREHOUSE {WAREHOUSE}')

# create the database
cur.execute(f'CREATE DATABASE IF NOT EXISTS {DATABASE}')

# use the database
cur.execute(f'USE DATABASE {DATABASE}')

# create the schema
cur.execute(f'CREATE SCHEMA IF NOT EXISTS {SCHEMA}')

# use the schema
cur.execute(f'USE SCHEMA {SCHEMA}')

# create the table
cur.execute("""
CREATE OR REPLACE TABLE nypd_arrests (
"arrest_key" STRING,
"arrest_date" TIMESTAMP,
"pd_cd" INTEGER,
"pd_desc" STRING,
"ky_cd" INTEGER,
"ofns_desc" STRING,
"law_code" STRING,
"law_cat_cd" STRING,
"arrest_boro" STRING,
"arrest_precinct" INTEGER,
"jurisdiction_code" INTEGER,
"age_group" STRING,
"perp_sex" STRING,
"perp_race" STRING,
"x_coord_cd" INTEGER,
"y_coord_cd" INTEGER,
"latitude" FLOAT,
"longitude" FLOAT
)
""")

# load the data from 'nyc_arrests' dataframe into 'nypd_arrests' Snowflake table

cur.execute('TRUNCATE TABLE nypd_arrests') # clear existing data if needed

write_pandas(conn, nyc_arrests, 'NYPD_ARRESTS')


# close the cursor and Snowflake connection
cur.close()
conn.close()

# call the function
upload_to_snowflake('E*****Z-J*****5', 'sarthakgirdhar', 'XXXXXXXX', 'COMPUTE_WH','NewYorkPoliceDepartment', 'ArrestsRawData')

In the above code, we first import the necessary python libraries. Next, we connect to Snowflake by providing username, password, warehouse, database, and schema. We do so because Snowflake needs to know where to store the table. Next, we create those warehouse, database, schema, and the table. Finally, the data is written using the write_pandas function.

Conclusion

This brings us to the end of this article. In this part, we extracted the data from the NYC Open Data API, did some transformations & data quality validation, before loading the data into a Snowflake table.

In part 2, we will do some light EDA to get insights about the 2023 NYPD arrest data. We’ll also connect Snowflake to Power BI for a further deep-dive analysis.

References

--

--