(1)根据srcPart(userPart).getPartition(userId)得到srcBlockId,根据dstPart(itemPart).getPartition(itemId)得到dstBlockId。得到的关系如下:
(2)将同一个(srcBlockId, dstBlockId)的打分信息(userId,
itemId, rating)拼凑在一起放到RatingBlock的结构下,RatingBlock中装有(userIds,
itemIds, ratings)的数据,最终返回((srcBlockId, dstBlockId),
RatingBlock)的结构。
(2)将RatingBlock中每个dstId映射成index,得到dstLocalIndices,此时值的形式为(srcBlockId,
(dstBlockId, srcIds, dstLocalIndices, ratings))。
(3)将dstBlockId +
dstLocalIndices得到dstEncodedIndices,此时值的形式为(srcBlockId, (srcIds,
dstEncodedIndices, ratings))。
(4)将上一步的值转化为类CSC格式,将值中的srcId排序,其他的dstEncodedIndices和ratings中元素也做相应调整。接着将srcIds中元素去重,并记录每个srcId元素的作用范围,得到dstPtrs。此时值的形式为(srcBlock,
InBlock(uniqueSrcIds, dstPtrs, dstEncodedIndices, ratings))。
6、分别根据userInBlocks和itemInBlocks初始化userFactors和itemFactors,userFactor的格式为(srcBlockId,
factors),factors的格式又是Array(srcId, factor)。
7、交替计算userFactors和itemFactors,先讲从userFactors计算itemFactors.
(1)通过srcOutBlocks和srcFactorBlocks(上次生成的factor值)得到每个dstBlockId有链接的每个srcBlockId的每个srcBlockId的每个srcId对应的factor值,如下所示:
(2)根据InBlocks和merged进行join,此时InBlocks为(dstBlockId,
InBlock(uniqueDstIds, srcPtrs, srcEncodedIndices,
ratings)),merged为(dstBlockId, srcBlockId * localIndex *
factor三维数组)。
根据InBlocks中srcEncodedIndices可以解析出srcBlockId和srcLocalIndex,从而获取获取srcEncodedIndices中每个元素的factor。InBlocks中srcPtrs划定了每个uniqueDstId对应的srcId的个数,取对应的所有srcId的factor的值和InBlocks中对应的所有dstId的rating值,计算出每个dstId的factor值。
package
com.pr.fortest.als
import
org.apache.log4j.{Level, Logger}
import
org.apache.spark.storage.StorageLevel
import
org.apache.spark.{HashPartitioner, SparkConf,
SparkContext}
object
BlasTest {
  def main(args:
Array[String]): Unit = {
    //
屏蔽日志
   
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
   
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //
设置运行环境
    val conf =
new
SparkConf().setAppName("ALSTest").setMaster("local[2]")
    val sc =
new SparkContext(conf)
    val
ratingsList = List((1, 2, 1.0), (176, 3, 2.0), (177, 3, 2.0),(178,
3, 2.0),(179, 3, 2.0),(175, 3, 2.0),(200, 1, 1.0), (25, 3,
3.0))
    val rank =
10
    val
maxIter = 11
    val
ratings = sc.parallelize(ratingsList).map(x => Rating(x._1,
x._2, x._3))
    val
userPart = new HashPartitioner(2)
    val
itemPart = new HashPartitioner(2)
    val
userLocalIndexEncoder = new
LocalIndexEncoder2(userPart.numPartitions)
    val
itemLocalIndexEncoder = new
LocalIndexEncoder2(itemPart.numPartitions)
    val
partitionRatings = new PartitionRatings()
    val
blockRatings = partitionRatings.partitionRatings(ratings, userPart,
itemPart)
    val
(userInBlocks, userOutBlocks) = partitionRatings.makeBlocks("user",
blockRatings, userPart, itemPart,
StorageLevel.MEMORY_AND_DISK)
   
userOutBlocks.count()
    val
swappedBlockRatings = blockRatings.map{
   
  case ((userBlockId, itemBlockId),
RatingBlock(userIds, itemIds, localRatings)) =>
   
    ((itemBlockId, userBlockId),
RatingBlock(itemIds, userIds, localRatings))
   
}
    val
(itemInBlocks, itemOutBlocks) = partitionRatings.makeBlocks("item",
swappedBlockRatings, itemPart, userPart,
StorageLevel.MEMORY_AND_DISK)
   
itemOutBlocks.count()
    val
factorCalculator = new FactorCalculator()
    var
userFactors = factorCalculator.initialize(userInBlocks,
rank)
    var
itemFactors = factorCalculator.initialize(itemInBlocks,
rank)
    for(iter
<- 0 to maxIter){
   
  itemFactors =
factorCalculator.computeFactors(userFactors, userOutBlocks,
itemInBlocks, rank, 0.01, userLocalIndexEncoder)
   
  userFactors =
factorCalculator.computeFactors(itemFactors, itemOutBlocks,
userInBlocks, rank, 0.01, itemLocalIndexEncoder)
   
}
   
itemFactors.take(10).foreach{case(idx, arr_arr)
=>
   
  println(idx)
   
 
println(arr_arr(0).mkString(":"))
   
  println("-------------")
   
}
   
sc.stop()
  }
}
2、FactorCalculator.scala,主要是计算因子相关的操作
package
com.pr.fortest.als
import
org.apache.spark.rdd.RDD
import
scala.util.Random
import
com.github.fommil.netlib.BLAS.{getInstance =>
blas}
import
org.apache.spark.HashPartitioner
// 计算因子
class FactorCalculator
{
  //
初始化每个因子
  def
initialize(inBlocks: RDD[(Int, InBlock)], rank: Int):
RDD[(Int,  Array[Array[Float]])] =
{
 
  inBlocks.map{case(srcBlockId, inBlock)
=>
 
      val
factors = Array.fill(inBlock.srcIds.length){
 
     
  val factor =
Array.fill(rank)(Random.nextGaussian().toFloat)
 
     
  val nrm = blas.snrm2(rank, factor,
1)
 
     
  blas.sscal(rank, 1.0f/nrm, factor,
1)  // 归一化操作
 
     
  factor
 
     
}
 
    (srcBlockId,
factors)
 
  }
 
}
  def
computeFactors(  // 以用户因子为例
 
     
     
     
srcFactorBlocks: RDD[(Int, Array[Array[Float]])], 
// 用户分块id,以及这个分块下每个用户对应的阶长数组
 
     
     
     
srcOutBlocks: RDD[(Int, Array[Array[Int]])], //
用户分块id,和这个分块有链接的各产品分块对应的用户本地索引数组
 
     
     
     
dstInBlocks: RDD[(Int, InBlock)], //
产品分块,以及唯一产品id数组/唯一产品id数组对应用户的累计条数数组/用户编码数组/产品对应用户得分
 
     
     
      rank:
Int,
 
     
     
      regParam:
Double,
 
     
     
     
srcEncoder: LocalIndexEncoder2): RDD[(Int, Array[Array[Float]])] =
{
 
  val numSrcBlocks =
srcFactorBlocks.partitions.length
 
  val srcOut =
srcOutBlocks.join(srcFactorBlocks).flatMap{
 
    case(srcBlockId,
(srcOutBlock, srcFactors)) =>
 
     
srcOutBlock.zipWithIndex.map{case(activeIndices, dstBlockId) =>
// 搞出产品blockid
 
     
  (dstBlockId, (srcBlockId, activeIndices.map(idx
=> srcFactors(idx))))
 
     
}
 
  } //
返回key是产品分块id,value的key是用户分块id,value的value是这个产品对应用户分块里每个产品分块id对应的用户因子数组
 
  val merged = srcOut.groupByKey(new
HashPartitioner(dstInBlocks.partitions.length))
 
 
dstInBlocks.join(merged).mapValues{
 
    case(InBlock(dstIds, srcPtrs,
srcEncodedIndices, ratings), srcFactors) =>
 
      val
sortedSrcFactors = new
Array[Array[Array[Float]]](numSrcBlocks)
 
     
srcFactors.foreach{ case(srcBlockId, factors)
=>
 
     
  sortedSrcFactors(srcBlockId) =
factors
 
     
}
 
      val
dstFactors = new
Array[Array[Float]](dstIds.length)
 
      var j =
0
 
      val ls =
new NormalEquation2(rank)
 
      while(j
< dstIds.length){
 
     
  ls.reset()
 
     
  var i = srcPtrs(j)
 
     
  var numExplicits = 0
 
     
  while(i < srcPtrs(j + 1)){
 
     
    val encoded =
srcEncodedIndices(i)
 
     
    val blockId =
srcEncoder.blockId(encoded)
 
     
    val localIndex =
srcEncoder.localIndex(encoded)
 
     
    val srcFactor =
sortedSrcFactors(blockId)(localIndex)
 
     
    val rating =
ratings(i)
 
     
    ls.add(srcFactor,
rating)
 
     
    numExplicits +=
1
 
     
    i += 1
 
     
  }
 
     
  dstFactors(j) = NNLS2.solve(rank, ls.ata,
ls.atb, 0.01)
 
     
  j += 1
 
     
}
 
     
dstFactors
 
  }
 
}
}
 
