介绍

如果你在大学期间学过信息论或数据压缩相关课程,一定会了解Huffman Tree。建议首先在wikipedia上重新温习下Huffman编码与Huffman Tree的基本概念: Huffman编码wiki

简单的说(对于文本中的字母或词),Huffman编码会将出现频率较高(也可认为是:权重较大)的字(或词)编码成短码,而将罕见的字(或词)编码成长码。对比长度一致的编码,能大幅提升压缩比例。

而Huffman树指的就是这种带权路径长度最短的二叉树。权指的就是权重W(比如上面的词频);路径指的是:从根节点到叶子节点的路径长度L;带权路径指的是:树中所有的叶结点的权值乘上其到根结点的路径长度。WPL=∑W*L,它是最小的。

word2vec的Huffman-Tree实现

为便于word2vec的Huffman-Tree实现,我已经将它单独剥离出来,具体代码托管到github上: huffman_tree代码下载。示例采用的是wikipedia上的字母:

即:F:2, O:3, R:4, G:4, E:5, T:7

这里有两个注意事项:

  • 1.对于单个节点分枝的编码,wikipedia上的1/0分配是定死的:即左为0,右为1(但其实分左右是相对的,左可以调到右,右也可以调到左)。而word2vec采用的方法是,两侧中哪一侧的值较大则为1,值较小的为0。当然你可以认为(1为右,0为左)。
  • 2.word2vec会对词汇表中的词汇预先从大到小排好序,然后再去创建Huffman树。在CreateBinaryTree()调用后,会生成最优的带权路径最优的Huffman-Tree。

最终生成的图如下:

此图中,我故意将右边的T和RG父结节调了下左右,这样可以跳出上面的误区(即左为0,右为1。其实是按大小来判断0/1)

相应的数据结构为:

/**
 * word与Huffman树编码
 */
struct vocab_word {
  long long cn;     // 词在训练集中的词频率
  int *point;       // 编码的节点路径
  char *word,       // 词
       *code,       // Huffman编码,0、1串
       codelen;     // Huffman编码长度
};

最后,上面链接代码得到的结果:

1
2
3
4
5
6
word=T	cn=7	codelen=2	code=10	point=4-3-
word=E	cn=5	codelen=2	code=01	point=4-2-
word=G	cn=4	codelen=3	code=111	point=4-3-1-
word=R	cn=4	codelen=3	code=110	point=4-3-1-
word=O	cn=3	codelen=3	code=001	point=4-2-0-
word=F	cn=2	codelen=3	code=000	point=4-2-0-

整个计算过程设计的比较精巧。使用了三个数组:count[]/binary[]/parent_node[],这三个数组构成相应的Huffman二叉树。有vocab_size个叶子节点。最坏情况下,每个节点下都有一个叶子节点,二叉树的总节点数为vocab_size * 2 - 1就够用了。代码使用的是 vocab_size * 2 + 1。

当然,如果你有兴趣关注下整棵树的构建过程的话,也可以留意下这部分输出:

count[]:	7 5 4 4 3 2 1000000000000000 1000000000000000 1000000000000000 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 0 0 0 0 0 0 0 0 0 0 0
parent[]:	0 0 0 0 0 0 0 0 0 0 0 0
	
--step 1:
count[]:	7 5 4 4 3 2 5 1000000000000000 1000000000000000 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 0 0 0 1 0 0 0 0 0 0 0
parent[]:	0 0 0 0 6 6 0 0 0 0 0 0
	
--step 2:
count[]:	7 5 4 4 3 2 5 8 1000000000000000 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 0 1 0 1 0 0 0 0 0 0 0
parent[]:	0 0 7 7 6 6 0 0 0 0 0 0
	
--step 3:
count[]:	7 5 4 4 3 2 5 8 10 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 1 1 0 1 0 0 0 0 0 0 0
parent[]:	0 8 7 7 6 6 8 0 0 0 0 0
	
--step 4:
count[]:	7 5 4 4 3 2 5 8 10 15 1000000000000000 1000000000000000
binary[]:	0 1 1 0 1 0 0 1 0 0 0 0
parent[]:	9 8 7 7 6 6 8 9 0 0 0 0
	
--step 5:
count[]:	7 5 4 4 3 2 5 8 10 15 25 1000000000000000
binary[]:	0 1 1 0 1 0 0 1 0 1 0 0
parent[]:	9 8 7 7 6 6 8 9 10 10 0 0

参考

1.Huffman编码

mllib中lda源码分析(一)中,我们描述了LDA的原理、以及变分推断batch算法和online算法的推导。这一节会再描述下spark MLlib中的ml实现。

spark MLlib的实现,是基于变分推断算法实现的,后续的版本会增加Gibbs sampling。本文主要关注online版本的lda方法。(代码主要以1.6.1为主)

一、Batch算法

二、Online算法

ml中的lda相当于一层wrapper,更好地适配ml中的架构(tranformer/pipeline等等),调用的实际上是mllib中的lda实现。

1.LDA.run(主函数)

  def run(documents: RDD[(Long, Vector)]): LDAModel = {
    // 初始化(em=>em, online=>online).
    val state = ldaOptimizer.initialize(documents, this)

    // 迭代开始. 
    var iter = 0
    val iterationTimes = Array.fill[Double](maxIterations)(0)
    while (iter < maxIterations) {
      val start = System.nanoTime()
      
      // optimzier对应的迭代函数.
      state.next()
      
      val elapsedSeconds = (System.nanoTime() - start) / 1e9
      iterationTimes(iter) = elapsedSeconds
      iter += 1
    }

    // 获取最终完整的的LDAModel. 
    state.getLDAModel(iterationTimes)
  }

OnlineOptimizer.next

  override private[clustering] def next(): OnlineLDAOptimizer = {
    // 先进行sample
    val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction,
      randomGenerator.nextLong())
    
    // 如果为空,返回
    if (batch.isEmpty()) return this

    // 提交batch,进行lda迭代.
    submitMiniBatch(batch)
  }

submitMinibatch

