Movie Recommendations (Part 1)

Posted by Christopher Mertin on January 15, 2017 in Project • 8 min read

The user ratings for this project were downloaded from the MovieLens Dataset, and the older MovieLens 100K Dataset was chosen. This data set consists of 100,000 ratings from 1,000 users on 1,700 movies.

This code can be used on the larger 1 million data set, though computational costs would be much larger, and running on Amazon’s Elastic MapReduce (EMR) would be more expensive and take longer. However, the results should be more accurate.

Building the Recommendation System

For using MapReduce, we can use the module mrjob, and we need to import the sqrt function for the distance calculation that we’re using for calculating similarities. Therefore, our software requires the following

1
2
3
4
from mrjob.job import MRJob
from mrjob.step import MRStep
from math import sqrt
from itertools import combinations

In order to calculate the similarities, we need to have a multi-step MapReduce system. itertools.combinations is useful to get all the possible combinations of users that have rated Movie 1 and Movie 2.

We also need a class for our program, which starts as the Following

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class MovieRecommendation(MRJob):

    def configure_options(self):
      super(MovieRecommendation, self).configure_options()
      self.add_file_option("--items", help="Path to u.item")

    def load_movie_names(self):
        self.movieNames = {}

        with open("u.item", encoding="ascii", errors="ignore") as f:
            for line in f:
                fields = line.split('|')
                self.movieNames[int(fields[0])] = fields[1]

    def cosine_similarity(self, ratingPairs):
        numPairs = 0
        sum_xx = sum_yy = sum_xy = 0
        for ratingX, ratingY in ratingPairs:
            sum_xx += ratingX * ratingX
            sum_yy += ratingY * ratingY
            sum_xy += ratingX * ratingY
            numPairs += 1

        numerator = sum_xy
        denominator = sqrt(sum_xx) * sqrt(sum_yy)

        score = 0
        if (denominator):
            score = (numerator / (float(denominator)))

        return (score, numPairs)

The function configure_options reads in the configuration settings, as the file u.item stores all the movies such that it has the movie ID and the movie name. This will be useful later as the u.data holds the user ID, movie ID, and the corresponding rating the user gave for that movie. These files are pipe delimited.

load_movie_names creates a dictionary on each hadoop system where it translates the movie IDs to the movie names. This is used to make the output more human readable at the end of the computation.

Finally, the cosine similarity is defined as

$$\cos(\theta) = \frac{\mathbf{A}\cdot\mathbf{B}}{\left|\left|\mathbf{A}\right|\right|_{2}\left|\left|\mathbf{B}\right|\right|_{2}}$$

The reason the cosine similarity was used is that it is very resistant to noise. While there’s no way to add a bias to offset people who are known for rating movies as more positive or negative than the average user, this cosine distance will help somewhat. However, we are able to take care of this bias in the later implementation with a Recurrent Neural Network (RNN) in Part 2.

The cosine similarity maps the similarity into the domain \([0,1]\), where “\(0\)” is for a combination not very similar, and “\(1\)” is for a more similar relationship.

In order to build our recommendation system, we need a mutli-step MapReduce. We can do it in 3 steps, which are defined as

  1. Step 1
    • Map: Maps the input of the user ratings into a pair such as UserID, (MovieID, rating) for each user
    • Reduce: Reduces the previous mapper function into a list such that it’s in the form of UserID, [(movie-1, rating-1), (movie-2, rating-2), ..., (movie-N, rating-N)] for all \(N\) movies rated by each user.
  2. Step 2
    • Map: Find every pair of movies each user has seen and create all possible combinations for each of the users.
    • Reduce: Compute cosine similarity for each pair, yielding moviePair, (score, numPairs).
  3. Step 3
    • Map: Shuffle the results so we have the key-value pair being (movie1, score), (movie2, n) using the actual movie names, allowing for better sorting, where n is the number of overlapped ratings between movie1 and movie2.
    • Reduce: Output the results in the form movie1, (movie2, score, n).

The code for each of these steps can be found below with a more thorough description of each step.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Step 1
def mapper_parse_input(self, key, line):
    (userID, movieID, rating, timestamp) = line.split('\t')
    yield  userID, (movieID, float(rating))

def reducer_ratings_by_user(self, user_id, itemRatings):
    ratings = []
    for movieID, rating in itemRatings:
        ratings.append((movieID, rating))

    yield user_id, ratings

In step 1, the mapper function just removes the timestamp from the input and maps it so that the key is the UserID. The reducer simply takes the value in the key-value pair and makes it a list of values. This list is useful so that we can know all the movies that the user rated, which is important for finding all the combinations in Step 2.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# Step 2
def mapper_create_item_pairs(self, user_id, itemRatings):
    for itemRating1, itemRating2 in combinations(itemRatings, 2):
        movieID1 = itemRating1[0]
        rating1 = itemRating1[1]
        movieID2 = itemRating2[0]
        rating2 = itemRating2[1]

        yield (movieID1, movieID2), (rating1, rating2)
        yield (movieID2, movieID1), (rating2, rating1)

def reducer_compute_similarity(self, moviePair, ratingPairs):
    score, numPairs = self.cosine_similarity(ratingPairs)

    if (numPairs > 50 && score > 0.95):
        yield moviePair, (score, numPairs)

