Saturday, September 01, 2012

Learning Mahout : Clustering

The next section in the MIA book is Clustering. As with Recommenders, Mahout provides both in-memory and map-reduce versions of various clustering algorithms. However, unlike Recommenders, there are quite a few toolkits (like Weka or Mallet for example) which are more comprehensive than Mahout for small or medium sized datasets, so I decided to concentrate on the M/R implementations.

The full list of clustering algorithms available in Mahout at the moment can be found on its Wiki Page under the Clustering section. The ones covered in the book are K-Means, Canopy, Fuzzy K-Means, LDA and Dirichlet. All these algorithms expect data in the form of vectors, so the first step is to convert the input data into this format, a process known as vectorization. Essentially, clustering is the process of finding nearby points in n-dimensional space, where each vector represents a point in this space, and each element of a vector represents a dimension in this space.

It is important to choose the right vector format for the clustering algorithm. For example, one should use the SequentialAccessSparseVector for KMeans, sinc there is lot of sequential access in the algorithm. Other possibilities are the DenseVector and the RandomAccessSparseVector formats. The input to a clustering algorithm is a SequenceFile containing key-value pairs of {IntWritable, VectorWritable} objects. Since the implementations are given, Mahout users would spend most of their time vectorizing the input (and thinking about what feature vectors to use, of course).

Once vectorized, one can invoke the appropriate algorithm either by calling the appropriate bin/mahout subcommand from the command line, or through a program by calling the appropriate Driver's run method. All the algorithms require the initial centroids to be provided, and the algorithm iteratively perturbes the centroids until they converge. One can either guess randomly or use the Canopy clusterer to generate the initial centroids.

Finally, the output of the clustering algorithm can be read using the Mahout cluster dumper subcommand. To check the quality, take a look at the top terms in each cluster to see how "believable" they are. Another way to measure the quality of clusters is to measure the intercluster and intracluster distances. A lower spread of intercluster and intracluster distances generally imply "good" clusters. Here is code to calculate inter-cluster distance based on code from the MIA book.

There are various ways of improving clustering quality. The first, of course, is knowing your data well enough so you can choose good features to cluster on. For clustering text or real world (dirty) data, vector generation can be improved by removing noise and using a good weighting technique. Mahout allows you to specify your custom Lucene analyzers to its clustering subcommands for this. Cluster quality is also dependent on the measure used to calculate similarity between two feature vectors. Once again, Mahout supplies a large number of Distance Measure implementations (Chebyshev, Cosine, Mahalanobis, Manhattan, Minkowski, SquaredEuclidean, Euclidean, Tanimoto, Weighted Euclidean and Weighted Manhattan) and also allows you to specify your own if these don't suit your purposes. Within each dimension, points can be normalized to remove the effect of outliers - the normalization p-norm should match the p-norm used by the distance measure. Finally, if the dimensions are not comparable, such as number of bedrooms and price in dollars for a house, then one should normalize across dimensions, a process known as weighting (this should be done during the vectorization process, which you control fully).

Case Study - Reuters

The dataset used for this is the Reuters-21578 collection. I basically followed along with the book, trying out commands and making them work against a pseudo-distributed Hadoop installation on my notebook.

Vectors can be created using the following Mahout subcommands.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # create sequence file from directory of input files
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout seqdirectory \
  -c UTF-8 \
  -i reuters \          # input directory
  -o reuters-seqfiles   # output directory
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # create vectors from sequence file (no normalization)
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout seq2sparse \
  -i reuters-seqfiles \ # input: sequence files directory
  -o reuters-vectors \  # output: vectors
  -ow
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # this creates 7 sub directories under output
hduser@cyclone:mahout$ # df-count, dictionary.file*, frequency.file-*,
hduser@cyclone:mahout$ # tf-vectors, and tfidf-vectors
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # create vectors from sequence file (with normalization)
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout seq2sparse \
  -i reuters-seqfiles \
  -o reuters-normalized-bigram -ow \
  -a org.apache.lucene.analysis.WhitespaceAnalyzer \ # analyzer
  -chunk 200 \  # chunk size (MB)
  -wt tfidf \   # weighting scheme
  -s 5 \        # minimum support
  -md 3 \       # minimum document frequency
  -x 90 \       # maximum document frequency percentage
  -ng 2 \       # ngram size
  -ml 50 \      # minimum log likelihood ratio
  -seq \        # create sequential access sparse vectors
  -n 2          # normalization - use 2-norm (aka Euclidean norm)
                # should be paired with similar distance measure
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # two additional directories created,
hduser@cyclone:mahout$ # tokenized-documents and wordcount.
hduser@cyclone:mahout$ #######################################################

To run the K-Means cluster, here is the sequence of mahout subcommands.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # invoke kmeans
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout kmeans \
  -i reuters-vectors/tfidf-vectors \ # input
  -c reuters-initial-clusters \      # generate initial clusters
  -o reuters-kmeans-clusters         # output
  -dm org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure \
  -cd 1.0 \ # convergence threshold
  -k 20 \   # number of clusters
  -x 20 \   # max iterations
  -cl       # run clustering after iterations
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # in the output directory, there is a clusteredPoints
hduser@cyclone:mahout$ # directory, and directories corresponding to each
hduser@cyclone:mahout$ # clustering iteration named clusters-*. The last
hduser@cyclone:mahout$ # cluster is named cluster-*-final.
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # dump clusters to local filesystem
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout clusterdump \
  -dt sequencefile \                            # format: {Integer => String}
  -d reuters-vectors/dictionary.file-* \        # dictionary: {id => word}
  -i reuters-kmeans-clusters/clusters-3-final \ # input
  -o clusters.txt \                             # output (local filesystem)
  -b 10 \                                       # format length
  -n 10                                         # number of top terms to print

The dm flag specifies the Distance measure. In general, it is better to use either Cosine or Tanimoto distance measures for clustering text rather than the SquaredEuclidean measure.