/**
   * 提供语料的minibatch给online LDA模型.为出现在minibatch中的terms它会自适应更新topic distribution。
   * 
   * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA
   * model, and it will update the topic distribution adaptively for the terms appearing in the
   * subset.
   */
  private[clustering] def submitMiniBatch(batch: RDD[(Long, Vector)]): OnlineLDAOptimizer = {
    iteration += 1
    val k = this.k
    val vocabSize = this.vocabSize
    val expElogbeta = exp(LDAUtils.dirichletExpectation(lambda)).t      // β ~ Dirichlet(λ)
    val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)       // 
    val alpha = this.alpha.toBreeze
    val gammaShape = this.gammaShape

    // step 1: 对每个partition做map,单独计算E-Step. stats(stat, gammaPart)
    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>

      // 1.过滤掉空的.
      val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)

      // 2.stat 是一个DenseMatrix:  k x vocabSize
      val stat = BDM.zeros[Double](k, vocabSize)

      // 
      var gammaPart = List[BDV[Double]]()

      // 3.遍历partition上的所有document:执行EM算法.
      // E-step可以并行计算.
      // M-step需要reduce,作合并,然后再计算.
      nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
        // 3.1  取出document所对应的词的id。(支持dense/sparse).
        val ids: List[Int] = termCounts match {
          case v: DenseVector => (0 until v.size).toList
          case v: SparseVector => v.indices.toList
        }

        // 3.2 更新状态 E-step => gammad ().
        val (gammad, sstats) = OnlineLDAOptimizer.variationalTopicInference(
          termCounts, expElogbetaBc.value, alpha, gammaShape, k)

        // 3.3 根据所对应id取出对应列. 更新sstats(对应主题状态) 
        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats

        // 3.4 更新该partition上每个文档的gammad的gammaPart列表中.
        gammaPart = gammad :: gammaPart
      }

      // 4.mapPartition返回iterator,每个partition返回一个迭代器(stat, gammaPart)
      // stat: k x V matrix. 针对该partition上的文档,所更新出的每个词在各主题上的分布.
      // gammaPart: list[vector[K]] 该分区上每个文档的gammad列表.
      Iterator((stat, gammaPart))
    }

    // step 2: 对mini-batch的所有partition上的stats(stat,gammaPart)中的stat进行求总和.
    val statsSum: BDM[Double] = stats.map(_._1).reduce(_ += _)
    expElogbetaBc.unpersist()

    // step 3: 对mini-batch的所有partition上的stats(stat,gammaPart)中的gammaPart进行更新.
    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
      stats.map(_._2).reduce(_ ++ _).map(_.toDenseMatrix): _*)

    // step 4: 更新batchResult ( K x V), 每个元素上乘以 E[log(β)]
    val batchResult = statsSum :* expElogbeta.t

    // M-step:
    // 更新λ.
    // Note that this is an optimization to avoid batch.count
    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)

    // 如果需要优化DocConentration,是否更新alpha
    if (optimizeDocConcentration) updateAlpha(gammat)
    this
  }

variationalTopicInference

里面比较核心的一个方法是variationalTopicInference:

  private[clustering] def variationalTopicInference(
      termCounts: Vector,
      expElogbeta: BDM[Double],
      alpha: breeze.linalg.Vector[Double],
      gammaShape: Double,
      k: Int): (BDV[Double], BDM[Double]) = {

    // step 1: 词的id和cnt: (ids, cnts) 
    val (ids: List[Int], cts: Array[Double]) = termCounts match {
      case v: DenseVector => ((0 until v.size).toList, v.values)
      case v: SparseVector => (v.indices.toList, v.values)
    }

    // step 2: 初始化: gammad ~ Γ(100, 0.01), 100维
    // gammd: mini-batch的变分分布: q(θ | γ) 
    // expElogthetad: paper中的exp(E[logθ]), 其中θ ~ Dirichlet(γ)
    // expElogbetad:  paper中的exp(E[logβ]), 其中β : V * K
    // phiNorm:  ϕ ∝ exp{E[logθ] + E[logβ]},ϕ ~ θ*β. 非零.
    // Initialize the variational distribution q(theta|gamma) for the mini-batch
    val gammad: BDV[Double] =
      new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k)                   // K
    val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad))  // K
    val expElogbetad = expElogbeta(ids, ::).toDenseMatrix                        // ids * K

    // 加上一个很小的数,让它非零.
    val phiNorm: BDV[Double] = expElogbetad * expElogthetad :+ 1e-100            // ids
    var meanGammaChange = 1D
    val ctsVector = new BDV[Double](cts)                                         // ids

    // 单个mini-batch里的loop.
    // 迭代,直到 β 和 ϕ 收敛. paper中是0.000001,此处用的是1e-3.
    // Iterate between gamma and phi until convergence
    while (meanGammaChange > 1e-3) {
      val lastgamma = gammad.copy

      // breeze操作:https://github.com/scalanlp/breeze/wiki/Universal-Functions
      // ":"为elementwise操作符;其中的:=,对象相同,内容赋值
      // 计算:γ_dk, 先对ctsVector进行归一化,再乘以转置E(log(β)]^T,再pointwise乘E(log(θ)).
      // 各矩阵或向量的维度: K为topic, ids:为词汇表V
      // gammad: Vector(K),单个文档上每个主题各有一γ值.
      // expElogthetad: Vector(K),单个文档上每个主题各有一θ值.
      // expElogbetad: Matrix(ids*K),每个词,每个主题都有一β值.
      // ctsVector: Vector(ids),每个词上有一个词频.
      // phiNorm: Vector(ids),每个词上有一个ϕ分布值。
      // 
      // 
      //        K                  K * ids               ids
      gammad := (expElogthetad :* (expElogbetad.t * (ctsVector :/ phiNorm))) :+ alpha
      
      // 更新 exp(E[logθ]), expElogbetad不需要更新
      expElogthetad := exp(LDAUtils.dirichletExpectation(gammad))
      
      // 更新 phiNorm, 
      // TODO: Keep more values in log space, and only exponentiate when needed.
      phiNorm := expElogbetad * expElogthetad :+ 1e-100

      // 平均变化量.
      meanGammaChange = sum(abs(gammad - lastgamma)) / k
    }

    // sstats: mini-batch下每个主题下的词分布.
    // n 
    // sstatsd: k x 1 * 1 x ids => k x ids
    val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector :/ phiNorm).asDenseMatrix
    (gammad, sstatsd)
  }

ok。关于alpha的更新等,此处不再解释。详见源码。

在对MLlib中的lda进行源码分析之前,我们先熟悉下blei的LDA paper以及相关的概念。

介绍

首先,先做下约定。

  • 一个词(word)是离散数据的基本单位,被定义成词汇表中的一个item,由{1,…,V}进行索引。我们将词表示成one-hot格式。使用上标来表示components,在词汇表中第v个词被表示成一个V维的向量w,其中$ w^v=1, w^u=0(u\neq{v}) $。
  • 一个文档(document)是一个N个词组成的序列,表示成W=(w1,w2,…,wN),其中wn是序列上的第n个词。
  • 一个语料(corpus)是一个关于M个文档的集合,被表示成D={W1,W2,…,Wm}.

(blei论文中用黑体的w表示文档,为便于区分,此处用大写W)。

我们希望找到一个关于该语料的概率模型,它不仅可以将高概率分配给该语料的组成文档,还可以将高概率分配给其它“相似”文档。

LDA

LDA是一个关于语料的生成概率模型。基本思想是,文档可以被表示成在隐主题(latent topics)上的随机混合,其中每个主题(topic)都以一个在词(words)上的分布进行表示。

对于在语料D中的每个文档W,LDA假设如下的生成过程:

  • 1.选择 N ~ Poisson(ξ)
  • 2.选择 θ ~ Dir(α)
  • 3.对于每个文档(N个词汇wn):
    • (a) 选择一个主题$z_n$ ~ Multinomial(θ)
    • (b) 以主题$z_n$为条件,使用多项分布条件概率(multinomial probability):$ P(w_n \mid z_n, \beta) $选中一个词$w_n$

在该基础模型上,做了一些简化假设。

  • 首先,Dirichlet分布的维度k(主题变量z的维度),假设是已知并且确定的。
  • 第二,词概率由一个k x V的矩阵β进行参数化,其中$ \beta_{ij}=p(w^j=1 \mid z^i=1) $, 目前当成是一个待估计的确定量。
  • 最后,Poisson猜想是不严格的,可以使用更多的实际文档长度分布。注意N对于所有其它数据生成变量(θ和z)是独立的。它是个辅助变量,我们通常忽略它的随机性。

