问题描述
我使用 spark 来计算用户评论的 pagerank,但是当我在大型数据集(40k 条目)上运行我的代码时,我不断收到 Spark java.lang.StackOverflowError
.在少量条目上运行代码时,它可以正常工作.
条目示例:
product/productId: B00004CK40 review/userId: A39IIHQF18YGZA review/profileName: CAM Salas review/helpfulness: 0/0 review/score: 4.0 review/time: 1175817600 review/summary: Reliable comedy review/text: Nice剧本,演得很好的喜剧,还有一个年轻的妮可莱特·谢里丹.库萨克处于最佳状态.
代码:
public void calculatePageRank() {sc.clearCallSite();sc.clearJobGroup();JavaRDD字符串>rddFileData = sc.textFile(inputFileName).cache();sc.setCheckpointDir("pagerankCheckpoint/");JavaRDD字符串>rddMovieData = rddFileData.map(new Function < String, String > () {@覆盖公共字符串调用(字符串 arg0)抛出异常 {字符串[] 数据 = arg0.split(" ");String movieId = data[0].split(":")[1].trim();String userId = data[1].split(":")[1].trim();返回电影 ID + " " + 用户 ID;}});JavaPairRDD<String, Iterable<String>>rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {@覆盖公共元组2 <字符串,字符串>调用(字符串 arg0)抛出异常 {字符串[] 数据 = arg0.split(" ");返回新的元组2 <字符串,字符串>(数据[0],数据[1]);}}).groupByKey().cache();JavaRDD<Iterable<String>>cartUsers = rddPairReviewData.map(f -> f._2());列表<可迭代<字符串>>cartUsersList = cartUsers.collect();JavaPairRDD<字符串,字符串>最终笛卡尔=空;int iterCounter = 0;for(Iterable<String> out : cartUsersList){JavaRDD<字符串>currentUsersRDD = sc.parallelize(Lists.newArrayList(out));如果(最终笛卡尔==空){finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);}别的{finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);如果(iterCounter % 20 == 0){finalCartesian.checkpoint();}}}JavaRDD<Tuple2<String,String>>finalCartesianToTuple = finalCartesian.map(m -> new Tuple2(m._1(),m._2()));finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);JavaPairRDD<字符串,字符串>userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2(m._1(),m._2()));JavaRDD<字符串>userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {//Tuple2, Tuple2t) 抛出异常 {返回 t._1 + " " + t._2;}});尝试 {//使用此 https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java 计算 pagerankJavaPageRank.calculatePageRank(userIdPairsString, 100);} 捕捉(异常 e){//TODO 自动生成的 catch 块e.printStackTrace();}sc.close();}
我有很多建议可以帮助您大大提高问题中代码的性能.
- 缓存:缓存应该用于那些您需要为相同/不同的操作(迭代算法)反复引用的数据集.
<块引用>
一个例子是 RDD.count
——告诉你在文件,该文件需要被读取.所以如果你写 RDD.count
,在此时将读取文件,计算行数,然后计数将被返回.
如果再次调用 RDD.count
会怎样?同样的事情:文件将是再读一遍数.那么 RDD.cache
有什么作用呢?现在,如果你跑RDD.count
第一次,文件会被加载,缓存,然后算了.如果您第二次调用 RDD.count
,该操作将使用缓存.它只会从缓存中获取数据并计算行,无需重新计算.
阅读更多关于缓存的信息这里.
在您的代码示例中,您没有重用已缓存的任何内容.所以你可以从那里删除 .cache
.
- 并行化:在代码示例中,您已经并行化了 RDD 中的每个单独元素,这些元素已经是一个分布式集合.我建议您合并
rddFileData
、rddMovieData
和rddPairReviewData
步骤,以便一次性完成.
摆脱 .collect
因为这会将结果返回给驱动程序,并且可能是错误的实际原因.
I'm using spark in order to calculate the pagerank of user reviews, but I keep getting Spark java.lang.StackOverflowError
when I run my code on a big dataset (40k entries). when running the code on a small number of entries it works fine though.
Entry Example :
product/productId: B00004CK40 review/userId: A39IIHQF18YGZA review/profileName: C. A. M. Salas review/helpfulness: 0/0 review/score: 4.0 review/time: 1175817600 review/summary: Reliable comedy review/text: Nice script, well acted comedy, and a young Nicolette Sheridan. Cusak is in top form.
The Code:
public void calculatePageRank() {
sc.clearCallSite();
sc.clearJobGroup();
JavaRDD < String > rddFileData = sc.textFile(inputFileName).cache();
sc.setCheckpointDir("pagerankCheckpoint/");
JavaRDD < String > rddMovieData = rddFileData.map(new Function < String, String > () {
@Override
public String call(String arg0) throws Exception {
String[] data = arg0.split(" ");
String movieId = data[0].split(":")[1].trim();
String userId = data[1].split(":")[1].trim();
return movieId + " " + userId;
}
});
JavaPairRDD<String, Iterable<String>> rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {
@Override
public Tuple2 < String, String > call(String arg0) throws Exception {
String[] data = arg0.split(" ");
return new Tuple2 < String, String > (data[0], data[1]);
}
}).groupByKey().cache();
JavaRDD<Iterable<String>> cartUsers = rddPairReviewData.map(f -> f._2());
List<Iterable<String>> cartUsersList = cartUsers.collect();
JavaPairRDD<String,String> finalCartesian = null;
int iterCounter = 0;
for(Iterable<String> out : cartUsersList){
JavaRDD<String> currentUsersRDD = sc.parallelize(Lists.newArrayList(out));
if(finalCartesian==null){
finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);
}
else{
finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);
if(iterCounter % 20 == 0) {
finalCartesian.checkpoint();
}
}
}
JavaRDD<Tuple2<String,String>> finalCartesianToTuple = finalCartesian.map(m -> new Tuple2<String,String>(m._1(),m._2()));
finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);
JavaPairRDD<String, String> userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2<String,String>(m._1(),m._2()));
JavaRDD<String> userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {
//Tuple2<Tuple2<MovieId, userId>, Tuple2<movieId, userId>>
@Override
public String call (Tuple2<String, String> t) throws Exception {
return t._1 + " " + t._2;
}
});
try {
//calculate pagerank using this https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
JavaPageRank.calculatePageRank(userIdPairsString, 100);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
sc.close();
}
I have multiple suggestions which will help you to greatly improve the performance of the code in your question.
- Caching: Caching should be used on those data sets which you need to refer to again and again for same/ different operations (iterative algorithms.
An example is RDD.
count
— to tell you the number of lines in the file, the file needs to be read. So if you write RDD.count
, at this point the file will be read, the lines will be counted, and the count will be returned.What if you call RDD.
count
again? The same thing: the file will be read and counted again. So what does RDD.cache
do? Now, if you run RDD.count
the first time, the file will be loaded, cached, and counted. If you call RDD.count
a second time, the operation will use the cache. It will just take the data from the cache and count the lines, no recomputing.
Read more about caching here.
In your code sample you are not reusing anything that you've cached. So you may remove the .cache
from there.
- Parallelization: In the code sample, you've parallelized every individual element in your RDD which is already a distributed collection. I suggest you to merge the
rddFileData
,rddMovieData
andrddPairReviewData
steps so that it happens in one go.
Get rid of .collect
since that brings the results back to the driver and maybe the actual reason for your error.
这篇关于火花 java.lang.StackOverflowError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!