The idea behind Distributional Semantics in Natural Language Processing (NLP) can be succintly summed up by the quote from the famous linguist John Firth -- You shall know a word by the company it keeps. In other words, the semantic meaning of a word can be derived by analyzing the meaning of words it is commonly found with in sentences. This intuition is the basis for neural NLP models such as Word2Vec, a group of models that exploit word co-occurrences in large, publicly available text corpora, to produce word embeddings, which are dense, (relatively) low-dimensional vector representations that encode the meanings of words in these corpora. The principle has been extended to domains other than NLP as well. In case of Word2Vec, the "company" words keep (or the context of the word) is determined by by looking at large number of word sub-sequences found in sentences in natural text, and training the model to trying to predict the neighbors given a word (Skip Gram), or predicting the word given its neighbors (CBOW). For graph structures, node sequences constructed by doing random walks on the graph can be thought of as being analogous to sentences, and may be used to train Word2Vec like models for the domain represented by the graph. This is the idea behind graph embeddings such as DeepWalk and node2vec. In this post, I will describe a Music Recommender built using DeepWalk embeddings using Apache Spark on Databricks.
The data used for this recommender comes from the Amazon product co-purchasing network (March 02 2003) and its associated product metadata. The data was released as part of the paper The Dynamics of Viral Marketing, (Leskovic, J, Adamic, L, and Adamic, B. 2007) and are available from the Stanford Network Analysis Project. The Amazon co-purchasing network contains approximately 260 thousand product nodes and 1.2 million co-purchasing edges. From these, I extracted just the nodes categorized as Music, and restricted edges only to those that connected a pair of Music nodes. This resulted in a much smaller graph of about 35 thousand nodes (103 thousand music products from catalog) and 46 thousand co-purchasing edges. I did the filtering because I felt that restricting to a single domain would result in more meaningful recommendations. The other major category in the dataset was Books, with nearly 400 thousand entries in the catalog, but I felt that book co-purchasing might not be as tightly linked to consumer taste as music. The format of the raw files were as follows, tab separated.
- nodes (id: String, prod_id: String, description: String, category: String)
- edges (src_id: String, dst_id: String)
The following Spark snippet converts the pair of files into what I call the node neighborhood format, with the immediate neighbor nodes for each node grouped together as a list. The first two blocks are just for reading the TSV file into named Spark DataFrames.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | import org.apache.spark.sql.functions.collect_list
val nodeDF = spark.read.format("csv")
.option("header", "false")
.option("delimiter", "\t")
.load(nodeFile)
.withColumnRenamed("_c0", "id")
.withColumnRenamed("_c1", "prod_id")
.withColumnRenamed("_c2", "description")
.withColumnRenamed("_c3", "category")
val edgeDF = spark.read.format("csv")
.option("header", "false")
.option("delimiter", "\t")
.load(edgeFile)
.withColumnRenamed("_c0", "src_id")
.withColumnRenamed("_c1", "dst_id")
val nodeNeighborsDF = edgeDF.groupBy("src_id")
.agg(collect_list("dst_id")
.alias("neighbor_ids"))
nodeNeighborsDF.write.parquet(nodeNeighborsOutputFile)
|
The mean length of the neighbor_ids list is about 1.5, with minimum length 1 and maximum length 5. The output looks format looks like this:
- node_neighbors (src_id: String, neighbor_ids: List[String])
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | import scala.util.Random
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{broadcast, size}
case class NeighborRec(src_id: String, neighbor_ids: Array[String])
case class PathRec(tail_src_id: String, path: List[String])
def getRandomElement(xs: Array[String]): String = {
val random = new Random()
xs(random.nextInt(xs.length))
}
def generateRandomWalks(nodeNeighborsDS: Dataset[NeighborRec],
numWalksPerStartNode: Int,
pathLen: Int): Dataset[PathRec] = {
var pathDS = nodeNeighborsDS.flatMap(rec => {
(0 until numWalksPerStartNode).toList.map(j => {
PathRec(rec.src_id, List(rec.src_id))
})
})
for (i <- 1 until pathLen) {
val newPathDS = pathDS.joinWith(broadcast(nodeNeighborsDS),
pathDS("tail_src_id") === nodeNeighborsDS("src_id"),
"left_outer")
.map(rec => {
val path = rec._1.path
if (rec._2 != null) {
val nextNode = getRandomElement(rec._2.neighbor_ids)
val newPath = path ++ List(nextNode)
PathRec(nextNode, newPath)
} else {
PathRec(rec._1.tail_src_id, rec._1.path)
}
})
pathDS = newPathDS
}
pathDS
}
val randomWalksDS = generateRandomWalks(nodeNeighborsDS, 20, 10)
randomWalksDS.write.parquet(randomWalksFile)
|
- random_walks (tail_src_id: String, path: List[String])
1 2 3 4 5 6 7 8 9 10 11 12 13 | import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}
val word2vec = new org.apache.spark.ml.feature.Word2Vec()
.setInputCol("path")
.setOutputCol("features")
.setVectorSize(100)
.setMinCount(0)
.setMaxIter(100)
.setWindowSize(3)
val model = word2vec.fit(randomWalksDF)
model.write.overwrite().save(modelFile)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
case class SynonymRec(word: String, similarity: Double)
case class ProductRec(id: String, prod_id: String, description: String, category: String)
case class NeighborRec(id: String, prod_id: String, description: String, similarity: Double)
def similarMusic(model: Word2VecModel,
nodeDS: Dataset[ProductRec],
srcId: String,
numSimilar: Int): Dataset[NeighborRec] = {
val synonymsDS = model.findSynonyms(srcId, numSimilar).as[SynonymRec]
val similarMusicDS = synonymsDS.joinWith(nodeDS, synonymsDS("word") === nodeDS("id"), "inner")
.map(rec => NeighborRec(rec._2.id, rec._2.prod_id, rec._2.description, rec._1.similarity))
.orderBy(desc("similarity"))
similarMusicDS
}
|
scala> similarMusic(model, nodeDS, "25551", 10)
| .show(10, false) // The Very Best of Motorhead
+------+----------+-------------------------------------+------------------+
|id |prod_id |description |similarity |
+------+----------+-------------------------------------+------------------+
|34447 |B000002C1I|All That Matters |0.8850399255752563|
|37049 |B00004S95N|Elevation, Vol. 3 |0.8403890132904053|
|45169 |B000056CDA|Collection |0.6613308787345886|
|17489 |B00002SWRF|Penetration |0.6495149731636047|
|222717|B00000GAOV|Rita Coolidge |0.6456888914108276|
|132023|B00000JN9G|F#¢k Me...I Thought He Was Dead!!! |0.628462553024292 |
|88642 |B000003A2X|What Goes Around |0.6210222244262695|
|132024|B00000JN9E|American Jet Set |0.6044375896453857|
|143078|B0000025D7|Don't Let Go |0.6024927496910095|
|208504|B0000023U0|South Texas Swing |0.6008718013763428|
+------+----------+-------------------------------------+------------------+
scala> similarMusic(model, nodeDS, "25598", 10)
| .show(10, false) // Mieczyslaw Horszowski Plays Mozart, Chopin, Debussy, Beethoven
+------+----------+-------------------------------------+------------------+
|id |prod_id |description |similarity |
+------+----------+-------------------------------------+------------------+
|23844 |B000008QVX|Sacred Spirit Drums |0.9538416266441345|
|50937 |B000006RBJ|Enemigos Intimos |0.8765220046043396|
|258208|B000068FUQ|Anthology |0.8210484981536865|
|258207|B000068FUU|Sound of Lies |0.8157663941383362|
|134531|B00004WFKM|Atmospheres: Celtic Voices |0.6351345181465149|
|151097|B00004WJEB|Christmas Time Again |0.632773756980896 |
|31231 |B000000919|Golden Classics |0.603758692741394 |
|138347|B0000032P5|Faithful |0.5865736603736877|
|45704 |B0000057OR|Second Sight |0.5757307410240173|
|122203|B00008BX5C|Alma |0.5749264359474182|
+------+----------+-------------------------------------+------------------+
scala> similarMusic(model, nodeDS, "1501", 10)
| .show(10, false) // Mississippi Hill Country Blues
+------+----------+-------------------------------------+------------------+
|id |prod_id |description |similarity |
+------+----------+-------------------------------------+------------------+
|1502 |B00005IAF6|Time Is the Distance |0.8823902606964111|
|174640|B00005NC3Q|Second Chants |0.8467361330986023|
|155669|B000068QZR|Gonna Take a Miracle [Expanded] |0.640330970287323 |
|177533|B0000549WA|A La Hora Que Me Llamen Voy |0.6273027658462524|
|49286 |B000003AFR|In tha Beginning...There Was Rap |0.6219795346260071|
|32838 |B00000JC6L|Real Life |0.6073424816131592|
|147053|B00004Y9J7|Silent Joy |0.6009130477905273|
|50583 |B000003ZTL|Greatest Freestyle Hits: Vol. 4 |0.6003987193107605|
|20414 |B000001SQ1|Horn Quartet of Berlin Philharmonic |0.5992087125778198|
|75424 |B000063WD9|Greetings from Asbury Park, N.J. |0.5959932804107666|
+------+----------+-------------------------------------+------------------+
Great Work! Wonderfully explained.
ReplyDelete