一个k维的Dirichlet随机变量θ,它的取值为(k-1)-simplex,(一个k维向量θ,它在泛化三角形(k-1)-simplex之内,其中:$ \theta_i \geq 0, \sum_{i=1}^k\theta_i=1 $) ,在该simplex上具有以下的概率密度:

……(1)

其中,参数α是一个k维的vector,相应的components上: αi > 0, 其中 Γ(x)为Gamma函数。Dirichlet是在simplex上的一个合适分布——它在指数族内,具有有限维的充分统计量(finite dimensional sufficient statistics),与multinomial分布共轭。在paper第5部分,这些属性将有助于LDA的inference和parameter estimation算法。

给定参数 α 和 β,我们可以给出关于一个主题混合θ,一个关于N个主题的z集合(每个词都有一个主题),一个N个词W的的联合分布

……(2)

其中$p(z_n \mid \theta) $可以简单认为:θi对于唯一的i,有$ {z_{n}}^{i}=1 $。在θ上积分(θ上连续),并在z上进行求和(z上离散),我们可以得到一个关于文档的边缘分布(marginal distribution)

……(3)

最后,将所有单个文档(document)的边缘分布进行连乘运算,我得到整个语料(corpus)的概率

LDA可以表示成图1所示的概率图模型。有三个级别的LDA表述。

  • 语料级参数:α 和 β是语料级参数(corpus-level parameters),假设在生成一个语料的过程中只抽样一次。
  • 文档级变量:θd是文档级变量(document-level variable),每个文档抽样一次。
  • 词级别变量:$z_{dn}$和$w_{dn}$是词级别变量(word-level variables),对于每个文档中的每个词抽样一次。

将LDA与一个简单的Dirichlet-multinomial clustering模型相区分很重要。一个经典的clustering模型将涉及到一个两层模型(two-level),对于一个语料只抽样一次Dirichlet;对于语料中的每个文档,只选一次multinomial clustering变量;对于在基于该cluster变量条件的文档上只选择一个词集合。有了多个clustering models后,这样的模型会限制一个文档只与单个主题相关联。而LDA涉及三层(three-level),尤其是主题节点(topic node)在单个文档中被重复抽样。在该模型下,文档可以与多个主题相关联。

图一:LDA的图形化模型表示。”plates”表示可重复。outer plate表示文档,inner plate表示在单个文档中关于主题和词汇的可重复的选择。

通常在贝叶斯统计建模(Bayesian statistical modeling)中学到与图1相似的结构。它们被称为层级模型(hierarchical models),或更精确地称为:条件独立层级模型(con-ditionally independent hierarchical models)(Kass and Steffey, 1989).这样的模型经常作为参数期望贝叶斯模型(parametric empirical Bayes models)被提到,它也被用于参数估计中。在第5部分,我们将采用经验贝叶斯方法(empirical Bayes approach),来估计在LDA的简单实现中的α 和 β, 我们也会考虑更完整的Bayesian方法。

2.1 LDA与可交换性

随机变量 {z1,…,zN}的一个有限集合,如果联合分布在置换(permutation)后不变,我们称之为是可交换的(exchangeable)。如果π是从整数1到N的一个置换(permutation):

一个无限的随机变量序列,如果每个有限的子序列是可交换的,那么就是无限可交换的(infinitely exchangeable)。

De Finetti’s representation理论声称:一个无限可交换的随机变量序列的联合概率,如果一个随机参数从这些分布中抽取,那么问题中的随机变量,在其于该参数条件下是独立同分布的(iid: independent and identically distributed)。

在LDA中,我们假设,词由主题(确定的条件分布)生成,这些主题在一个文档中是无限可交换的(infinitely exchangeable)。由de Finetti的理论,一个词和主题的序列的概率,必须具有以下形式:

其中,θ是在这些主题上的一个服从多项分布的随机变量。我们可以在等式(3)中获取LDA分布,通过对主题变量边缘化,并让θ服从一个Dirichlet分布。

2.2 其它模型

unigram模型

在unigram模型中,每个文档中的词都从一个单独的multinomial分布独立抽取得到:

Mixture of unigrams

如果unigram模型和一个离散随机主题变量z混合,我们就得到了一个mixture of unigrams model。在该混合模型下,每个文档的生成:首先选中一个主题z,然后以条件多项概率$p(w \mid z)$独立生成N个词。一个文档的概率为:

当从一个语料估计时,词分布可以看成是:假设每个文档只有一个主题,在该假设下的主题表示。该假设对于大语料集来说是一个很受限的模型。

相反的,LDA允许文档展现多个主题。这会额外带来另一个参数形销:有k-1参数与mixture of unigrams中的p(z)有关,而LDA模型中有k个参数与$ p(\theta \mid \alpha)$有关。

pLSI

pLSI是另一个广泛使用的文档模型(Hofmann 1999)。pLSI模型认为,一个文档d,词wn,对于一个未观察到的主题z来说是条件独立的:

pLSI模型尝试放宽maxture of unigrams模型中作出的简化猜想:每个文档只由一个主题生成。在某种意义上,它会捕获概率:一个文档可以包含多个主题,因为p(z | d)可看成是对于一个特定文档d的主题权重的混合。然后,注意,d是一个dummy index,指向在训练集中文档列表。这样,d是一个多项分布随机变量,值尽可能与训练文档一样多,模型学到的主题混合 p(z | d)只对应于被训练的文档。出于该原因,pLSI并不是一种定义良好的(well-defined)文档生成模型;天然不支持使用它来分配概率给一个之前未见过的文档。

图3: 不同模型的图表示

2.3 几何学解释

LDA与其它隐主题模型不同,可以通过隐空间(latent space)的几何表示来解释。来看下在每种模型下,一个文档在几何上是如何表示的。

上面所描述的所有4种模型(unigram, mixture of unigrams, pLSI, LDA),都是在词的空间分布上操作。每个这样的分布,可以看成是在(V-1)-simplex上的一个点,我们称之为word simplex。simplex是泛化三角形,关于simplex,参见simplex的wiki

unigram模型是在word simplex上找到单个点,假定语料中的所有词都来自该分布。隐变量模型(letent variable models)会考虑在word simplex上的k个点,并基于这些点形成一个子simplex(sub-simplex),我们称之为topic simplex。注意,在topic simplex上的任意点,在word simplex也是一个点。使用topic simplex的不同隐变量模型,以不同的方式生成一个文档。

  • 1-gram混合模型(mixture of unigram model):假设对于每个文档,在word simplex上的k个点(也就是说:topic simplex的其中一个角落)被随机选中,该文档的所有词汇都从这些点的分布上抽取得到。
  • pLSI模型:假定一个训练文档的每个词都来自一个随机选中的topic。这些主题(topics)本身从一个在这些主题上的指定文档分布上抽取到,例如,在topic simplex上的一个点。对于每个文档都有一个这样的分布;训练文档的集合定义了在topic simplex上的一个经验分布。
  • LDA: 假定,已观察到和未观察到的文档上的每个词,都由一个随机选中的topic生成,这个topic从一个随机选中的参数的分布上抽取。该参数从来自topic simplex的一个平滑分布上的每个文档中抽样得到。

