Flink寫入hive測試

2021-02-20 大數據雜燴

序言:前段時間測試讀取hive數據,今天測試寫入hive

一、插件版本 flink 1.11.1  hive 2.3.6  hadoop 2.7.6

二、pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.example</groupId>
  <artifactId>qjd_flink11</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.8</scala.version>
    <flink.version>1.11.1</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>


    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>1.11.1</version>
    </dependency>


    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>${flink.version}</version>
      <!--<scope>provided</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>1.11.1</version>
    </dependency>



    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_2.11</artifactId>
      <version>1.11.1</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.68</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>1.11.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>1.11.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime_2.11</artifactId>
      <version>1.11.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.11.0.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime-blink_2.11</artifactId>
      <version>1.11.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hive_2.11</artifactId>
      <version>1.11.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
      <version>1.11.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-filesystem_2.11</artifactId>
      <version>1.11.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-sql-connector-kafka_2.11</artifactId>
      <version>1.11.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.9.8</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.8</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>2.3.6</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.7.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-common</artifactId>
      <version>2.7.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <version>2.7.6</version>
    </dependency>

  </dependencies>
  <build>
    <sourceDirectory>src/main</sourceDirectory>
    <resources>
      <resource>
        <directory>src/main/resources</directory>
        <includes>
          <include>**/*.properties</include>
          <include>*.properties</include>
        </includes>
        <filtering>true</filtering>
      </resource>
    </resources>

    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <archive>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
            <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>8</source>
          <target>8</target>
        </configuration>
      </plugin>

    </plugins>
  </build>

  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

三、代碼

package com.qjdchina.qjdFlink;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import java.sql.Timestamp;

/**
 * @author QJDTEST
 * @date 2020/12/5 on 21:55.
 *    寫完數據需要刷新hive元資料庫才能查詢到數據
 *    在hive庫中執行 msck repair table fs_table
 */
public class StreamMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        bsEnv.enableCheckpointing(10000);
        bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
        DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource())
                .assignTimestampsAndWatermarks(
                        new AssignerWithPunctuatedWatermarks<UserInfo>() {
                            long water = 0l;

                            @Override
                            public Watermark checkAndGetNextWatermark(
                                    UserInfo lastElement,
                                    long extractedTimestamp) {
                                return new Watermark(water);
                            }

                            @Override
                            public long extractTimestamp(
                                    UserInfo element,
                                    long recordTimestamp) {
                                water = element.getTs().getTime();
                                return water;
                            }
                        });


        //構造hive catalog
        String name = "myhive";      // flink通過Catalog訪問hive,catalog定義一個名稱
        String defaultDatabase = "myhive";  // 對應hive默認資料庫名稱
        String hiveConfDir = "/opt/soft/hive-2.3.6/conf";  // hive-site.xml路徑
        String version = "2.3.6";       // Hive版本號


        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
        tEnv.registerCatalog("myhive", hive);
        tEnv.useCatalog("myhive");
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tEnv.useDatabase("myhive");

        tEnv.createTemporaryView("users", dataStream);

      //  創建hive表
        String hiveSql = "CREATE external TABLE fs_table (\n" +
                "  user_id STRING,\n" +
                "  order_amount DOUBLE" +
                ") partitioned by (dt string,h string,m string) " +
                "stored as ORC " +
                "TBLPROPERTIES (\n" +
                "  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
                "  'sink.partition-commit.delay'='0s',\n" +
                "  'sink.partition-commit.trigger'='partition-time',\n" +
                "  'sink.partition-commit.policy.kind'='metastore'" +
                ")";
        tEnv.executeSql(hiveSql);

        String insertSql = "insert into  fs_table SELECT userId, amount, " +
                " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
        tEnv.executeSql(insertSql);
    }


    public static class MySource implements SourceFunction<UserInfo> {

        String userids[] = {
                "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
                "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
                "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
                "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
                "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
        };

        @Override
        public void run(SourceFunction.SourceContext<UserInfo> sourceContext) throws Exception {

            while (true) {
                String userid = userids[(int) (Math.random() * (userids.length - 1))];
                UserInfo userInfo = new UserInfo();
                userInfo.setUserId(userid);
                userInfo.setAmount(Math.random() * 100);
                userInfo.setTs(new Timestamp(System.currentTimeMillis()));
                sourceContext.collect(userInfo);
                Thread.sleep(100);
            }
        }

        @Override
        public void cancel() {

        }
    }

    public static class UserInfo implements java.io.Serializable {
        private String userId;
        private Double amount;
        private Timestamp ts;

        public String getUserId() {
            return userId;
        }

        public void setUserId(String userId) {
            this.userId = userId;
        }

        public Double getAmount() {
            return amount;
        }

        public void setAmount(Double amount) {
            this.amount = amount;
        }

        public Timestamp getTs() {
            return ts;
        }

        public void setTs(Timestamp ts) {
            this.ts = ts;
        }
    }
}

