Saturday, March 14, 2020

Music Recommendations using DeepWalk on Spark


The idea behind Distributional Semantics in Natural Language Processing (NLP) can be succintly summed up by the quote from the famous linguist John Firth -- You shall know a word by the company it keeps. In other words, the semantic meaning of a word can be derived by analyzing the meaning of words it is commonly found with in sentences. This intuition is the basis for neural NLP models such as Word2Vec, a group of models that exploit word co-occurrences in large, publicly available text corpora, to produce word embeddings, which are dense, (relatively) low-dimensional vector representations that encode the meanings of words in these corpora. The principle has been extended to domains other than NLP as well. In case of Word2Vec, the "company" words keep (or the context of the word) is determined by by looking at large number of word sub-sequences found in sentences in natural text, and training the model to trying to predict the neighbors given a word (Skip Gram), or predicting the word given its neighbors (CBOW). For graph structures, node sequences constructed by doing random walks on the graph can be thought of as being analogous to sentences, and may be used to train Word2Vec like models for the domain represented by the graph. This is the idea behind graph embeddings such as DeepWalk and node2vec. In this post, I will describe a Music Recommender built using DeepWalk embeddings using Apache Spark on Databricks.

The data used for this recommender comes from the Amazon product co-purchasing network (March 02 2003) and its associated product metadata. The data was released as part of the paper The Dynamics of Viral Marketing, (Leskovic, J, Adamic, L, and Adamic, B. 2007) and are available from the Stanford Network Analysis Project. The Amazon co-purchasing network contains approximately 260 thousand product nodes and 1.2 million co-purchasing edges. From these, I extracted just the nodes categorized as Music, and restricted edges only to those that connected a pair of Music nodes. This resulted in a much smaller graph of about 35 thousand nodes (103 thousand music products from catalog) and 46 thousand co-purchasing edges. I did the filtering because I felt that restricting to a single domain would result in more meaningful recommendations. The other major category in the dataset was Books, with nearly 400 thousand entries in the catalog, but I felt that book co-purchasing might not be as tightly linked to consumer taste as music. The format of the raw files were as follows, tab separated.

  • nodes (id: String, prod_id: String, description: String, category: String)
  • edges (src_id: String, dst_id: String)

The following Spark snippet converts the pair of files into what I call the node neighborhood format, with the immediate neighbor nodes for each node grouped together as a list. The first two blocks are just for reading the TSV file into named Spark DataFrames.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.sql.functions.collect_list

val nodeDF = spark.read.format("csv")
  .option("header", "false")
  .option("delimiter", "\t")
  .load(nodeFile)
  .withColumnRenamed("_c0", "id")
  .withColumnRenamed("_c1", "prod_id")
  .withColumnRenamed("_c2", "description")
  .withColumnRenamed("_c3", "category")

val edgeDF = spark.read.format("csv")
  .option("header", "false")
  .option("delimiter", "\t")
  .load(edgeFile)
  .withColumnRenamed("_c0", "src_id")
  .withColumnRenamed("_c1", "dst_id")

val nodeNeighborsDF = edgeDF.groupBy("src_id")
  .agg(collect_list("dst_id")
  .alias("neighbor_ids"))

nodeNeighborsDF.write.parquet(nodeNeighborsOutputFile)

The mean length of the neighbor_ids list is about 1.5, with minimum length 1 and maximum length 5. The output looks format looks like this:

  • node_neighbors (src_id: String, neighbor_ids: List[String])
The next step is to generate random walks using the node_neighbors format. Our co-purchasing network is undirected because a co-purchase edge between nodes A and B is semantically the same as one between nodes B and A. Also, since each co-purchase between a pair of music products is treated as a single node, the edges are unweighted. The DeepWalk algorithm generates multiple random walks of some specified maximum length starting from each node in the graph. At each node on its random path, the algorithm will randomly choose the next node to go to from the neighor_ids list. A sequential implementation would require O(m*N*d*k) computations, where N is the number of nodes in the graph, m is the number of walks to start from each node, d is the average degree of the network, and k is the path length. However, the process of selecting the next node to add to a random walk is dependent only on (the neighbors of) the current node, so we can speed this up if we parallelize this using a platform such as Spark. So the idea is to build up the random walk path Dataset iteratively. Before starting the iteration, the path Dataset is initialized with the src_id column from the node_neighbors Dataset, repeating m times to get the required number of paths per start node. At each iteration, an additional random node is added to all the random walks in the path Dataset. Instead of looking up the neighbors at each row, we leverage Spark's join capability to join the path Dataset with the node_neighbors Dataset using the src_id and the id of the last element in the path, and then randomly choosing the next node from the neighbor_ids list, so this is another time saving due to Spark. The iterations continue for the maximum specified path length. There may be nodes in the graph for which there are no neighbors, so not all generated random paths will have the same length. The code below contains the full code for generating random walks. The case classes specify the formats required for input and output. Output is written out as a Parquet file for the next step.
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import scala.util.Random

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{broadcast, size}