图4: 嵌在包含三个词的word simplex上的三个主题的topic simplex。word simplex的角(corners)对应于三个分布,每个词各自都具有一个概率分布。topic simplex的三个顶点对应于在词上的三个不同分布。The mixture of unigrams模型会将每个文档放到topic simplex上其中一个角(corners)上。pLSI模型会引入根据x的topic simplex的一个经验分布。LDA则在由等高线表示的topic simplex上放置一个平滑分布。

3.推断与参与估计

3.1 inference

为了使用LDA,我们需要解决的核心推断问题是,对于一个给定文档,计算这些隐变量的后验分布(主题分布)

不幸的是,该分布很难计算。为了归一化该分布,我们对隐变量边缘化(marginalize),并将等式(3)表示成模型参数的形式:

该函数很难解,因为θ 和 β在隐主题的求和上相耦合(Dickey, 1983)。Dickey展示了该函数是一个在Dirichlet分布(可以表示成一个超几何函数)的特定扩展下的期望。它被用在一个Beyesian上下文中…

尽管后验分布很难精准推断(inference),有许多近似推断算法可以用于LDA,包括Laplace近似,变分近似(variational approximation),马尔可夫链蒙特卡罗法MCMC(jordan,1999)。本节我们描述了一种在LDA中简单的凸变分推断算法(convexity-based variational algorithm for inference)。其它方法在paper 第8部分讨论。

3.2 变分推断

凸变分推断算法的基本思想是,充分利用Jensen不等式来获取一个在log likelihood上的可调下界(jordan et al.,1999)。本质上,可以考虑一个关于下界的家族(a family of lower bounds),由一个变分参数(variational parameters)集合进行索引。变分参数由一个优化过程进行选择,它会尝试找到最紧可能的下界(tightest possible lower bound)。

获取一个可控下界家族的一个简单方法是,考虑将原始的图示模型进行简单修改,移去一些边(edge)和节点(node)。将图1所示的LDA模型进行修改, θ 和 β之间的耦合,由于边θ, z, 和 W之间存在边(edge)。通过抛弃这些边以及W节点,生成一个更简单的图示模型,它有自由的变分参数,我们可以在隐变量上获取一个分布族。该分布族由以下的变分分布组成:

……(4)

其中,Dirichlet分布参数γmultinomial分布参数(φ1 , . . . , φN),是自由变分参数。

图4: (左)LDA的图示模型 (右)采用变分分布来近似LDA后验的图示模型

在指定了一个简化版本的概率分布族后,下一步是建立一个优化问题,来确定变分参数γ 和 φ的值。正如在附录A中展示的,找到一个在log likelihood上的紧下限,可以直接翻译成下面的优化问题:

……(5)

变分参数的最优值,可以通过对变分分布$ q(\theta,z \mid \gamma,\phi) $和真实后验概率$ p(\theta,z \mid W,\alpha,\beta) $间的KL散度进行最小化得到。该最小化可以通过一个迭代型的定点方法(fixed-point method)完成。特别的,我们在附录A.3中展示了:通过计算KL散度的导数,将它们等于0, 可以得到以下更新等式的pair:

……(6)

……(7)

在附录A.1中,多项分布的期望更新可以以如下方式计算:

……(8)

其中,Ψ 是logΓ函数通过Taylor展开式的首个导数项。

等式(6)和(7)具有一个吸引人的直觉解释。Dirichlet分布更新(Dirichlet update)是一个在给定变分分布($ E[z_n \mid \phi_n] $>)下给定的期望观察的后验Dirichlet。多项分布更新(multinomial update)类似于使用贝叶斯理论,$ p(z_n \mid w_n) \propto p(w_n \mid z_n) $,其中p(zn)是在变分分布下的期望值的log的指数近似。

注意,变分分布实际上是一个条件分布,由一个关于W的函数区分。因为在等式(5)中的优化问题,由确定的W所管理,因而,最优解($ \gamma^{\ast},\phi^{\ast} $)是一个关于W的函数。我们可以将产生的变分分布写成:$ q(\theta,z \mid \gamma^{\ast}(W),\phi^{\ast}(W)) $,其中,我们已经在W上显式地作出了独立性。这样,变分分布可以看成是一个对后验分布$ p(\theta,z \mid W,\alpha,\beta)$的近似。

在文本语言中,最优参数($ \gamma^{\ast}(W), \phi^{\ast}(W) $)是由文档决定的(document-specific)。特别的,我们可以将Dirichlet参数$ \gamma^{\ast}(W) $看成是在topic simplex中提供一个文档表示。

图6: LDA的variational inference算法

图6展示了变分推断过程的总结,对于 γ 和 φn具有合适的起始点。从该伪代码我们可以知道:LDA的变分推断算法,每次迭代需要O((N+1)k)次操作。经验上,我们可以找到对于单个文档的迭代数,与文档中词的数目相近似。因而总的操作数与$ N^2k $相近。

3.3 参数估计

本节描述了一种在LDA模型中用于参数估计的经验贝叶斯方法(empirical Bayes method)。特别的,给定一个文档集的语料库:D={W1,W2,…WM}。我们希望找到α 和 β,来最大化整个语料数据的(marginal)log likelihood:

如上所述,p(W | α,β)的计算很困难。然而,变分推断提供给我们一个在log likelihood上的下界,我们可以分别根据α和β进行最大化。我们找到了近似经验贝叶斯估计,通过一种交替变分EM过程(alternating variational EM procedure),来分别对变分参数γ 和 φ,最大化下界。接着,变分参数确定下来后,再分别根据模型参数α 和 β,最大化下界。

我们在附录A.4中提供了一个用于LDA的变分EM算法的导数的详细推导。推导以下面的迭代形式描述:

  • 1.(E-step): 对于每个文档,找到变分参数{$ \gamma_d^{\ast},\phi_d^{\ast}: d \in D $}。这个在上一节已描述。
  • 2.(M-step): 分别根据模型参数α 和 β,最大化在log likelihood上产生的下界。这相当于,在E-step中计算出的合适的后验概率下,为每个文档使用期望的足够统计量,找到极大似然估计。

这两个step会一直循环,直到log likelihood的下界收敛为止。

在附录A.4,我们展示了对于条件多项分布参数β进行M-step更新(M-step update),可以写成:

……(9)

我们会进一步展示Dirichlet参数α的M-step更新,它可以使用一种有效在线性时间上增长的Newton-Raphson方法,其中的Hessian是可逆的。

3.4 Smoothing

对于许多文档的大语料,具有很大的词汇表size,通常会有严重的sparsity问题。对于一个新文档,它包含的词汇很可能没有出现在训练语料中。对于这些词来说,多项分布参数(multinomial parameters)的最大似然估计会分配概率为0,因而,这些新文档的概率为0。应付该问题的方法是,对多项分布参数进行”平滑(smooth)”,对于所有词汇表item都分配一个正的概率,不管它们是否出现在训练集当中。最常用的方法是Laplace smoothing;这本质上会产生在多项分布参数上的均匀Dirichlet先验分布下,对后验分布求平均。

不幸的是,在混合模型设置中,简单的Laplace smoothing不再适合最大化后验方法。事实上,在多项分布参数上放置一个Dirichlet先验,我们可以在混合模型设置上得到一个难解的后验,这与在基本的LDA模型上得到一个难解的后验的原因基本类似。我们提出解决该问题的方法是,简单地应用变分推断方法到扩展模型上,它包括了在多项分布参数上的Dirichlet smoothing

