DDR爱好者之家 Design By 杰米
错误思想
举个列子,当我们想要比较 一个 类型为 RDD[(Long, (String, Int))] 的RDD,让它先按Long分组,然后按int的值进行倒序排序,最容易想到的思维就是先分组,然后把Iterable 转换为 list,然后sortby,但是这样却有一个致命的缺点,就是Iterable 在内存中是一个指针,不占内存,而list是一个容器,占用内存,如果Iterable 含有元素过多,那么极易引起OOM
val cidAndSidCountGrouped: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey()
// 4. 排序, 取top10
val result: RDD[(Long, List[(String, Int)])] = cidAndSidCountGrouped.map {
case (cid, sidCountIt) =>
// sidCountIt 排序, 取前10
// Iterable转成容器式集合的时候, 如果数据量过大, 极有可能导致oom
(cid, sidCountIt.toList.sortBy(-_._2).take(5))
}
首先,我们要知道,RDD 的排序需要 shuffle, 是采用了内存+磁盘来完成的排序.这样能有效避免OOM的风险,但是RDD是全部排序,所以需要针对性的过滤Key值来进行排序
方法一 利用RDD排序特点
//把long(即key值)提取出来
val cids: List[Long] = categoryCountList.map(_.cid.toLong)
val buffer: ListBuffer[(Long, List[(String, Int)])] = ListBuffer[(Long, List[(String, Int)])]()
//根据每个key来过滤RDD
for (cid <- cids) {
/*
List((15,(632972a4-f811-4000-b920-dc12ea803a41,10)), (15,(f34878b8-1784-4d81-a4d1-0c93ce53e942,8)), (15,(5e3545a0-1521-4ad6-91fe-e792c20c46da,8)), (15,(66a421b0-839d-49ae-a386-5fa3ed75226f,8)), (15,(9fa653ec-5a22-4938-83c5-21521d083cd0,8)))
目标:
(9,List((199f8e1d-db1a-4174-b0c2-ef095aaef3ee,9), (329b966c-d61b-46ad-949a-7e37142d384a,8), (5e3545a0-1521-4ad6-91fe-e792c20c46da,8), (e306c00b-a6c5-44c2-9c77-15e919340324,7), (bed60a57-3f81-4616-9e8b-067445695a77,7)))
*/
val arr: Array[(String, Int)] = cidAndSidCount.filter(cid == _._1)
.sortBy(-_._2._2)
.take(5)
.map(_._2)
buffer += ((cid, arr.toList))
}
buffer.foreach(println)
这样做也有缺点:即有多少个key,就有多少个Job,占用资源
方法二 利用TreeSet自动排序特性
def statCategoryTop10Session_3(sc: SparkContext,
categoryCountList: List[CategroyCount],
userVisitActionRDD: RDD[UserVisitAction]) = {
// 1. 过滤出来 top10品类的所有点击记录
// 1.1 先map出来top10的品类id
val cids = categoryCountList.map(_.cid.toLong)
val topCategoryActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))
// 2. 计算每个品类 下的每个session 的点击量 rdd ((cid, sid) ,1)
val cidAndSidCount: RDD[(Long, (String, Int))] = topCategoryActionRDD
.map(action => ((action.click_category_id, action.session_id), 1))
// 使用自定义分区器 重点理解分区器的原理
.reduceByKey(new CategoryPartitioner(cids), _ + _)
.map {
case ((cid, sid), count) => (cid, (sid, count))
}
// 3. 排序取top10
//因为已经按key分好了区,所以用Mappartitions ,在每个分区中新建一个TreeSet即可
val result: RDD[(Long, List[SessionInfo])] = cidAndSidCount.mapPartitions((it: Iterator[(Long, (String, Int))]) => {
//new 一个TreeSet,并同时指定排序规则
var treeSet: mutable.TreeSet[CategorySession] = new mutable.TreeSet[CategorySession]()(new Ordering[CategorySession] {
override def compare(x: CategorySession, y: CategorySession): Int = {
if (x.clickCount >= y.clickCount) -1 else 1
}
})
var id = 0l
iter.foreach({
case (l, session) => {
id = l
treeSet.add(session)
if (treeSet.size > 10) treeSet = treeSet.take(10)
}
})
Iterator(id, treeSet)
})
result.collect.foreach(println)
Thread.sleep(1000000)
}
}
/*
根据传入的key值来决定分区号,让相同key进入相同的分区,能够避免多次shuffle
*/
class CategoryPartitioner(cids: List[Long]) extends Partitioner {
// 用cid索引, 作为将来他的分区索引.
private val cidWithIndex: Map[Long, Int] = cids.zipWithIndex.toMap
// 返回集合的长度
override def numPartitions: Int = cids.length
// 根据key返回分区的索引
override def getPartition(key: Any): Int = {
key match {
// 根据品类id返回分区的索引! 0-9
case (cid: Long, _) =>
cidWithIndex(cid)
}
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
DDR爱好者之家 Design By 杰米
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
DDR爱好者之家 Design By 杰米
暂无评论...
RTX 5090要首发 性能要翻倍!三星展示GDDR7显存
三星在GTC上展示了专为下一代游戏GPU设计的GDDR7内存。
首次推出的GDDR7内存模块密度为16GB,每个模块容量为2GB。其速度预设为32 Gbps(PAM3),但也可以降至28 Gbps,以提高产量和初始阶段的整体性能和成本效益。
据三星表示,GDDR7内存的能效将提高20%,同时工作电压仅为1.1V,低于标准的1.2V。通过采用更新的封装材料和优化的电路设计,使得在高速运行时的发热量降低,GDDR7的热阻比GDDR6降低了70%。
更新日志
2025年11月01日
2025年11月01日
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]