case class NeighborRec(src_id: String, neighbor_ids: Array[String])
case class PathRec(tail_src_id: String, path: List[String])

def getRandomElement(xs: Array[String]): String = {
  val random = new Random()
  xs(random.nextInt(xs.length))
}


def generateRandomWalks(nodeNeighborsDS: Dataset[NeighborRec], 
                        numWalksPerStartNode: Int, 
                        pathLen: Int): Dataset[PathRec] = {
  
  var pathDS = nodeNeighborsDS.flatMap(rec => {
    (0 until numWalksPerStartNode).toList.map(j => {
      PathRec(rec.src_id, List(rec.src_id))
    })
  })
  for (i <- 1 until pathLen) {
    val newPathDS = pathDS.joinWith(broadcast(nodeNeighborsDS), 
        pathDS("tail_src_id") === nodeNeighborsDS("src_id"),
        "left_outer")
      .map(rec => {
        val path = rec._1.path
        if (rec._2 != null) {
          val nextNode = getRandomElement(rec._2.neighbor_ids)
          val newPath = path ++ List(nextNode)
          PathRec(nextNode, newPath)
        } else {
          PathRec(rec._1.tail_src_id, rec._1.path)
        }
      })
    pathDS = newPathDS
  }
  pathDS
}


val randomWalksDS = generateRandomWalks(nodeNeighborsDS, 20, 10)
randomWalksDS.write.parquet(randomWalksFile)
The output of this step has the following format. We generated around 630,000 paths with average length 7.7, minimum 2 and maximum 10.
  • random_walks (tail_src_id: String, path: List[String])
Once the random walks are generated, we can treat the node sequence in the path column as sentences to be input into the Word2Vec model. The Spark ML library contains a Word2Vec Estimator that can be trained using these sentences. The only change we make to the default implementation is to consider window sizes of 6 (3 nodes to the left, and 3 nodes to the right of the current node) instead of the default 5 (5 words to the left, 5 words to the right) that seems more suitable to natural language. Here is the code to train the model.
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}

val word2vec = new org.apache.spark.ml.feature.Word2Vec()
     .setInputCol("path")
     .setOutputCol("features")
     .setVectorSize(100)
     .setMinCount(0)
     .setMaxIter(100)
     .setWindowSize(3)

val model = word2vec.fit(randomWalksDF)

model.write.overwrite().save(modelFile)
Finally, we can use the trained Word2Vec model to recommend music similar to a given music product, by computing the synonyms of the original music. Embeddings are created as a side effect of the Word2Vec training. As the model trains, it gets better and better at predicting either a word given its context, or its context given the word, based on the type of model being trained. However, what really changes under the hood are the weights of the network for each word in its vocabulary. These weights can be thought of as vectors in a space where semantically similar words clump together and semantically dissimilar words get pushed furthr apart. Using the same analogy to the embeddings from our trained model, we can now find music similar to some given music product by looking in the neighborhood of the given product in the space created by the embeddings. The findSynonyms() call provided by the Spark ML Word2Vec model returns a DataFrame of neighboring words (music product in our case), and the similarity between the source word and the neighbor. The function below wraps the findSynonyms() call, and pulls out the neighbor metadata from the nodes Dataset we saw earlier. As before, the case classes enforce the input and output formats the function will need.
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._

case class SynonymRec(word: String, similarity: Double)
case class ProductRec(id: String, prod_id: String, description: String, category: String)
case class NeighborRec(id: String, prod_id: String, description: String, similarity: Double)