图7:带平滑的LDA图示模型

在LDA设置中,我们可以获得如图7所示的扩展模型。我们将β看成是一个k x V的随机矩阵(每行表示一个mixture component),其中我们假设每行都是从一个可交换的Dirichlet分布上独立抽取的。我们现在扩展我们的inference过程,将βi看成是随机变量,它天然具有一个基于训练语料条件的后验分布。此时我们来看下这个更完整的LDA Bayesian方法。

将一个变分过程看成是一个Bayesian inference,它在随机变量β, θ 和 z上放置一个独立分布(Attias,2000):

其中 $ q_d(\theta,z \mid \phi,\gamma) $是在等式(4)中定义的变分分布。很容易验证,产生的变分推断过程会产生等式(6)和(7)来分别作为对变分参数φ 和 γ更新等式,同时也会对一个新的变分参数λ做一个额外更新:

迭代这些等式直到收敛,会产生一个在β, θ 和 z上的一个合适后验分布。

现在我们将超参数η用于可交换的Dirichlet,如同前面提到的超参数α。设置这些超参数的方法是:我们使用变分EM来寻找基于边缘似然上的这些参数的最大似然估计。这些过程在附录A.4描述。

4.online算法

对于主题模型,后验概率是很难计算的,可以通过近似后验推断来计算。当前的近似后验推导算法分为两大类:抽样方法(sampling approaches)和优化解法(optimization approaches)。抽样方法有MCMC,优化解法有VB(Variational Bayes)。VB在经验上比MCMC更快,与MCMC准确率相当,对于大数据集来说,VB是更吸引人。

但是,使用VB在大数据上计算很困难。标准的“batch”VB算法,会迭代分析每个观察(observation),并迭代更新数据集范围的变分参数。batch算法的每次迭代开销,对于非常大的数据集来说不切实际。在主题建模(topic modeling)应用中,这一点特别重要。主题建模会对不能手工标注的大型文档集合进行归纳隐式结构。

为此,blei等提出了一种online VB算法,这种算法基于online随机优化(online stochastic optimization),在大数据集上,它比batch算法更快地产生好的参数估计。Online LDA可以方便地分析大量文档集合,不需要本地存储,或者收集文档,每个文档可以以流式(streaming)到达,看完后抛弃。

以下的等式重新标号。

假设有K个主题。每个主题定义了一个在词汇表上的多项分布,假设这些主题从一个Dirichlet上抽取得到: βk ∼ Dirichlet(η)。给定主题后,LDA为每个文档d假设了以下的生成过程。

  • 首先,从主题$ \theta_d \sim Dirichlet(\alpha) $上抽取一个分布;
  • 接着,对于在文档中的每个词i,从主题权重$ z_{di} \sim \theta_d $ 上抽取一个主题索引$ z_{di} \in {1, . . . , K }$,从选中的主题上抽取观察词$w_{di}$,$w_{di} \sim \beta_{z_{di}}$。出于简洁性,我们假设在θ 和 β上具有对称先验,但该假设很容易放宽。

注意,如果我们在主词分配z上进行求和,接着,我们得到:$p(w_{di} \mid \theta_d,\beta)=\sum_k\theta_{dk}\beta_{kw}$。这会导致LDA的”multinomial PCA”解释。我们可以将LDA看成是将一个词频n的矩阵(其中$n_{dw}$是词w出现在文档d中的次数),概率分解(probabilistic factorization)成一个关于主题权重θ的矩阵,以及一个主题字典β。我们的工作可以看成是在线矩阵分解技术的一个扩展,它会最优化squared error来得到更泛化的概率公式。

  • 主题(the topics): β
  • 主题比例(topic proportions): θ
  • 主题分配(topic assignments): z

4.1 Batch算法

在VB推断中,真实的后验由一个更简单的分布q(z,θ,β)来近似,它由一个自由参数集进行索引。这些参数用于优化:最大化置信下界(ELBO:Evidence Lower BOund)

…… (1)

最大化ELBO,相当于最小化q(z,θ,β)与后验$p(z,\theta,\beta \mid W,\alpha,\eta)$间的KL散度。我们选择一个完整的因子分布q(fully factorized distribution):

……(2)

每个词的主题分配z的后验,可以由φ参数化,在每个文档上的主题权重θ可以由γ进行参数化,在主题上的后验β可以由λ进行参数化。为便于记忆,我们将λ作为”主题(the topics)”。等式1分解为:

……(3)

注意,我们在对文档求和中引入了每个语料的项,可以通过文档D的数目进行划分。该步可以帮助我们对online VB算法求导。

接着,我们将上述期望展开成变分参数的函数。变分目标函数只依赖于$ n_dw $, 词w在文档中的出现次数。当使用VB时,文档可以由词数(word counts)来进行归纳:

……(4)

其中V是词汇表的size,D是是文档数目。$ \ell(n_d,\phi_d,\gamma_d,\lambda) $ 表示文档d对ELBO的贡献度。

L可以使用在变分参数φ, γ, λ 上的坐标上升法进行优化:

……(5)

在q分布下logθ和logβ的期望为:

……(6)

其中Ψ表示digamma函数。

等式5的更新可以保证ELBO收敛到一个稳定点。通过EM算法(Expectation-Maximization)来完成,我们可以将这些更新划分到”E-step”:它会保持λ固定,迭代更新γ 和 φ 直到收敛,接着会进行”M-step”:由给定φ更新λ。实例上,如果在每个E-step上重新初始化γ 和 φ,该算法会收敛到一个更好的解。算法1就是batch VB for LDA:

Online算法

算法1具有常数量的内存开销,经验上,会比batch collpased Gibbs sampling收敛更快。然而,它仍需要在每次迭代时,将整个语料库传进去。因而,如果对于非常大的数据集会很慢,天然不适合不断到来的新数据。我们提出了一种在线变分推断算法来拟合λ,该参数是在主题分布β上的变分后验。我们的算法几科与batch VB算法一样简单,但对于大数据集收敛更快。

要想让主题参数λ的设置更好,就要让在算法1中的E-step中拟合得到每个文档的变分参数γ和φ之后,衡量ELBO的L要尽可能地高。将γ(nd, λ)和φ(nd, λ)设置为由E-step产生的γd 和 φd。我们的学习目标是设置λ,使下面目标最大化:

……(7)

是第d个文档对等式(4)的变分上界的贡献度。这类似于最小平方矩阵分解的目标函数,尽管LDA的ELBO比简单的squared loss function更不方便。

Online VB for LDA在算法2中描述。

第t个词频向量$n_t$被观察看,保持λ固定,我们执行一个E-step来找到局部最优解$\gamma_{t}$和$\phi_{t}$。接着,我们计算$\hat{\lambda}$,假如整个语料由单个文档$n_t$重复D次组成,那么λ的设置就是最优的(给定φt)。D是提供给算法的唯一文档数目,比如:一个语料的size。(在真实online的例子中:D->∞,对应于β的经验Bayes估计)。我们接着更新λ,使用一个λ的先前值和$\hat{\lambda}$进行加权平均得到。$\hat{\lambda}$的权重为:$\rho_t \triangleq (\tau_0 + t)^{-\kappa}$,其中$\kappa \in (0.5,1]$控制着$\hat{\lambda}$的旧值被遗忘的比率,τ0 ≥ 0会减缓算法的早期迭代。条件κ ∈ (0.5, 1]需要保证收敛。我们会在下一节展示,online LDA对应于在变分目标函数L上的一个随机自然梯度算法(stochastic natural gradient algorithm)。