3、LocalIndexEncoder2.scala,用来将分区+本地索引信息放入一个Int中表示
package
com.pr.fortest.als
class
LocalIndexEncoder2(numBlocks: Int) extends Serializable
{
  val
numLocalIndexBits = math.min(, 31)  //
取非0前面0的个数,也就是localIndex占用区间的位数
  val
localIndexMask = (1 << numLocalIndexBits) - 1 //
掩码
  def
getIndexBit(): Int = numLocalIndexBits
  def
getLocalIndexMask(): String =
localIndexMask.toBinaryString
  //
将(blockId, localIndex)存放到一个整数里面
  def
encode(blockId: Int, localIndex: Int): Int = {
 
  require(blockId < numBlocks)
 
  require((localIndex & ~localIndexMask) ==
0)  // blockId存放区间和localIndex存放区间不能有交集,
也就是localIndex不能太大跑到非localIndex的区间去了
 
  (blockId << numLocalIndexBits) |
localIndex
 
}
  def
blockId(encoded: Int): Int = {
 
  encoded >>>
numLocalIndexBits  // 注意是>>>
而非>>,使用不带符号的位移方式
 
}
  def
localIndex(encoded: Int): Int = {
 
  encoded & localIndexMask
 
}
}
 
4、NNLS2.scala,解决非负最小平方问题
package
com.pr.fortest.als
import
java.util
import
com.github.fommil.netlib.BLAS.{getInstance =>
blas}
import
scala.util.Random
//
用修改过的梯度映射法解决非负最小平方问题
object NNLS2
{
  private
var ata: Array[Double] = _
  private
var rank: Int = -1
  private
var workspace: Workspace = _
  //
跟计算相关的中间变量
  class
Workspace(val n: Int){
 
  val scratch = new
Array[Double](n)
 
  val grad = new Array[Double](n)
 
  val x = new Array[Double](n)
 
  val dir = new Array[Double](n)
 
  val lastDir = new
Array[Double](n)
 
  val res = new Array[Double](n)
 
  def wipe(): Unit = {
 
    util.Arrays.fill(scratch,
0.0)
 
    util.Arrays.fill(grad,
0.0)
 
    util.Arrays.fill(x,
0.0)
 
    util.Arrays.fill(dir,
0.0)
 
    util.Arrays.fill(lastDir,
0.0)
 
    util.Arrays.fill(res,
0.0)
 
  }
 
}
  def
createWorkspace(n: Int): Workspace = {
 
  new Workspace(n)
 
}
  //
初始化中间变量
  def
initialize(rank: Int): Unit = {
 
  this.rank = rank
 
  workspace =
createWorkspace(rank)
 
  ata = new Array[Double](rank *
rank)
 
}
  //
填充ata矩阵,之前只是上三角矩阵的值
  def
fillAtA(triAtA: Array[Double], lambda: Double) {
 
  var i = 0
 
  var pos = 0
 
  var a = 0.0
 
  while (i < rank) {
 
    var j = 0
 
    while (j <= i)
{
 
      a =
triAtA(pos)
 
      ata(i *
rank + j) = a
 
      ata(j *
rank + i) = a
 
      pos +=
1
 
      j +=
1
 
    }
 
    ata(i * rank + i) +=
lambda
 
    i += 1
 
  }
 
}
  //
solve主入口函数
  def
solve(k: Int, ata1: Array[Double], atb1: Array[Double], lambda:
Double): Array[Float] = {
 
  val rank = k
 
  initialize(rank)
 
  fillAtA(ata1, lambda)
 
  val x = solve(ata, atb1,
workspace)
 
  x.map(x => x.toFloat)
 
}
  //
使用梯度下降的方法计算因子,中间比较复杂的是step的更新
  def
solve(ata: Array[Double], atb: Array[Double], ws: Workspace):
Array[Double] = {
 
  ws.wipe()
 
  val n = atb.length
 
  val scratch = ws.scratch
 
  // 计算步长的公式,dir:梯度方向, res: 残差
 
  def steplen(dir: Array[Double], res:
Array[Double]): Double = {
 
    val top = blas.ddot(n, dir,
1, res, 1)
 
    blas.dgemv("N", n, n, 1.0,
ata, n, dir, 1, 0.0, scratch, 1)
 
    top / (blas().ddot(n,
scratch, 1, dir, 1) + 1e-20)
 
  }
 
  def stop(step: Double, ndir: Double, nx:
Double): Boolean = {
 
   
((step.isNaN)
 
      || (step
< 1e-7)
 
      || (step
> 1e40)
 
      || (ndir
< 1e-12 * nx)
 
      || (ndir
< 1e-32)
 
     
)
 
  }
 
  val grad = ws.grad
 
  val x = ws.x
 
  val dir = ws.dir
 
  val lastDir = ws.lastDir
 
  val res = ws.res
 
  val iterMax = math.max(400, 20 *
n)
 
  var lastNorm = 0.0
 
  var iterno = 0
 
  var lastWall = 0
 
  var i = 0
 
  while(iterno < iterMax){
 
    blas.dgemv("N", n, n, 1.0,
ata, n, x, 1, 0.0, res, 1)
 
    blas.daxpy(n, -1.0, atb, 1,
res, 1)
 
    blas.dcopy(n, res, 1, grad,
1)
 
    i = 0
 
    while(i <
n){
 
      if(grad(i)
> 0.0 && x(i) == 0.0){
 
     
  grad(i) = 0.0
 
     
}
 
      i = i +
1
 
    }
 
    val ngrad = blas.ddot(n,
grad, 1, grad, 1)
 
    blas.dcopy(n, grad, 1, dir,
1)
 
    var step = steplen(grad,
res)
 
    var ndir =
0.0
 
    val nx = blas.ddot(n, x, 1,
x, 1)
 
    if(iterno > lastWall +
1){
 
      val alpha
= ngrad / lastNorm
 
     
blas.daxpy(n, alpha, lastDir, 1, dir, 1)
 
      val dstep
= steplen(dir, res)
 
      ndir =
blas.ddot(n, dir, 1, dir, 1)
 
      if
(stop(dstep, ndir, nx)) {
 
     
  // reject the CG step if it could lead to
premature termination
 
     
  blas.dcopy(n, grad, 1, dir, 1)
 
     
  ndir = blas.ddot(n, dir, 1, dir,
1)
 
      } else
{
 
     
  step = dstep
 
     
}
 
    }else{
 
      ndir =
blas.ddot(n, dir, 1, dir, 1)
 
    }
 
    if(stop(step, ndir,
nx)){
 
      return
x.clone
 
    }
 
    i = 0
 
    while(i <
n){
 
      if(step *
dir(i) > x(i)){
 
     
  step = x(i) / dir(i)
 
     
}
 
      i = i +
1
 
    }
 
    i = 0
 
    // 迈步操作
 
    while(i <
n){
 
      if(step *
dir(i) > x(i) * (1 - 1e-14)){
 
     
  x(i) = 0
 
     
  lastWall = iterno
 
     
}else{
 
     
  x(i) -= step * dir(i)
 
     
}
 
      i = i +
1
 
    }
 
    iterno = iterno +
1
 
    blas.dcopy(n, dir, 1,
lastDir, 1)
 
    lastNorm =
ngrad
 
  }
 
  x.clone
 
}
  //
测试
  def
main(args: Array[String]) = {
 
  val n = 10
 
  val lambda = 0.02
 
  val ata =
Array.fill(n*(n+1)/2)(Random.nextGaussian())
 
  val atb =
Array.fill(n)(Random.nextGaussian())
 
  println("[ata]" +
ata.mkString(":"))
 
  println("[atb]" +
atb.mkString(":"))
 
  val result = solve(n, ata, atb,
lambda).mkString(":")
 
  println(result)
 
}
}
 