def similarMusic(model: Word2VecModel,
                 nodeDS: Dataset[ProductRec],
                 srcId: String, 
                 numSimilar: Int): Dataset[NeighborRec] = {
  
  val synonymsDS = model.findSynonyms(srcId, numSimilar).as[SynonymRec]
  val similarMusicDS = synonymsDS.joinWith(nodeDS, synonymsDS("word") === nodeDS("id"), "inner")
    .map(rec => NeighborRec(rec._2.id, rec._2.prod_id, rec._2.description, rec._1.similarity))
    .orderBy(desc("similarity"))
  similarMusicDS
}
It is now simple to generate recommendations for some given music. Here are some examples. As you can see, the recommendations are in the same or similar genres, which the model learned from walking the co-purchase graph.
scala> similarMusic(model, nodeDS, "25551", 10)
     |   .show(10, false) // The Very Best of Motorhead
+------+----------+-------------------------------------+------------------+
|id    |prod_id   |description                          |similarity        |
+------+----------+-------------------------------------+------------------+
|34447 |B000002C1I|All That Matters                     |0.8850399255752563|
|37049 |B00004S95N|Elevation, Vol. 3                    |0.8403890132904053|
|45169 |B000056CDA|Collection                           |0.6613308787345886|
|17489 |B00002SWRF|Penetration                          |0.6495149731636047|
|222717|B00000GAOV|Rita Coolidge                        |0.6456888914108276|
|132023|B00000JN9G|F#¢k Me...I Thought He Was Dead!!!   |0.628462553024292 |
|88642 |B000003A2X|What Goes Around                     |0.6210222244262695|
|132024|B00000JN9E|American Jet Set                     |0.6044375896453857|
|143078|B0000025D7|Don't Let Go                         |0.6024927496910095|
|208504|B0000023U0|South Texas Swing                    |0.6008718013763428|
+------+----------+-------------------------------------+------------------+

scala> similarMusic(model, nodeDS, "25598", 10)
     |   .show(10, false) // Mieczyslaw Horszowski Plays Mozart, Chopin, Debussy, Beethoven 
+------+----------+-------------------------------------+------------------+
|id    |prod_id   |description                          |similarity        |
+------+----------+-------------------------------------+------------------+
|23844 |B000008QVX|Sacred Spirit Drums                  |0.9538416266441345|
|50937 |B000006RBJ|Enemigos Intimos                     |0.8765220046043396|
|258208|B000068FUQ|Anthology                            |0.8210484981536865|
|258207|B000068FUU|Sound of Lies                        |0.8157663941383362|
|134531|B00004WFKM|Atmospheres: Celtic Voices           |0.6351345181465149|
|151097|B00004WJEB|Christmas Time Again                 |0.632773756980896 |
|31231 |B000000919|Golden Classics                      |0.603758692741394 |
|138347|B0000032P5|Faithful                             |0.5865736603736877|
|45704 |B0000057OR|Second Sight                         |0.5757307410240173|
|122203|B00008BX5C|Alma                                 |0.5749264359474182|
+------+----------+-------------------------------------+------------------+

scala> similarMusic(model, nodeDS, "1501", 10)
     |   .show(10, false) // Mississippi Hill Country Blues 
+------+----------+-------------------------------------+------------------+
|id    |prod_id   |description                          |similarity        |
+------+----------+-------------------------------------+------------------+
|1502  |B00005IAF6|Time Is the Distance                 |0.8823902606964111|
|174640|B00005NC3Q|Second Chants                        |0.8467361330986023|
|155669|B000068QZR|Gonna Take a Miracle [Expanded]      |0.640330970287323 |
|177533|B0000549WA|A La Hora Que Me Llamen Voy          |0.6273027658462524|
|49286 |B000003AFR|In tha Beginning...There Was Rap     |0.6219795346260071|
|32838 |B00000JC6L|Real Life                            |0.6073424816131592|
|147053|B00004Y9J7|Silent Joy                           |0.6009130477905273|
|50583 |B000003ZTL|Greatest Freestyle Hits: Vol. 4      |0.6003987193107605|
|20414 |B000001SQ1|Horn Quartet of Berlin Philharmonic  |0.5992087125778198|
|75424 |B000063WD9|Greetings from Asbury Park, N.J.     |0.5959932804107666|
+------+----------+-------------------------------------+------------------+
As you can see, the results don't look too bad, and it was not a whole lot of work to get here. Neither Word2Vec nor DeepWalk are novel concepts, but generating random walks for any reasonable sized graph is usually quite a computation intensive process, so I decided to see if I could use Spark to do this more efficiently. So this was the bulk of the work involved in building the recommender. Hopefully you found it interesting, and hope it helps you build similar recommenders with your own datasets.

1 comments (moderated to prevent spam):

Anonymous said...

Great Work! Wonderfully explained.