该算法很像paper[16]中提出在online VB,在模型上有隐数据——最重要的区别是,我们使用一个近似的E-step来优化γt 和 φt, 因为我们不能精确计算条件分布$ p(z_t,\theta_t \mid \beta,n_t,\alpha) $。

Mini-batches: 在随机学习中的常用技术是,在每次更新时考虑多个观察(observations)以降噪。在online LDA中,这意味着计算$\hat{\lambda}$使用S>1的观察:

……(8)

其中$n_{ts}$是在mini-batch t中的第s个文档。该文档的变分参数$\phi_{ts}$和$\gamma_{ts}$是使用普通E-step进行拟合。注意,当S=D 和 κ = 0时,online VB就成了batch VB。

超参数估计:在batch 变分LDA中,超参数α 和 η的点估计,可以拟合给出的γ 和 λ,它使用线性时间的Newton-Raphson法。我们同样将α 和 η的更新并入到online LDA:

……(9)

其中,$\hat{\alpha}(\gamma_t)$是Hessian逆矩阵乘以梯度$\triangledown_{\alpha}l(n_t,\gamma_t,\phi_t,\lambda$,$ \hat{\eta}(\lambda))$是Hessian的逆矩阵乘以梯度$\triangledown_{\eta}\mathcal{L}$,$\rho_{t} \triangleq (\tau_0 + t)^{-\kappa}$。

5.3 收敛分析

此处不详说,见paper。

介绍

MLlib中有两个lr实现。分别在mllib和ml中。此处分析的是ml,这也是后续mllib继续重点发展的一个库, mllib则会渐渐淡出历史舞台。ml的logistic回归,代码在org.apache.spark.ml.classification.LogisticRegression中实现。在详见代码之前,我们先简单地看下LR可调节的参数. [暂时修正至以1.6.1代码为参考.]

一、LR模型的参数

参数:

  • threshold: 如果label 1的概率>threshold,则预测为1,否则为0.
  • regParam:正则项参数(相当于lambda)
  • elasticNetParam:ElasticNet混合参数。如果alpha = 0, 惩罚项是一个L2 penalty。如果alpha = 1,它是一个L1 penalty。如果0 < alpha < 1,则是L1和L2的结果。缺省为0.0,为L2罚项。
  • maxIter: 缺省100次。
  • tol:迭代收敛的容忍度。值越小,会得到更高精度,同时迭代次数开销也大。缺省为1E-6.
  • fitIntercept: 是否要拟合截距项(intercept term),缺省为true.
  • standardization:在对模型进行fit前,是否对训练特征进行标准化(归一化)。模型的系数将总是返回到原始的scale上,它对用户是透明的。注意:当不使用正则化时,使用/不使用standardization,模型都应收敛到相同的解决方式上。在R的GLMNET包里,缺省行为也为true。缺省为true。
  • weightCol:如果没设置或空,所有实例为有为1的权重。如果设置,则相当于对unbalanced data设置不同的权重。缺省不设置。
  • treeAggregate:如果特征维度或分区数太大,该参数需要调得更大。缺省为2.

二、原理

logistic回归的原理,这里不详述。简单地写:y = logit(∑wx + b)。相应的loss和梯度如下所示(箭头表示向量):

$l_{model}$为cross-entropy;$l_{reg}$为正则项。

对于第一部分$l_{model}$的计算,依赖于训练数据;对于第二部分正则项,它不依赖于训练数据。因而这两部分在实现时是独立计算的。

每个训练样本的loss和gradient是独立的。

样本i的梯度:

样本的loss:

对于spark来说,ml的logistic回归实现通过treeAggregate来完成。在executors/workers上计算各RDD上的loss和gradient;在driver/controller上将它们进行reduce进行所有训练样本的求和,得到lossSum和gradientSum。

在单机的driver上完成Optimization; L1正则由OWLQN optimizer处理。L2由L-BFGS处理。这两个优化器都由Breeze库提供(Breeze库提供了Vector/Matrix的实现以及相应计算的接口Linalg)。

整个过程如图所示:

三、loss与gradient

主要在LogisticCostFun类中,具体见代码,这里仅把核心代码及注释帖出来。可以对应于上面的公式。

private class LogisticCostFun(
    instances: RDD[Instance],
    numClasses: Int,
    fitIntercept: Boolean,
    standardization: Boolean,
    featuresStd: Array[Double],
    featuresMean: Array[Double],
    regParamL2: Double) extends DiffFunction[BDV[Double]] {

  override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
    val numFeatures = featuresStd.length
    val coeffs = Vectors.fromBreeze(coefficients)

    // step 1: cost部分.
    val logisticAggregator = {
      val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance)
      val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)

      // 先在rdd的每个分区上应用seqOp函数,做add操作(计算每个样本的loss/gradient)
      // 再在driver上应用comOp函数,做merge操作(求得总的loss/gradient)
      instances.treeAggregate(
        new LogisticAggregator(coeffs, numClasses, fitIntercept, featuresStd, featuresMean)
      )(seqOp, combOp)
    }

    // 求得总的gradientArray.
    val totalGradientArray = logisticAggregator.gradient.toArray

    // step 2: L2正则项部分 => regVal. 
    //  reg = lambda * ∑ regParamL2/2 wi^2 + (1-regParamL2) |wi| 
    // regVal is the sum of coefficients squares excluding intercept for L2 regularization.
    val regVal = if (regParamL2 == 0.0) {
      0.0
    } else {
      var sum = 0.0
      
      coeffs.foreachActive { (index, value) =>
        // If `fitIntercept` is true, the last term which is intercept doesn't
        // contribute to the regularization.
        if (index != numFeatures) {
          // The following code will compute the loss of the regularization; also
          // the gradient of the regularization, and add back to totalGradientArray.
          // gradientArray:  costFun项的梯度 + 正则项的梯度
          // sum: L2正则
          sum += {
             
            if (standardization) {
              
              totalGradientArray(index) += regParamL2 * value
              value * value
            } else {
              // 
              if (featuresStd(index) != 0.0) {
                // If `standardization` is false, we still standardize the data
                // to improve the rate of convergence; as a result, we have to
                // perform this reverse standardization by penalizing each component
                // differently to get effectively the same objective function when
                // the training dataset is not standardized.
                val temp = value / (featuresStd(index) * featuresStd(index))
                totalGradientArray(index) += regParamL2 * temp
                value * temp
              } else {
                0.0
              }
            }
          }
        }
      }

      0.5 * regParamL2 * sum
    }

    // 返回带L2正则的loss,以及带L2正则的梯度.
    (logisticAggregator.loss + regVal, new BDV(totalGradientArray))
  }
}

里面会涉及到另一个类:LogisticAggregator。它的作用,相当于做map/reduce运算,计算loss/gradient。

