一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java 數據流之Broadcast State

Java 數據流之Broadcast State

2021-12-31 00:41Vicky_Tang Java教程

這篇文章主要介紹了Java 數據流之Broadcast State,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

一、BroadcastState 的介紹

廣播狀態(Broadcast State)是 Operator State 的一種特殊類型。如果我們需要將配置 、規則等低吞吐事件流廣播到下游所有 Task 時,就可以使用 BroadcastState。下游的 Task 接收這些配置、規則并保存為 BroadcastState,所有Task 中的狀態保持一致,作用于另一個數據流的計算中。
簡單理解:一個低吞吐量流包含一組規則,我們想對來自另一個流的所有元素基于此規則進行評估。
場景:動態更新計算規則。

廣播狀態與其他操作符狀態的區別在于:

  • 它有一個 map 格式,用于定義存儲結構
  • 它僅對具有廣播流和非廣播流輸入的特定操作符可用
  • 這樣的操作符可以具有不同名稱的多個廣播狀態

Java 數據流之Broadcast State

二、BroadcastState 操作流程

Java 數據流之Broadcast State

三、案例實現

  • 從端口讀取Json數據作為事件流
  • 從Mysql讀取數據作為廣播流
  • 關聯廣播流和事件流
  • 匹配對應的用戶信息
package cn.kgc.broadcast
 
import java.sql.{Connection, DriverManager, PreparedStatement}
 
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
 
// (001,"tom",18,"北京",15830010002)
// 定義樣例類 接受 MySQL的用戶數據
case class BaseUserInfo(id:Long,name:String,age:Int,city:String,phone:Long)
 
// user_id、user_name、user_addrss、behaviour、url
// 輸出數據類型
case class UserVisitInfo(id:Long,name:String,city:String,behaviour:String,url:String)
 
// 實現廣播ProcessFunction
class MyBroadcastFunc extends BroadcastProcessFunction[String,(Long, BaseUserInfo),UserVisitInfo]{
 
  lazy val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
 
  // 處理的是日志流中的每條數據
  override def processElement(value: String, ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#ReadOnlyContext, out: Collector[UserVisitInfo]): Unit = {
    // {"user_id":"001","ts":"2021-07-10 11:10:05","behaviour":"browse","url":"https://www.tb1.com/1.html"}
    val user_id = JSON.parseObject(value).getLong("user_id")
    val behaviour = JSON.parseObject(value).getString("behaviour")
    val url = JSON.parseObject(value).getString("url")
 
    val mapState = ctx.getBroadcastState(mapStateDes)
    val userInfo = mapState.get(user_id)
 
    out.collect(UserVisitInfo(user_id,userInfo.name,userInfo.city,behaviour,url))
 
  }
 
  // 處理的是廣播流的每個值
  override def processBroadcastElement(value: (Long, BaseUserInfo), ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#Context, out: Collector[UserVisitInfo]): Unit = {
    val mapState: BroadcastState[Long, BaseUserInfo] = ctx.getBroadcastState(mapStateDes)
    mapState.put(value._1,value._2)
  }
}
 
 
class UserSourceFunc extends RichParallelSourceFunction[BaseUserInfo]{
 
  var conn:Connection = _
  var statement: PreparedStatement = _
  var flag:Boolean = true
 
  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","liu911223")
    statement = conn.prepareStatement("select * from base_user")
  }
 
  override def run(ctx: SourceFunction.SourceContext[BaseUserInfo]): Unit = {
    while (flag){
      Thread.sleep(5000)
      val resultSet = statement.executeQuery()
      while (resultSet.next()){
        val id = resultSet.getLong(1)
        val name = resultSet.getString(2)
        val age = resultSet.getInt(3)
        val city = resultSet.getString(4)
        val phone = resultSet.getLong(5)
        ctx.collect(BaseUserInfo(id,name,age,city,phone))
      }
    }
  }
 
  override def cancel(): Unit = {
    flag = false
  }
 
  override def close(): Unit = {
    if (statement != null) statement.close()
    if (conn != null) conn.close()
  }
}
object BroadcastDemo01 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
 
    // 定義為KV,一方面是為了廣播的時候定義為map,另一方面是為了做關聯操作
    val userBaseDS: DataStream[(Long, BaseUserInfo)] = env.addSource(new UserSourceFunc)
      .map(user => (user.id, user))
    val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
    val broadCastStream: BroadcastStream[(Long, BaseUserInfo)] = userBaseDS.broadcast(mapStateDes)
 
    // 日志JSON數據
    val dataInfoDS: DataStream[String] = env.socketTextStream("master",1314)
 
    dataInfoDS.connect(broadCastStream)
      .process(new MyBroadcastFunc)
      .print()
 
    env.execute()
  }
}

到此這篇關于Java 數據流之Broadcast State的文章就介紹到這了,更多相關Java Broadcast State內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://blog.csdn.net/sweet19920711/article/details/120027690

延伸 · 閱讀

精彩推薦
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發現了對于集合操作轉換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關于Java8中S...

    阿杜7482021-02-04
  • Java教程Java實現搶紅包功能

    Java實現搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關于小米推送Java代碼,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩中求8032021-07-12
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經有好久沒有升過級了。升級完畢重啟之后,突然發現好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程Java BufferWriter寫文件寫不進去或缺失數據的解決

    Java BufferWriter寫文件寫不進去或缺失數據的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
  • Java教程xml與Java對象的轉換詳解

    xml與Java對象的轉換詳解

    這篇文章主要介紹了xml與Java對象的轉換詳解的相關資料,需要的朋友可以參考下...

    Java教程網2942020-09-17
主站蜘蛛池模板: 小SAO货叫大声点妓女 | 毛片手机在线视频免费观看 | 国产自拍偷拍自拍 | 国色天香社区在线视频免费观看 | 久久人妻熟女中文字幕AV蜜芽 | 色老板在线播放 | 毛片在线播放a | 欧洲网色偷偷亚洲男人的天堂 | 久久精品国产免费播放 | 纲手被强喷水羞羞漫画 | 亚洲卡一卡2卡三卡4麻豆 | 久久精品国产免费播高清无卡 | 亚洲无线一二三区2021 | 女同69式互添在线观看免费 | 2019自拍偷拍视频 | narutotsunade全彩雏田 | 麻豆视频免费在线播放 | 亚洲AV无码偷拍在线观看 | sex5·性屋娱乐 | 精品视频免费在线观看 | 2019aw网站 | 日韩特级片 | 国产一级网站 | 午夜亚洲视频 | 母乳在线 | 亚洲第一福利视频 | 天美麻豆 | 爱爱亚洲| 摸逼网| 亚洲高清中文字幕精品不卡 | 欧美精品亚洲精品日韩专区va | 四虎免费看片 | 亚洲欧美视频在线播放 | 国产欧美va欧美va香蕉在线观看 | 午夜欧美精品久久久久久久久 | 色老女人 | 亚洲六月丁香六月婷婷色伊人 | 久久精品国产欧美日韩99热 | 国产精品日韩欧美一区二区 | 日本精品www色 | 91久久精品青青草原伊人 |