The value of k (number of clusters) is provided by the caller based on his knowledge of the data. You can eliminate the guesswork by using the Canopy clusterer with appropriate distance thresholds to indicate the size of clusters as shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # generate initial clusters with canopy
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout canopy \
  -i reuters-vectors/tfidf-vectors \
  -o reuters-canopy-centroids \
  -dm org.apache.mahout.common.distance.EuclideanDistanceMeasure \
  -t1 1500 \ # points between 0 and t1 from centroid are included
  -t2 2000   # points between t1 and t2 from centroid are removed
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # run kmeans with canopy cluster centroids.
hduser@cyclone:mahout$ # in this case, since -c points to a populated path,
hduser@cyclone:mahout$ # the algo doesn't have to guess initial centroids.
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout kmeans \
  -i reuters-vectors/tfidf-vectors \
  -o reuters-kmeans-clusters \
  -c reuters-canopy-centroids/clusters-0-final \
  -dm org.apache.mahout.common.distance.TanimotoDistanceMeasure \
  -cd 0.1 \     # convergence threshold
  -ow \         # overwrite
  -x 20 \       # max iterations
  -cl           # run clustering after iterations
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # view the clusters generated
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout clusterdump \
  -dt sequencefile \                     # format {Integer => String}
  -d reuters-vectors/dictionary.file-* \ # dictionary (id => word)
  -i reuters-kmeans-clusters/clusters-1-final \ # input directory
  -o clusters.txt \                      # output (local filesystem)
  -b 10 \                                # max size of word to print
  -n 10                                  # number of top terms

You can also write your own custom clusterer. I suppose this would be useful when you want to deploy an end-to-end solution but as you can see, there is not much difference from using a pipeline of commands as shown above. Here is an example of a custom clusterer that works against the Reuters dataset, using Canopy to compute the initial centroids, then using KMeans (or optionally Fuzzy KMeans) for clustering.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Source: :r src/main/scala/com/mycompany/mia/cluster/ReutersClusterer.scala
package com.mycompany.mia.cluster

import java.io.{StringReader, Reader}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.{SequenceFile, IntWritable}
import org.apache.lucene.analysis.standard.{StandardTokenizer, StandardFilter, StandardAnalyzer}
import org.apache.lucene.analysis.tokenattributes.TermAttribute
import org.apache.lucene.analysis.{WhitespaceTokenizer, TokenStream, StopFilter, LowerCaseFilter, Analyzer}
import org.apache.lucene.util.Version
import org.apache.mahout.clustering.canopy.CanopyDriver
import org.apache.mahout.clustering.classify.WeightedVectorWritable
import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver
import org.apache.mahout.clustering.kmeans.KMeansDriver
import org.apache.mahout.clustering.Cluster
import org.apache.mahout.common.distance.{TanimotoDistanceMeasure, EuclideanDistanceMeasure}
import org.apache.mahout.common.HadoopUtil
import org.apache.mahout.vectorizer.tfidf.TFIDFConverter
import org.apache.mahout.vectorizer.{DocumentProcessor, DictionaryVectorizer}

object ReutersClusterer extends App {
  
  // parameters
  val minSupport = 2
  val minDf = 5
  val maxDfPercent = 95
  val maxNGramSize = 2
  val minLLRValue = 50
  val reduceTasks = 1
  val chunkSize = 200
  val norm = 2
  val sequentialAccessOutput = true
  
  val inputDir = args(0) // directory of doc sequence file(s)
  val outputDir = args(1) // directory where clusters will be written
  val algo = args(2) // "kmeans" or "fkmeans"
  
  val conf = new Configuration()
  val fs = FileSystem.get(conf)
  HadoopUtil.delete(conf, new Path(outputDir))
  
  // converts input docs in sequence file format in input_dir 
  // into token array in output_dir/tokenized-documents
  val inputPath = new Path(inputDir)
  val tokenizedDocPath = new Path(outputDir, 
    DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER) 
  DocumentProcessor.tokenizeDocuments(inputPath, 
    classOf[ReutersAnalyzer], tokenizedDocPath, conf)
    
  // reads token array in output_dir/tokenized-documents and
  // writes term frequency vectors in output_dir (under tf-vectors)
  DictionaryVectorizer.createTermFrequencyVectors(
    tokenizedDocPath,
    new Path(outputDir),
    DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER,
    conf, minSupport, maxNGramSize, minLLRValue, 2, true, reduceTasks,
    chunkSize, sequentialAccessOutput, false)

  // converts term frequency vectors in output_dir/tf-vectors
  // to TF-IDF vectors in output_dir (under tfidf-vectors)
  val tfVectorPath = new Path(outputDir, 
    DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER)
  val outputPath = new Path(outputDir)
  val docFreqs = TFIDFConverter.calculateDF(
    tfVectorPath, outputPath, conf, chunkSize)
  TFIDFConverter.processTfIdf(tfVectorPath, outputPath,
    conf, docFreqs, minDf, maxDfPercent, norm, true, 
    sequentialAccessOutput, false, reduceTasks)
    
  // reads tfidf-vectors from output_dir/tfidf-vectors
  // and writes out Canopy centroids at output_dir/canopy-centroids
  val tfidfVectorPath = new Path(outputDir, "tfidf-vectors")
  val canopyCentroidPath = new Path(outputDir, "canopy-centroids")
  CanopyDriver.run(conf, tfidfVectorPath, canopyCentroidPath,
    new EuclideanDistanceMeasure(), 250, 120, false, 0.01, false)
    
  // reads tfidf-vectors from output_dir/tfidf-vectors and 
  // refers to directory path for initial clusters, and 
  // writes out clusters to output_dir/clusters
  val clusterPath = new Path(outputDir, "clusters")
  algo match {
    case "kmeans" => KMeansDriver.run(conf, tfidfVectorPath, 
      new Path(canopyCentroidPath, "clusters-0-final"),
      clusterPath, new TanimotoDistanceMeasure(), 
      0.01, 20, true, 0.01, false)
    case "fkmeans" => FuzzyKMeansDriver.run(conf, tfidfVectorPath,
      new Path(canopyCentroidPath, "clusters-0-final"),
      clusterPath, new TanimotoDistanceMeasure(), 
      0.01, 20, 2.0f, true, true, 0.0, false)
    case _ => throw new IllegalArgumentException(
      "algo can be either kmeans or fkmeans")
  }
  
  // read clusters and output
  val reader = new SequenceFile.Reader(fs, 
    new Path(clusterPath, Cluster.CLUSTERED_POINTS_DIR + "/part-m-00000"), 
    conf)
  val key = new IntWritable()
  val value = new WeightedVectorWritable()
  while (reader.next(key, value)) {
    println(key.toString + " belongs to " + value.toString)
  }
  reader.close()
}

class ReutersAnalyzer extends Analyzer {
  
  val ALPHA_PATTERN = """[a-z]+""".r
  
