介绍

Mikolov在Distributed Representations of Words and Phrases and their Compositionality中,提到高频词的subsampling问题,以下我对相关节选进行的简单中文翻译:

在非常大的语料中,最高频的词汇,可能出现上百万次(比如:in, the, a这些词)。这样的词汇通常比其它罕见词提供了更少的信息量。例如,当skip-gram模型,通过观察”France”和”Paris”的共现率(co-occurrence),Skip-gram模型可以从中获益;而”France”和”the”的共现率则对模型贡献很少;而几乎每个词都常在句子中与”the”共同出现过。该思路也常用在相反的方向上,高频词的向量表示,在上百万样本训练完后不会出现大变化。

为了度量这种罕见词与高频词间存在不平衡现象,我们使用一个简单的subsampling方法:训练集中的每个词wi,以下面公式计算得到的概率进行抛弃:

f(wi)是wi的词频,t为选中的一个阀值,通常为10^-5周围(0.00001)。我们之所以选择该subsampling公式,是因为:它可以很大胆的对那些词频大于t的词进行subsampling,并保留词频的排序(ranking of the frequencies)。尽管subsampling公式的选择是拍脑袋出来的(启发式的heuristically),我们发现它在实践中很有效。它加速了学习,并极大改善了罕见词的学习向量的准确率(accuracy)。

具体实现

有道之前的中,提到的该subsampling描述也不准确。在当中的描述是:

而实际中,采用的是:

部分代码:

// 进行subsampling,随机丢弃常见词,保持相同的频率排序ranking.
// The subsampling randomly discards frequent words while keeping the ranking same
if (sample > 0) {

  // 计算相应的抛弃概率ran.
  real ran = (sqrt(vocab[word].cn / (sample * train_words)) + 1) * (sample * train_words) / vocab[word].cn;

  // 生成一个随机数next_random.
  next_random = next_random * (unsigned long long)25214903917 + 11;

  // 如果random/65535 - ran > 0, 则抛弃该词,继续
  if (ran < (next_random & 0xFFFF) / (real)65536) 
      continue;
}

为此,我简单地写了一个python程序,做了个测试。程序托管在github上,点击下载

下面提供了三种方法,最终生成的图可以看下。对于前两种方法,基本能做到与原先词频正相关,最后使用时,需要设置一个阀值,砍掉高频词。而最后一种方法,效果也不错(虽然偶有会存留高频词,或者低频词也同样被砍掉)。而word2vec采用的正是第3种方法(大于0的采样点会被抛弃掉)。

参考:

Distributed Representations of Words and Phrases and their Compositionality

介绍

如果你在大学期间学过信息论或数据压缩相关课程,一定会了解Huffman Tree。建议首先在wikipedia上重新温习下Huffman编码与Huffman Tree的基本概念: Huffman编码wiki

简单的说(对于文本中的字母或词),Huffman编码会将出现频率较高(也可认为是:权重较大)的字(或词)编码成短码,而将罕见的字(或词)编码成长码。对比长度一致的编码,能大幅提升压缩比例。

而Huffman树指的就是这种带权路径长度最短的二叉树。权指的就是权重W(比如上面的词频);路径指的是:从根节点到叶子节点的路径长度L;带权路径指的是:树中所有的叶结点的权值乘上其到根结点的路径长度。WPL=∑W*L,它是最小的。

word2vec的Huffman-Tree实现

为便于word2vec的Huffman-Tree实现,我已经将它单独剥离出来,具体代码托管到github上: huffman_tree代码下载。示例采用的是wikipedia上的字母:

即:F:2, O:3, R:4, G:4, E:5, T:7

这里有两个注意事项:

  • 1.对于单个节点分枝的编码,wikipedia上的1/0分配是定死的:即左为0,右为1(但其实分左右是相对的,左可以调到右,右也可以调到左)。而word2vec采用的方法是,两侧中哪一侧的值较大则为1,值较小的为0。当然你可以认为(1为右,0为左)。
  • 2.word2vec会对词汇表中的词汇预先从大到小排好序,然后再去创建Huffman树。在CreateBinaryTree()调用后,会生成最优的带权路径最优的Huffman-Tree。

最终生成的图如下:

此图中,我故意将右边的T和RG父结节调了下左右,这样可以跳出上面的误区(即左为0,右为1。其实是按大小来判断0/1)

相应的数据结构为:

/**
 * word与Huffman树编码
 */
struct vocab_word {
  long long cn;     // 词在训练集中的词频率
  int *point;       // 编码的节点路径
  char *word,       // 词
       *code,       // Huffman编码,0、1串
       codelen;     // Huffman编码长度
};

最后,上面链接代码得到的结果:

1
2
3
4
5
6
word=T	cn=7	codelen=2	code=10	point=4-3-
word=E	cn=5	codelen=2	code=01	point=4-2-
word=G	cn=4	codelen=3	code=111	point=4-3-1-
word=R	cn=4	codelen=3	code=110	point=4-3-1-
word=O	cn=3	codelen=3	code=001	point=4-2-0-
word=F	cn=2	codelen=3	code=000	point=4-2-0-

整个计算过程设计的比较精巧。使用了三个数组:count[]/binary[]/parent_node[],这三个数组构成相应的Huffman二叉树。有vocab_size个叶子节点。最坏情况下,每个节点下都有一个叶子节点,二叉树的总节点数为vocab_size * 2 - 1就够用了。代码使用的是 vocab_size * 2 + 1。

当然,如果你有兴趣关注下整棵树的构建过程的话,也可以留意下这部分输出:

count[]:	7 5 4 4 3 2 1000000000000000 1000000000000000 1000000000000000 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 0 0 0 0 0 0 0 0 0 0 0
parent[]:	0 0 0 0 0 0 0 0 0 0 0 0
	
--step 1:
count[]:	7 5 4 4 3 2 5 1000000000000000 1000000000000000 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 0 0 0 1 0 0 0 0 0 0 0
parent[]:	0 0 0 0 6 6 0 0 0 0 0 0
	
--step 2:
count[]:	7 5 4 4 3 2 5 8 1000000000000000 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 0 1 0 1 0 0 0 0 0 0 0
parent[]:	0 0 7 7 6 6 0 0 0 0 0 0
	
--step 3:
count[]:	7 5 4 4 3 2 5 8 10 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 1 1 0 1 0 0 0 0 0 0 0
parent[]:	0 8 7 7 6 6 8 0 0 0 0 0
	
--step 4:
count[]:	7 5 4 4 3 2 5 8 10 15 1000000000000000 1000000000000000
binary[]:	0 1 1 0 1 0 0 1 0 0 0 0
parent[]:	9 8 7 7 6 6 8 9 0 0 0 0
	
--step 5:
count[]:	7 5 4 4 3 2 5 8 10 15 25 1000000000000000
binary[]:	0 1 1 0 1 0 0 1 0 1 0 0
parent[]:	9 8 7 7 6 6 8 9 10 10 0 0

参考

1.Huffman编码

mllib中lda源码分析(一)中,我们描述了LDA的原理、以及变分推断batch算法和online算法的推导。这一节会再描述下spark MLlib中的ml实现。

spark MLlib的实现,是基于变分推断算法实现的,后续的版本会增加Gibbs sampling。本文主要关注online版本的lda方法。(代码主要以1.6.1为主)

一、Batch算法

二、Online算法

