中文字幕在亚洲第一在线_亚洲VA中文字幕无码一二三区_亚洲AV无码乱码在线观看裸奔_亚洲丰满熟女一区二区v

您的位置:首頁 >資訊 > 正文

環球速遞!大數據NiFi(十八):離線同步MySQL數據到HDFS

來源:騰訊云2023-02-21 20:16:57

?離線同步MySQL數據到HDFS

案例:使用NiFi將MySQL中數據導入到HDFS中。


【資料圖】

以上案例用到的處理器有“QueryDatabaseTable”、“ConvertAvroToJSON”、“SplitJson”、“PutHDFS”四個處理器。

一、配置“QueryDatabaseTable”處理器

該處理器主要使用提供的SQL語句或者生成SQL語句來查詢MySQL中的數據,查詢結果轉換成Avro格式。該處理器只能運行在主節點上。

關于“QueryDatabaseTable”處理器的“Properties”配置的說明如下:

配置項

默認值

允許值

描述

Database Connection Pooling Service(數據庫連接池服務)

用于獲得與數據庫的連接的Controller Service。

Database Type(數據庫類型)

Generic

選擇數據庫類型。Generic 通用類型OracleOracle 12+MS SQL 2012+MS SQL 2008MySQLPostgreSQL

Table Name(表名)

查詢數據庫的表名,當使用“Custom Query”時,此為查詢結果的別名,并作為FlowFile中的屬性。

Columns to Return(返回的列)

查詢返回的列,多個列使用逗號分隔。如果列中有特殊名稱需要加引號,則所有列都需要加引號處理。

Additional WHERE clause(where條件)

在構建SQL查詢時添加到WHERE條件中的自定義子句。

Custom Query(自定義SQL查詢)

自定義的SQL語句。該查詢被構建成子查詢,設置后不會從其他屬性構建SQL查詢。自定義SQL不支持Order by查詢。

Maximum-value Columns(最大值列)

指定增量查詢獲取最大值的列,多列使用逗號分開。指定后,這個處理器只能檢索到添加/更新的行。不能設置無法比較大小的列,例如:boolean/bit。如果不指定,則參照表中所有的列來查詢全量數據,這會對性能產生影響。

Max Wait Time(最大超時時間)

0 seconds

SQL查詢最大時長,默認為0沒有限制,設置小于0的時間默認為0。

Fetch Size(拉取數據量)

0

每次從查詢結果中拉取的數據量。

Max Rows Per Flow File(每個FlowFile行數)

0

在一個FlowFile文件中的數據行數。通過這個參數可以將很大的結果集分到多個FlowFile中。默認設置為0,所有結果存入一個FlowFile。

Output Batch Size(數據輸出批次量)

0

輸出的FlowFile批次數據大小,當設置為0代表所有數據輸出到下游關系。如果數據量很大,則有可能下游很久沒有收到數據,如果設置了,則每次達到該數據量就釋放數據,傳輸到下游。

Maximum Number of Fragments(最大片段數)

0

設置返回的最大數據片段數,設置0默認將所有數據片段返回,如果表非常大,設置后可以防止OOM錯誤。

Normalize Table/Column Names(標準表/列名)

false

truefalse

是否將列名中不兼容avro的字符修改為兼容avro的字符。例如,冒號和句點將被更改為下劃線,以構建有效的Avro記錄。

Transaction Isolation Level

設置事務隔離級別。

Use Avro Logical Types(使用Avro邏輯類型)

false

truefalse

是否對DECIMAL/NUMBER, DATE, TIME 和 TIMESTAMP 列使用Avro邏輯類型。

Default Decimal Precision(Decimal數據類型位數)

10

當 DECIMAL/NUMBER 數據類型轉換成Avro類型數據時,指定的數據位數。

Default Decimal Scale(Decimal 數據類型小數位數)

0