  override def tokenStream(fieldName : String, reader : Reader) : 
   TokenStream = {
    // tokenize input string by standard tokenizer
    var result : TokenStream = 
      new StandardTokenizer(Version.LUCENE_CURRENT, reader)
    result = new StandardFilter(result)
    // lowercase all words
    result = new LowerCaseFilter(result)
    // remove stop words
    result = new StopFilter(true, result, 
      StandardAnalyzer.STOP_WORDS_SET)
    val termAttr = result.addAttribute(classOf[TermAttribute]).
      asInstanceOf[TermAttribute]
    val buf = new StringBuilder()
    while (result.incrementToken()) {
      // remove words < 3 chars long
      if (termAttr.termLength() >= 3) {
        val word = new String(
          termAttr.termBuffer(), 0, termAttr.termLength())
        // remove words with non-alpha chars in them
        if (ALPHA_PATTERN.pattern.matcher(word).matches) {
          buf.append(word).append(" ")
        }
      }
    }
    // return the remaining tokens
    new WhitespaceTokenizer(new StringReader(buf.toString))
  }
}

Just like K-Means, the other clustering algorithms can also be run either form a script or via code as shown above. Here are the commands for the other major clustering algorithms.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # run fuzzy kmeans with random initial centroids
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout fkmeans \
  -i reuters-vectors/tfidf-vectors \
  -c reuters-fkmeans-centroids \
  -o reuters-fkmeans-clusters \
  -cd 1.0 \ # convergence threshold
  -k 21 \
  -ow \
  -x 10 \ # maxiterations
  -m 2.0 \ # fuzzification factor
  -dm org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # view generated clusters.
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout clusterdump \
  -dt sequencefile \
  -d reuters-vectors/dictionary.file-* \
  -i reuters-fkmeans-clusters/clusters-2-final \
  -o clusters.txt \
  -b 10 \
  -n 10

Dirichlet clustering is based on a probabilistic model, that attempts to predict the distribution of points around a cluster. Model Distributions provided by Mahout are the DistanceMeasureClusterDistribution and GaussianClusterDistribution, and of course users can implement ModelDistribution to create their own custom one as well. The command line invocation of Dirichlet clusterer is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # invoke dirichlet clustering
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout dirichlet \
  -i reuters-vectors/tfidf-vectors \
  -o reuters-dirichlet-clusters \
  -k 60 \                          # number of clusters
  -x 10 \                          # number of iterations
  -a0 1.0 \                        # alpha0 value
  -md org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution \
  -mp org.apache.mahout.math.SequentialAccessSparseVector #default vector type
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ # view generated clusters.
hduser@cyclone:mahout$ #######################################################
hduser@cyclone:mahout$ bin/mahout clusterdump \
  -dt sequencefile \
  -d reuters-vectors/dictionary.file-* \
  -i reuters-dirichlet-clusters/clusters-0 \
  -o clusters.txt \
  -b 10 \
  -n 10

The book covers another algorithm called LDA (Latent Dirichlet Allocation) which is useful for Topic Modeling, but the subcommand has been deprecated in version 0.7 and replaced with two other subcommands which I could not make working. I guess I will come back to this one later when I actually need it, and when I am a bit more familiar with Mahout.

Case Study - LastFM

The Last.fm dataset contains (music) group names tagged with words or phrases and the respective tag counts. The objective is to suggest tags for artists given their existing tags, ie to cluster tags using the artists as feature vector and tag counts as feature weights. Unlike the Reuters case, the approach here is to write just enough code to generate the dictionary (for clusterdump) and feature vectors (for the clusterer). For the actual clustering and viewing the clusters, we depend on the appropriate Mahout subcommands. Here is the code for the vectorizer.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Source: src/main/scala/com/mycompany/mia/cluster/LastFMVectorizer.scala
package com.mycompany.mia.cluster

import scala.collection.JavaConversions.iterableAsScalaIterable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.{IntWritable, DefaultStringifier, Text, SequenceFile, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat}
import org.apache.hadoop.mapreduce.{Mapper, Reducer, Job}
import org.apache.hadoop.util.{GenericsUtil, GenericOptionsParser}
import org.apache.mahout.math.{VectorWritable, Vector, SequentialAccessSparseVector, NamedVector}

/**
 * We need to read a file of the following format:
 * UUID<sep>Artist Name<sep>Tag<sep>Count
 * into a sequence file of [Text,VectorWritable] as follows:
 * Tag => VectorWritable(Artist:Count)
 */
object LastFMVectorizer {

  def main(args : Array[String]) : Int = {
    val conf = new Configuration()

    val otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs
    if (otherArgs.length != 2) {
      println("Usage: LastFMVectorizer input_file output_dir")
      -1
    }

    // Dictionary Mapper/Reducer. Extract unique artists
    val job1 = new Job(conf, "Dictionary Mapper")
    job1.setJarByClass(classOf[DictionaryMapper])
    job1.setMapperClass(classOf[DictionaryMapper])
    job1.setReducerClass(classOf[DictionaryReducer])
    job1.setOutputKeyClass(classOf[Text])
    job1.setOutputValueClass(classOf[IntWritable])
    job1.setInputFormatClass(classOf[TextInputFormat])
    job1.setOutputFormatClass(classOf[SequenceFileOutputFormat[Text,IntWritable]])
    FileInputFormat.addInputPath(job1, new Path(args(0)))
    val dictOutput = new Path(args(1), "dictionary")
    FileOutputFormat.setOutputPath(job1, dictOutput)
    var succ = (job1.waitForCompletion(true))
    
    if (succ) {
      // get a mapping of unique ids to artist and tag for converting
      // to Mahout vectors
      val dictOutput = new Path(args(1), "dictionary")
      val fs = FileSystem.get(dictOutput.toUri(), conf)
      val dictfiles = fs.globStatus(new Path(dictOutput, "part-*"))
      var i = 0
      val dictGlob = new Path(args(1), "dict-glob")
      val writer = new SequenceFile.Writer(fs, conf, dictGlob, 
        classOf[Text], classOf[IntWritable])
      for (dictfile <- dictfiles) {
        val path = dictfile.getPath()
        val reader = new SequenceFile.Reader(fs, path, conf)
        val key = new Text()
        val value = new IntWritable()
        while (reader.next(key, value)) {
          writer.append(key, new IntWritable(i))
          i += 1
        }
        reader.close()
      }
      writer.close()
      conf.set("dictpath", dictGlob.toString())

      val job2 = new Job(conf, "Dictionary Vectorizer")
      job2.setJarByClass(classOf[VectorMapper])
      job2.setMapperClass(classOf[VectorMapper])
      job2.setReducerClass(classOf[VectorReducer])
      job2.setOutputKeyClass(classOf[Text])
      job2.setOutputValueClass(classOf[VectorWritable])
      job2.setInputFormatClass(classOf[TextInputFormat])
      job2.setOutputFormatClass(classOf[SequenceFileOutputFormat[Text,VectorWritable]])
      FileInputFormat.addInputPath(job2, new Path(args(0)))
      FileOutputFormat.setOutputPath(job2, new Path(args(1), "vectors"))
      succ = (job2.waitForCompletion(true))
    }
    if (succ) 0 else 1
  }
}