ml中的lda相当于一层wrapper,更好地适配ml中的架构(tranformer/pipeline等等),调用的实际上是mllib中的lda实现。

1.LDA.run(主函数)

  def run(documents: RDD[(Long, Vector)]): LDAModel = {
    // 初始化(em=>em, online=>online).
    val state = ldaOptimizer.initialize(documents, this)

    // 迭代开始. 
    var iter = 0
    val iterationTimes = Array.fill[Double](maxIterations)(0)
    while (iter < maxIterations) {
      val start = System.nanoTime()
      
      // optimzier对应的迭代函数.
      state.next()
      
      val elapsedSeconds = (System.nanoTime() - start) / 1e9
      iterationTimes(iter) = elapsedSeconds
      iter += 1
    }

    // 获取最终完整的的LDAModel. 
    state.getLDAModel(iterationTimes)
  }

OnlineOptimizer.next

  override private[clustering] def next(): OnlineLDAOptimizer = {
    // 先进行sample
    val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction,
      randomGenerator.nextLong())
    
    // 如果为空,返回
    if (batch.isEmpty()) return this

    // 提交batch,进行lda迭代.
    submitMiniBatch(batch)
  }

submitMinibatch

/**
   * 提供语料的minibatch给online LDA模型.为出现在minibatch中的terms它会自适应更新topic distribution。
   * 
   * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA
   * model, and it will update the topic distribution adaptively for the terms appearing in the
   * subset.
   */
  private[clustering] def submitMiniBatch(batch: RDD[(Long, Vector)]): OnlineLDAOptimizer = {
    iteration += 1
    val k = this.k
    val vocabSize = this.vocabSize
    val expElogbeta = exp(LDAUtils.dirichletExpectation(lambda)).t      // β ~ Dirichlet(λ)
    val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)       // 
    val alpha = this.alpha.toBreeze
    val gammaShape = this.gammaShape

    // step 1: 对每个partition做map,单独计算E-Step. stats(stat, gammaPart)
    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>

      // 1.过滤掉空的.
      val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)

      // 2.stat 是一个DenseMatrix:  k x vocabSize
      val stat = BDM.zeros[Double](k, vocabSize)

      // 
      var gammaPart = List[BDV[Double]]()

      // 3.遍历partition上的所有document:执行EM算法.
      // E-step可以并行计算.
      // M-step需要reduce,作合并,然后再计算.
      nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
        // 3.1  取出document所对应的词的id。(支持dense/sparse).
        val ids: List[Int] = termCounts match {
          case v: DenseVector => (0 until v.size).toList
          case v: SparseVector => v.indices.toList
        }

        // 3.2 更新状态 E-step => gammad ().
        val (gammad, sstats) = OnlineLDAOptimizer.variationalTopicInference(
          termCounts, expElogbetaBc.value, alpha, gammaShape, k)

        // 3.3 根据所对应id取出对应列. 更新sstats(对应主题状态) 
        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats

        // 3.4 更新该partition上每个文档的gammad的gammaPart列表中.
        gammaPart = gammad :: gammaPart
      }

      // 4.mapPartition返回iterator,每个partition返回一个迭代器(stat, gammaPart)
      // stat: k x V matrix. 针对该partition上的文档,所更新出的每个词在各主题上的分布.
      // gammaPart: list[vector[K]] 该分区上每个文档的gammad列表.
      Iterator((stat, gammaPart))
    }

    // step 2: 对mini-batch的所有partition上的stats(stat,gammaPart)中的stat进行求总和.
    val statsSum: BDM[Double] = stats.map(_._1).reduce(_ += _)
    expElogbetaBc.unpersist()

    // step 3: 对mini-batch的所有partition上的stats(stat,gammaPart)中的gammaPart进行更新.
    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
      stats.map(_._2).reduce(_ ++ _).map(_.toDenseMatrix): _*)

    // step 4: 更新batchResult ( K x V), 每个元素上乘以 E[log(β)]
    val batchResult = statsSum :* expElogbeta.t

    // M-step:
    // 更新λ.
    // Note that this is an optimization to avoid batch.count
    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)

    // 如果需要优化DocConentration,是否更新alpha
    if (optimizeDocConcentration) updateAlpha(gammat)
    this
  }

variationalTopicInference

里面比较核心的一个方法是variationalTopicInference:

  private[clustering] def variationalTopicInference(
      termCounts: Vector,
      expElogbeta: BDM[Double],
      alpha: breeze.linalg.Vector[Double],
      gammaShape: Double,
      k: Int): (BDV[Double], BDM[Double]) = {

    // step 1: 词的id和cnt: (ids, cnts) 
    val (ids: List[Int], cts: Array[Double]) = termCounts match {
      case v: DenseVector => ((0 until v.size).toList, v.values)
      case v: SparseVector => (v.indices.toList, v.values)
    }

    // step 2: 初始化: gammad ~ Γ(100, 0.01), 100维
    // gammd: mini-batch的变分分布: q(θ | γ) 
    // expElogthetad: paper中的exp(E[logθ]), 其中θ ~ Dirichlet(γ)
    // expElogbetad:  paper中的exp(E[logβ]), 其中β : V * K
    // phiNorm:  ϕ ∝ exp{E[logθ] + E[logβ]},ϕ ~ θ*β. 非零.
    // Initialize the variational distribution q(theta|gamma) for the mini-batch
    val gammad: BDV[Double] =
      new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k)                   // K
    val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad))  // K
    val expElogbetad = expElogbeta(ids, ::).toDenseMatrix                        // ids * K

    // 加上一个很小的数,让它非零.
    val phiNorm: BDV[Double] = expElogbetad * expElogthetad :+ 1e-100            // ids
    var meanGammaChange = 1D
    val ctsVector = new BDV[Double](cts)                                         // ids

    // 单个mini-batch里的loop.
    // 迭代,直到 β 和 ϕ 收敛. paper中是0.000001,此处用的是1e-3.
    // Iterate between gamma and phi until convergence
    while (meanGammaChange > 1e-3) {
      val lastgamma = gammad.copy

      // breeze操作:https://github.com/scalanlp/breeze/wiki/Universal-Functions
      // ":"为elementwise操作符;其中的:=,对象相同,内容赋值
      // 计算:γ_dk, 先对ctsVector进行归一化,再乘以转置E(log(β)]^T,再pointwise乘E(log(θ)).
      // 各矩阵或向量的维度: K为topic, ids:为词汇表V
      // gammad: Vector(K),单个文档上每个主题各有一γ值.
      // expElogthetad: Vector(K),单个文档上每个主题各有一θ值.
      // expElogbetad: Matrix(ids*K),每个词,每个主题都有一β值.
      // ctsVector: Vector(ids),每个词上有一个词频.
      // phiNorm: Vector(ids),每个词上有一个ϕ分布值。
      // 
      // 
      //        K                  K * ids               ids
      gammad := (expElogthetad :* (expElogbetad.t * (ctsVector :/ phiNorm))) :+ alpha
      
      // 更新 exp(E[logθ]), expElogbetad不需要更新
      expElogthetad := exp(LDAUtils.dirichletExpectation(gammad))
      
      // 更新 phiNorm, 
      // TODO: Keep more values in log space, and only exponentiate when needed.
      phiNorm := expElogbetad * expElogthetad :+ 1e-100

      // 平均变化量.
      meanGammaChange = sum(abs(gammad - lastgamma)) / k
    }

    // sstats: mini-batch下每个主题下的词分布.
    // n 
    // sstatsd: k x 1 * 1 x ids => k x ids
    val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector :/ phiNorm).asDenseMatrix
    (gammad, sstatsd)
  }