三、flink lib jar添加

四、啟動hadoop集群,並提交 flink on yarn 模式 通過界面提交代碼

五、查看hive表

由於數據是分區寫入hive表,在查詢前先執行 msck repair table fs_table,刷新hive元資料庫;

相關焦點

  • Flink監控 Rest API
    { "jid": "a2848961b7e7768dcf2d9ac71405aacf", "name": "flink-kafka-stream", "isStoppable": false, "state": "RUNNING", "start-time": 1604157862897, "end-time": -1,
  • 通達信:電腦端指標導入、源碼的寫入、看盤模板保存(詳細圖文)
    重點提示:總共3個部份,導入+源碼寫入+看盤模板編輯第一部分:導入
  • 人格七宗罪測試:測試你是七大罪的屬性,測試自己的七宗罪!七宗罪心理測試
    本文為您分享的心理測試題主題為:人格7宗罪測試、測試你是七大罪的屬性、測試自己的七宗罪、人格七宗罪在線測試、人格七宗罪測試連結、七宗罪測試連結、
  • 智商測試:國際標準智商測試
    這是國際標準的智商測試問卷之一僅30題30分鐘即可以了解你的智商。智力的高低直接影響到一個人在社會上是否成功。趕緊來做做這套國際上權威的智商測試,了解一下自己的智商怎麼樣。【適合對象】16歲以上人群 想測試一下的話,就長按下面的二維碼進入,內有上百種職業指導、職業規劃與心理測評產品在社會中生存
  • JLPT日語能力測試簡介
    JLPT日本語能力測試簡介     日本語能力測試是日本國際交流基金會主辦的,面向母語為非日語的日語學習者的日語語言能力測試    評判日語等級的考試有很多,有J-Test、NAT-test、K-Cart、STBJ,而其中最有名,普遍認可度最高的就是JLPT(日本語能力測試)。    新JLPT分為N1、N2、N3、N4、N5共5個級別,N1與原日本語能力測試中的1級相比(約相當於我國大學本科專業日語3~4年級的水平),加深了高難度部分。
  • 新疆普通話測試站聯繫方式
    測試站聯繫方式01新疆維吾爾自治區普通話水平測試中心地址:烏魯木齊市新華南路654號電話:02
  • 超準心理測試:專業智商測試來了!
    ▼測評體驗價為 9.9 元測試時間需 20-45 分鐘測試題:35題丨6頁專業報告付費成功後點擊完成,頁面將自動跳轉購買的測評保存在菜單欄【我的測評】↑掃描 二維碼 馬上參與測試↑↓ 點擊閱讀原文,馬上進入測試吧
  • 測試一下,你是否擼多了?
    測試你的腎還好不好,憋一口氣,評論,不停的按8,不夠6行的,絕對是擼多了。像這樣的測試,小編都不屑去證明.大家看,我的腎非常好,甚至還能再點兩行。網站包含 6 個小測試:號碼記憶、反應時間、語言記憶、視覺記憶聽力損失、打字速度。
  • 測試:濾紙,你們好嗎?(一)
    由此,我們做了一系列的小測試。不算嚴謹,也不建議作為購買指導,僅供參考交流。,這一期,我們先TDS測試筆來測試濾紙在常溫方面的離子溶出度。通過TDS測試筆測試,我們這瓶作為浸泡基準液的礦泉水TDS穩定值為95ppm
  • 唾液測試準確率高達98% 在家測試更方便...
    ,唾液測試準確率高達98%。我們經由FDA認證,只需通過簡單的唾液測試,就可得到高達98%的靈敏度(意味著98%的陽性測試是正確的)和99%的特異性(意味著99%的陰性測試是正確的)的測試結果。」如何購買和使用新冠肺炎唾液測試套件?