/////////////////////////////////////////////////////////////////
// Assigns a unique ID to each artist. Needed by clusterdump
/////////////////////////////////////////////////////////////////

class DictionaryMapper 
    extends Mapper[LongWritable,Text,Text,IntWritable] {
  
  val pattern = """<sep>""".r
  val zero = new IntWritable(0)
  
  override def map(key : LongWritable, 
      value : Text, 
      context : Mapper[LongWritable,Text,Text,IntWritable]#Context) = { 
    val fields = pattern.split(value.toString)
    if (fields.length != 4) {
      context.getCounter("Map", "LinesWithErrors").increment(1)
    } else {
      context.write(new Text(fields(1)), zero)
    }
  }
}

class DictionaryReducer
    extends Reducer[Text,IntWritable,Text,IntWritable] {

  val zero = new IntWritable(0)

  override def reduce(key : Text, 
      values : java.lang.Iterable[IntWritable],
      context : Reducer[Text,IntWritable,Text,IntWritable]#Context) = {
    context.write(key, zero)
  }
}

/////////////////////////////////////////////////////////////////
// For each tag, creates feature vectors of artists
/////////////////////////////////////////////////////////////////

class VectorMapper
    extends Mapper[LongWritable,Text,Text,VectorWritable] {
  
  val pattern = """<sep>""".r
  var dict = new java.util.HashMap[String,Integer]()
  var vecwritable = new VectorWritable()
  
  override def setup(
      context : Mapper[LongWritable,Text,Text,VectorWritable]#Context) = {
    super.setup(context)
    val conf = context.getConfiguration()
    val dictpath = new Path(conf.get("dictpath"))
    val fs = FileSystem.get(dictpath.toUri(), conf)
    val reader = new SequenceFile.Reader(fs, dictpath, conf)
    val key = new Text()
    val value = new IntWritable()
    while (reader.next(key, value)) {
      dict.put(key.toString(), value.get())
    }
  }
  
  override def map(key : LongWritable, 
      value : Text,
      context : Mapper[LongWritable,Text,Text,VectorWritable]#Context) = {
    val fields = pattern.split(value.toString)
    if (fields.length != 4) {
      context.getCounter("Map", "LinesWithErrors").increment(1)
    } else {
      val artist = fields(1)
      val tag = fields(2)
      val weight = java.lang.Double.parseDouble(fields(3))
      val vector = new NamedVector(
        new SequentialAccessSparseVector(dict.size()), tag)
      vector.set(dict.get(artist), weight)
      vecwritable.set(vector)
      context.write(new Text(tag), vecwritable)
    }
  }
}

class VectorReducer
    extends Reducer[Text,VectorWritable,Text,VectorWritable] {
  
  var vecwritable = new VectorWritable()
  
  override def reduce(key : Text,
      values : java.lang.Iterable[VectorWritable],
      context : Reducer[Text,VectorWritable,Text,VectorWritable]#Context) = {
    var vector : Vector = null
    for (partialVector <- values) {
      if (vector == null) {
        vector = partialVector.get().like()
      } else {
        vector.plus(partialVector.get())
      }
    }
    val artistVector = new NamedVector(vector, key.toString())
    vecwritable.set(artistVector)
    context.write(key, vecwritable)
    for (value <- values) {
      context.write(key, value)
    }
  }
}

Once the vectors are generated, we just use Mahout subcommands to complete the clustering work.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
hduser@cyclone:mahout$ hadoop jar /tmp/my-mahout-fatjar.jar \
  com.mycompany.mia.cluster.LastFMVectorizer \
  input/ArtistTags.dat \
  output
hduser@cyclone:mahout$ bin/mahout kmeans \
  -i output/vectors \
  -o output/topics \
  -c output/centroids \
  -k 200 \
  -ow \
  -dm org.apache.mahout.common.distance.CosineDistanceMeasure \
  -cd 0.01 \
  -x 20 \
  -cl
hduser@cyclone:mahout$ bin/mahout clusterdump \
  -i output/topics/clusters-20-final \
  -d output/dict-glob \
  -dt sequencefile \
  -n 10 \
  -o clusters.txt

There are two other case studies in the MIA book which I didn't do, maybe I will come back and do them once I finish the other stuff in the book. I also skipped over LDA (because its now deprecated in favor of two other alogorithms and I couldn't make either of them work). However, I now have a decent understanding of how to do clustering with Mahout.

If you've been reading this post with the intent of learning about Mahout clustering, you may have found the pace a little too fast. Thats because there is a lot of material to cover in a relatively short post. For a gentler introduction please refer to the MIA book and the companion code repository on GitHub..

58 comments (moderated to prevent spam):

Anonymous said...

Thanks alot brother.Am working with SMS texts,do you have any tips?

Sujit Pal said...

You are welcome. Only thing I can think of is that SMS texts are similar to twitter messages, so perhaps some initial cleanup may be needed?

Anonymous said...

Hi , I am looking at LDA libraries or packages which have LDA. Does your JTMT or any other package support LDA or its variants?

Thanks a Ton in advance.

Sujit Pal said...

Hi, no JTMT doesn't have anything for LDA. However, Mahout does if you want to use Hadoop and so does Mallet if not.

Shrida said...

I want to use Web Log files for clustering..Any idea..how to work on it

Sujit Pal said...

You could just vectorize the log lines and feed the vectors into Mahout's clustering algorithm. If you have additional knowledge of the features of the log lines, then perhaps you could use that to vectorize instead of treating the log line as a bag of words.

Shrida said...

Thanks..N sorry for the delay.
I worked but I am finding some problems.Basically the picture isnt yet clear in front of me.
right now I ve just parsed the log files and grouped them into an array...Wud this be useful ?

Sujit Pal said...