當 DECIMAL/NUMBER 數據類型轉換成Avro類型數據時,指定的小數點后的位數。

Generic 通用類型OracleOracle 12+MS SQL 2012+MS SQL 2008MySQLPostgreSQL

Table Name(表名)查詢數據庫的表名,當使用“Custom Query”時,此為查詢結果的別名,并作為FlowFile中的屬性。 Columns to Return (返回的列) 查詢返回的列,多個列使用逗號分隔。如果列中有特殊名稱需要加引號,則所有列都需要加引號處理。 Additional WHERE clause (where條件) 在構建SQL查詢時添加到WHERE條件中的自定義子句。 Custom Query (自定義SQL查詢) 自定義的SQL語句。該查詢被構建成子查詢,設置后不會從其他屬性構建SQL查詢。自定義SQL不支持Order by查詢。 Maximum-value Columns (最大值列) 指定增量查詢獲取最大值的列,多列使用逗號分開。指定后,這個處理器只能檢索到添加/更新的行。不能設置無法比較大小的列,例如:boolean/bit。如果不指定,則參照表中所有的列來查詢全量數據,這會對性能產生影響。 Max Wait Time(最大超時時間)0 seconds SQL查詢最大時長,默認為0沒有限制,設置小于0的時間默認為0。 Fetch Size(拉取數據量)0 每次從查詢結果中拉取的數據量。 Max Rows Per Flow File(每個FlowFile行數)0 在一個FlowFile文件中的數據行數。通過這個參數可以將很大的結果集分到多個FlowFile中。默認設置為0,所有結果存入一個FlowFile。 Output Batch Size(數據輸出批次量)0 輸出的FlowFile批次數據大小,當設置為0代表所有數據輸出到下游關系。如果數據量很大,則有可能下游很久沒有收到數據,如果設置了,則每次達到該數據量就釋放數據,傳輸到下游。 Maximum Number of Fragments(最大片段數)0 設置返回的最大數據片段數,設置0默認將所有數據片段返回,如果表非常大,設置后可以防止OOM錯誤。 Normalize Table/Column Names(標準表/列名)false true false 是否將列名中不兼容avro的字符修改為兼容avro的字符。例如,冒號和句點將被更改為下劃線,以構建有效的Avro記錄。 Transaction Isolation Level 設置事務隔離級別。 Use Avro Logical Types(使用Avro邏輯類型)false true false 是否對DECIMAL/NUMBER, DATE, TIME 和 TIMESTAMP 列使用Avro邏輯類型。 Default Decimal Precision(Decimal數據類型位數)10 當 DECIMAL/NUMBER 數據類型轉換成Avro類型數據時,指定的數據位數。 Default Decimal Scale(Decimal 數據類型小數位數)0 當 DECIMAL/NUMBER 數據類型轉換成Avro類型數據時,指定的小數點后的位數。

配置步驟如下:

1、新建“QueryDatabaseTable”處理器

2、配置“SCHEDULING”調度時間

這里調度時間配置為99999s,讀取數據庫,這里讀取一次即可,默認0會不間斷讀取數據庫會對服務器造成非常大壓力。執行僅支持“Primary”主節點運行。

3、配置“PROPERTIES”

配置“Database Connection Pooling Service”選擇創建,在彈出頁面中可以按照默認選擇直接點擊“Create”。

點擊“->”繼續配置MySQL連接:

在彈出的頁面中填入:

連接MysqlURL:

jdbc:mysql://192.168.179.5:3306/mynifi?characterEncoding=UTF-8&useSSL=false

MySQL驅動類:com.mysql.jdbc.DriverMySQL jar包路徑:需要提前在NiFI集群各個節點上創建對應目錄并上傳jar包。連接mysql的用戶名和密碼。

通過以上配置好連接mysql如下:

配置其他屬性如下:

二、???????配置“ConvertAvroToJSON”處理器