5、NormalEquation2.scala,正规方程类。
package
com.pr.fortest.als
import
com.github.fommil.netlib.BLAS.{getInstance =>
blas}
class NormalEquation2(val k:
Int) extends Serializable {
  val triK
= k * (k + 1) / 2
  val ata =
new Array[Double](triK)
  val atb =
new Array[Double](k)
  val da =
new Array[Double](k)
  val upper
= "U"
  def
copyToDouble(a: Array[Float]) = {
 
  var i = 0
 
  while(i < k){
 
    da(i) = a(i)
 
    i += 1
 
  }
 
}
  def
add(a: Array[Float], b: Double, c: Double=1.0): this.type =
{
 
  require(c >= 0.0)
 
  require(a.length == k)
 
  copyToDouble(a)
 
  blas.dspr(upper, k, c, da, 1,
ata)
 
  if(b != 0.0){
 
    blas.daxpy(k, b, da, 1, atb,
1)
 
  }
 
  this
 
}
  def
merge(other: NormalEquation2): this.type = {
 
  require(other.k == k)
 
  blas.daxpy(ata.length, 1.0, other.ata, 1, ata,
1)
 
  blas.daxpy(atb.length, 1.0, other.atb, 1, atb,
1)
 
  this
 
}
  def
reset(): Unit = {
 
 
 
 
 
}
}
 