If you mean an array of lines, then probably not too useful. You want a term-document matrix where each row of the matrix corresponds to a document and each column corresponds to a word in the /corpus/. For example, suppose your corpus consisted of two documents, each with one sentence: "The dog barks" and "The cat meows". Your vocabulary is the set of words ['barks', 'cat', 'dog', 'meows', 'the'], and your document vectors are: [1, 0, 1, 0, 1] and [0, 1, 0, 1, 1]. The clustering algorithm needs the documents converted to a format like the above, and (I am pretty sure but its been a while so I may be wrong) Mahout supplies a script with which you can convert your text files to a sequence file of document vectors.

Shrida said...

Thanks for the rply sir..I am attaching a line extracted from the log file whhich I need to cluster based on some parameter..Please guide me accordingly
192.168.1.16 - - [11/04/2012:05:17:55 -0400] "POST /aboutme.html HTTP/1.1" 200 100 "http://www.facebook.com/ShridaGirme2" "Mozilla/5.0 (Windows; U; Windows NT 5.2; en-US; rv:1.8.1.7) Gecko/20070914 Firefox/2.0.0.7"

How could I cluster the above line using kmeans clustering algorithm

Sujit Pal said...

You could either treat the corpus as a bag of words in which case the Mahout vectorizer can reduce it to a matrix for you, and it would be a good thing to do as a baseline anyway.

You could then extract one or more features, perhaps things like browser, operating system, HTTP code, HTTP method, IP address, date/time, etc, and use them as categorical features instead of the words. What features you choose would most likely be influenced by what you know about the log files, ie, which applications, what kind of users, etc. You may probably need to write your own vectorizer in this case. You can evaluate the clusters so formed using some similarity metric between the points in the cluster. Since K-Means is not deterministic, ie, the choice of coordinates of the starting clusters determines the final clusters, perhaps use Canopy clustering to select your individual points.

BTW, you will probably get better information if you ask on the Mahout mailing list, there are people there that are actively working on and with Mahout, and perhaps even some doing webserver log analysis, and they would be able to help you better than me based on their experience.

amrendra said...

how to create manually k-mean cluster after that i realize that how to work cluster and how to machine create a cluster

Sujit Pal said...

Hi Amrendra, wikipedia has a reasonably good description that may answer your question. Essentially, you choose initial cluster centroids randomly then cluster points in the input set by distance to the randomly generated centroids. At the end of the iteration, you recalculate the centroid from the points in the cluster, and keep doing iterations until the centroid clusters stop moving around too much. If your data has n features, then each row of data is represented by a point in n-dimensional space, each dimension representing a feature.


Vyankatesh said...

Hello Sujit,
I am new to Hadoop and Mahout.I was trying to cluster user data using Mahout clustering.
Can you please guide me to create vector from the user data, that may be probably in text files.
I was trying the reuters data but it was giving me exception like the FileNotFound Exception to reuters folder extracted.

Sujit Pal said...

Hi Vyankatesh, Mahout offers the seqdirectory subcommand which will take a directory of text files and convert it to a sequence file (one file per line), and a seq2sparse subcommand that will apply the weighting scheme of your choice tf/tfidf and create vectors out of your text files. If you want to do it in code take a look at the code for DictionaryVectorizer in Mahout sources.

Vyankatesh said...

Thanks Sujit for your reply
I have stored the text files in directory: /usr/lib/mahout/description(contains text files) and
for output I have created the another directory: /usr/lib/mahout/description-seqfiles
and I ran the subcommand seqdirectory but it is still giving me the exception like FileNotFoundException

here is the exception that is showing me

[hadoop@jgusrhel5 mahout]$ bin/mahout seqdirectory -c UTF-8 -i /usr/lib/mahout/description -o /usr/lib/mahout/description-seqfiles
MAHOUT_LOCAL is not set; adding HADOOP_CONF_DIR to classpath.
Running on hadoop, using /opt/hadoop/hadoop/bin/hadoop and HADOOP_CONF_DIR=/opt/hadoop/hadoop/conf
MAHOUT-JOB: /usr/lib/mahout/examples/target/mahout-examples-0.9-SNAPSHOT-job.jar
13/09/16 14:11:59 INFO common.AbstractJob: Command line arguments: {--charset=[UTF-8], --chunkSize=[64], --endPhase=[2147483647], --fileFilterClass=[org.apache.mahout.text.PrefixAdditionFilter], --input=[/usr/lib/mahout/description], --keyPrefix=[], --method=[mapreduce], --output=[/usr/lib/mahout/description-seqfiles], --startPhase=[0], --tempDir=[temp]}
Exception in thread "main" java.io.FileNotFoundException: File does not exist: /usr/lib/mahout/description
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:558)
at org.apache.mahout.text.SequenceFilesFromDirectory.runMapReduce(SequenceFilesFromDirectory.java:140)
at org.apache.mahout.text.SequenceFilesFromDirectory.run(SequenceFilesFromDirectory.java:89)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.mahout.text.SequenceFilesFromDirectory.main(SequenceFilesFromDirectory.java:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:68)
at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:139)
at org.apache.mahout.driver.MahoutDriver.main(MahoutDriver.java:195)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:160)

is it the problem related to hadoop file system? or any other thing?

thanks in advance.

Sujit Pal said...

Hi Vyankatesh, looks like your files are on the /local/ filesystem while the job is expecting them to be on HDFS. To run with Hadoop local mode (no HDFS) set MAHOUT_LOCAL to some value (see the log message: MAHOUT_LOCAL is not set; adding HADOOP_CONF_DIR to classpath. Running on hadoop, using /opt/hadoop/hadoop/bin/hadoop and HADOOP_CONF_DIR=/opt/hadoop/hadoop/conf).

Vyankatesh said...

Hi sujit,
Thank you very much for your reply.
I had set the MAHOUT_LOCAL
then it is giving me error like

[hadoop@jgusrhel5 mahout]$ bin/mahout seqdirectory -c UTF-8 -i /usr/lib/mahout/description -o /usr/lib/mahout/description-seqfiles
MAHOUT_LOCAL is set, so we don't add HADOOP_CONF_DIR to classpath.
MAHOUT_LOCAL is set, running locally
Error occurred during initialization of VM
Could not reserve enough space for object heap
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

Then I increased as well as decreased the size of Heap in hadoop-env.sh file still the problem persist.
Thank you.

Sujit Pal said...