ok。关于alpha的更新等,此处不再解释。详见源码。

在对MLlib中的lda进行源码分析之前,我们先熟悉下blei的LDA paper以及相关的概念。

介绍

首先,先做下约定。

  • 一个词(word)是离散数据的基本单位,被定义成词汇表中的一个item,由{1,…,V}进行索引。我们将词表示成one-hot格式。使用上标来表示components,在词汇表中第v个词被表示成一个V维的向量w,其中$ w^v=1, w^u=0(u\neq{v}) $。
  • 一个文档(document)是一个N个词组成的序列,表示成W=(w1,w2,…,wN),其中wn是序列上的第n个词。
  • 一个语料(corpus)是一个关于M个文档的集合,被表示成D={W1,W2,…,Wm}.

(blei论文中用黑体的w表示文档,为便于区分,此处用大写W)。

我们希望找到一个关于该语料的概率模型,它不仅可以将高概率分配给该语料的组成文档,还可以将高概率分配给其它“相似”文档。

LDA

LDA是一个关于语料的生成概率模型。基本思想是,文档可以被表示成在隐主题(latent topics)上的随机混合,其中每个主题(topic)都以一个在词(words)上的分布进行表示。

对于在语料D中的每个文档W,LDA假设如下的生成过程:

  • 1.选择 N ~ Poisson(ξ)
  • 2.选择 θ ~ Dir(α)
  • 3.对于每个文档(N个词汇wn):
    • (a) 选择一个主题$z_n$ ~ Multinomial(θ)
    • (b) 以主题$z_n$为条件,使用多项分布条件概率(multinomial probability):$ P(w_n \mid z_n, \beta) $选中一个词$w_n$

在该基础模型上,做了一些简化假设。

  • 首先,Dirichlet分布的维度k(主题变量z的维度),假设是已知并且确定的。
  • 第二,词概率由一个k x V的矩阵β进行参数化,其中$ \beta_{ij}=p(w^j=1 \mid z^i=1) $, 目前当成是一个待估计的确定量。
  • 最后,Poisson猜想是不严格的,可以使用更多的实际文档长度分布。注意N对于所有其它数据生成变量(θ和z)是独立的。它是个辅助变量,我们通常忽略它的随机性。

一个k维的Dirichlet随机变量θ,它的取值为(k-1)-simplex,(一个k维向量θ,它在泛化三角形(k-1)-simplex之内,其中:$ \theta_i \geq 0, \sum_{i=1}^k\theta_i=1 $) ,在该simplex上具有以下的概率密度:

……(1)

其中,参数α是一个k维的vector,相应的components上: αi > 0, 其中 Γ(x)为Gamma函数。Dirichlet是在simplex上的一个合适分布——它在指数族内,具有有限维的充分统计量(finite dimensional sufficient statistics),与multinomial分布共轭。在paper第5部分,这些属性将有助于LDA的inference和parameter estimation算法。

给定参数 α 和 β,我们可以给出关于一个主题混合θ,一个关于N个主题的z集合(每个词都有一个主题),一个N个词W的的联合分布

……(2)

其中$p(z_n \mid \theta) $可以简单认为:θi对于唯一的i,有$ {z_{n}}^{i}=1 $。在θ上积分(θ上连续),并在z上进行求和(z上离散),我们可以得到一个关于文档的边缘分布(marginal distribution)

……(3)

最后,将所有单个文档(document)的边缘分布进行连乘运算,我得到整个语料(corpus)的概率

LDA可以表示成图1所示的概率图模型。有三个级别的LDA表述。

  • 语料级参数:α 和 β是语料级参数(corpus-level parameters),假设在生成一个语料的过程中只抽样一次。
  • 文档级变量:θd是文档级变量(document-level variable),每个文档抽样一次。
  • 词级别变量:$z_{dn}$和$w_{dn}$是词级别变量(word-level variables),对于每个文档中的每个词抽样一次。

将LDA与一个简单的Dirichlet-multinomial clustering模型相区分很重要。一个经典的clustering模型将涉及到一个两层模型(two-level),对于一个语料只抽样一次Dirichlet;对于语料中的每个文档,只选一次multinomial clustering变量;对于在基于该cluster变量条件的文档上只选择一个词集合。有了多个clustering models后,这样的模型会限制一个文档只与单个主题相关联。而LDA涉及三层(three-level),尤其是主题节点(topic node)在单个文档中被重复抽样。在该模型下,文档可以与多个主题相关联。

图一:LDA的图形化模型表示。”plates”表示可重复。outer plate表示文档,inner plate表示在单个文档中关于主题和词汇的可重复的选择。

通常在贝叶斯统计建模(Bayesian statistical modeling)中学到与图1相似的结构。它们被称为层级模型(hierarchical models),或更精确地称为:条件独立层级模型(con-ditionally independent hierarchical models)(Kass and Steffey, 1989).这样的模型经常作为参数期望贝叶斯模型(parametric empirical Bayes models)被提到,它也被用于参数估计中。在第5部分,我们将采用经验贝叶斯方法(empirical Bayes approach),来估计在LDA的简单实现中的α 和 β, 我们也会考虑更完整的Bayesian方法。

2.1 LDA与可交换性

随机变量 {z1,…,zN}的一个有限集合,如果联合分布在置换(permutation)后不变,我们称之为是可交换的(exchangeable)。如果π是从整数1到N的一个置换(permutation):

一个无限的随机变量序列,如果每个有限的子序列是可交换的,那么就是无限可交换的(infinitely exchangeable)。

De Finetti’s representation理论声称:一个无限可交换的随机变量序列的联合概率,如果一个随机参数从这些分布中抽取,那么问题中的随机变量,在其于该参数条件下是独立同分布的(iid: independent and identically distributed)。

在LDA中,我们假设,词由主题(确定的条件分布)生成,这些主题在一个文档中是无限可交换的(infinitely exchangeable)。由de Finetti的理论,一个词和主题的序列的概率,必须具有以下形式:

其中,θ是在这些主题上的一个服从多项分布的随机变量。我们可以在等式(3)中获取LDA分布,通过对主题变量边缘化,并让θ服从一个Dirichlet分布。

2.2 其它模型

unigram模型

在unigram模型中,每个文档中的词都从一个单独的multinomial分布独立抽取得到:

Mixture of unigrams

如果unigram模型和一个离散随机主题变量z混合,我们就得到了一个mixture of unigrams model。在该混合模型下,每个文档的生成:首先选中一个主题z,然后以条件多项概率$p(w \mid z)$独立生成N个词。一个文档的概率为:

当从一个语料估计时,词分布可以看成是:假设每个文档只有一个主题,在该假设下的主题表示。该假设对于大语料集来说是一个很受限的模型。

相反的,LDA允许文档展现多个主题。这会额外带来另一个参数形销:有k-1参数与mixture of unigrams中的p(z)有关,而LDA模型中有k个参数与$ p(\theta \mid \alpha)$有关。

pLSI

pLSI是另一个广泛使用的文档模型(Hofmann 1999)。pLSI模型认为,一个文档d,词wn,对于一个未观察到的主题z来说是条件独立的:

pLSI模型尝试放宽maxture of unigrams模型中作出的简化猜想:每个文档只由一个主题生成。在某种意义上,它会捕获概率:一个文档可以包含多个主题,因为p(z | d)可看成是对于一个特定文档d的主题权重的混合。然后,注意,d是一个dummy index,指向在训练集中文档列表。这样,d是一个多项分布随机变量,值尽可能与训练文档一样多,模型学到的主题混合 p(z | d)只对应于被训练的文档。出于该原因,pLSI并不是一种定义良好的(well-defined)文档生成模型;天然不支持使用它来分配概率给一个之前未见过的文档。

图3: 不同模型的图表示

2.3 几何学解释

LDA与其它隐主题模型不同,可以通过隐空间(latent space)的几何表示来解释。来看下在每种模型下,一个文档在几何上是如何表示的。

上面所描述的所有4种模型(unigram, mixture of unigrams, pLSI, LDA),都是在词的空间分布上操作。每个这样的分布,可以看成是在(V-1)-simplex上的一个点,我们称之为word simplex。simplex是泛化三角形,关于simplex,参见simplex的wiki

unigram模型是在word simplex上找到单个点,假定语料中的所有词都来自该分布。隐变量模型(letent variable models)会考虑在word simplex上的k个点,并基于这些点形成一个子simplex(sub-simplex),我们称之为topic simplex。注意,在topic simplex上的任意点,在word simplex也是一个点。使用topic simplex的不同隐变量模型,以不同的方式生成一个文档。

  • 1-gram混合模型(mixture of unigram model):假设对于每个文档,在word simplex上的k个点(也就是说:topic simplex的其中一个角落)被随机选中,该文档的所有词汇都从这些点的分布上抽取得到。
  • pLSI模型:假定一个训练文档的每个词都来自一个随机选中的topic。这些主题(topics)本身从一个在这些主题上的指定文档分布上抽取到,例如,在topic simplex上的一个点。对于每个文档都有一个这样的分布;训练文档的集合定义了在topic simplex上的一个经验分布。
  • LDA: 假定,已观察到和未观察到的文档上的每个词,都由一个随机选中的topic生成,这个topic从一个随机选中的参数的分布上抽取。该参数从来自topic simplex的一个平滑分布上的每个文档中抽样得到。

图4: 嵌在包含三个词的word simplex上的三个主题的topic simplex。word simplex的角(corners)对应于三个分布,每个词各自都具有一个概率分布。topic simplex的三个顶点对应于在词上的三个不同分布。The mixture of unigrams模型会将每个文档放到topic simplex上其中一个角(corners)上。pLSI模型会引入根据x的topic simplex的一个经验分布。LDA则在由等高线表示的topic simplex上放置一个平滑分布。

3.推断与参与估计

3.1 inference

为了使用LDA,我们需要解决的核心推断问题是,对于一个给定文档,计算这些隐变量的后验分布(主题分布)

不幸的是,该分布很难计算。为了归一化该分布,我们对隐变量边缘化(marginalize),并将等式(3)表示成模型参数的形式:

该函数很难解,因为θ 和 β在隐主题的求和上相耦合(Dickey, 1983)。Dickey展示了该函数是一个在Dirichlet分布(可以表示成一个超几何函数)的特定扩展下的期望。它被用在一个Beyesian上下文中…

尽管后验分布很难精准推断(inference),有许多近似推断算法可以用于LDA,包括Laplace近似,变分近似(variational approximation),马尔可夫链蒙特卡罗法MCMC(jordan,1999)。本节我们描述了一种在LDA中简单的凸变分推断算法(convexity-based variational algorithm for inference)。其它方法在paper 第8部分讨论。

3.2 变分推断

凸变分推断算法的基本思想是,充分利用Jensen不等式来获取一个在log likelihood上的可调下界(jordan et al.,1999)。本质上,可以考虑一个关于下界的家族(a family of lower bounds),由一个变分参数(variational parameters)集合进行索引。变分参数由一个优化过程进行选择,它会尝试找到最紧可能的下界(tightest possible lower bound)。

获取一个可控下界家族的一个简单方法是,考虑将原始的图示模型进行简单修改,移去一些边(edge)和节点(node)。将图1所示的LDA模型进行修改, θ 和 β之间的耦合,由于边θ, z, 和 W之间存在边(edge)。通过抛弃这些边以及W节点,生成一个更简单的图示模型,它有自由的变分参数,我们可以在隐变量上获取一个分布族。该分布族由以下的变分分布组成:

……(4)

其中,Dirichlet分布参数γmultinomial分布参数(φ1 , . . . , φN),是自由变分参数。

图4: (左)LDA的图示模型 (右)采用变分分布来近似LDA后验的图示模型

在指定了一个简化版本的概率分布族后,下一步是建立一个优化问题,来确定变分参数γ 和 φ的值。正如在附录A中展示的,找到一个在log likelihood上的紧下限,可以直接翻译成下面的优化问题:

……(5)

变分参数的最优值,可以通过对变分分布$ q(\theta,z \mid \gamma,\phi) $和真实后验概率$ p(\theta,z \mid W,\alpha,\beta) $间的KL散度进行最小化得到。该最小化可以通过一个迭代型的定点方法(fixed-point method)完成。特别的,我们在附录A.3中展示了:通过计算KL散度的导数,将它们等于0, 可以得到以下更新等式的pair:

……(6)

……(7)

在附录A.1中,多项分布的期望更新可以以如下方式计算:

……(8)

其中,Ψ 是logΓ函数通过Taylor展开式的首个导数项。

等式(6)和(7)具有一个吸引人的直觉解释。Dirichlet分布更新(Dirichlet update)是一个在给定变分分布($ E[z_n \mid \phi_n] $>)下给定的期望观察的后验Dirichlet。多项分布更新(multinomial update)类似于使用贝叶斯理论,$ p(z_n \mid w_n) \propto p(w_n \mid z_n) $,其中p(zn)是在变分分布下的期望值的log的指数近似。

注意,变分分布实际上是一个条件分布,由一个关于W的函数区分。因为在等式(5)中的优化问题,由确定的W所管理,因而,最优解($ \gamma^{\ast},\phi^{\ast} $)是一个关于W的函数。我们可以将产生的变分分布写成:$ q(\theta,z \mid \gamma^{\ast}(W),\phi^{\ast}(W)) $,其中,我们已经在W上显式地作出了独立性。这样,变分分布可以看成是一个对后验分布$ p(\theta,z \mid W,\alpha,\beta)$的近似。

在文本语言中,最优参数($ \gamma^{\ast}(W), \phi^{\ast}(W) $)是由文档决定的(document-specific)。特别的,我们可以将Dirichlet参数$ \gamma^{\ast}(W) $看成是在topic simplex中提供一个文档表示。

图6: LDA的variational inference算法

图6展示了变分推断过程的总结,对于 γ 和 φn具有合适的起始点。从该伪代码我们可以知道:LDA的变分推断算法,每次迭代需要O((N+1)k)次操作。经验上,我们可以找到对于单个文档的迭代数,与文档中词的数目相近似。因而总的操作数与$ N^2k $相近。

3.3 参数估计

本节描述了一种在LDA模型中用于参数估计的经验贝叶斯方法(empirical Bayes method)。特别的,给定一个文档集的语料库:D={W1,W2,…WM}。我们希望找到α 和 β,来最大化整个语料数据的(marginal)log likelihood:

