hivefans
9/26/2018 - 10:42 AM

spark.groupByKey,combineByKey

spark.groupByKey,combineByKey

pairRdd中最好不要用groupByKey,因为groupBy类函数会使用shuffl带来性能问题,所以pairRdd一般使用combineByKey:
示例:
使用前rdd格式: JavaPairRDD<String, HotsCompare>
	pairRdd2 = pairRdd.combineByKey(e -> {
			ArrayList<HotsCompare> list = new ArrayList<HotsCompare>();
			list.add(e);
			return list;
		}, (list, e) -> {
			list.add(e);
			return list;
		}, (lista, listb) -> {
			lista.addAll(listb);
			return lista;
		});
使用后pairRdd2格式:JavaPairRDD<String, List<HotsCompare>>

可使用dataset的groupByKey()+mapGroups()代替pairRdd的combineByKey():
//df原schema:
StructType flatSchema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("asin", StringType, false),
				DataTypes.createStructField("pathId", StringType, true), DataTypes.createStructField("rank", IntegerType, true), });
//df转换后的schema:
StructType returnSchema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("asin", StringType, false),
				DataTypes.createStructField("bsr_ext", DataTypes.createMapType(StringType, IntegerType, true), true) });
//同一个asin数据组合为map
df = df.groupByKey(row -> row.<String> getAs("asin"), Encoders.STRING()).mapGroups((key, values) -> {
			String asin = key;
			Iterator<Row> t = values;
			Map<String, Integer> map = Maps.newHashMap();
			while (t.hasNext()) {
				Row row = t.next();
				String pathId = row.getAs("pathId");
				Integer rank = row.getAs("rank");
				map.put(pathId, rank);
			}
			return new GenericRowWithSchema(new Object[] { asin, asScalaMap(map) }, returnSchema);
		}, RowEncoder.apply(returnSchema));