Step 2 is where the actual similarity computation takes place. The mapper function creates the key-value pair of each combination of each movie that a user rated. This is the largest computational bottleneck in the entire process as not only is the communication large but it is also computationally \(\mathcal{O}(n^2)\).

The reducer computes the similarity between all the pairs of movie ratings, though it bounds the output by the if statement. The if statement bounds it such that the similarity score has to be greater than \(0.95\) and there had to have been more than 50 combinations of the ratings between the movie pairs. This is to reduce the amount of the output and to ensure that there is some “minimum” quality bounding the output. Only outputting the most similar computed movies.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Step 3
def mapper_sort_similarities(self, moviePair, scores):
    score, n = scores
    movie1, movie2 = moviePair

    yield (self.movieNames[int(movie1)], score), \
          (self.movieNames[int(movie2)], n)

def reducer_output_similarities(self, movieScore, similarN):
    movie1, score = movieScore
    for movie2, n in similarN:
        yield movie1, (movie2, score, n)

Step 3 is not very intensive as it is mostly the “cleanup step.” The mapper function simply shuffles the data around so it is in the key-value pair (movie1, score), (movie2, n), where n is the number of users that rated both movie1 and movie2.

The reason this is done is so that in the reducer we can simply output all of the combinations of movie1 and movie2 sequentially over all values of movie2 for movie1.

It’s important to note that this is symmetric so all ratings for a combination between the pair (movie1, movie2) are the same as for (movie2, movie1). This is done to make lookup easier by just looking at the first value.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def steps(self):
    return [
            MRStep(mapper=self.mapper_parse_input,
                    reducer=self.reducer_ratings_by_user),
            MRStep(mapper=self.mapper_create_item_pairs,
                    reducer=self.reducer_compute_similarity),
            MRStep(mapper=self.mapper_sort_similarities,
                    mapper_init=self.load_movie_names,
                    reducer=self.reducer_output_similarities)
           ]

Finally, we need to tell mrjob the order to run the steps which is done with the function above. Finally, in our main, we can run the above class by having the code MovieRecommendation.run().

To run our program, we can run it in a few configurations. For example, to use it with Amazon’s Elastic MapReduce we need to setup the configuration file as defined here. Below is how to run our program for each configuration.

  1. Run Locally:
    • python MovieRecommendation.py --items=ml-100k/u.item ml-100k/u.data > movie_rec.txt
  2. Run on single EMR Node:
    • python MovieRecommendation.py -r emr --items=ml-100k/u.item ml-100k/u.data
  3. Run on 4 EMR Nodes:
    • python MovieRecommendation.py -r emr --num-ec2-instances=4 --items=ml-100k/uitem ml-100k/u.data

The output on this dataset can be found here. We can look at some of the results to compare our recommendation system to see if it makes sense. I decided to pull out 3 examples to check if their similarity makes sense. This can be seen in the table below.

Movie 1 Movie 2 Score n
Star Wars (1977) Empire Strikes Back, The (1980) 0.990 345
Star Wars (1977) Aliens (1986) 0.967 259
Top Gun (1986) Apollo 13 (1995) 0.962 204

The above groupings and similarity scores make a lot of sense in the movies being similar to each other. From this we can see that the movie recommendation system seems to be quite accurate.

Finally, we can take the output movie_rec.txt and convert it into a sorted json file for ease of reading. The first key is sorted alphabetically, while the second key is sorted by the Score.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import ast
import json
import collections

def ReadFile(filename):
    data = {}
    lines = [line.rstrip('\n') for line in open(filename)]

    for idx, line in enumerate(lines):
        mov1 = ast.literal_eval(line.split('\t')[0])
        data[mov1] = {}

    for idx, line in enumerate(lines):
        mov1 = ast.literal_eval(line.split('\t')[0])
        mov2 = ast.literal_eval(line.split('\t')[1])
        data[mov1][mov2[0]] = {}

    for idx, line in enumerate(lines):
        mov1 = ast.literal_eval(line.split('\t')[0])
        mov2 = ast.literal_eval(line.split('\t')[1])
        data[mov1][mov2[0]]["Score"] = mov2[1]
        data[mov1][mov2[0]]["N"] = mov2[2]

    data = collections.OrderedDict(
            [(k, collections.OrderedDict(
                 sorted(v.items(), key=lambda pair: pair[1]["Score"], reverse=True)))
                 for k, v in sorted(data.items())])
    return data

file_in = "movie_rec.txt"

movie_scores = ReadFile(file_in)

with open("movies.json", 'w') as fp:
    json.dump(movie_scores, fp, indent=4)

This will save it as a sorted json file. With the help of a little bit of javascript, I wrote a small webapp where you can select one of the movies in the list and get a list of movies that are the most similar.

Note: It’s important to notice that this was done with the small dataset with a very simple heuristic. Therefore, some of the movie similarities will be off, for example 101 Dalmatians being similar to The Hunt for Red October. However, there is a verifiable structure in this, for example if you look up for Star Wars, other films from the same franchise are recommended.

Even though there are some irregularities, and this could be made better with the larger data set, one factor that should be realized is the larger value of \(n\), the more likely the movies are to be similar to one another. If this code was used with the larger dataset, it would be important to tweak the threshold in the reducer in Step 2 playing with factors such as the threshold for the Cosine Distance, and also - but most importantly, the threshold for \(n\).

Choose Movie:

MovieScoren