由于爬虫抓取等原因,会导致单一ID的日志条数过多。在spark中,同一ID的日志会被shuffle到单一的节点上进行处理,导致系统运行缓慢!
因为这些用户的访问本来就是无效的,所以可以直接过滤掉这部分用户。
话不多说,scala的DataFrame版输出和代码如下(参考链接见代码注释):
引用
spark version: 1.6.1
Original DataFrame (with fake users):
+---------+------+
| id| movie|
+---------+------+
| u1|WhoAmI|
| u2|Zoppia|
| u2| Lost|
|FakeUserA|Zoppia|
|FakeUserA| Lost|
|FakeUserA|Zoppia|
|FakeUserA| Lost|
|FakeUserA|Zoppia|
|FakeUserA| Lost|
|FakeUserB| Lost|
|FakeUserB| Lost|
|FakeUserB| Lost|
|FakeUserB| Lost|
+---------+------+
Fake Users with count (threshold=2):
+---------+-----+
| id|count|
+---------+-----+
|FakeUserA| 6|
|FakeUserB| 4|
+---------+-----+
Fake Users:
Set(FakeUserA, FakeUserB)
Valid users after filter:
+---+------+
| id| movie|
+---+------+
| u1|WhoAmI|
| u2|Zoppia|
| u2| Lost|
+---+------+
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
/**
* Created by colinliang on 2017/8/14.
*/
case class IDMovie(id: String, movie: String)
object BroadcastTest {
def main(args: Array[String]): Unit = {
Logger.getRootLogger().setLevel(Level.FATAL) //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
val sc = new SparkContext(conf)
println("spark version: " + sc.version)
sc.setLogLevel("WARN") //http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console
val spark = new SQLContext(sc)
val idvids = List(
IDMovie("u1", "WhoAmI")
, IDMovie("u2", "Zoppia")
, IDMovie("u2", "Lost")
, IDMovie("FakeUserA", "Zoppia")
, IDMovie("FakeUserA", "Lost")
, IDMovie("FakeUserA", "Zoppia")
, IDMovie("FakeUserA", "Lost")
, IDMovie("FakeUserA", "Zoppia")
, IDMovie("FakeUserA", "Lost")
, IDMovie("FakeUserB", "Lost")
, IDMovie("FakeUserB", "Lost")
, IDMovie("FakeUserB", "Lost")
, IDMovie("FakeUserB", "Lost")
);
val df = spark
.createDataFrame(idvids)
.repartition(col("id"))
println("Original DataFrame (with fake users): ")
df.show()
// val df_fakeUsers_with_count=df.sample(false,0.1).groupBy(col("id")).count().filter(col("count")>2).limit(10000)//实际中可以根据需要仅采样一部分数据
val df_fakeUsers_with_count=df.groupBy(col("id")).count().filter(col("count")>2)
/**DataFrame 中的groupby 为aggregation形式的,不涉及shuffle,速度很快。参见:https://forums.databricks.com/questions/956/how-do-i-group-my-dataset-by-a-key-or-combination.html
更多聚合函数参见:https://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.sql.functions$
此外,还可以通过agg()函数对groupBy后的数据的多列进行聚合
*/
println("Fake Users with count (threshold=2):")
df_fakeUsers_with_count.show()
val set_fakeUsers=df_fakeUsers_with_count.select("id").collect().map(_(0)).toList.map(_.toString).toArray[String].toSet
println("Fake Users:")
println(set_fakeUsers)
val set_fakeUsers_broadcast=sc.broadcast(set_fakeUsers)
/** broadcast教程:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html
* 官方文档: http://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables
*/
val udf_isValidUser = udf((id: String) => !set_fakeUsers_broadcast.value.contains(id)) //直接用set_highCountUsers.contains(id) 也行,但效率低,因为反序列化的次数可能比较多,参见http://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables
val df_filtered=df.filter(udf_isValidUser(col("id")) ) //过滤掉这部分用户
/** 如果是要保留部分用户,而不是过滤掉这部分用户,且用户量很小,无需定义UDF:
* https://stackoverflow.com/questions/39234360/filter-spark-scala-dataframe-if-column-is-present-in-set
* val validValues = Set("A", "B", "C")
* data.filter($"myColumn".isin(validValues.toSeq: _*))
*/
/** 如果是要保留部分用户,且用户量比较大,可以用broadcast 的DataFrame:
* https://stackoverflow.com/questions/33824933/spark-dataframe-filtering-retain-element-belonging-to-a-list
* import org.apache.spark.sql.functions.broadcast
* initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")
*/
println("\nValid users after filter:")
df_filtered.show()
}
}
分享到:
相关推荐
简单一个示例,演示Spark中DataFrame的创建与操作
[Spark]将Spark DataFrame中的数值取出有时候经过Spark SQL计算得到的结果往往就一行,而且需要将该结果取出,作为字符串参与别的代码块的
Spark DataFrame使用详解,包括:DataFrame解析;创建;Action;条件查询和join详尽操作解释
今天小编就为大家分享一篇spark dataframe 将一列展开,把该列所有值都变成新列的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
例子中定义了多个List数据集合,包括用户信息,订单信息,用户订单信息,将List对象生成DataFrame,使用SparkSQL查询将多个DataFrame合成一个DataFrame,使用Scala语言编写。
spark数据处理和数据分析项目实战Dataframe风格里面包括数据和代码,启动idea就可以练习
本文档详细的描述了SPARK DATAFRAME架构的使用,列举了详细的例子,通俗易懂
详细介绍了基于RDD的DataFrame数据结构以及操作接口。
系列博客是学习厦门大学林子雨老师spark编程基础课程的笔记,方便回顾 系列博客: Spark学习笔记(一):Spark概述与运行原理 ... •DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的
Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...
主要介绍了pandas和spark dataframe互相转换实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
spark连接rabbitmq java代码 消费者consumer。写入mysql
Databrciks工程师,Spark Committer,Spark SQL主要开发者之一的连城详细解读了“Spark SQL结构化数据分析”。他介绍了Spark1.3版本中的很多新特性。重点介绍了DataFrame。其从SchemaRDD演变而来,提供了更加高层...
然后扩展以将该功能传递给Spark Dataframes。 快速安装 pip install datacompy 熊猫细节 DataComPy将尝试在连接列列表或索引上连接两个数据框。 如果两个数据框具有基于联接值的重复项,则匹配过程将按其余字段排序...
目录(Scala中的Spark示例)Spark RDD示例火花蓄能器介绍将Spark RDD转换为DataFrame | 数据集 Spark SQL教程Spark创建带有示例的DataFrame Spark DataFrame withColumn 重命名Spark DataFrame上的列的方法Spark –...
包括spara rdd api,dataframe action操作、查询操作、join操作,dataframe rdd dataset 相互转换以及spark sql。
dataframe是pandas中的一种数据类型 list是python的基本数据结构,两者之间可以进行转化 代码示例: import numpy as np import pandas as pd df = pd.DataFrame( data={ "A":1.0, "B":pd.Timestamp("20220121"),...
火花光学使用光学组件修改spark-sql数据框中的复杂结构。入门是否需要在复杂的结构中设置内部元素? import org . apache . spark . sql . DataFrameimport org . apache . spark . sql . functions . litval df : ...