此處理器是將二進制Avro記錄轉換為JSON對象,提供了一個從Avro字段到JSON字段的直接映射,這樣得到的JSON將具有與Avro文檔相同的層次結構。輸出的JSON編碼為UTF-8編碼,如果傳入的FlowFile包含多個Avro記錄,則轉換后的FlowFile是一個含有所有Avro記錄的JSON數組或一個JSON對象序列(每個Json對象單獨成行)。如果傳入的FlowFile不包含任何記錄,則輸出一個空JSON對象。

關于“ConvertAvroToJSON”處理器的“Properties”配置的說明如下:

配置項

默認值

允許值

描述

JSON container options(Json選擇)

array

nonearray

如何解析Json對象,none:解析Json將每個Json對象寫入新行。array:解析到的json存入JsonArray一個對象

Wrap Single Record(數據庫類型)

false

truefalse

指定解析到的空記錄或者單條記錄是否按照“JSON container options”配置包裝對象。

Avro schema(表名)

如果Avro數據沒有Schema信息,需要配置。

配置步驟如下:

1、創建“ConvertAvroToJSON”處理器

2、配置“PROPERTIES”

3、連接“QueryDatabaseTable”處理器和“CovertAvroToJSON”處理器

連接好兩個處理器后,可以配置“Connection”為負載均衡方式傳遞數據:

三、???????配置“SplitJson”處理器

該處理器使用JsonPath表達式指定需要的Json數組元素,將Json數組中的多個Json對象切分出來,形成多個FlowFile。每個生成的FlowFile都由指定數組中的一個元素組成,并傳輸到關系"split",原始文件傳輸到關系"original"。如果沒有找到指定的JsonPath,或者沒有對數組元素求值,則將原始文件路由到"failure",不會生成任何文件。

關于“SplitJson”處理器的“Properties”配置的說明如下:

配置項

默認值

允許值

描述

JsonPath Expression(Json表達式)

一個JsonPath表達式,它指定用以分割的數組元素。

Null Value Representation(Null值表示)

empty string

empty stringthe string "null"

指定結果為空值時的表示形式。

配置步驟如下:

1、創建“SplitJson”處理器

2、配置“PROPERTIES”

3、連接“ConvertAvroToJSON”處理器和“SplitJson”處理器

連接后,連接關系選擇“success”:

同時配置“ConverAvroToJSON”處理失敗的數據自動終止:

四、配置“PutHDFS”處理器

該處理器是將FlowFile數據寫入到HDFS分布式文件系統中。關于“PutHDFS”處理器的“Properties”主要配置的說明如下:

配置項

默認值

允許值

描述

Hadoop Configuration Resources(Hadoop配置)

nonearray

HDFS配置文件,一個文件或者由逗號分隔的多個文件。不配置將在ClassPath中尋找‘core-site.xml’或者‘hdfs-site.xml’文件。

Directory(目錄)

需要寫入文件的HDFS父目錄。如果目錄不存在,將創建該目錄。

Conflict Resolution Strategy(沖突解決)

fail

replaceignorefailappend

指示當輸出目錄中已經存在同名文件時如何處理。

配置步驟如下:

1、創建“PutHDFS”處理器

2、配置“PROPERTIES”

注意:以上需要在各個NiFi集群節點上創建“/root/test”目錄,并且在該目錄下上傳hdfs-site.xml和core-site.xml文件。

3、連接“SplitJson”處理器和“PutHDFS”處理器

同時設置“SplitJson”處理器中“failure”和“original”數據關系自動終止。

設置“PutHDFS”處理器“success”和“failure”數據關系自動終止:

配置好的連接關系如下:

五、??????????????運行測試

1、在MySQL創建庫“mynifi”,并且創建表“test1”,向表中插入10條數據

mysql> create database mynifi;Query OK, 1 row affected (0.02 sec)mysql> use mynifi;Database changedmysql> create table test1(id int,name varchar(255),age int );Query OK, 0 rows affected (0.07 sec)mysql> insert into test1 values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tt",22)

