跳到主要内容

Iceberg in Action 1

阅读需 2 分钟

安装 hadoop

  1. 下载

    Hadoop :https://archive.apache.org/dist/hadoop/core/hadoop-2.7.7/	
  2. 解压

    tar -xvf hadoop-2.7.7.tar.gz
  3. 配置环境变量

    export HADOOP_HOME=/path/to/hadoop-2.7.7
    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
    export HADOOP_HDFS_HOME=$HADOOP_HOME
    export PATH=$PATH:$HADOOP_HOME/bin
  4. 执行source

    source .bash_profile
  5. 验证

    hadoop version
  6. 配置Hadoop

    进入~/etc/hadoop/ 目录

    配置hadoop-env.sh配置:

    export JAVA_HOME=/path/to/java/

    配置core-site.xml:

    <configuration>
    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
    </property>
    </configuration>

    配置hdfs-site.xml:

    <configuration>
    <configuration>
    <property>
    <name>dfs.replication</name>
    <value>1</value>
    </property>
    <property>
    <name>dfs.namenode.name.dir</name>
    <value>file:/path/to/hadoop/hdfs/namenode</value>
    </property>
    <property>
    <name>dfs.datanode.data.dir</name>
    <value>file:/path/to/hadoop/hdfs/datanode</value>
    </property>
    </configuration>
    </configuration>

    格式化hdfs:

    hdfs namenode -format

    启动hadoop:

    cd {hadoop_home}/sbin
    ./start-all.sh

    查看启动是否正常:

    jps
    1986 Jps
    41715 SecondaryNameNode
    41460 DataNode
    41287 NameNode
    39660 ResourceManager

    出现NameNodeDataNode表示已经正常启动

    注意:启动的时候,需要对机器做ssh免登入,包括本机也是,否则无法启动。

Flink的接入,可以参照官方文档来,我这里简单列一下步骤:

  1. 下载 Flink

    FLINK_VERSION=1.11.1
    SCALA_VERSION=2.12
    APACHE_FLINK_URL=archive.apache.org/dist/flink/
    wget ${APACHE_FLINK_URL}/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
    tar xzvf flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
  2. Hadoop环境中启动一个独立的Flink集群

    # HADOOP_HOME is your hadoop root directory after unpack the binary package.
    export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

    # Start the flink standalone cluster
    ./bin/start-cluster.sh
  3. 启动Flink SQL Client客户端

    # download Iceberg dependency
    ICEBERG_VERSION=0.11.1
    MAVEN_URL=https://repo1.maven.org/maven2
    ICEBERG_MAVEN_URL=${MAVEN_URL}/org/apache/iceberg
    ICEBERG_PACKAGE=iceberg-flink-runtime
    wget ${ICEBERG_MAVEN_URL}/${ICEBERG_PACKAGE}/${ICEBERG_VERSION}/${ICEBERG_PACKAGE}-${ICEBERG_VERSION}.jar

    # download the flink-sql-connector-hive-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar
    HIVE_VERSION=2.3.6
    SCALA_VERSION=2.11
    FLINK_VERSION=1.11.0
    FLINK_CONNECTOR_URL=${MAVEN_URL}/org/apache/flink
    FLINK_CONNECTOR_PACKAGE=flink-sql-connector-hive
    wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}/${FLINK_VERSION}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar

    # open the SQL client.
    /path/to/bin/sql-client.sh embedded \
    -j ${ICEBERG_PACKAGE}-${ICEBERG_VERSION}.jar \
    -j ${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar \
    shell
  4. 简单使用

    -- 1. 创建 hadoop_catalog
    CREATE CATALOG hadoop_catalog WITH (
    'type'='iceberg',
    'catalog-type'='hadoop',
    'warehouse'='hdfs://localhost:9000/user/hive/warehouse',
    'property-version'='1'
    );

    -- 2. 创建 database
    CREATE DATABASE iceberg_db;

    use iceberg_db;

    -- 3. 创建非分区表和分区表;
    CREATE TABLE `hadoop_catalog`.`iceberg_db`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
    );

    CREATE TABLE `hadoop_catalog`.`iceberg_db`.`sample_partition` (
    id BIGINT COMMENT 'unique id',
    data STRING
    ) PARTITIONED BY (data);

    -- 4. 插入数据
    insert into `hadoop_catalog`.`iceberg_db`.`sample` values (1,'test1');
    insert into `hadoop_catalog`.`iceberg_db`.`sample` values (2,'test2');

    INSERT into `hadoop_catalog`.`iceberg_db`.sample_partition PARTITION(data='city') SELECT 86;

    -- 5. 查询数据

    select * from `hadoop_catalog`.`iceberg_db`.`sample`;
    select * from `hadoop_catalog`.`iceberg_db`.`sample` where id=1;
    select * from `hadoop_catalog`.`iceberg_db`.`sample` where data='test1';

CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://localhost:9000/user/hive/warehouse'
);

hadoop_catalog

CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://localhost:9000/user/hive/warehouse',
'property-version'='1'
);

遇到问题

  1. hadoop 启动的时候,需要做免登入,本机也一样。
  2. iceberg无法用于全文检索,但可以用于标签检索。

参考

基于 Flink+Iceberg 构建企业级实时数据湖

Flink Learning

通俗易懂了解什么是数据仓库

Flink + Iceberg 全场景实时数仓的建设实践_腾讯

Flink + Iceberg + 对象存储,构建数据湖方案_阿里

基于 Flink+Iceberg 构建企业级实时数据湖

Loading Comments...