Its complaining about the size of the heap. Since you are running locally (via MAHOUT_LOCAL) changing the configuration in hadoop-env.sh does not help. Look inside the bin/mahout script, theres a reference to MAHOUT_HEAPSIZE environment variable, which sets the -Xmx to the underlying java call. It sounds though that perhaps you already have a single-node hadoop set up. If so, it may just be easier to run in hadoop mode instead - just unset MAHOUT_LOCAL and copy the input files to HDFS and rerun.

Vyankatesh said...

Hello Sujit,
Thanks for your help.
As you suggested I had saved the text files in HDFS an ran the kmeans clustering it's generates clusters, I have dumped that output in text files using clusterdump command.
It is showing the 7 clusters are formed, but m confused about the output cause I observed that the cluster contains the values of c vector are exactly equal to the tf/idf vectors values and the value of r is empty, also I was confused about the value of n.
the output is as shown below
***********************************
VL-0{n=2 c=[4:1.560, 18:3.902, 19:1.000, 34:3.186, 44:1.560, 64:3.186, 65:1.336, 67:1.847, 68:2.612, 72:2.206, 76:3.186, 77:3.186, 79:1.560, 80:3.186, 83:1.847, 91:1.847, 92:1.632, 93:1.847, 95:1.847, 97:3.186, 98:3.186, 105:1.560, 106:2.206, 107:1.847, 111:1.847, 116:1.847, 117:1.847, 124:1.847, 126:1.336, 129:2.612, 137:2.206, 144:1.560, 145:3.820, 147:3.186, 148:1.560, 152:1.847, 159:1.847, 165:1.847, 171:3.186, 185:2.206, 188:1.560, 190:1.560, 191:1.154, 199:3.186, 200:1.560, 205:1.999, 216:1.847, 223:1.632, 229:2.612, 230:1.560, 231:1.336, 235:1.847, 239:2.308] r=[]}
Weight : [props - optional]: Point:
1.0: [4:1.560, 18:3.902, 19:1.000, 34:3.186, 44:1.560, 64:3.186, 65:1.336, 67:1.847, 68:2.612, 72:2.206, 76:3.186, 77:3.186, 79:1.560, 80:3.186, 83:1.847, 91:1.847, 92:1.632, 93:1.847, 95:1.847, 97:3.186, 98:3.186, 105:1.560, 106:2.206, 107:1.847, 111:1.847, 116:1.847, 117:1.847, 124:1.847, 126:1.336, 129:2.612, 137:2.206, 144:1.560, 145:3.820, 147:3.186, 148:1.560, 152:1.847, 159:1.847, 165:1.847, 171:3.186, 185:2.206, 188:1.560, 190:1.560, 191:1.154, 199:3.186, 200:1.560, 205:1.999, 216:1.847, 223:1.632, 229:2.612, 230:1.560, 231:1.336, 235:1.847, 239:2.308]
VL-1{n=2 c=[12:1.560, 19:1.000, 22:3.902, 33:3.186, 39:1.847, 46:3.186, 49:1.847, 50:1.847, 58:1.847, 61:2.206, 63:4.506, 67:2.612, 68:1.847, 71:1.847, 74:3.186, 103:5.518, 104:3.186, 106:1.560, 109:3.186, 117:1.847, 121:3.186, 128:1.847, 129:1.847, 131:3.200, 138:1.890, 141:3.902, 145:3.820, 146:2.206, 152:1.847, 161:3.186, 172:1.847, 178:3.902, 182:1.847, 203:3.186, 205:1.154, 219:3.186, 222:1.560, 223:1.154, 226:3.186, 230:1.560, 231:1.336, 232:3.186, 237:3.902] r=[]}
Weight : [props - optional]: Point:
1.0: [12:1.560, 19:1.000, 22:3.902, 33:3.186, 39:1.847, 46:3.186, 49:1.847, 50:1.847, 58:1.847, 61:2.206, 63:4.506, 67:2.612, 68:1.847, 71:1.847, 74:3.186, 103:5.518, 104:3.186, 106:1.560, 109:3.186, 117:1.847, 121:3.186, 128:1.847, 129:1.847, 131:3.200, 138:1.890, 141:3.902, 145:3.820, 146:2.206, 152:1.847, 161:3.186, 172:1.847, 178:3.902, 182:1.847, 203:3.186, 205:1.154, 219:3.186, 222:1.560, 223:1.154, 226:3.186, 230:1.560, 231:1.336, 232:3.186, 237:3.902]

***********************************
Some times in clusterdump it is writing 0 clusters and sometimes it writes 7 clusters
To understand this I was trying to run the Kmeans on Reuters(reuters21578) as mentioned in your blog, but the files are in .sgm format so m unable to convert to text using mvn

Sujit Pal said...

Hi Vyankatesh, you are welcome, happy to help. Not completely sure, but the output seems to indicate that you have 2 centroids (VL-0 and VL-1) formed for 2 input points (n=2) with radius of 0 (ie the centroid is the point). This page describes the format of the clusterdumper output. Regarding number of clusters, you should be able to specify the number of clusters using the k parameter, although since the centroids are chosen randomly, each run may differ in what the cluster actually contains. You may get more help on the Mahout mailing list, since these are people doing this stuff regularly and they may be able to spot things which I am not seeing.

Vyankatesh said...

Hello Sujit,
Thanks for your guidance!
I am able to now get the what exactly the output of clustering means,but I am confused about the how it generates the cluster because in first run its creating clusters with some n documents, in again next run with no changes in input it creates cluster with m documents.
one thing I observed is in first run I got cluster final output at clusters-20-final, in second run I got cluster at clusters-12-final with named vectors option in seq2sparse, while in 3rd run with no changes in i/p it gives cluster at cluster-15-final same named vectors as in 2nd run.
I am confused about it, is it generates random cluster with the subsequent runs?
Thanks in advance.

Sujit Pal said...

This is because KMeans randomly assigns k centroids at the beginning of the run. Because of this, results are not deterministic, and what you are seeing is very possible. Perhaps look at using Canopy clustering to determine initial centroids and pass it in to Kmeans?

Unknown said...

Hello Sujit, I am trying to cluster approx. 2MM customers based on their transaction and demographic attributes. I have around 20 dimensions across which I need to cluster the dataset.

All the examples I have seen so far have highlighted only text based consumption for Mahout. Can Mahout cluster the customers based on non-text data too?

Regards
Piyush

Sujit Pal said...

Hi Piyush,