如上所述,p(W | α,β)的计算很困难。然而,变分推断提供给我们一个在log likelihood上的下界,我们可以分别根据α和β进行最大化。我们找到了近似经验贝叶斯估计,通过一种交替变分EM过程(alternating variational EM procedure),来分别对变分参数γ 和 φ,最大化下界。接着,变分参数确定下来后,再分别根据模型参数α 和 β,最大化下界。

我们在附录A.4中提供了一个用于LDA的变分EM算法的导数的详细推导。推导以下面的迭代形式描述:

  • 1.(E-step): 对于每个文档,找到变分参数{$ \gamma_d^{\ast},\phi_d^{\ast}: d \in D $}。这个在上一节已描述。
  • 2.(M-step): 分别根据模型参数α 和 β,最大化在log likelihood上产生的下界。这相当于,在E-step中计算出的合适的后验概率下,为每个文档使用期望的足够统计量,找到极大似然估计。

这两个step会一直循环,直到log likelihood的下界收敛为止。

在附录A.4,我们展示了对于条件多项分布参数β进行M-step更新(M-step update),可以写成:

……(9)

我们会进一步展示Dirichlet参数α的M-step更新,它可以使用一种有效在线性时间上增长的Newton-Raphson方法,其中的Hessian是可逆的。

3.4 Smoothing

对于许多文档的大语料,具有很大的词汇表size,通常会有严重的sparsity问题。对于一个新文档,它包含的词汇很可能没有出现在训练语料中。对于这些词来说,多项分布参数(multinomial parameters)的最大似然估计会分配概率为0,因而,这些新文档的概率为0。应付该问题的方法是,对多项分布参数进行”平滑(smooth)”,对于所有词汇表item都分配一个正的概率,不管它们是否出现在训练集当中。最常用的方法是Laplace smoothing;这本质上会产生在多项分布参数上的均匀Dirichlet先验分布下,对后验分布求平均。

不幸的是,在混合模型设置中,简单的Laplace smoothing不再适合最大化后验方法。事实上,在多项分布参数上放置一个Dirichlet先验,我们可以在混合模型设置上得到一个难解的后验,这与在基本的LDA模型上得到一个难解的后验的原因基本类似。我们提出解决该问题的方法是,简单地应用变分推断方法到扩展模型上,它包括了在多项分布参数上的Dirichlet smoothing

图7:带平滑的LDA图示模型

在LDA设置中,我们可以获得如图7所示的扩展模型。我们将β看成是一个k x V的随机矩阵(每行表示一个mixture component),其中我们假设每行都是从一个可交换的Dirichlet分布上独立抽取的。我们现在扩展我们的inference过程,将βi看成是随机变量,它天然具有一个基于训练语料条件的后验分布。此时我们来看下这个更完整的LDA Bayesian方法。

将一个变分过程看成是一个Bayesian inference,它在随机变量β, θ 和 z上放置一个独立分布(Attias,2000):

其中 $ q_d(\theta,z \mid \phi,\gamma) $是在等式(4)中定义的变分分布。很容易验证,产生的变分推断过程会产生等式(6)和(7)来分别作为对变分参数φ 和 γ更新等式,同时也会对一个新的变分参数λ做一个额外更新:

迭代这些等式直到收敛,会产生一个在β, θ 和 z上的一个合适后验分布。

现在我们将超参数η用于可交换的Dirichlet,如同前面提到的超参数α。设置这些超参数的方法是:我们使用变分EM来寻找基于边缘似然上的这些参数的最大似然估计。这些过程在附录A.4描述。

4.online算法

对于主题模型,后验概率是很难计算的,可以通过近似后验推断来计算。当前的近似后验推导算法分为两大类:抽样方法(sampling approaches)和优化解法(optimization approaches)。抽样方法有MCMC,优化解法有VB(Variational Bayes)。VB在经验上比MCMC更快,与MCMC准确率相当,对于大数据集来说,VB是更吸引人。

但是,使用VB在大数据上计算很困难。标准的“batch”VB算法,会迭代分析每个观察(observation),并迭代更新数据集范围的变分参数。batch算法的每次迭代开销,对于非常大的数据集来说不切实际。在主题建模(topic modeling)应用中,这一点特别重要。主题建模会对不能手工标注的大型文档集合进行归纳隐式结构。

为此,blei等提出了一种online VB算法,这种算法基于online随机优化(online stochastic optimization),在大数据集上,它比batch算法更快地产生好的参数估计。Online LDA可以方便地分析大量文档集合,不需要本地存储,或者收集文档,每个文档可以以流式(streaming)到达,看完后抛弃。

以下的等式重新标号。

假设有K个主题。每个主题定义了一个在词汇表上的多项分布,假设这些主题从一个Dirichlet上抽取得到: βk ∼ Dirichlet(η)。给定主题后,LDA为每个文档d假设了以下的生成过程。

  • 首先,从主题$ \theta_d \sim Dirichlet(\alpha) $上抽取一个分布;
  • 接着,对于在文档中的每个词i,从主题权重$ z_{di} \sim \theta_d $ 上抽取一个主题索引$ z_{di} \in {1, . . . , K }$,从选中的主题上抽取观察词$w_{di}$,$w_{di} \sim \beta_{z_{di}}$。出于简洁性,我们假设在θ 和 β上具有对称先验,但该假设很容易放宽。

注意,如果我们在主词分配z上进行求和,接着,我们得到:$p(w_{di} \mid \theta_d,\beta)=\sum_k\theta_{dk}\beta_{kw}$。这会导致LDA的”multinomial PCA”解释。我们可以将LDA看成是将一个词频n的矩阵(其中$n_{dw}$是词w出现在文档d中的次数),概率分解(probabilistic factorization)成一个关于主题权重θ的矩阵,以及一个主题字典β。我们的工作可以看成是在线矩阵分解技术的一个扩展,它会最优化squared error来得到更泛化的概率公式。

  • 主题(the topics): β
  • 主题比例(topic proportions): θ
  • 主题分配(topic assignments): z

4.1 Batch算法

在VB推断中,真实的后验由一个更简单的分布q(z,θ,β)来近似,它由一个自由参数集进行索引。这些参数用于优化:最大化置信下界(ELBO:Evidence Lower BOund)

…… (1)

最大化ELBO,相当于最小化q(z,θ,β)与后验$p(z,\theta,\beta \mid W,\alpha,\eta)$间的KL散度。我们选择一个完整的因子分布q(fully factorized distribution):

……(2)

每个词的主题分配z的后验,可以由φ参数化,在每个文档上的主题权重θ可以由γ进行参数化,在主题上的后验β可以由λ进行参数化。为便于记忆,我们将λ作为”主题(the topics)”。等式1分解为:

……(3)

注意,我们在对文档求和中引入了每个语料的项,可以通过文档D的数目进行划分。该步可以帮助我们对online VB算法求导。

接着,我们将上述期望展开成变分参数的函数。变分目标函数只依赖于$ n_dw $, 词w在文档中的出现次数。当使用VB时,文档可以由词数(word counts)来进行归纳:

……(4)

其中V是词汇表的size,D是是文档数目。$ \ell(n_d,\phi_d,\gamma_d,\lambda) $ 表示文档d对ELBO的贡献度。