6、PartitionRatings.scala,主要是计算InBlock和OutBlock
package
com.pr.fortest.als
import
org.apache.spark.{HashPartitioner, Partitioner}
import
org.apache.spark.rdd.RDD
import
org.apache.spark.storage.StorageLevel
import
org.apache.spark.util.collection.OpenHashSet
import
scala.collection.mutable
import
scala.util.Sorting
// 原始的用户打分类
case class Rating(user: Int,
item: Int, rating: Double)
// 一个rating block存放多个src
IDs, dst IDs以及ratings
case class
RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings:
Array[Double]){
  def size:
Int = srcIds.length
 
require(dstIds.length == srcIds.length)
 
require(ratings.length == srcIds.length)
}
//
构建RatingBlock类
class RatingBlockBuilder
extends Serializable{
  private
val  srcIds =
mutable.ArrayBuilder.make[Int]
  private
val dstIds = mutable.ArrayBuilder.make[Int]
  private
val ratings = mutable.ArrayBuilder.make[Double]
  var size
= 0
  //
增加一个rating信息
  def
add(r: Rating): this.type = {
 
  size += 1
 
  srcIds += r.user
 
  dstIds += r.item
 
  ratings += r.rating
 
  this
 
}
  //
合并另一个RatingBlockBuilder
  def
merge(other: RatingBlock): this.type = {
 
  size += other.srcIds.length
 
  srcIds ++= other.srcIds
 
  dstIds ++= other.dstIds
 
  ratings ++= other.ratings
 
  this
 
}
  //
构建一个RatingBlock
  def
build(): RatingBlock = {
 
  RatingBlock(srcIds.result(), dstIds.result(),
ratings.result())
 
}
}
//
将原始打分放入blocks中
// 返回格式:((srcBlockId,
dstBlockId), ratingBlock)
class PartitionRatings
{
  //
将原始打分数据放入blocks中
  //
返回:((srcBlockId, dstBlockId), ratingBlock)
  def
partitionRatings(ratings: RDD[Rating], srcPart: Partitioner,
dstPart: Partitioner): RDD[((Int, Int), RatingBlock)] =
{
 
  val numPartitions = srcPart.numPartitions *
dstPart.numPartitions
 
  ratings.mapPartitions{iter
=>
 
    val builders =
Array.fill(numPartitions)(new RatingBlockBuilder)
 
    iter.flatMap{ r
=>
 
      val
srcBlockId = srcPart.getPartition(r.user) //
根据value生成一个partitionId
 
      val
dstBlockId = dstPart.getPartition(r.item)
 
      val idx =
srcBlockId + dstBlockId * srcPart.numPartitions
 
      val
builder = builders(idx)
 
     
builder.add(r)
 
     
if(builder.size >= 2048){  //
builder数据太大则返回
 
     
  builders(idx) = new
RatingBlockBuilder
 
     
  Iterator.single(((srcBlockId, dstBlockId),
builder.build()))
 
      }else
{
 
     
  Iterator.empty
 
     
}
 
    } ++ { //
没返回部分的builder统一返回
 
     
builders.zipWithIndex.filter(_._1.size > 0).map{case(block, idx)
=>
 
     
  val srcBlockId = idx %
srcPart.numPartitions
 
     
  val dstBlockId = idx /
srcPart.numPartitions
 
     
  ((srcBlockId, dstBlockId),
block.build())
 
     
}
 
    }
 
  }.groupByKey().mapValues{blocks
=>
 
    val builder = new
RatingBlockBuilder
 
   
blocks.foreach(builder.merge)
 
   
builder.build()
 
  }.setName("ratingBlocks")
 
}
  //
根据RatingBlock创建in-blocks和out-blocks
  def
makeBlocks(
 
     
     
  prefix: String,
 
     
     
  ratingBlocks: RDD[((Int, Int),
RatingBlock)],
 
     
     
  srcPart: Partitioner,
 
     
     
  dstPart: Partitioner,
 
     
     
  storageLevel: StorageLevel): (RDD[(Int,
InBlock)], RDD[(Int, Array[Array[Int]])]) = {
 
  val inBlocks =
ratingBlocks.map{
 
    case((srcBlockId,
dstBlockId), RatingBlock(srcIds, dstIds, ratings))
=>
 
      val
dstIdSet = new OpenHashSet[Int](1 << 20)
 
     
dstIds.foreach(dstIdSet.add)
 
      val
sortedDstIds = new Array[Int](dstIdSet.size)
 
      var i =
0
 
      var pos =
dstIdSet.nextPos(0)
 
      while(pos
!= -1){
 
     
  sortedDstIds(i) =
dstIdSet.getValue(pos)
 
     
  pos = dstIdSet.nextPos(pos + 1)
 
     
  i += 1
 
     
}
 
     
Sorting.quickSort(sortedDstIds)  //
dstIds去重后排序
 
      val
dstIdToLocalIndex = new mutable.OpenHashMap[Int,
Int](sortedDstIds.length)  //
dstId值对应的索引关系
 
      i =
0
 
      while(i
< sortedDstIds.length){
 
     
  dstIdToLocalIndex.update(sortedDstIds(i),
i)
 
     
  i += 1
 
     
}
 
      val
dstLocalIndices = dstIds.map(dstIdToLocalIndex.apply) //
将dstIds值转化为索引值
 
     
(srcBlockId, (dstBlockId, srcIds, dstLocalIndices,
ratings))
 
  }.groupByKey(new
HashPartitioner(srcPart.numPartitions))
 
    .mapValues{iter
=>
 
      val
builder = new UncompressedInBlockBuilder(new
LocalIndexEncoder2(dstPart.numPartitions))
 
     
iter.foreach{case(dstBlockId, srcIds, dstLocalIndices, ratings)
=>
 
     
    builder.add(dstBlockId,
srcIds, dstLocalIndices, ratings)
 
     
}
 
     
builder.build().compress()
 
    }.setName(prefix +
"InBlocks")
 
   
.persist(storageLevel)
 
  val outBlocks = inBlocks.mapValues{ case
InBlock(srcIds, dstPtrs, dstEncodedIndices, _)
=>
 
      val
encoder = new
LocalIndexEncoder2(dstPart.numPartitions)
 
      val
activeIds =
Array.fill(dstPart.numPartitions)(mutable.ArrayBuilder.make[Int])
 
      var i =
0
 
      val seen =
new Array[Boolean](dstPart.numPartitions)
 
      while(i
< srcIds.length){
 
     
  var j = dstPtrs(i)
 
     
  java.util.Arrays.fill(seen,
false)
 
     
  while(j < dstPtrs(i + 1)){
 
     
    val dstBlockId =
encoder.blockId(dstEncodedIndices(j))
 
     
   
if(!seen(dstBlockId)){  //
seen的作用是标记某个用户是否与某个产品block相连,true就是相连,false不相连
 
     
     
activeIds(dstBlockId) += i
 
     
     
seen(dstBlockId) = true
 
     
    }
 
     
    j += 1
 
     
  }
 
     
  i += 1
 
     
}
 
     
activeIds.map{x =>
 
     
  val result = x.result()
 
     
  println("------")
 
     
  result.foreach(println)
 
     
  println("------")
 
     
  result
 
     
}  //
最终返回二维矩阵,行是dstBlockId,列是和这个dstBlockId有关联的用户索引数组
 
  }.setName(prefix + "OutBlocks")
 
   
.persist(storageLevel)
 
  (inBlocks, outBlocks)
 
}
}
//
构建无压缩的in-block的构建类,格式是(srcId, dstEncodedIndex,
rating)
class
UncompressedInBlockBuilder(encoder:
LocalIndexEncoder2){
  private
val srcIds = mutable.ArrayBuilder.make[Int]
  private
val dstEncodedIndices =
mutable.ArrayBuilder.make[Int]  // (blockid,
dstLocalIndices)组成的整数
  private