You are right, the pipeline in the MIA (and in my post too, since I followed MIA) uses text, but it should be possible to do what you ask.

In the text example, the features are the words in the documents. The pipeline tokenizes the text into words with WhitespaceAnalyzer and then computing TF, IDF and TF*IDF vectors for each word.

So you need to preprocess your data a bit. Assume 3 features F1..3 and 3 records R1..3 in your training set:

....F1....F2....F3
R1:..2....NA....NA
R2:..1.....3.....2
R3:.NA.....1.....1

You could pre-process your data to be like this:

R1: F1 F1
R2: F1 F2 F2 F2 F3 F3
F3: F2 F3

and the text pipeline will convert it back to the original sparse matrix.

Anonymous said...

Haii, I want to analyze telecom data - like no: of calls, sms, gprs-usage and then identify customer behaviour. So how can I use this k-means algorithm for the same?

Sujit Pal said...

Its your data, you should know :-). But things that come to mind with K-Means would be to perhaps build clusters with varying k and measure your clusters using some evaluation metric. Once you have a good k, you could run k-means multiple times (the clusters are sensitive to the choice of initial centroid) and choose a cluster set (based on commonality of members in the set across runs). These clusters can yield patterns about how your customers cluster together - obviously its up to you now to identify these patterns using your domain knowledge or descriptive statistics on each cluster data. Another thing you could do (not related to clustering) could be to find correlations between different features in your dataset - perhaps people who do more SMS show more GPRS usage. Finally, if your dataset contains an output variable, perhaps "heavy" vs "light" users, you could train predictive models to predict heavy/light user based on unseen data.

sreejith said...

Hi Sujit,

How to generate --cluster values in kmeans. Because in the kmeans command you speficied -c reuters-initial-cluster but i am not getting any idea where it come from.

When i tried canopy and created cluster-centroids and put it as -c option for kmeans. But then it results in an error "No input clusters found in fraud-canopy-centroid/clusters-0-final. Check your -c argument."

Thank you

Sujit Pal said...

Hi Sreejith, not sure about the latest version of Mahout, but on Mahout 0.8 on my machine, "bin/mahout kmeans --help" tells me this about the "--clusters (-c)" parameter: the input centroids, as vectors. Must be a SequenceFile of Writable Cluster/Canopy. If k is also specified, then a random set of vectors will be selected and written out to this path first. So -c just points to a directory where the initial centroids (chosen randomly) will be written out by the job. I think I used Mahout 0.6 when I wrote this post.

sreejith said...

Thanks for the reply Sujit,

When i direct the -c parameter to an empty folder , it results in a Array Index out of bounds exception in kmeans.

Do u have any hints about this behaviour ?

Thanks,

Sujit Pal said...

Hi Sreejith, maybe try pointing to a non-existent folder?

sreejith said...

Hi Sujit,

I tried out it on reuters and it worked fine. So something wrong in my input data.

Input data is in a csv file and have to inspect for any possible vulnerability.

Any pointers on this ?

Thank you,

Sujit Pal said...

When you say "worked fine", do you mean the issue with the -c parameter? Did you try pointing -c to a non-existent folder on HDFS with your own data? My understanding is that the random centroid assignments are completely independent from the data, so -c should not behave differently for bad data. Unless the error message is lying...

Regarding your second question, Reuters data is text, so it is converted to a SequenceFile with seqdirectory, then vectorized with seq2sparse. If your data is CSV, you will need to build a custom preprocessor to convert it to a sparse vector.

sreejith said...

Thank you Sujit for the pointers.

Yes, the problem of -c solved in reuter data.

Yes, i am just using the CSV in seqdirectory with out any pre processing. Thought mahout would read through the csv headers,data and write the sequential file.

And my data is numeric.

Thanks,

Kirti Pai said...

Hello Sujit!
I have been trying to run a k-means clustering job on the reuters dataset. I encountered the following error:

14/03/27 16:44:32 WARN mapred.LocalJobRunner: job_local1234704244_0001
java.lang.Exception: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:403)
Caused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.initNextRecordReader(CombineFileRecordReader.java:164)
at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.(CombineFileRecordReader.java:126)
at org.apache.mahout.text.MultipleTextFileInputFormat.createRecordReader(MultipleTextFileInputFormat.java:43)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.(MapTask.java:491)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:734)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.initNextRecordReader(CombineFileRecordReader.java:155)
... 12 more
Caused by: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
at org.apache.mahout.text.WholeFileRecordReader.(WholeFileRecordReader.java:59)
... 17 more

Can you please help me resolve this error?!
Is anything wrong with the dataset, cause I will be running the same k-means onto a medical dataset, so it would be helpful to know.
Thank you! :)

Sujit Pal said...

Based on the snippet from your stack trace below, looks like you may be running Mahout on a newer version of Hadoop (2.x). My understanding is that Mahout as released works against Hadoop 1.x.

Caused by: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
at org.apache.mahout.text.WholeFileRecordReader.(WholeFileRecordReader.java:59)

Investigating the error message a bit, looks like Hadoop changed TaskAttemptContext from a class in 1.x to an interface in 2.x, among other things.

So I guess you can either downgrade your Hadoop to 1.x, or refer to this thread on StackOverflow which points to instructions for getting Mahout source suitable for use with Hadoop 2.x.

lokesh said...

hello sujit,
i am writing my own customize distance measure...but i dont know how can i use Mydistnce measure with the algorithms like kmeans...Like thru commnd line we simply wrote org.apche.....cosine distancemeasure...


How can i do this for my Customize distance measure

Sujit Pal said...

Hi Lokesh, take a look at the Mahout KMeans page, it allows you to pass in your custom distance measure using the -dm parameter. To make it accessible to Mahout, you should build a custom JAR and drop it into the $MAHOUT_HOME/lib directory.

lokesh said...

thanks sujith i saw ur replied that Hi Lokesh, take a look at the Mahout KMeans page, it allows you to pass in your custom distance measure using the -dm parameter. To make it accessible to Mahout, you should build a custom JAR and drop it into the $MAHOUT_HOME/lib directory.

but tell me how to create a jar for mahout liberary???.
if i run this jar using -dm parametr ,thn will it be accesble for Mahout...

Sujit Pal said...