L可以使用在变分参数φ, γ, λ 上的坐标上升法进行优化:

……(5)

在q分布下logθ和logβ的期望为:

……(6)

其中Ψ表示digamma函数。

等式5的更新可以保证ELBO收敛到一个稳定点。通过EM算法(Expectation-Maximization)来完成,我们可以将这些更新划分到”E-step”:它会保持λ固定,迭代更新γ 和 φ 直到收敛,接着会进行”M-step”:由给定φ更新λ。实例上,如果在每个E-step上重新初始化γ 和 φ,该算法会收敛到一个更好的解。算法1就是batch VB for LDA:

Online算法

算法1具有常数量的内存开销,经验上,会比batch collpased Gibbs sampling收敛更快。然而,它仍需要在每次迭代时,将整个语料库传进去。因而,如果对于非常大的数据集会很慢,天然不适合不断到来的新数据。我们提出了一种在线变分推断算法来拟合λ,该参数是在主题分布β上的变分后验。我们的算法几科与batch VB算法一样简单,但对于大数据集收敛更快。

要想让主题参数λ的设置更好,就要让在算法1中的E-step中拟合得到每个文档的变分参数γ和φ之后,衡量ELBO的L要尽可能地高。将γ(nd, λ)和φ(nd, λ)设置为由E-step产生的γd 和 φd。我们的学习目标是设置λ,使下面目标最大化:

……(7)

$ \ell(n_d,\gamma_d,\phi_d,\lambda) $是第d个文档对等式(4)的变分上界的贡献度。这类似于最小平方矩阵分解的目标函数,尽管LDA的ELBO比简单的squared loss function更不方便。

Online VB for LDA在算法2中描述。

第t个词频向量$n_t$被观察看,保持λ固定,我们执行一个E-step来找到局部最优解$\gamma_{t}$和$\phi_{t}$。接着,我们计算$\hat{\lambda}$,假如整个语料由单个文档$n_t$重复D次组成,那么λ的设置就是最优的(给定φt)。D是提供给算法的唯一文档数目,比如:一个语料的size。(在真实online的例子中:D->∞,对应于β的经验Bayes估计)。我们接着更新λ,使用一个λ的先前值和$\hat{\lambda}$进行加权平均得到。$\hat{\lambda}$的权重为:$\rho_t \triangleq (\tau_0 + t)^{-\kappa}$,其中$\kappa \in (0.5,1]$控制着$\hat{\lambda}$的旧值被遗忘的比率,τ0 ≥ 0会减缓算法的早期迭代。条件κ ∈ (0.5, 1]需要保证收敛。我们会在下一节展示,online LDA对应于在变分目标函数L上的一个随机自然梯度算法(stochastic natural gradient algorithm)。

该算法很像paper[16]中提出在online VB,在模型上有隐数据——最重要的区别是,我们使用一个近似的E-step来优化γt 和 φt, 因为我们不能精确计算条件分布$ p(z_t,\theta_t \mid \beta,n_t,\alpha) $。

Mini-batches: 在随机学习中的常用技术是,在每次更新时考虑多个观察(observations)以降噪。在online LDA中,这意味着计算$\hat{\lambda}$使用S>1的观察:

……(8)

其中$n_{ts}$是在mini-batch t中的第s个文档。该文档的变分参数$\phi_{ts}$和$\gamma_{ts}$是使用普通E-step进行拟合。注意,当S=D 和 κ = 0时,online VB就成了batch VB。

超参数估计:在batch 变分LDA中,超参数α 和 η的点估计,可以拟合给出的γ 和 λ,它使用线性时间的Newton-Raphson法。我们同样将α 和 η的更新并入到online LDA:

……(9)

其中,$\hat{\alpha}(\gamma_t)$是Hessian逆矩阵乘以梯度$\triangledown_{\alpha}l(n_t,\gamma_t,\phi_t,\lambda$,$ \hat{\eta}(\lambda))$是Hessian的逆矩阵乘以梯度$\triangledown_{\eta}\mathcal{L}$,$\rho_{t} \triangleq (\tau_0 + t)^{-\kappa}$。

5.3 收敛分析

此处不详说,见paper。

介绍

MLlib中有两个lr实现。分别在mllib和ml中。此处分析的是ml,这也是后续mllib继续重点发展的一个库, mllib则会渐渐淡出历史舞台。ml的logistic回归,代码在org.apache.spark.ml.classification.LogisticRegression中实现。在详见代码之前,我们先简单地看下LR可调节的参数. [暂时修正至以1.6.1代码为参考.]

一、LR模型的参数

参数:

  • threshold: 如果label 1的概率>threshold,则预测为1,否则为0.
  • regParam:正则项参数(相当于 $ \lambda $)
  • elasticNetParam:ElasticNet混合参数 $ \alpha $ 。如果$ \alpha = 0 $, 惩罚项是一个L2 penalty。如果$ \alpha = 1 $,它是一个L1 penalty。如果$ 0 < \alpha < 1 $,则是L1和L2的结果。缺省为0.0,为L2罚项。
  • maxIter: 缺省100次。
  • tol:迭代收敛的容忍度。值越小,会得到更高精度,同时迭代次数开销也大。缺省为1E-6.
  • fitIntercept: 是否要拟合截距项(intercept term),缺省为true.
  • standardization:在对模型进行fit前,是否对训练特征进行标准化(归一化)。模型的系数将总是返回到原比例(origin scale)上,它对用户是透明的。注意:当不使用正则化时,使用/不使用standardization,模型都应收敛到相同的解(solution)上。在R的GLMNET包里,缺省行为也为true。缺省为true。
  • weightCol:如果没设置或空,所有实例为有为1的权重。如果设置,则相当于对unbalanced data设置不同的权重。缺省不设置。
  • treeAggregate:如果特征维度或分区数太大,该参数需要调得更大。缺省为2.

二、原理

logistic回归的原理,这里不详述。简单地写:y = logit(∑wx + b)。相应的loss和梯度如下所示(箭头表示向量):

$l_{model}$为cross-entropy;$l_{reg}$为正则项。

对于第一部分$l_{model}$的计算,依赖于训练数据;对于第二部分正则项,它不依赖于训练数据。因而这两部分在实现时是独立计算的。

每个训练样本的loss和gradient是独立的。

样本i的梯度:

样本的loss:

对于spark来说,ml的logistic回归实现通过treeAggregate来完成。在executors/workers上计算各RDD上的loss和gradient;在driver/controller上将它们进行reduce进行所有训练样本的求和,得到lossSum和gradientSum。

在单机的driver上完成Optimization; L1正则由OWLQN optimizer处理。L2由L-BFGS处理。这两个优化器都由Breeze库提供(Breeze库提供了Vector/Matrix的实现以及相应计算的接口Linalg)。

整个过程如图所示:

三、loss与gradient

主要在LogisticCostFun类中,具体见代码,这里仅把核心代码及注释帖出来。可以对应于上面的公式。