2、首先啟動“QueryDatabaseTable”處理器觀察隊列數據

3、單獨啟動“ConvertAvroToJson”處理器觀察隊列數據

4、單獨啟動“SplitJson”處理器觀察隊列數據

5、單獨啟動“PutHDFS”處理器觀察HDFS對應目錄數據

查看數據:

注意:

如果在“QueryDatabaseTable”處理器中設置增屬性“Maximum-value Columns”為id,那么每次查詢都是大于id的增量數據。如果想要存入HDFS文件為多行而不是一行,可以將“CovertAvroToJson”處理器屬性“JSON container options”設置為none,直接解析Avro文件得到一個個json數據,然后直接連接“PutHDFS”處理器即可。

最近更新

中文字幕在亚洲第一在线_亚洲VA中文字幕无码一二三区_亚洲AV无码乱码在线观看裸奔_亚洲丰满熟女一区二区v

    91亚洲精品久久久蜜桃| 亚洲一区在线观看网站| 午夜精品久久久久久| 精品少妇一区二区三区日产乱码| 亚洲激情网站免费观看| 激情图区综合网| 中文字幕高清不卡| 欧美片网站yy| 亚洲免费视频成人| 国产成人在线看| 自拍av一区二区三区| 在线不卡的av| 亚洲激情六月丁香| 大陆成人av片| 亚洲精品网站在线观看| 欧美一区二区三区在线| 亚洲裸体xxx| 成人在线综合网站| 亚洲18女电影在线观看| 久久久久久久免费视频了| 青青草国产精品亚洲专区无| 国产日韩av一区| 在线不卡的av| 午夜日韩在线电影| 久久久久久久久99精品| 欧美日韩性生活| 一区二区久久久| 91丨porny丨最新| 91福利视频久久久久| 成人欧美一区二区三区黑人麻豆| 国产精品一区在线| 亚洲图片欧美色图| 国产精品视频yy9299一区| 精品一区二区在线免费观看| 亚洲少妇中出一区| 精品国产91亚洲一区二区三区婷婷| 日韩国产精品久久久久久亚洲| 国产亚洲精品中文字幕| 制服.丝袜.亚洲.中文.综合| 午夜精品久久久久久久蜜桃app| 国产视频一区在线播放| 欧美群妇大交群的观看方式| 亚洲小说春色综合另类电影| 久久亚洲精精品中文字幕早川悠里| 欧美日韩一级片在线观看| 亚洲午夜免费电影| 国产欧美综合色| 欧美xxxxx牲另类人与| 捆绑变态av一区二区三区| 最新中文字幕一区二区三区| 久久欧美中文字幕| 国产精华液一区二区三区| 婷婷亚洲久悠悠色悠在线播放| 亚洲欧美乱综合| 久久丝袜美腿综合| 日韩一区二区免费在线电影| 免费精品视频在线| 一区二区久久久久久| 国产精品对白交换视频| 99久久国产综合精品麻豆| 欧美精品三级日韩久久| 奇米一区二区三区| 亚洲一区二区三区四区在线| 中文字幕五月欧美| 久久久精品人体av艺术| 国产日韩av一区| 精品少妇一区二区三区在线播放| 国产一区中文字幕| 在线亚洲欧美专区二区| 午夜久久电影网| 亚洲精品国产视频| 国产精品成人网| 国产欧美一区二区三区在线老狼| www激情久久| 白白色亚洲国产精品| 69p69国产精品| 国产一区不卡在线| 欧美日韩黄色一区二区| 蜜臀久久99精品久久久久宅男| 亚洲国产婷婷综合在线精品| 亚洲国产精品一区二区久久 | 亚洲综合偷拍欧美一区色| 日韩理论在线观看| 中文字幕免费一区| 欧美激情一区不卡| 久久中文字幕电影| 久久精品视频在线免费观看| 9色porny自拍视频一区二区| 精品日本一线二线三线不卡| 成人小视频在线| 精品三级在线看| 99re热这里只有精品免费视频| 精品99999| www久久精品| 久久蜜桃一区二区| 91免费国产在线| 久久久青草青青国产亚洲免观| 99精品1区2区| 国产午夜三级一区二区三| 久久蜜桃av一区精品变态类天堂| 日本一区二区综合亚洲| 欧美国产视频在线| 国产欧美1区2区3区| 国产日韩精品一区| 亚洲日本电影在线| 亚洲人成小说网站色在线 | 国产欧美综合在线观看第十页| 久久综合九色欧美综合狠狠| 欧美激情中文不卡| 国产精品久久久久永久免费观看| 亚洲女人的天堂| 一区二区三区视频在线观看| 天天综合色天天综合色h| 色狠狠色狠狠综合| 国产在线一区二区| 日韩三级在线免费观看| 91亚洲国产成人精品一区二三| 日本一区二区视频在线观看| 国产精品每日更新| 亚洲一卡二卡三卡四卡 | 老司机午夜精品| 日韩一区二区高清| 久久久亚洲欧洲日产国码αv| 中文字幕亚洲在| 亚洲国产精品自拍| 久久精品在线免费观看| 国产欧美一区二区精品秋霞影院 | 成人午夜私人影院| 国产欧美一区二区精品性色| 国产精品久久久久久久久动漫| 亚洲国产日韩在线一区模特| 在线一区二区三区做爰视频网站| 国产麻豆成人传媒免费观看| 久久久久一区二区三区四区| 日韩一区在线看| 日日夜夜免费精品视频| 678五月天丁香亚洲综合网| ww久久中文字幕| 亚洲蜜臀av乱码久久精品蜜桃| 色婷婷激情综合| 成人小视频免费观看| 中文字幕一区二区三区四区不卡| 亚洲在线视频免费观看| 极品瑜伽女神91| 久久精品一区八戒影视| 亚洲精品va在线观看| 精品一区二区三区香蕉蜜桃| 久久午夜色播影院免费高清| 亚洲视频你懂的| 韩国精品主播一区二区在线观看| 久久久99精品久久| 一级中文字幕一区二区| 国产精品一区二区视频| 国产精品欧美一区喷水| 一本一道综合狠狠老| youjizz久久| 亚洲在线视频网站| 在线综合视频播放| 国产精品久久影院| 久久99国内精品| 国产欧美一区二区精品久导航| 亚洲成人激情av| 成人三级在线视频| 一区二区三区美女| 欧美一级片在线看| 综合久久久久久久| 国产剧情av麻豆香蕉精品| 中文字幕在线观看不卡| 欧美日韩在线播放三区四区| 国产网红主播福利一区二区| 免费人成在线不卡| 国产人成亚洲第一网站在线播放| 色综合欧美在线| xf在线a精品一区二区视频网站| 婷婷丁香久久五月婷婷| 2022国产精品视频| 色综合天天天天做夜夜夜夜做| 91麻豆免费视频| 奇米888四色在线精品| 国产欧美一区二区精品性| 在线精品国精品国产尤物884a| 久久婷婷成人综合色| 另类中文字幕网| 中文字幕在线免费不卡| 欧美精品乱人伦久久久久久| 亚洲视频一区二区免费在线观看| 国产aⅴ精品一区二区三区色成熟| 一区二区三区在线播| 日韩欧美黄色影院| 亚洲一区电影777| 2022国产精品视频| 久色婷婷小香蕉久久| 最新国产成人在线观看| 日韩一区二区免费电影| 亚洲国产精品欧美一二99| 久久蜜桃一区二区| 精品一区二区三区在线观看 | 亚洲午夜免费电影| 2021中文字幕一区亚洲| 久久精品99国产精品日本|