4. 广播变量
一、分区规则(DataStream Broadcast)和广播变量(Flink Broadcast)
1.1 DataStream Broadcast(分区规则)
分区规则是把元素广播给所有的分区,数据会被重复处理。
DataStream.broadcast()
1.2 Flink Broadcast(广播变量)
类似于Spark广播变量,广播的数据是Dataset,接收广播的也是Dataset
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}object BroadCastTest1 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment //创建批处理执行环境val broadcastData = List(("zs", 18), ("ls", 28), ("ww", 38)) // 创建要广播的datasetval tupleData = env.fromCollection(broadcastData)val toBroadcastData = tupleData.map(tup => {Map(tup._1->tup._2)})val text: DataSet[String] = env.fromElements("zs", "ls", "ww") //创建接收广播的datasetval result = text.map(new RichMapFunction[String, String] {var listData: java.util.List[Map[String, Int]] = nullvar allMap = Map[String, Int]()override def open(parameters: Configuration): Unit = {this.listData = getRuntimeContext.getBroadcastVariable[Map[String, Int]]("bd") //获取broadcastval it = listData.iterator()while(it.hasNext) {val next = it.next()allMap = allMap.++(next)}}override def map(value: String): String = {val age = allMap.getOrElse(value, 1)value + ", " + age}}).withBroadcastSet(toBroadcastData, "bd")result.printenv.execute()}
}