private class LogisticCostFun(
    instances: RDD[Instance],
    numClasses: Int,
    fitIntercept: Boolean,
    standardization: Boolean,
    featuresStd: Array[Double],
    featuresMean: Array[Double],
    regParamL2: Double) extends DiffFunction[BDV[Double]] {

  override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
    val numFeatures = featuresStd.length
    val coeffs = Vectors.fromBreeze(coefficients)

    // step 1: cost部分.
    val logisticAggregator = {
      val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
      val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)

      // 先在rdd的每个分区上应用seqOp函数,做add操作(计算每个样本的loss/gradient)
      // 再在driver上应用comOp函数,做merge操作(求得总的loss/gradient)
      instances.treeAggregate(
        new LogisticAggregator(coeffs, numClasses, fitIntercept, featuresStd, featuresMean)
      )(seqOp, combOp)
    }

    // 求得总的gradientArray.
    val totalGradientArray = logisticAggregator.gradient.toArray

    // step 2: L2正则项部分 => regVal. 
    //  reg = lambda * ∑ regParamL2/2 wi^2 + (1-regParamL2) |wi| 
    // regVal is the sum of coefficients squares excluding intercept for L2 regularization.
    val regVal = if (regParamL2 == 0.0) {
      0.0
    } else {
      var sum = 0.0
      
      coeffs.foreachActive { (index, value) =>
        // If `fitIntercept` is true, the last term which is intercept doesn't
        // contribute to the regularization.
        if (index != numFeatures) {
          // The following code will compute the loss of the regularization; also
          // the gradient of the regularization, and add back to totalGradientArray.
          // gradientArray:  costFun项的梯度 + 正则项的梯度
          // sum: L2正则
          sum += {
             
            if (standardization) {
              
              totalGradientArray(index) += regParamL2 * value
              value * value
            } else {
              // 
              if (featuresStd(index) != 0.0) {
                // If `standardization` is false, we still standardize the data
                // to improve the rate of convergence; as a result, we have to
                // perform this reverse standardization by penalizing each component
                // differently to get effectively the same objective function when
                // the training dataset is not standardized.
                val temp = value / (featuresStd(index) * featuresStd(index))
                totalGradientArray(index) += regParamL2 * temp
                value * temp
              } else {
                0.0
              }
            }
          }
        }
      }

      0.5 * regParamL2 * sum
    }

    // 返回带L2正则的loss,以及带L2正则的梯度.
    (logisticAggregator.loss + regVal, new BDV(totalGradientArray))
  }
}

里面会涉及到另一个类:LogisticAggregator。它的作用,相当于做map/reduce运算,计算loss/gradient。

private class LogisticAggregator(
    coefficients: Vector,
    numClasses: Int,
    fitIntercept: Boolean,
    featuresStd: Array[Double],
    featuresMean: Array[Double]) extends Serializable {

  private var weightSum = 0.0
  private var lossSum = 0.0

  // coefficients => coefficientsArray 数组.
  private val coefficientsArray = coefficients match {
    case dv: DenseVector => dv.values
    case _ =>
      throw new IllegalArgumentException(
        s"coefficients only supports dense vector but got type ${coefficients.getClass}.")
  }

  // 总的维度.
  private val dim = if (fitIntercept) coefficientsArray.length - 1 else coefficientsArray.length

  //gradientSumArray.
  private val gradientSumArray = Array.ofDim[Double](coefficientsArray.length)

  /**
   * 将一个训练样本添加到LogisticAggregator, 并更新目标函数的loss和gradient.
   * 
   * Add a new training instance to this LogisticAggregator, and update the loss and gradient
   * of the objective function.
   *
   * @param instance The instance of data point to be added.
   * @return This LogisticAggregator object.
   */
  def add(instance: Instance): this.type = {
    instance match { case Instance(label, weight, features) =>
      require(dim == features.size, s"Dimensions mismatch when adding new instance." +
        s" Expecting $dim but got ${features.size}.")
      require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")

      if (weight == 0.0) return this

      val localCoefficientsArray = coefficientsArray
      val localGradientSumArray = gradientSumArray

      numClasses match {
        case 2 =>
          // For Binary Logistic Regression.
          // step 1: 二分类,求得 z = ∑ w*x + b,取负号.
          val margin = - {
            var sum = 0.0

            // a.每个特征号上,单独做求和运算.  ∑ w*x + b
            // 此处是做 ∑ w*x 运算. (是否做了归一化)
            features.foreachActive { (index, value) =>
              if (featuresStd(index) != 0.0 && value != 0.0) {
                sum += localCoefficientsArray(index) * (value / featuresStd(index))
              }
            }

            // 此处是 + b的运算.
            sum + {
              if (fitIntercept) localCoefficientsArray(dim) else 0.0
            }
          }

          // 这里的label采用的是(1,0).
          // step 2: 乘以该样本所带的unbalanced weight(样本所占比重, 缺省weight=1).
          val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)

          // step 3: 更新localGradient.
          // gradient = 1/n ∑ multiplier * x
          features.foreachActive { (index, value) =>
            if (featuresStd(index) != 0.0 && value != 0.0) {
              localGradientSumArray(index) += multiplier * (value / featuresStd(index))
            }
          }

          if (fitIntercept) {
            localGradientSumArray(dim) += multiplier
          }

          // step 4: 如果label为正例, loss为cross-entropy: 1/n * ∑ ln(1+exp(-y*wx))
          // lossSum.
          if (label > 0) {
            // The following is equivalent to log(1 + exp(margin)) but more numerically stable.
            lossSum += weight * MLUtils.log1pExp(margin)
          } else {
            lossSum += weight * (MLUtils.log1pExp(margin) - margin)
          }
        case _ =>
          new NotImplementedError("LogisticRegression with ElasticNet in ML package " +
            "only supports binary classification for now.")
      }

      // 
      weightSum += weight
      this
    }
  }

  /**
   * 进行merge: 方便分布式计算.
   *
   * Merge another LogisticAggregator, and update the loss and gradient
   * of the objective function.
   * (Note that it's in place merging; as a result, `this` object will be modified.)
   *
   * @param other The other LogisticAggregator to be merged.
   * @return This LogisticAggregator object.
   */
  def merge(other: LogisticAggregator): this.type = {
    require(dim == other.dim, s"Dimensions mismatch when merging with another " +
      s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")

    if (other.weightSum != 0.0) {
      weightSum += other.weightSum
      lossSum += other.lossSum

      var i = 0
      val localThisGradientSumArray = this.gradientSumArray
      val localOtherGradientSumArray = other.gradientSumArray
      val len = localThisGradientSumArray.length

      // 以local为准. 每列coff所对应梯度进行叠加.
      while (i < len) {
        localThisGradientSumArray(i) += localOtherGradientSumArray(i)
        i += 1
      }
    }
    this
  }

  // 求得最终loss.
  def loss: Double = {
    require(weightSum > 0.0, s"The effective number of instances should be " +
      s"greater than 0.0, but $weightSum.")
    lossSum / weightSum
  }

  // 求得最终gradient.
  def gradient: Vector = {
    require(weightSum > 0.0, s"The effective number of instances should be " +
      s"greater than 0.0, but $weightSum.")
    val result = Vectors.dense(gradientSumArray.clone())
    scal(1.0 / weightSum, result)
    result
  }
}

