Flink在實時在實時計算平臺和實時數倉中的企業級應用小結
大資料領域自 2010 年開始,以 Hadoop、Hive 為代表的離線計算開始進入各大公司的視野。大資料領域開始瞭如火如荼的發展。我個人在學校期間就開始關注大資料領域的技術迭代和更新,並且有幸在畢業後成為大資料領域的開發者。
在過去的這幾年時間裡,以 Storm、Spark、Flink 為代表的實時計算技術接踵而至。2019 年阿里巴巴內部 Flink 正式開源。整個實時計算領域風起雲湧,一些普通的開發者因為業務需要或者個人興趣開始接觸Flink。
Apache Flink(以下簡稱 Flink)一改過去實時計算領域為人詬病的缺陷,以其強大的計算能力和先進的設計理念,迅速成為實時計算領域先進生產力的代表。各大小公司紛紛開始在 Flink 的應用上進行探索,其中最引人矚目的兩個方向便是:實時計算平臺和實時資料倉庫。
Flink 實時計算
如果你是一位大資料領域的開發人員或者你是一名後端的開發者,那麼你對下面這些需求場景應該不會陌生:
我是抖音主播,我想看帶貨銷售情況的排行?
我是運營,我想看到我們公司銷售商品的 TOP10?
我是開發,我想看到我們公司所有生產環境中伺服器的執行情況?
……
在 Hadoop 時代,我們通常的做法是將資料批次儲存到 HDFS 中,在用 Hive 產出離線的報表。或者我們使用類似 ClickHouse 或者 PostgreSQL 這樣的資料庫儲存生產資料,用 SQL 直接進行彙總檢視。
那麼這樣的方式有什麼問題呢?
第一種,基於 Hive 的離線報表形式。大部分公司隨著業務場景的不斷豐富,同時在業界經過多年的實踐檢驗,基於 Hadoop 的離線儲存體系已經足夠成熟。但是離線計算天然時效性不強,一般都是隔天級別的滯後,業務資料隨著實踐的推移,本身的價值就會逐漸減少。越來越多的場景需要使用實時計算,在這種背景下實時計算平臺的需求應運而生。
第二種,基於 ClickHouse 或者 PostgreSQL 直接進行彙總查詢。這種情況在一些小規模的公司使用非常常見,原因只有一個就是資料量不夠大。在我們常用的具有 OLAP 特性的資料庫的使用過程中,如果在一定的資料量下直接用複雜的 SQL 查詢,一條複雜的 SQL 足以引起資料庫的劇烈抖動,甚至直接宕機,對生產環境產生毀滅性的影響。這種查詢在大公司是堅決不能進行的操作。
因此基於 Flink 強大實時計算能力消費實時資料的需求便應運而生。在實時資料平臺中,Flink 會承擔實時資料的採集、計算和傳送到下游。
Flink 實時資料倉庫
資料倉庫最初是指的我們儲存的 Hive 中的表的集合。按照業務需求一般會分為原始層、明細層、彙總層、業務層。各個公司根據實際業務需要會有更為細緻的劃分。
傳統的離線資料倉庫的做法一般是將資料按天離線集中儲存後,按照固定的計算邏輯進行資料的清洗、轉換和載入。最終在根據業務需求進行報表產出或者提供給其他的應用使用。我們很明顯的可以看到,資料在這中間有了至少 T+1 天的延遲,資料的時效性大打折扣。
這時,實時資料倉庫應運而生。一個典型的實時資料倉庫架構圖如下:
技術選型
這一部分作者結合自身在阿里巴巴這樣的公司生產環境中的技術選擇和實際應用的中一些經驗,來講解實時計算平臺和實時資料倉庫的各個部分是如何進行技術選型的。
實時計算引擎
我們在上面提到,實時計算解決的最重要的問題就是實時性和穩定性。
實時計算對資料有非常高的穩定性和精確性要求,特別是面向公眾第三方的資料大屏,同時要求高吞吐、低延遲、極高的穩定性和絕對零誤差。隨時電商大促的成交記錄一次次被重新整理,背後是下單、支付、發貨高達幾萬甚至十幾萬的峰值 QPS。
你可以想象這樣的場景嗎?天貓雙十一,萬眾矚目下的實時成交金額大屏突然卡住沒有反應。我估計所有開發人員都要被開除了…
我們以一個最常見和經典的實時計算大螢幕來舉例。
在面向實際運營的資料大屏中,需要提供高達幾十種維度的資料,每秒的資料量高達千萬甚至億級別,這對於我們的實時計算架構提出了相當高的要求。那麼我們的大屏背後的實時處理在這種資料量規模如何才能達到高吞吐、低延遲、極高的穩定性和絕對零誤差的呢?
在上圖的架構圖中,涉及幾個關鍵的技術選型,我們下面一一進行講解。
業務庫 Binlog 同步利器 - Canal
我們的實時計算架構一般是基於業務資料進行的,但無論是實時計算大屏還是常規的資料分析報表,都不能影響業務的正常進行,所以這裡需要引入訊息中介軟體或增量同步框架 Canal。
我們生產環境中的業務資料絕大多數都是基於 MySQL 的,所以需要一個能夠實時監控 MySQL 業務資料變化的工具。Canal 是阿里巴巴開源的資料庫 Binlog 日誌解析框架,主要用途是基於 MySQL 資料庫增量日誌解析,提供增量資料訂閱和消費。
Canal 的原理也非常簡單,它會偽裝成一個數據庫的從庫,來讀取 Binlog 並進行解析。Canal 在阿里巴巴內部有大規模的應用,因為阿里有眾多的業務是跨機房部署,大量業務需要進行業務同步,Canal 功能強大,效能也很穩定。
解耦和海量資料支援 - Kafka
在實時大屏的技術架構下,我們的資料來源絕大多數情況下都是訊息。我們需要一個強大的訊息中介軟體來支撐高達幾十萬 QPS,同時支援海量資料儲存。
首先,我們為什麼需要引入訊息中介軟體?主要是下面三個目的:
同步變非同步
應用解耦
流量削峰
在我們的架構中,為了和業務資料互相隔離,需要使用訊息中介軟體進行解耦從而互不影響。另外在雙十一等大促場景下,交易峰值通常出現在某一個時間段,這個時間段系統壓力陡增,資料量暴漲,訊息中介軟體還起到了削峰的作用。
為什麼選擇 Kafka?
Kafka 是最初由 Linkedin 公司開發,是一個分散式、高吞吐、多分割槽的訊息中介軟體。Kafka 經過長時間的迭代和實踐檢驗,因為其獨特的優點已經成為目前主流的分散式訊息引擎,經常被用作企業的訊息匯流排、實時資料儲存等。
Kafka 從眾多的訊息中介軟體中脫穎而出,主要是因為高吞吐、低延遲的特點;另外基於 Kafka 的生態越來越完善,各個實時處理框架包括 Flink 在訊息處理上都會優先進行支援。並且 Flink 和 Kafka 結合可以實現端到端精確一次語義的原理。
Kafka 作為大資料生態系統中已經必不可少的一員,主要的特性如下所示。
高吞吐:
可以滿足每秒百萬級別訊息的生產和消費,並且可以透過橫向擴充套件,保證資料處理能力可以得到線性擴充套件。
低延遲:
以時間複雜度為 O(1) 的方式提供訊息持久化能力,即使對 TB 級以上資料也能保證常數時間複雜度的訪問效能。
高容錯:
Kafka 允許叢集的節點出現失敗。
可靠性:
訊息可以根據策略進行磁碟的持久化,並且讀寫效率都很高。
生態豐富:
Kafka 周邊生態極其豐富,與各個實時處理框架結合緊密。
實時計算服務 - Flink
Flink 在當前的架構中主要承擔了訊息消費、維表關聯、訊息傳送等。在實時計算領域,Flink 的優勢主要包括:
強大的狀態管理
。
Flink 使用 State 儲存中間狀態和結果,並且有強大的容錯能力;
非常豐富的 API
。
Flink 提供了包含 DataSet API、DataStream API、Flink SQL 等等強大的API;
生態支援完善
。
Flink 支援多種資料來源(Kafka、MySQL等)和儲存(HDFS、ES 等),並且和其他的大資料領域的框架結合完善;
批流一體
。
Flink 已經在將流計算和批計算的 API 進行統一,並且支援直接寫入 Hive。
對於 Flink 的一些特點我們不做過多展開了。這裡需要注意的是,Flink 在消費完成後一般會把計算結果資料發往三個方向:
高度彙總
,高度彙總指標一般儲存在 Redis、HBase 中供前端直接查詢使用。
明細資料
,在一些場景下,我們的運營和業務人員需要查詢明細資料,有一些明細資料極其重要,比如雙十一派送的包裹中會有一些丟失和破損。
實時訊息
,Flink 在計算完成後,有一個下游是發往訊息系統,這裡的作用主要是提供給其他業務複用;
另外,在一些情況下,我們計算好明細資料也需要再次經過訊息系統才能落庫,將原來直接落庫拆成兩步,方便我們進行問題定位和排查。
百花齊放 - OLAP 資料庫選擇
OLAP 的選擇是當前實時架構中最有爭議和最困難的。目前市面上主流的開源 OLAP 引擎包含但不限於:Hive、Hawq、Presto、Kylin、Impala、SparkSQL、Druid、Clickhouse、Greeplum 等,可以說目前沒有一個引擎能在資料量,靈活程度和效能上做到完美,使用者需要根據自己的需求進行選型。
我曾經在之前的一篇文章 《實時數倉 | 你需要的是一款強大的 OLAP 引擎》用了兩萬字分析了目前市面上主流的 OLAP 資料庫的選型問題,這裡直接給出結論:
Hive、Hawq、Impala:
基於 SQL on Hadoop
Presto 和 Spark SQL 類似:
基於記憶體解析 SQL 生成執行計劃
Kylin:
用空間換時間、預計算
Druid:
資料實時攝入加實時計算
ClickHouse:
OLAP 領域的 HBase,單表查詢效能優勢巨大
Greenpulm:
OLAP 領域的 PostgreSQL
如果你的場景是基於 HDFS 的離線計算任務,那麼 Hive、Hawq 和 Imapla 就是你的調研目標。
如果你的場景解決分散式查詢問題,有一定的實時性要求,那麼 Presto 和 SparkSQL 可能更符合你的期望。
如果你的彙總維度比較固定,實時性要求較高,可以透過使用者配置的維度 + 指標進行預計算,那麼不妨嘗試 Kylin 和 Druid。
ClickHouse 則在單表查詢效能上獨領風騷,遠超過其他的 OLAP 資料庫。
Greenpulm 作為關係型資料庫產品,效能可以隨著叢集的擴充套件線性增長,更加適合進行資料分析。
Flink 實時資料倉庫
實時資料倉庫的發展經歷了從離線到實時的發展,一個典型的實時資料倉庫架構如下如圖所示:
一般實時資料倉庫的設計也借鑑了離線數倉的理念,不但要提高我們模型的複用率,也要考慮實時數倉的穩定性和易用性。
在實時資料倉庫的技術選型中,用到的核心技術包括:Kafka、Flink、Hbase 等。
其中 Kafka 和 Flink 的優勢我們在上述實時資料平臺的技術選型中已經做過詳細的介紹。這其中還有兩個關鍵的指標儲存系統:Hbase 和 Redis。
其中 Hbase 是典型的列式分散式儲存系統,Redis 是快取系統中首選,他們的主要優勢包括:
強一致性
自動故障轉移和容錯性
極高的讀寫 QPS,非常適合儲存 K-V 形式的指標
批流一體是未來
隨著 Flink 1。12 版本的釋出,Flink 與 Hive 的整合達到了一個全新的高度,Flink 可以很方便的對 Hive 直接進行讀寫。
這代表了什麼?
只要我們還在使用實時資料倉庫,那麼我們可以直接對 Hive 進行讀寫,Flink 成為了 Hive 上的一個處理引擎,既可以透過批的方式也可以透過流的方式。
從 Flink 1。12 開始會有大批的離線實時一體的資料倉庫出現。我們資料倉庫架構就變成了:
其中 Flink SQL 統一了實時和離線的邏輯,避免出現離線和實時需要兩套架構和程式碼支撐,也基本解決了離線和實時資料對不齊的尷尬局面。
大廠的實時計算平臺和實時數倉技術方案
這部分小編結合自身在實際生產環境中的經驗,參考了市面上幾個大公司在實時計算平臺和實時數倉設計中,選出了其中最穩妥也是最常用的技術方案,奉獻給大家。
作者的經驗
在我們的實時計算架構中採用的是典型的 Kappa 架構,我們的業務難點和重點主要集中在:
資料來源過多
我們的實時訊息來源多達幾十個,分佈在各大生產系統中,這些系統中的訊息資料格式不一。
資料來源之間時間 GAP 巨大
我們業務資料之間需要互相等待,舉個最簡單的例子。使用者下單後,可能 7 天以後後還會進行操作,這就導致一個問題,我們在建設實時數倉時中間狀態 State 巨大,直接使用 Flink 原生的狀態會導致任務資源消耗巨大,非常不穩定。
離線資料和實時資料要求強一致性
我們的資料最終會以考核的形式下發,直接指導一線員工的工資和獎金髮放。要求資料強一致性保障,否則會引起投訴甚至輿情。
基於以上的考慮,我們的實時資料倉庫架構如下:
幾個關鍵的技術點如下:
第一,我們使用了 Hbase 作為中間狀態的儲存。我們在上面提到,因為在 Flink SQL 中進行計算需要儲存中間狀態,而我們的資料來源過多,且時間差距過大,那麼實時計算的狀態儲存變得異常巨大,在大資料量的衝擊下,任務變得非常不穩定。另外如果任務發生 Fail-over,狀態會丟失,結果嚴重失真。所以我們所有的資料都會儲存在 Hbase。
第二,實時資料觸發模式計算。在 Flink SQL 的邏輯裡,Hbase 的變更訊息發出,我們只需要接受其中的 rowkey 資訊,然後所有的資料都是反查 Hbase。我們在上面的文章中講到過,Hbase 因為極高的讀寫 QPS 被各大公司普遍應用在實時儲存和高頻查詢中。
第三,雙寫 ADB 和 Hologres。ADB 和 Hologres 是阿里雲提供的強大的 OLAP 引擎。我們在 Flink SQL 計算完畢後將結果雙寫,前端查詢可以進行分流和負載均衡。
第四,離線資料同步。這裡我們採用的是直接將訊息透過中介軟體進行同步,在離線數倉中有一套一樣的邏輯將資料寫入 Hive 中。在 Flink 1。12 後,離線和實時的計算邏輯統一為一套,完全避免了離線和實時訊息的不一致難題。
但是,客觀的說這套資料架構有沒有什麼問題呢?
這套資料架構引入了 Hbase 作為中間儲存,資料鏈路變長。導致運維成本大量增加,整個架構的實時效能受制於 Hbase 的變更資訊能不能及時傳送。
指標沒有分層,會導致 ADB 和 Hologres 成為查詢瓶頸。在這套資料架構中,我們完全拋棄了中間指標層,完全依賴 SQL 直接彙總查詢。一方面得益於省略中間層後指標的準確性,另一方面因為 SQL 直接查詢會對 ADB 有巨大的查詢壓力,使得 ADB 消耗了巨大的資源和成本。
在未來的規劃中,我們希望對業務 SQL 進行分級。高優先順序、實時性極高的指標和資料直接查詢資料庫。非高優先順序和極高實時性的指標可以通過歷史資料加實時資料結合的方式組裝結果,減少對資料庫的查詢壓力。
騰訊看點的實時資料系統設計
騰訊看點資料中心承接了騰訊 QQ 看點、小程式、瀏覽器、快報等等業務的開發取數、看數的需求。騰訊看點一天上報的資料量可以達到萬億級規模,對低延遲、亞秒級的實時計算和多維查詢帶來了巨大的技術挑戰。
首先,我們來看一下騰訊看點的實時資料系統的架構設計:
上圖是騰訊看點的整體的實時架構設計圖。我們可以看到整體的架構分為三層:
資料採集層
在這層中,騰訊看點完全使用訊息佇列 Kafka 進行了解耦操作,避免直接讀取業務系統資料。
實時資料倉庫層
在這一層中騰訊看點使用 Flink 分別做分鐘級別的聚合和中度聚合,大大減輕了實時 SQL 查詢的壓力。
實時資料儲存層
騰訊看點使用 ClickHouse 和 MySQL 作為實時資料儲存,我們在下面會分析 ClickHouse 作為實時資料儲存的優勢和特點。
關於資料選型,實時數倉的整體架構騰訊看點選擇了 Lambda 架構,主要是因為高靈活性、高容錯性、高成熟度、極低的遷移成本。
在實時計算上,騰訊看點選擇了 Flink 作為計算引擎,Flink 受到青睞的原因包括 Exactly-once 語義支援,輕量級的快照機制以及極高的吞吐性。另一一個很重要的原因就是 Flink 高效的維表關聯,支援了實時資料流 (百萬級/s) 關聯 HBase 維度表。
在資料儲存上,騰訊看點重度使用 ClickHouse。ClickHouse 的優勢包括:
多核 CPU 平行計算
SIMD 平行計算加速
分散式水平擴充套件叢集
稀疏索引、列式儲存、資料壓縮
聚合分析最佳化
最終騰訊看點的實時資料系統支撐了亞秒級響應多維條件查詢請求:
過去 30 分鐘內容的查詢,99% 的請求耗時在1秒內
過去 24 小時內容的查詢,90% 的請求耗時在5秒內,99% 的請求耗時在 10 秒內
阿里巴巴批流一體資料倉庫建設
我們在上面介紹了 Flink 的優勢,尤其是在 Flink 1。12 版本後,Flink 與 Hive 的整合達到了一個全新的高度,Flink 可以很方便的對 Hive 直接進行讀寫。
阿里巴巴率先在業務實現了批流一體的實時資料倉庫,根據公開的資料顯示,阿里巴巴在批流一體上的探索主要包含三個方面:
統一元資料管理
Flink 從 1。11 版本開始簡化了連線 Hive 的方式,Flink 透過一套簡單的 Hive Catelog API 打通了與 Hive 的通訊。使得訪問 Hive 變得輕而易舉。
統一計算引擎
在我們傳統的實時數倉的建設中,基於離線和實時引擎的不同,需要編寫兩套 SQL 進行計算和資料入庫操作。Flink 高效解決了這個問題,基於 ANSI-SQL 標準提供了批與流統一的語法,並且使用 Flink 引擎執行可以同時讀寫 Hive 與其他的 OLAP 資料庫。
統一資料儲存
在這個架構下,離線資料成為了實時資料的歷史備份,離線資料也可以作為資料來源被實時攝入,批次計算的場景變成了實時排程,不在依賴定時排程任務。
基於以上的工作,基於 Flink 和 Hive 的批流一體實時數倉應運而生,整體的架構如下:
我們可以看到,原來的離線和實時雙寫鏈路演變成了單一通道,一套程式碼即可完成離線和實時的計算操作。並且基於 Flink 對 SQL 的支撐,程式碼開發變得異常簡潔,阿里巴巴的批流一體資料倉庫在 2020 年落地並且投入使用,效果顯著,支撐了雙十一的資料需求。
實戰案例
這部分我們我們將以一個實時統計專案為背景,介紹實時計算中的架構設計和技術選型以及最終的實現。其中涉及了日誌資料埋點、日誌資料採集、清洗、最終的指標計算等等。
架構設計
我們以統計網站的 PV 和 UV 為例,涉及到幾個關鍵的處理步驟:
日誌資料上報
日誌資料清洗
實時計算程式
結果儲存
基於以上的業務處理流程,我們常用的實時處理技術選型和架構如下圖所示:
整體的程式碼開發包括:
Flume 和 Kafka 整合和部署
Kafka 模擬資料生成和傳送
Flink 和 Kafka 整合時間視窗設計
Flink 計算 PV、UV 程式碼實現
Flink 和 Redis 整合以及 Redis Sink 實現
Flume 和 Kafka 整合和部署
我們可以在 Flume 的官網下載安裝包,在這裡下載一個 1。8。0 的穩定版本,然後進行解壓:
tar zxf apache-flume-1。8。0-bin。tar。gz
可以看到有幾個關鍵的目錄,其中 conf/ 目錄則是我們存放配置檔案的目錄。
接下來我們整合 Flume 和 Kafka。整體整合思路為,我們的兩個 Flume Agent 分別部署在兩臺 Web 伺服器上,用來採集兩臺伺服器的業務日誌,並且 Sink 到另一臺 Flume Agent 上,然後將資料 Sink 到 Kafka 叢集。在這裡需要配置三個 Flume Agent。
首先在 Flume Agent 1 和 Flume Agent 2 上建立配置檔案,修改 source、channel 和 sink 的配置,vim log_kafka。conf 程式碼如下:
# 定義這個 agent 中各元件的名字
a1。sources = r1
a1。sinks = k1
a1。channels = c1
# source的配置,監聽日誌檔案中的新增資料
a1。sources。r1。type = exec
a1。sources。r1。command = tail -F /home/logs/access。log
#sink配置,使用avro日誌做資料的消費
a1。sinks。k1。type = avro
a1。sinks。k1。hostname = flumeagent03
a1。sinks。k1。port = 9000
#channel配置,使用檔案做資料的臨時快取
a1。channels。c1。type = file
a1。channels。c1。checkpointDir = /home/temp/flume/checkpoint
a1。channels。c1。dataDirs = /home/temp/flume/data
#描述和配置 source channel sink 之間的連線關係
a1。sources。r1。channels = c1
a1。sinks。k1。channel = c
上述配置會監聽 /home/logs/access。log 檔案中的資料變化,並且將資料 Sink 到 flumeagent03 的 9000 埠。
然後我們分別啟動 Flume Agent 1 和 Flume Agent 2,命令如下:
$ flume-ng agent
-c conf
-n a1
-f conf/log_kafka。conf >/dev/null 2>&1 &
第三個 Flume Agent 用來接收上述兩個 Agent 的資料,並且傳送到 Kafka。我們需要啟動本地 Kafka,並且建立一個名為 log_kafka 的 Topic。
然後,我們建立 Flume 配置檔案,修改 source、channel 和 sink 的配置,vim flume_kafka。conf 程式碼如下:
# 定義這個 agent 中各元件的名字
a1。sources = r1
a1。sinks = k1
a1。channels = c1
#source配置
a1。sources。r1。type = avro
a1。sources。r1。bind = 0。0。0。0
a1。sources。r1。port = 9000
#sink配置
a1。sinks。k1。type = org。apache。flume。sink。kafka。KafkaSink
a1。sinks。k1。topic = log_kafka
a1。sinks。k1。brokerList = 127。0。0。1:9092
a1。sinks。k1。requiredAcks = 1
a1。sinks。k1。batchSize = 20
#channel配置
a1。channels。c1。type = memory
a1。channels。c1。capacity = 1000
a1。channels。c1。transactionCapacity = 100
#描述和配置 source channel sink 之間的連線關係
a1。sources。r1。channels = c1
a1。sinks。k1。channel = c1
配置完成後,我們啟動該 Flume Agent:
$ flume-ng agent
-c conf
-n a1
-f conf/flume_kafka。conf >/dev/null 2>&1 &
當 Flume Agent 1 和 2 中監聽到新的日誌資料後,資料就會被 Sink 到 Kafka 指定的 Topic,我們就可以消費 Kafka 中的資料了。
我們現在需要消費 Kafka Topic 資訊,並且把序列化的訊息轉化為使用者的行為物件:
public class UserClick {
private String userId;
private Long timestamp;
private String action;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this。userId = userId;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this。timestamp = timestamp;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this。action = action;
}
public UserClick(String userId, Long timestamp, String action) {
this。userId = userId;
this。timestamp = timestamp;
this。action = action;
}
}
enum UserAction{
//點選
CLICK(“CLICK”),
//購買
PURCHASE(“PURCHASE”),
//其他
OTHER(“OTHER”);
private String action;
UserAction(String action) {
this。action = action;
}
}
在計算 PV 和 UV 的業務場景中,我們選擇使用訊息中自帶的事件時間作為時間特徵,程式碼如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment();
env。setStreamTimeCharacteristic(TimeCharacteristic。EventTime);
// 檢查點配置,如果要用到狀態後端,那麼必須配置
env。setStateBackend(new MemoryStateBackend(true));
Properties properties = new Properties();
properties。setProperty(“bootstrap。servers”, “127。0。0。1:9092”);
properties。setProperty(FlinkKafkaConsumerBase。KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, “10”);
FlinkKafkaConsumer
//設定從最早的offset消費
consumer。setStartFromEarliest();
DataStream
。addSource(consumer)
。name(“log_user_action”)
。map(message -> {
JSONObject record = JSON。parseObject(message);
return new UserClick(
record。getString(“user_id”),
record。getLong(“timestamp”),
record。getString(“action”)
);
})
。returns(TypeInformation。of(UserClick。class));
由於我們的使用者訪問日誌可能存在亂序,所以使用 BoundedOutOfOrdernessTimestampExtractor 來處理亂序訊息和延遲時間,我們指定訊息的亂序時間 30 秒,具體程式碼如下:
SingleOutputStreamOperator
@Override
public long extractTimestamp(UserClick element) {
return element。getTimestamp();
}
});
到目前為止,我們已經透過讀取 Kafka 中的資料,序列化為使用者點選事件的 DataStream,並且完成了水印和時間戳的設計和開發。
接下來,按照業務需要,我們需要開窗並且進行一天內使用者點選事件的 PV、UV 計算。這裡我們使用 Flink 提供的滾動視窗,並且使用 ContinuousProcessingTimeTrigger 來週期性的觸發視窗階段性計算。
dataStream
。windowAll(TumblingProcessingTimeWindows。of(Time。days(1), Time。hours(-8)))
。trigger(ContinuousProcessingTimeTrigger。of(Time。seconds(20)))
為了減少視窗內快取的資料量,我們可以根據使用者的訪問時間戳所在天進行分組,然後將資料分散在各個視窗內進行計算,接著在 State 中進行彙總。
首先,我們把 DataStream 按照使用者的訪問時間所在天進行分組:
userClickSingleOutputStreamOperator
。keyBy(new KeySelector
@Override
public String getKey(UserClick value) throws Exception {
return DateUtil。timeStampToDate(value。getTimestamp());
}
})
。window(TumblingProcessingTimeWindows。of(Time。days(1), Time。hours(-8)))
。trigger(ContinuousProcessingTimeTrigger。of(Time。seconds(20)))
。evictor(TimeEvictor。of(Time。seconds(0), true))
。。。
然後根據使用者的訪問時間所在天進行分組並且呼叫了 evictor 來剔除已經計算過的資料。其中的 DateUtil 是獲取時間戳的年月日:
public class DateUtil {
public static String timeStampToDate(Long timestamp){
ThreadLocal
= ThreadLocal。withInitial(() -> new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”));
String format = threadLocal。get()。format(new Date(timestamp));
return format。substring(0,10);
}
}
接下來我們實現自己的 ProcessFunction:
public class MyProcessWindowFunction extends ProcessWindowFunction
private transient MapState
private transient ValueState
@Override
public void open(Configuration parameters) throws Exception {
super。open(parameters);
uvState = this。getRuntimeContext()。getMapState(new MapStateDescriptor<>(“uv”, String。class, String。class));
pvState = this。getRuntimeContext()。getState(new ValueStateDescriptor
}
@Override
public void process(String s, Context context, Iterable
Integer pv = 0;
Iterator
while (iterator。hasNext()){
pv = pv + 1;
String userId = iterator。next()。getUserId();
uvState。put(userId,null);
}
pvState。update(pvState。value() + pv);
Integer uv = 0;
Iterator
while (uvIterator。hasNext()){
String next = uvIterator。next();
uv = uv + 1;
}
Integer value = pvState。value();
if(null == value){
pvState。update(pv);
}else {
pvState。update(value + pv);
}
out。collect(Tuple3。of(s,“uv”,uv));
out。collect(Tuple3。of(s,“pv”,pvState。value()));
}
}
我們在主程式中可以直接使用自定義的 ProcessFunction :
userClickSingleOutputStreamOperator
。keyBy(new KeySelector
@Override
public String getKey(UserClick value) throws Exception {
return value。getUserId();
}
})
。window(TumblingProcessingTimeWindows。of(Time。days(1), Time。hours(-8)))
。trigger(ContinuousProcessingTimeTrigger。of(Time。seconds(20)))
。evictor(TimeEvictor。of(Time。seconds(0), true))
。process(new MyProcessWindowFunction());
到此為止,我們已經計算出來了 PV 和 UV,下面我們分別講解 Flink 和 Redis 是如何整合實現 Flink Sink 的。
在這裡我們直接使用開源的 Redis 實現,首先新增 Maven 依賴如下:
可以透過實現 RedisMapper 來自定義 Redis Sink,在這裡我們使用 Redis 中的 HASH 作為儲存結構,Redis 中的 HASH 相當於 Java 語言裡面的 HashMap:
public class MyRedisSink implements RedisMapper
/**
* 設定redis資料型別
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand。HSET,“flink_pv_uv”);
}
//指定key
@Override
public String getKeyFromData(Tuple3
return data。f1;
}
//指定value
@Override
public String getValueFromData(Tuple3
return data。f2。toString();
}
}
上面實現了 RedisMapper 並覆寫了其中的 getCommandDescription、getKeyFromData、getValueFromData 3 種方法,其中 getCommandDescription 定義了儲存到 Redis 中的資料格式。這裡我們定義的 RedisCommand 為 HSET,使用 Redis 中的 HASH 作為資料結構;getKeyFromData 定義了 HASH 的 Key;getValueFromData 定義了 HASH 的值。
然後我們直接呼叫 addSink 函式即可:
。。。
userClickSingleOutputStreamOperator
。keyBy(new KeySelector
@Override
public String getKey(UserClick value) throws Exception {
return value。getUserId();
}
})
。window(TumblingProcessingTimeWindows。of(Time。days(1), Time。hours(-8)))
。trigger(ContinuousProcessingTimeTrigger。of(Time。seconds(20)))
。evictor(TimeEvictor。of(Time。seconds(0), true))
。process(new MyProcessWindowFunction())
。addSink(new RedisSink<>(conf,new MyRedisSink()));
。。。
到此為止,我們就會將結果存進了 Redis 中,我們在實際業務中可以選擇使用不同的目標庫例如:Hbase 或者 MySQL 等等。
總結
以 Flink 為代表的實時計算技術還是飛速發展中,眾多的新特性例如 Flink Hive Connector、CDC 增量同步等持續湧現,我們有理由相信基於 Flink 的實時計算平臺和實時資料倉庫的發展未來會大放異彩,解決掉業界在實時計算和實時數倉領域的痛點,成為大資料領域先進生產力的代表。
八千里路雲和月 | 從零到大資料專家學習路徑指南
我們在學習Flink的時候,到底在學習什麼?
193篇文章暴揍Flink,這個合集你需要關注一下
Flink生產環境TOP難題與最佳化,阿里巴巴藏經閣YYDS
Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點
我們在學習Spark的時候,到底在學習什麼?
在所有Spark模組中,我願稱SparkSQL為最強!
硬剛Hive | 4萬字基礎調優面試小總結
資料治理方法論和實踐小百科全書
標籤體系下的使用者畫像建設小指南
4萬字長文 | ClickHouse基礎&實踐&調優全視角解析
【面試&個人成長】2021年過半,社招和校招的經驗之談
大資料方向另一個十年開啟 |《硬剛系列》第一版完結
我寫過的關於成長/面試/職場進階的文章
當我們在學習Hive的時候在學習什麼?「硬剛Hive續集」
你好,我是王知無,一個大資料領域的硬核原創作者。
做過後端架構、資料中介軟體、資料平臺&架構、演算法工程化。
專注大資料領域實時動態&技術提升&個人成長&職場進階,歡迎關注。