Deep Dive into Python Part 2: Indexing Location Data for Risk Assessment
In my last post, I explained how we intended to normalize data for time comparison in GermTheory, a project I've been working on lately as a team. The goal of the functions outlined there was to take intermittent location points and normalize it across a series of times, so that data points for 4:02 PM and 4:10 PM could become an accurate location at 4:05 PM. Today, I'm going to walk through a more challenging code section, the actual indexing of disease risk for users.
The Problem
Essentially, we are given a group of users for which we have massive amounts of location/time data, stored in a table titled "locations." Given that certain users are infected, we need to determine the risk of infection for all non-infected users based on whether they have interacted with an infected user. This level of risk should be returned as an index from 0-1 for each user, which we can later plot to low, medium or high values.
While there are some huge optimizations and interesting ways we pull this data from the database(using the psycopg2 module in python), I'll refrain from mentioning them here, as they might expand the scope to be too wide.
The General Approach
For now, let's assume that we receive normalized data as two separate dictionaries. The first dictionary contains all location data for infected users over the time period that we are analyzing and the second contains all location data for users across the same time period. It looks something like this:
# users who are infected. Key is the userId
infectedLocs = {14: [{'time': datetime.datetime(2014, 11, 6, 16, 20, 2, 387000, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-480), 'coords': (37.7890480964603, -122.424191045913)}]}
# users who are not infected. Key is the userId
nonInfectedUsers = { 24: [{'time': datetime.datetime(2014, 11, 6, 16, 20, 2, 387000, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=-480), 'coords': (37.7890480964614, -122.424191045924)}] }
Of course, in reality there are many many more items in each user's locations table(thousands, actually), and there are many more keys in each dictionary that represent other users.
Before diving deep into the code of how we actually calculate these indices, I should outline the general approach. First, let's simplify to problem and imagine we ony have two users. One is infected and one is not. Given these user locations over a given period of time, we might be able to plot their distance from each other like so:
Our time units are not integers in reality, but leaving them like that adds simplicity in this example. To map our index, we need to find if these two users are close, so the algorithm tests if their distances fall below a certain threshold, in this case 100 meters. However, simply knowing if they were near each other does not necessarily translate to a numerical index of their risk. To provide meaningful results, we need to take into consideration both how close the user came to the infected and how long they maintained that proximity.
To accomplish this, we find the area between our distance curve and the threshold, the shaded value in this graph:
By adding up all these areas for a given user, we have a sum(likely in the thousands) that indicates their general likelihood of being infected. However, we still need to convert this area value to an index for this information to be of value. After all, 3401.28271 has no meaning to a user, but an index of .89 might.
To convert this value, we draw on another function and we map our area values to its shape. Essentially, our area provides the x-value for the following function:
On a graph, it maps to something like this:
As you can see, the graph is asymptotic at 1, so if we map our data to the function, it will map nicely to an index from 0 to 1. It's just a matter of building the function correctly so that the graph scales correctly based on the disease danger. To do so, we modify the "b" value in our prior equation.
Ultimately, we calculate b via a combination of a contagiousness factor from 0-10(the numerator) and the threshold(part of the denominator). More on this later.
By now, we can see how we'll calculate the index for each user, but how do we take our two dictionaries and return an index? More on that next.
The code
Now, what we've all been waiting for. Here's the code for the algorithm described above.
# create a results dictionary to store our values
results = {}
# for every user...
for userId in nonInfectedUsers:
print('Calculating index for user %s of %s' % (userId, len(nonInfectedUsers)) )
user = nonInfectedUsers[userId]
# count used to store the sum of all areas
count = 0
# compare user to each infected user
for infectedId in infectedLocs:
infected = infectedLocs[infectedId]
distances = [] # to be our y values
# check user values to add distances
for idx, val in enumerate(user):
if val == None or infected[idx] == None:
distances.append(None)
else:
# great_circle distances is a function from the geopy library used to find earth distances
distances.append(great_circle(val['coords'], infected[idx]['coords']).meters)
belowthreshold = False
belowdata = []
for idx, distance in enumerate(distances):
if distance == None or (belowthreshold == False and distance > threshold):
continue
# check if we're at the end
elif (belowthreshold == True and distance > threshold) or (belowthreshold == True and idx == len(distances) - 1):
# Add area between threshold and curve to count if we have a complete arc below threshold
# cumtrapz is part of scipy, and it uses the trapezoidal method to find the area between the curve and the x-axis
if len(belowdata) > 1:
count += (threshold * (len(belowdata) - 1)) - cumtrapz(belowdata, dx = 1)
elif len(belowdata) == 1:
print(threshold - belowdata[0])
count += threshold - belowdata[0]
belowdata = []
belowthreshold = False
elif belowthreshold == False and distance < threshold:
belowthreshold = True
belowdata.append(distance)
elif belowthreshold == True and distance < threshold:
belowdata.append(distance)
else:
continue
# see below for mapcount definition
results[userId] = mapcount(count, contagiousness, threshold)
print('Found an %s index of %s' % (diseaseName, results[userId]));
Two interesting functions used in this code are the great_circle() function and the cumtrapz() function. Great circle is part of the geopy library, and it measures latitudinal distance. Cum trapz is part of the scipy library, and it finds a rough area between a curve and the x-axis when given an array of y-values.
However, this would not be complete without a definition for our mapcount function:
def mapcount(count, contagiousness, threshold):
# because math is fun. See here for what this looks like: http://bit.ly/1u7DQuj
return 1 - math.exp((-1)*(contagiousness/(threshold*10/timeInterval) * dayDifference )) * count)
This mapcount function maps our count data to the function described above and returns the index for the user!
In this implementation, we don't do anything with the results, but in the actual script, they are written to the database once we've calculated the index for each user. There are a few more intricacies in the implementation outlined above, but most everything else can be understood by reading the code.
In any case, this was an extremely fun algorithm to implement, and there are definitely still 1000 ways to optimize it, but that's all for now!