四、训练过程

  override protected def train(dataset: DataFrame): LogisticRegressionModel = {
    // 从数据集抽取列. 如果数据集是persisted,不需要persist oldDataset.  
    // Extract columns from data.  If dataset is persisted, do not persist oldDataset.
    val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))

    // 选取labelCol, weightCol, featuresCol作为row
    val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map {
      case Row(label: Double, weight: Double, features: Vector) =>
        Instance(label, weight, features)
    }

    // 是否持久化. 如果持久化,将训练样本进行cache. (MEMORY_AND_DISK,如果内存不够,会存成disk)
    // 这里有可能影响性能.
    val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
    if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

    // 统计.
    val (summarizer, labelSummarizer) = {
      val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer),
        instance: Instance) =>
          (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight))

      val combOp = (c1: (MultivariateOnlineSummarizer, MultiClassSummarizer),
        c2: (MultivariateOnlineSummarizer, MultiClassSummarizer)) =>
          (c1._1.merge(c2._1), c1._2.merge(c2._2))

      instances.treeAggregate(
        new MultivariateOnlineSummarizer, new MultiClassSummarizer)(seqOp, combOp)
    }

    val histogram = labelSummarizer.histogram
    val numInvalid = labelSummarizer.countInvalid
    val numClasses = histogram.length
    val numFeatures = summarizer.mean.size

    if (numInvalid != 0) {
      val msg = s"Classification labels should be in {0 to ${numClasses - 1} " +
        s"Found $numInvalid invalid labels."
      logError(msg)
      throw new SparkException(msg)
    }

    if (numClasses > 2) {
      val msg = s"Currently, LogisticRegression with ElasticNet in ML package only supports " +
        s"binary classification. Found $numClasses in the input dataset."
      logError(msg)
      throw new SparkException(msg)
    }

    // 特征的mean和std.
    val featuresMean = summarizer.mean.toArray
    val featuresStd = summarizer.variance.toArray.map(math.sqrt)

    // L1, L2
    val regParamL1 = $(elasticNetParam) * $(regParam)
    val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

    // costFun.
    val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), $(standardization),
      featuresStd, featuresMean, regParamL2)

    // optimizer: 优化器.
    // 如果 elasticNetParam = 0, 或者 regParam=0. 即使用L2正则,或者没有正则项. => 使用LBFGS.
    // 否则,L1+L2正则,以及L1正则 => 使用OWLQN
    val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
      new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
    } else {
      def regParamL1Fun = (index: Int) => {
        // Remove the L1 penalization on the intercept
        if (index == numFeatures) {
          0.0
        } else {
          if ($(standardization)) {
            regParamL1
          } else {
            // If `standardization` is false, we still standardize the data
            // to improve the rate of convergence; as a result, we have to
            // perform this reverse standardization by penalizing each component
            // differently to get effectively the same objective function when
            // the training dataset is not standardized.
            if (featuresStd(index) != 0.0) regParamL1 / featuresStd(index) else 0.0
          }
        }
      }
      new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
    }

    // 初始化cofficients为0.
    val initialCoefficientsWithIntercept =
      Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures)

    // 对于二分类,当将coefficients初始化为0时,如果根据label的分布对intercept进行初始化,收敛会更快.
    // b = log(P(1)/P(0))
    if ($(fitIntercept)) {
      /*
         For binary logistic regression, when we initialize the coefficients as zeros,
         it will converge faster if we initialize the intercept such that
         it follows the distribution of the labels.

         
         P(0) = 1 / (1 + \exp(b)), and
         P(1) = \exp(b) / (1 + \exp(b))
         , hence
         
         b = \log{P(1) / P(0)} = \log{count_1 / count_0}
         
       */
      initialCoefficientsWithIntercept.toArray(numFeatures)
        = math.log(histogram(1) / histogram(0))
    }

    // 开始迭代.
    val states = optimizer.iterations(new CachedDiffFunction(costFun),
      initialCoefficientsWithIntercept.toBreeze.toDenseVector)

    // 获取迭代的结果.
    val (coefficients, intercept, objectiveHistory) = {
      /*
         Note that in Logistic Regression, the objective history (loss + regularization)
         is log-likelihood which is invariance under feature standardization. As a result,
         the objective history from optimizer is the same as the one in the original space.
       */
      val arrayBuilder = mutable.ArrayBuilder.make[Double]
      var state: optimizer.State = null
      while (states.hasNext) {
        state = states.next()
        arrayBuilder += state.adjustedValue
      }

      if (state == null) {
        val msg = s"${optimizer.getClass.getName} failed."
        logError(msg)
        throw new SparkException(msg)
      }

      // 当权重coefficients在归一化空间中训练时,我们需要将它们转回到原始空间.
      // 注意,归一化空间的intercept,和原始空间是一样的,不需要归一化.
      /*
         The coefficients are trained in the scaled space; we're converting them back to
         the original space.
         Note that the intercept in scaled space and original space is the same;
         as a result, no scaling is needed.
       */
      val rawCoefficients = state.x.toArray.clone()
      var i = 0
      while (i < numFeatures) {
        rawCoefficients(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 }
        i += 1
      }

      // 
      if ($(fitIntercept)) {
        (Vectors.dense(rawCoefficients.dropRight(1)).compressed, rawCoefficients.last,
          arrayBuilder.result())
      } else {
        (Vectors.dense(rawCoefficients).compressed, 0.0, arrayBuilder.result())
      }
    }

    if (handlePersistence) instances.unpersist()

    // 统计状态.
    val model = copyValues(new LogisticRegressionModel(uid, coefficients, intercept))
    val logRegSummary = new BinaryLogisticRegressionTrainingSummary(
      model.transform(dataset),
      $(probabilityCol),
      $(labelCol),
      $(featuresCol),
      objectiveHistory)
    model.setSummary(logRegSummary)
  }

四、正则

  • L2 regularization -> ridge
  • L1 regularization -> lasso
  • mix L1 and L2 -> elastic Net

相应的公式:

对应到后面的代码里:

regPram = regParamL1+regParamL2
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

两种正则化方法L1和L2。L2正则化假设模型参数服从高斯分布,L2正则化函数比L1更光滑,所以更容易计算;L1假设模型参数服从拉普拉斯分布,L1正则化具备产生稀疏解的功能,从而具备Feature Selection的能力。

LBFGS和OWLQN

ok,我们知道,模型本身基本上核心代码就落在了这两个方法上:LBFGS和OWLQN。两者都是牛顿法的变种,核心思想是:

关于Hessian矩阵的计算,此处不做过多解释,如果你有兴趣想深究下数学实现。也可以再看一下breeze库里的这两个方法实现:

L-BFGS: Limited-memory BFGS。其中BFGS代表4个人的名字:broyden–fletcher–goldfarb–shanno OWL-QN: (Orthant-Wise Limited-Memory Quasi-Newton)算法。

关于breezey库,这里再简单提一下:

breeze库用于数值处理。它的目标是通用、简洁、强大,不需要牺牲太多性能。当前版本0.12。我们所熟悉的spark中的MLlib(经常见到的线性算法库:breeze.linalg、最优化算法库:breeze.optimize)就是在它的基础上构建的。另外它还提供了图绘制的功能(breeze.plot)。

总结

整个过程基本都ok了。L1还是L2的选择,看你的具体调参。另外再提一些注意事项。

  • 缺省会对特征做归一化,对于一些场景(比如推荐),离散化的特征,归一化没啥意义。反倒可能会影响结果好坏。
  • 对于截距b,会使用正负样本比例,进行log(P(1)/P(0))初始化。收敛会更快。
  • cache机制(StorageLevel.MEMORY_AND_DISK)。当内存不够用,可能会影响性能,这一点不好。

参考:

1.LogisticRegression源码 2.breeze 3.breeze文档