val ratings = mutable.ArrayBuilder.make[Double]
  def
add(dstBlockId: Int, srcIds: Array[Int], dstLocalIndices:
Array[Int], ratings: Array[Double]): this.type = {
 
  val sz = srcIds.length
 
  require(dstLocalIndices.length ==
sz)
 
  require(ratings.length == sz)
 
  this.srcIds ++= srcIds
 
  this.ratings ++= ratings
 
  var j = 0
 
  while(j < sz){
 
    this.dstEncodedIndices +=
encoder.encode(dstBlockId, dstLocalIndices(j))
 
    j += 1
 
  }
 
  this
 
}
  def
build(): UncompressedInBlock = {
 
  new UncompressedInBlock(srcIds.result(),
dstEncodedIndices.result(), ratings.result())
 
}
}
// 一个blockId,包含(srcIds,
dstEncodedIndices, ratings)
class
UncompressedInBlock(val srcIds: Array[Int], val dstEncodedIndices:
Array[Int], val ratings: Array[Double]){
  def
length: Int = srcIds.length
  def
compress() = {
 
  sort()
 
  val sz = length
 
  val uniqueSrcIdsBuilder =
mutable.ArrayBuilder.make[Int]
 
  val dstCountsBuilder =
mutable.ArrayBuilder.make[Int]
 
  var preSrcId = srcIds(0)
 
  uniqueSrcIdsBuilder += preSrcId
 
  var curCount = 1
 
  var i = 1
 
  while(i < sz){
 
    val srcId =
srcIds(i)
 
    if(srcId !=
preSrcId){
 
     
uniqueSrcIdsBuilder += srcId
 
     
dstCountsBuilder += curCount
 
      preSrcId =
srcId
 
      curCount =
0
 
    }
 
    curCount +=
1
 
    i += 1
 
  }
 
  dstCountsBuilder += curCount
 
  val uniqueSrcIds =
uniqueSrcIdsBuilder.result()
 
  val numUniqueSrcIds =
uniqueSrcIds.length
 
  val dstCounts =
dstCountsBuilder.result()
 
  val dstPtrs = new Array[Int](numUniqueSrcIds +
1)
 
  var sum = 0
 
  i = 0
 
  while(i < numUniqueSrcIds){
 
    sum +=
dstCounts(i)
 
    i += 1
 
    dstPtrs(i) =
sum
 
  }
 
  InBlock(uniqueSrcIds, dstPtrs,
dstEncodedIndices, ratings)
 
}
  //
srcIds中元素升序排列,同时其他数组也根据srcId顺序做相应调整
  def
sort() = {
 
  val srcIdsCopy = srcIds.clone()
 
  val id2IndexMap =
srcIdsCopy.zipWithIndex.sortBy(_._1).zipWithIndex.map(x =>
(x._1._2, x._2)).toMap
 
  val dstEncodedIndicesCopy =
dstEncodedIndices.clone()
 
  val ratingsCopy =
ratings.clone()
 
  var i = 0
 
  while(i < length){
 
    val newIndex =
id2IndexMap.get(i).get
 
    srcIds(newIndex) =
srcIdsCopy(i)
 
    dstEncodedIndices(newIndex) =
dstEncodedIndicesCopy(i)
 
    ratings(newIndex) =
ratingsCopy(i)
 
    i += 1
 
  }
 
}
  def
printElem() = {
 
  println("srcIds new:")
 
  srcIds.foreach(println)
 
  println("dstEncodedIndices
new:")
 
 
dstEncodedIndices.foreach(println)
 
  println("ratings new:")
 
  ratings.foreach(println)
 
}
}
// in-block
block,用于计算(user/item)因子,使用CSC类似的格式存放in-link信息。因此我们只使用一个正则方程类计算一个个src
factors
// srcIds: src
ids
// dstPtrs: (dstPtrs(i),
dstPtrs(i+1))格式的dst indices
// dstEncodedIndices:
encoded dst indices
case class
InBlock(
 
     
     
    srcIds:
Array[Int],
 
     
     
    dstPtrs:
Array[Int],
 
     
     
    dstEncodedIndices:
Array[Int],
 
     
     
    ratings:
Array[Double]){
  def size:
Int = ratings.length
 
require(dstEncodedIndices.length == size)
 
require(dstPtrs.length == srcIds.length + 1)
}