private class LogisticAggregator(
    coefficients: Vector,
    numClasses: Int,
    fitIntercept: Boolean,
    featuresStd: Array[Double],
    featuresMean: Array[Double]) extends Serializable {

  private var weightSum = 0.0
  private var lossSum = 0.0

  // coefficients => coefficientsArray 数组.
  private val coefficientsArray = coefficients match {
    case dv: DenseVector => dv.values
    case _ =>
      throw new IllegalArgumentException(
        s"coefficients only supports dense vector but got type ${coefficients.getClass}.")
  }

  // 总的维度.
  private val dim = if (fitIntercept) coefficientsArray.length - 1 else coefficientsArray.length

  //gradientSumArray.
  private val gradientSumArray = Array.ofDim[Double](coefficientsArray.length)

  /**
   * 将一个训练样本添加到LogisticAggregator, 并更新目标函数的loss和gradient.
   * 
   * Add a new training instance to this LogisticAggregator, and update the loss and gradient
   * of the objective function.
   *
   * @param instance The instance of data point to be added.
   * @return This LogisticAggregator object.
   */
  def add(instance: Instance): this.type = {
    instance match { case Instance(label, weight, features) =>
      require(dim == features.size, s"Dimensions mismatch when adding new instance." +
        s" Expecting $dim but got ${features.size}.")
      require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")

      if (weight == 0.0) return this

      val localCoefficientsArray = coefficientsArray
      val localGradientSumArray = gradientSumArray

      numClasses match {
        case 2 =>
          // For Binary Logistic Regression.
          // step 1: 二分类,求得 z = ∑ w*x + b,取负号.
          val margin = - {
            var sum = 0.0

            // a.每个特征号上,单独做求和运算.  ∑ w*x + b
            // 此处是做 ∑ w*x 运算. (是否做了归一化)
            features.foreachActive { (index, value) =>
              if (featuresStd(index) != 0.0 && value != 0.0) {
                sum += localCoefficientsArray(index) * (value / featuresStd(index))
              }
            }

            // 此处是 + b的运算.
            sum + {
              if (fitIntercept) localCoefficientsArray(dim) else 0.0
            }
          }

          // 这里的label采用的是(1,0).
          // step 2: 乘以该样本所带的unbalanced weight(样本所占比重, 缺省weight=1).
          val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)

          // step 3: 更新localGradient.
          // gradient = 1/n ∑ multiplier * x
          features.foreachActive { (index, value) =>
            if (featuresStd(index) != 0.0 && value != 0.0) {
              localGradientSumArray(index) += multiplier * (value / featuresStd(index))
            }
          }

          if (fitIntercept) {
            localGradientSumArray(dim) += multiplier
          }

          // step 4: 如果label为正例, loss为cross-entropy: 1/n * ∑ ln(1+exp(-y*wx))
          // lossSum.
          if (label > 0) {
            // The following is equivalent to log(1 + exp(margin)) but more numerically stable.
            lossSum += weight * MLUtils.log1pExp(margin)
          } else {
            lossSum += weight * (MLUtils.log1pExp(margin) - margin)
          }
        case _ =>
          new NotImplementedError("LogisticRegression with ElasticNet in ML package " +
            "only supports binary classification for now.")
      }

      // 
      weightSum += weight
      this
    }
  }

  /**
   * 进行merge: 方便分布式计算.
   *
   * Merge another LogisticAggregator, and update the loss and gradient
   * of the objective function.
   * (Note that it's in place merging; as a result, `this` object will be modified.)
   *
   * @param other The other LogisticAggregator to be merged.
   * @return This LogisticAggregator object.
   */
  def merge(other: LogisticAggregator): this.type = {
    require(dim == other.dim, s"Dimensions mismatch when merging with another " +
      s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")

    if (other.weightSum != 0.0) {
      weightSum += other.weightSum
      lossSum += other.lossSum

      var i = 0
      val localThisGradientSumArray = this.gradientSumArray
      val localOtherGradientSumArray = other.gradientSumArray
      val len = localThisGradientSumArray.length

      // 以local为准. 每列coff所对应梯度进行叠加.
      while (i < len) {
        localThisGradientSumArray(i) += localOtherGradientSumArray(i)
        i += 1
      }
    }
    this
  }

  // 求得最终loss.
  def loss: Double = {
    require(weightSum > 0.0, s"The effective number of instances should be " +
      s"greater than 0.0, but $weightSum.")
    lossSum / weightSum
  }

  // 求得最终gradient.
  def gradient: Vector = {
    require(weightSum > 0.0, s"The effective number of instances should be " +
      s"greater than 0.0, but $weightSum.")
    val result = Vectors.dense(gradientSumArray.clone())
    scal(1.0 / weightSum, result)
    result
  }
}