Hi Lokesh, assuming you have already written code for the custom distance measure, you must have a project where you compiled it, right? If so, all you have to do is build the JAR using your project's packager. With mvn it is as simple as "mvn package". With ant you have to build a target (you can find the Jar task in the Ant manual) yourself. Other packagers (ivy, gradle, sbt) allow building JARs with varying degrees of ease, check their docs. Assuming you are not using JARs to compile your code against that are outside of the list in Mahout's lib directory, then all you have to do is drop your application's JAR file into Mahout's lib. If you have JARs outside the list, you should drop these JARs in as well.

lokesh said...

sir i am new in mahout so sir please tell me,...
step wise u tell me sir please
i have only one .java file ....and where i keep this file and and what command i pass to make jar of my distance measure...

if i will compile using mvn so will it be make jar ...and u told put ur jar in lib folder but all the class files are in mahout core...

Sujit Pal said...

Hi Lokesh, you may need a bit more background, but hopefully this will be sufficient. Running "mvn package" would create a JAR which you drop into $MAHOUT_HOME/lib directory, then run the mahout script with the -dm parameter.

Manthira Moorthy said...

hi this moorthy, whether we can define our own centroid points for a specific feauture in kmeans. if so how to create a custom centroid ?

Sujit Pal said...

Hi Moorthy, I suppose you could, to control the quality of the clustering with some manual input - interesting idea, never thought of this before. In any case, the centroid would be a n-tuple of numbers between 0 and 1 where n is the number of words in the dictionary-file. These centroids can then be specified as the initial-cluster centers to KMeans.

Venkata siva kamesh Bhallamudi said...

Hi Sujit,
Please suggest me how to do vectorization for the data which as both text and numeric fields.

Each record contains both text fields and numeric fields and there are 100 such fields in a record. I have not find any method to convert this as a vector also I want to use these fields while calculating distance in the distance measure.

Do I need to write my own vector implementation?.

Sujit Pal said...

Hi Venkata, I believe you will have to write your own vectorizer implementation. For numeric fields, they can go into your output vector either normalized or unchanged depending on the consuming algorithm and for text fields they need to be vectorized the standard way. One option could be to leverage Mahouts seq2sparse to generate the text vectors, then join them with your numeric data into your own vectorizer (or adapt the code to include numeric fields).

me said...

Hi , I'm new to this Mahout.Its there anyway that i can analyze and cluster some bag of words inside text file but i define myself the classification . The result might be similar like this

Bigdata
----------
Hadoop
Mapreduce
Cloudera


Database
---------
Query
Speed
SQL

I don't know what kind algo should i use inside mahout. So far i success using the K-Means. The result only show top terms like below

Top Terms:
dan => 0.306998285073021
hadoop => 0.25066306368584196
experience => 0.17834882707417365
generation => 0.17834882707417365
health => 0.17834882707417365
model => 0.17834882707417365
phone => 0.17834882707417365
your => 0.17834882707417365
3d => 0.12611166719912437
api => 0.12611166719912437

I really need your advice. Thanks.

Sujit Pal said...

I don't think an algorithm exists in Mahout to do what you want, but this is quite simple to do yourself. Assuming that your classification fits inside main memory, you can implement it as a map of {word: category} and for each document, tokenize into words and run each word through the map, and get a list of category counts. The document would be in the category with the highest count. This does not even need to be a Map-Reduce job unless your document corpus is very large and you want to parallelize across multiple Map jobs.

me said...

Thank you Sujit Pal for the explanation . I appreciate that. By the way is there any way i can get the categories especially for the technology terms ?

Sujit Pal said...

You are welcome, glad it helped. I did a quick google search for "big data skills taxonomy" but didn't get anything significant but maybe I didn't look hard enough. One option could be to scan the skills section for resumes of people with big data skills, aggregating their self classified skill keywords should provide a classification you could use. Of course you need to have access to such resumes, not sure where one would get that...

Shweta Agrawal said...

Hi,
I am new to mahout, i want implement tweets topic deection through mahout. so, i can implement it?? and do i need to use hadoop also??

Sujit Pal said...

Hi Shweta, if you want to do topic modeling using Mahout, then Mahout only offers a Hadoop based algorithm AFAIK, maybe best to ask on the Mahout ML to be sure. I have blogged about my experience with Mahout topic modeling perhaps you may find it helpful to answer the "how" part of your question. However, if you prefer to not use Hadoop and your data size is relatively small (tweets probably qualify), you may want to check out Mallet (Java) or Gensim (Python).

Nikitha JV said...

Hello boss, Thanks for the tutorial. I have a doubt,
1) is it possible to convert any text file into sequence file?
2) Is this format correct file.txt ==> Sequence file ==> vector format.
3) what is the procedure for doing clustering on simple text file?

Kindly help me out. Thank you so much. You can mail me at nikipraha.18@gmail.com

Sujit Pal said...

Hi Nikitha, you are welcome. Its been a while since I used Mahout, and it looks most clustering algorithms have been deprecated. In the old Mahout, you would convert the text to sequence file to vector. On a simple text file, you might just use something like sklearn - here is an example from their website.

Nikitha JV said...

Thank you so much :)

Soumav Prakash said...

hi sir i am unable to get the clusteredPoints directory after performing kmeans so i am not able to get the output from clusterdump

Sujit Pal said...

Hi Soumav, its been a while so any advice I give here might be completely wrong. If you can verify that you are using the same or compatible version of Mahout that I used (I believe 0.6), and the commands that you used to get to the point you mention, I can try to debug the problem. Also make sure you got clean runs for the commands preceding that point. If not, please specify the error message and stack trace if available.

Girthana said...

Hai Sujit

I have been trying to run K-Means clustering on Newsgroup dataset. I'm getting the following error when I try to create a sequence file. Could you please help me with this.

Exception in thread "main" java.io.FileNotFoundException: File does not exist: /20news-all
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
at org.apache.mahout.text.SequenceFilesFromDirectory.runMapReduce(SequenceFilesFromDirectory.java:162)
at org.apache.mahout.text.SequenceFilesFromDirectory.run(SequenceFilesFromDirectory.java:91)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at org.apache.mahout.text.SequenceFilesFromDirectory.main(SequenceFilesFromDirectory.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:71)
at org.apache.hadoop.util.ProgramDriver.run(ProgramDriver.java:144)
at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:152)
at org.apache.mahout.driver.MahoutDriver.main(MahoutDriver.java:195)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

Sujit Pal said...

Hi Girthana, from the stack trace it looks like you have specified the path to the input directory as /20news-all, ie its a directory under the root directory of the system. The location seems unusual - perhaps the actual location is under your home directory? Easy way to verify is to do an ls on the path and see if you get the same error.