A colleague pointed out that there exists an inverse use-case to finding outliers in medical claims. This one is to group procedure codes into clusters, or what Health Insurance companies call Episode Treatment Groups (ETG). Essentially an ETG is a way to cluster a group of services (procedures) into a medically relevant unit.

The CMS.gov dataset provides slightly under 16 million anonymized outpatient claims for Medicare/Medicaid patients. Each outpatient record can have upto 6 ICD-9 procedure codes, upto 10 ICD-9 diagnosis codes and upto 45 HCPCS codes. So just like the outlier case, we can derive a measure of similarity between a pair of codes as the average co-occurrence within claims across the dataset.

I decided to use a variant of the DBSCAN clustering algorithm. This post provides some tips on how to implement DBSCAN in a distributed manner - I used the ideas in this post to develop my implementation. The intuition behind my clustering algorithm goes something like this.

We calculate the similarity s

_{AB}between a pair of codes A and B as the number of times they co-occur in the corpus. Clustering algorithms need a distance measure, so we treat the distance d

_{AB}as the reciprocal of their similarity, ie 1/s

_{AB}. The DBSCAN clustering algorithm works by selecting other points around each point that are within a specified distance ε from each other. Candidate cluster centroids are those that have at least MinPoints codes within this distance ε. My algorithm deviates from DBSCAN at this point - instead of finding density-reachable codes I just find the Top-N densest clusters. Density is calculated as the number of codes within a circular area of the mean radius, i.e. N

^{2}/ πΣ

_{i=0..N}d

^{2}. We then calculate the top N densest code clusters - these are our derived ETGs.

The Scalding code below does just this. We simplify a bit by not calculating using some constants such as π but otherwise the code is quite faithful to the algorithm described above.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | ```
// Source: src/main/scala/com/mycompany/cmspp/cluster/CodeCluster.scala
package com.mycompany.cmspp.clusters
import com.twitter.scalding.Job
import com.twitter.scalding.Args
import com.twitter.scalding.TextLine
import com.twitter.scalding.Tsv
import scala.io.Source
class CodeCluster(args: Args) extends Job(args) {
def extractPairs(line: String): List[(String,String)] = {
val cols = line.split(",").toList
val codes = (cols.slice(22, 27) // ICD9 procedure code cols
.map(x => if (x.isEmpty) x else "ICD9:" + x)
::: cols.slice(31, 75) // HCPCS (CPT4) procedure cols
.map(x => if (x.isEmpty) x else "HCPCS:" + x))
.filter(x => (! x.isEmpty))
val cjoin = for {codeA <- codes; codeB <- codes} yield (codeA, codeB)
cjoin.filter(x => x._1 < x._2)
}
val Epsilon = args("epsilon").toDouble
val MinPoints = args("minpoints").toInt
val NumClusters = args("nclusters").toInt
val output = Tsv(args("output"))
val dists = TextLine(args("input"))
.read
// compute pair-wise distances between procedure codes
.flatMapTo('line -> ('codeA, 'codeB)) { line: String => extractPairs(line) }
.groupBy('codeA, 'codeB) { group => group.size('sim) }
.map('sim -> 'radius) { x: Int => (1.0D / x) }
.discard('sim)
// group by codeA and retain only records which are within epsilon distance
.groupBy('codeA) { group => group.sortBy('radius).reverse }
.filter('radius) { x: Double => x < Epsilon }
val codeCounts = dists
.groupBy('codeA) { group =>
group.sizeAveStdev('radius -> ('count, 'mean, 'std))
}
// only retain codes that have at least MinPoints points within Epsilon
.filter('count) { x: Int => x > MinPoints }
.discard('std)
val densities = dists.joinWithSmaller(('codeA -> 'codeA), codeCounts)
.map(('mean, 'count) -> 'density) { x: (Double,Int) =>
1.0D * Math.pow(x._2, 2) / Math.pow(x._1, 2)
}
.discard('radius, 'count)
// sort the result by density descending and find the top N clusters
val densestCodes = densities.groupAll { group =>
group.sortBy('density).reverse }
.unique('codeA)
.limit(NumClusters)
// join code densities with densest codes to find final clusters
densities.joinWithTiny(('codeA -> 'codeA), densestCodes)
.groupBy('codeA) { group => group.mkString('codeB, ",")}
.write(output)
}
object CodeCluster {
def main(args: Array[String]): Unit = {
// populate redis cache
new CodeCluster(Args(List(
"--local", "",
"--epsilon", "0.3",
"--minpoints", "10",
"--nclusters", "10",
"--input", "data/outpatient_claims.csv",
"--output", "data/clusters.csv"
))).run
Source.fromFile("data/clusters.csv")
.getLines()
.foreach(Console.println(_))
}
}
``` |

I ran this locally with 1 million claims (out of the 16 million claims in my dataset) and got results like this:

1 2 3 4 5 6 7 8 9 10 | ```
HCPCS:00142 HCPCS:85025, HCPCS:36415, HCPCS:93005, HCPCS:80053, ...
HCPCS:00300 HCPCS:85025, HCPCS:80053, HCPCS:36415, HCPCS:99284, ...
HCPCS:00400 HCPCS:85025, HCPCS:93005, HCPCS:80053, HCPCS:36415, ...
HCPCS:00532 HCPCS:85025, HCPCS:36415, HCPCS:80048, HCPCS:G0378, ...
HCPCS:0073T HCPCS:77417, HCPCS:77336, HCPCS:77427, HCPCS:77280, ...
HCPCS:00740 HCPCS:85025, HCPCS:36415, HCPCS:93005, HCPCS:85610, ...
HCPCS:00750 HCPCS:36415, HCPCS:85025, HCPCS:85610, HCPCS:J3010, ...
HCPCS:00790 HCPCS:36415, HCPCS:85025, HCPCS:80048, HCPCS:80053, ...
HCPCS:00810 HCPCS:36415, HCPCS:85025, HCPCS:93005, HCPCS:80053, ...
HCPCS:00830 HCPCS:36415, HCPCS:85025, HCPCS:80048, HCPCS:93005, ...
``` |

*[*

**Edit: 07/22/2014:**This approach does not produce clusters. Notice in the data the same HCPCS:85025 is part of the first 3 clusters, which is obviously not wanted. I will implement the last part of the DBSCAN algorithm and update this page when I am done.]And thats all I have for today. I'd like to point out a new book on Scalding, Programming MapReduce with Scalding by Antonios Chalkiopoulos. I was quite impressed by this book, you can read my review on Amazon if you are interested.