四、训练过程

  override protected def train(dataset: DataFrame): LogisticRegressionModel = {
    // 从数据集抽取列. 如果数据集是persisted,不需要persist oldDataset.  
    // Extract columns from data.  If dataset is persisted, do not persist oldDataset.
    val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))

    // 选取labelCol, weightCol, featuresCol作为row
    val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map {
      case Row(label: Double, weight: Double, features: Vector) =>
        Instance(label, weight, features)
    }

    // 是否持久化. 如果持久化,将训练样本进行cache. (MEMORY_AND_DISK,如果内存不够,会存成disk)
    // 这里有可能影响性能.
    val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
    if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

    // 统计.
    val (summarizer, labelSummarizer) = {
      val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer),
        instance: Instance) =>
          (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight))

      val combOp = (c1: (MultivariateOnlineSummarizer, MultiClassSummarizer),
        c2: (MultivariateOnlineSummarizer, MultiClassSummarizer)) =>
          (c1._1.merge(c2._1), c1._2.merge(c2._2))

      instances.treeAggregate(
        new MultivariateOnlineSummarizer, new MultiClassSummarizer)(seqOp, combOp)
    }

    val histogram = labelSummarizer.histogram
    val numInvalid = labelSummarizer.countInvalid
    val numClasses = histogram.length
    val numFeatures = summarizer.mean.size

    if (numInvalid != 0) {
      val msg = s"Classification labels should be in {0 to ${numClasses - 1} " +
        s"Found $numInvalid invalid labels."
      logError(msg)
      throw new SparkException(msg)
    }

    if (numClasses > 2) {
      val msg = s"Currently, LogisticRegression with ElasticNet in ML package only supports " +
        s"binary classification. Found $numClasses in the input dataset."
      logError(msg)
      throw new SparkException(msg)
    }

    // 特征的mean和std.
    val featuresMean = summarizer.mean.toArray
    val featuresStd = summarizer.variance.toArray.map(math.sqrt)

    // L1, L2
    val regParamL1 = $(elasticNetParam) * $(regParam)
    val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

    // costFun.
    val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), $(standardization),
      featuresStd, featuresMean, regParamL2)

    // optimizer: 优化器.
    // 如果 elasticNetParam = 0, 或者 regParam=0. 即使用L2正则,或者没有正则项. => 使用LBFGS.
    // 否则,L1+L2正则,以及L1正则 => 使用OWLQN
    val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
      new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
    } else {
      def regParamL1Fun = (index: Int) => {
        // Remove the L1 penalization on the intercept
        if (index == numFeatures) {
          0.0
        } else {
          if ($(standardization)) {
            regParamL1
          } else {
            // If `standardization` is false, we still standardize the data
            // to improve the rate of convergence; as a result, we have to
            // perform this reverse standardization by penalizing each component
            // differently to get effectively the same objective function when
            // the training dataset is not standardized.
            if (featuresStd(index) != 0.0) regParamL1 / featuresStd(index) else 0.0
          }
        }
      }
      new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
    }

    // 初始化cofficients为0.
    val initialCoefficientsWithIntercept =
      Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures)

    // 对于二分类,当将coefficients初始化为0时,如果根据label的分布对intercept进行初始化,收敛会更快.
    // b = log(P(1)/P(0))
    if ($(fitIntercept)) {
      /*
         For binary logistic regression, when we initialize the coefficients as zeros,
         it will converge faster if we initialize the intercept such that
         it follows the distribution of the labels.

         
         P(0) = 1 / (1 + \exp(b)), and
         P(1) = \exp(b) / (1 + \exp(b))
         , hence
         
         b = \log{P(1) / P(0)} = \log{count_1 / count_0}
         
       */
      initialCoefficientsWithIntercept.toArray(numFeatures)
        = math.log(histogram(1) / histogram(0))
    }

    // 开始迭代.
    val states = optimizer.iterations(new CachedDiffFunction(costFun),
      initialCoefficientsWithIntercept.toBreeze.toDenseVector)

    // 获取迭代的结果.
    val (coefficients, intercept, objectiveHistory) = {
      /*
         Note that in Logistic Regression, the objective history (loss + regularization)
         is log-likelihood which is invariance under feature standardization. As a result,
         the objective history from optimizer is the same as the one in the original space.
       */
      val arrayBuilder = mutable.ArrayBuilder.make[Double]
      var state: optimizer.State = null
      while (states.hasNext) {
        state = states.next()
        arrayBuilder += state.adjustedValue
      }

      if (state == null) {
        val msg = s"${optimizer.getClass.getName} failed."
        logError(msg)
        throw new SparkException(msg)
      }

      // 当权重coefficients在归一化空间中训练时,我们需要将它们转回到原始空间.
      // 注意,归一化空间的intercept,和原始空间是一样的,不需要归一化.
      /*
         The coefficients are trained in the scaled space; we're converting them back to
         the original space.
         Note that the intercept in scaled space and original space is the same;
         as a result, no scaling is needed.
       */
      val rawCoefficients = state.x.toArray.clone()
      var i = 0
      while (i < numFeatures) {
        rawCoefficients(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 }
        i += 1
      }

      // 
      if ($(fitIntercept)) {
        (Vectors.dense(rawCoefficients.dropRight(1)).compressed, rawCoefficients.last,
          arrayBuilder.result())
      } else {
        (Vectors.dense(rawCoefficients).compressed, 0.0, arrayBuilder.result())
      }
    }

    if (handlePersistence) instances.unpersist()

    // 统计状态.
    val model = copyValues(new LogisticRegressionModel(uid, coefficients, intercept))
    val logRegSummary = new BinaryLogisticRegressionTrainingSummary(
      model.transform(dataset),
      $(probabilityCol),
      $(labelCol),
      $(featuresCol),
      objectiveHistory)
    model.setSummary(logRegSummary)
  }

四、正则

  • L2 regularization -> ridge
  • L1 regularization -> lasso
  • mix L1 and L2 -> elastic Net

相应的公式:

对应到后面的代码里:

regPram = regParamL1+regParamL2
val regParamL1 = $(elasticNetParam) * $(regParam)
val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)

两种正则化方法L1和L2。L2正则化假设模型参数服从高斯分布,L2正则化函数比L1更光滑,所以更容易计算;L1假设模型参数服从拉普拉斯分布,L1正则化具备产生稀疏解的功能,从而具备Feature Selection的能力。

LBFGS和OWLQN

ok,我们知道,模型本身基本上核心代码就落在了这两个方法上:LBFGS和OWLQN。两者都是牛顿法的变种,核心思想是:

关于Hessian矩阵的计算,此处不做过多解释,如果你有兴趣想深究下数学实现。也可以再看一下breeze库里的这两个方法实现:

L-BFGS: Limited-memory BFGS。其中BFGS代表4个人的名字:broyden–fletcher–goldfarb–shanno OWL-QN: (Orthant-Wise Limited-Memory Quasi-Newton)算法。

关于breezey库,这里再简单提一下:

breeze库用于数值处理。它的目标是通用、简洁、强大,不需要牺牲太多性能。当前版本0.12。我们所熟悉的spark中的MLlib(经常见到的线性算法库:breeze.linalg、最优化算法库:breeze.optimize)就是在它的基础上构建的。另外它还提供了图绘制的功能(breeze.plot)。

总结

整个过程基本都ok了。L1还是L2的选择,看你的具体调参。另外再提一些注意事项。

  • 缺省会对特征做归一化,对于一些场景(比如推荐),离散化的特征,归一化没啥意义。反倒可能会影响结果好坏。
  • 对于截距b,会使用正负样本比例,进行log(P(1)/P(0))初始化。收敛会更快。
  • cache机制(StorageLevel.MEMORY_AND_DISK)。当内存不够用,可能会影响性能,这一点不好。

参考:

1.LogisticRegression源码 2.breeze 3.breeze文档

机器学习中的许多模型中,对类别型变量,常作的处理是,将它们编码成one-hot。但是对于树模型来说,将类别型变量编码成one-hot,这样作是否有意义呢?像一些机器学习工具包(比如:spark gbm实现),你可以指定为类别型变量,内部自己去做one-hot实现。而像xgboost,则将输入全认为是数值型特征去处理。

一、要不要one-hot?

这在机器学习界也有争论。理论上,树模型如果够深,也能将关键的类别型特型切出来。

关于这个,xgboost的作者tqchen在某个issues有提到过:

I do not know what you mean by vector. xgboost treat every input feature as numerical, with support for missing values and sparsity. The decision is at the user

So if you want ordered variables, you can transform the variables into numerical levels(say age). Or if you prefer treat it as categorical variable, do one hot encoding.

在另一个issues上也提到过(tqchen commented on 8 May 2015):

One-hot encoding could be helpful when the number of categories are small( in level of 10 to 100). In such case one-hot encoding can discover interesting interactions like (gender=male) AND (job = teacher).

While ordering them makes it harder to be discovered(need two split on job). However, indeed there is not a unified way handling categorical features in trees, and usually what tree was really good at was ordered continuous features anyway..

总结起来的结论,大至两条:

  • 1.对于类别有序的类别型变量,比如age等,当成数值型变量处理可以的。对于非类别有序的类别型变量,推荐one-hot。但是one-hot会增加内存开销以及训练时间开销。
  • 2.类别型变量在范围较小时(tqchen给出的是[10,100]范围内)推荐使用

二、one-hot的一致性问题

当你进行one-hot编码时,有些机器学习工具包内置的one-hot编码工具会帮你做这些事,但是真实的情况是,我们有数据集有如下分类:训练集、测试集、预测集(真实数据)等。

这些数据集并不会统一,比如:

训练集上A特征有: a,b,c,d,测试集上A特征有:a,b,d,预测集上可能有b,c,e,f

因此,你需要做的是,将它们统一使用one-hot编码,而非分开做不同的one-hot编码.

参考