9年互聯網/大數據領域研發、架構經驗,2015年加入京東物流,主要負責大數據相關架構與開發工作。
摘要: 數據蜂巢平臺是京東物流自主研發的分布式、高性能、高可用、支持異構,離線和實時的大數據同步與管理平臺。關鍵技術:HA;離線與實時同步整合;binlog采集,存儲與訂閱;客戶端并發消費;一致性校驗與修復;任務隔離。
目前已經在京東物流系統中大規模應用,比如單源和多源復制,從全國各地倉儲園區集群(上百個)實時復制到IDC,從mysql到ES,從mysql到cassandra等等。倉儲園區硬件、網絡環境復雜,數據蜂巢平臺需要考慮硬件設施和網絡故障的容錯性。議題主要分享平臺誕生的背景,使用的關鍵技術,架構的演進過程,演進過程中所踩過的坑。
正文:
京東物流一線數據大部分都存在MySQL數據庫上,分布比較廣,包括國內外園區庫房和IDC,這樣數據使用起來極不方便,各業務系統為了使用數據,開發出多種版本的同步工具,結果導致管理非常困難以及資源的浪費。這時就需要一個統一的平臺把這些數據管理起來。
架構設計
關于數據同步,主要分為:批量同步、實時訂閱和實時同步。
批量同步: 采用sqoop的模式,把數據分片,然后進行多機并發復制,用以提升效率。
實時訂閱:使用消息隊列,即將binlog事件解析生成對應的消息存儲在隊列中;
實時同步:通過客戶端去消費隊列將數據寫入目標存儲,從而實現數據的實時同步。
如何中將以上三個功能整合到一個平臺?
首先,適配批量同步,參照sqoop, sqoop是借助于hadoop集群提交一個mr作業, 類似的,蜂巢系統也提供一個集群,將一個批量同步作業分為多task去執行。實時訂閱也可使用這種思路,不同的是每一個task即對應一臺mysql實例的binlog采集;同理實時同步也可分成多個task,每一個task即為一個消息隊列的消費客戶端。
架構圖如下所示:
采用了經典的master、slave結構,每一個slave上可以跑三種對應的任務。BatchWorker負責把MySQL數據批量同步到storage里面,這里的storage是一個抽象,不一定就是存儲之類的系統,也有可能是一些業務的處理。StreamWorker負責binlog的采集和消息隊列的維護,如果有訂閱需求,使用系統提供的客戶端Consumer就可以直接消費。
Pieworker則是Consumer的分裝,只不過這個客戶端由集群去管理維護。
BatchWorker結構比較簡單,主要由Fetcher、Sinker和Storage(與整體架構圖中的storage不同,此處為一個buffer)組成。Fetcher接口負責抽取數據,Storage負責緩存數據,Sinker接口負責數據的寫入。
StreamWorker模仿MySQL的主從復制機制, RelayLogTask負責binlog的抽取;HHLTask負責解析binlog,生成消息體,最后存在hhl中,hhl即為上文提到消息隊列。
客戶端: StreamJob除了采集binlog,維護消息隊列外,還提供了一個ClientServer模塊,用于接收客戶端的消費請求。當Consumer需要消費的時候,請求會發送到ClientServer,ClientServer通過索引快速的定位到hhl文件的某一個位置,然后把數據讀取出來,然后經過客戶端的指定過濾規則進行過濾,最終將消息體傳送給客戶端。
消息的位點由三部分組成的:Serverld對應的就是MySQL的Serverld;binlogPosition是一個長整型數字,高32位為binlog文件下標,低32位為文件內部的位置;time為對應的binlog事件產生的時間。
為解決性能問題,客戶端支持多種并行模式。
第一種是以事務為單位串行處理;
第二種是在第一種基礎上進行了一個簡單的優化,比如:一個事務內只對單表操作,那么這個事務是完全可以并發的,不同表間的事務順序并不會影響最終結果的一致性;但事務內對多表操作就需要繼續串行。所以第二種并發模式就是不斷串行并行轉變的過程。
以上兩種消費模式是以事務為單位,以下兩種以行為單位
第三種是表級并發,將事務拆分為多個行級操作,同表操作由同一線程完成,保障同一表操作的有序性;
第四種是行級并發,主要是把MySQL的數據同步到NOSQL上時使用,關系型數據庫同步時局限性較大,因為行級并發只保障同一主鍵的操作有序,而關系型數據庫會存在多個唯一約束,這樣即使保障了主鍵的操作有序也可能引起數據不一致。
集群的特性:作為一個集群都需要要保證高可用、數據本地性和負載均衡。高可用這一塊主要分為三部分:
MySQL: 由DBA保證數據庫的高可用,但當Mysql主從切換時,binlog的位點是不一致的,此時系統通過Serverld的檢測發現該變更,然后通過時間在新的mysql實列上定位正確的binlog位點。
master(Queen):基于Zookeeper完成Active角色的選舉
Bee(Slave):Bee宕機后由Master將其運行的任務遷移到其他的Slave上。
數據本地性:每一個Bee在啟動時都配置了機房,分組等信息,作業提交時可以指定自己期望的運行位置,與hadoop,spark類似。
負載均衡: 每一個Bee會將自己的負載信息通過心跳發給Queen,queen進行作業調度時,會在滿足數據本地性的前提下選擇壓力最小的機器去運行新任務。
演進
HHL文件丟失:上文提到過,如果Streamworker的運行主機宕機,Master會把它遷移到另外一臺機器上,但是Streamworker采集解析的binlog存在本地,遷移后會引起數據丟失,解決這個問題通用的方案是多副本,但大數據量下的多復本會造成磁盤空間的浪費,尤其是在庫房環境下。
并且這些數據有一個特點,就是發生遷移時,雖然解析過的數據丟失,但是原始binlog都會在機器上保存(dba會保留n天的binlog數據),最終可以通過數據補全來保證數據不丟失。
上圖為第一版streamworker的擴展,最右面的可以認為是這一組的主線程,虛線位置發生任務遷移,切換到了新的主機上,此時虛線左邊的數據全部丟失。如果有客戶端需要消費丟失的數據,服務端則啟動一組新線程,然后進行catchup,catchup會把丟失的部分補齊并提供給客戶端消費。
元數據: binlog是不記錄字段名等元數據的,而客戶端消費時需要。最簡單的方式是收集到binlog之后,去源庫上查詢,但在binlog采集延遲期間如果有ddl操作,會導致元數據不準確。為解決該問題系統實現了一個快照模塊。
在StreamJob初次啟動的時候,把對應的MySQL里面所有表都做一份快照,在此后的運行期間監控DDL操作,當解析到DDL操作時會將原快照取出生成一個復本,并在這個復本上應用這個ddl,生成新的快照 。這樣系統可以保證任何時刻binlog對應的元數據都是正確的,方便用戶使用。
客戶端:服務端并不記錄客戶端的消費位點,消費的位置由客戶端自行存儲。由于客戶端采用的是并發消費模式,消息又是嚴格有序的,此時位點記錄就必須保障每一個記錄下的位點之前的所有消息都被正確處理了,此處引入了一個環形提交隊列(具體實現與disruptor類似)。
當連續的多個消息被正確處理,并達到記錄位點的間隔,此時提交隊列會將一個位點寫入對應的存儲介質。比如1,2,3,4,5,8,10,14被處理完成,位點提交間隔為5,則5位置對應的位點被記錄,當6,7,9被處理完成后10再被記錄。
以下主要為易用性的改進:
SQL: 用戶通過SQL描述需要同步哪些字段,同步條件等信息,服務端通過解析sql執行對應的同步邏輯。
Union: 用于處理多表合一的場景,通過加入來源標識字段來解決唯一約束。
Join: 在同步的過程中完成寬表的加工,內部通過緩存,布隆過濾器等優化方案來提升性能。
一致性較驗:
系統提供了兩種模型:
第一種是使用pt_table_checksum的思路,比如要較驗一張表的數據,先把這張表的數據分成多個片段,然后對這些片段進行crc計算,并將結果和計算的范圍存儲到同一個數據庫的表里去,此時會觸發一個binlog事件,當消費者消費到這個事件后重現該操作,通過比對計算結果值來確定數據是否一致。
這種方式一是侵入性比較強,需要在原庫上建對應的比對結果表,二是需要加鎖,三是對延遲的要求很高,當延遲較大時,消費端拿不到對應的比對事件,將無法確定數據一致性。
第二種數據校驗的模型是基于BatchJob實現。
Fetcher抽取源和目標雙方的數據,排序后通過storage把數據傳遞給sinker,Sinker根據用戶自定義的比較接口對數據進行比較,最后將差異通過Collector進行收集。這種比對方式的缺點是不能保證時間序列,在比對的時候數據是變更的,比對出來的結果可能并不是真正的差異。
此時我們需要修復比對結果,首先在比對開始時把消費端的位點記錄下來,比對完成后,如果有差異則從比對開始前記錄的位點進行binlog重放,通過分析binlog中操作,對差異進行修復,輸出最終結果。
該比對模型最大的問題是需要抽取比對雙方的數據,對帶寬占用較大,為減少網絡傳輸,內部會對數據進行初步的篩選,將要比對的數據分成多個片段,在存儲端對每個片段進行md5計算,fetcher只收集md5值 ,只有雙方md5值不同時才將真正的數據抽取到計算端進行比較。
修復:
系統提供兩種修復方式:
一是基于binlog事件傳遞,當比對出差異數據后,只需要在源庫上對差異數據進行一個偽操作(比如更改update_time字段),觸發binlog事件的產生,消費端收到該事件即可修復錯誤數據(事件對應的消息體內包含了所有對應字段的值,并不只限于update_time字段),該方式缺點是受延遲的影響,同時還需要源庫的寫權限。
二是直接修復,即直接在目標上將差異數據修正。在修復數據時需要加鎖或暫停同步,避免并發問題。
資源的隔離: 系統默認只提供了常用的同步功能,當默認實現無法滿足客戶需求時,用戶需要自行編寫代碼來實現對應的接口來完成他們的邏輯。Bee開始使用的線程模型,即每一個Bee可以看做一個線程池,多個任務都在同一線程池內運行,而用戶自定義代碼又無法控制,導致不同任務相互影響,更有甚者任務結束后資源不釋放。
為保證資源隔離,將線程改為進程,將不同作業的任務交由不同的子進程去執行,子進程啟動時會指定額定運行資源;所有子進程由Bee統一管理,Bee不再運行具體任務,作業結束后,Bee將未退出的子進程全部強制殺死。
新時代鞋服物流與供應鏈面臨的變革和挑戰03月07日 20:38
點贊:這個雙11,物流大佬一起做了這件事11月22日 21:43
物流管理機構及政策分布概覽12月04日 14:10
盤點:2017中國零售業十大事件12月12日 13:57
2017年中國零售電商十大熱點事件點評12月28日 09:58