您的当前位置:首页正文

大数据培训:构建Flink SQL流式计算平台

来源:华佗健康网

一、背景

Flink 由于阿里在国内的助推,火爆程度可以想象,且目前Flink 有非常明显的趋势是往SQL 方向进行的。很多大厂已经实现了Flink SQL化,那我们怎么去实现一个流式计算平台呢?

二、Flink SQL 初探以及代码实现

连接kafka 对数据进行处理写入mysql

package org.example;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;




public class SqlDemo {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();


TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);


//把kafka 中的topic映射成一个输入临时表
        tableEnv.executeSql(
"create table sensor_source (id string,name string) with (" +
" 'connector' = 'kafka'," +
"  'topic' = 'test_info_test'," +
"  'properties.bootstrap.servers' = 'localhost:9092'," +
"  'properties.group.id' = 'testGroup'," +
"  'scan.startup.mode' = 'earliest-offset'," +
"  'format' = 'json')"
        );
//把mysql 中的表映射成一个输出临时表


String sql = "CREATE TABLE print_table (\n" +
" id STRING,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";


String mysql_sql = "CREATE TABLE mysql_sink (\n" +
"                   id string,\n" +
"                   name string\n" +
" ) WITH (\n" +
"   'connector' = 'jdbc',\n" +
"   'url' = 'jdbc:mysql://ip:8081/kafka?serverTimezone=UTC',\n" +
"   'table-name' = 'test_info',\n" +
"   'username' = 'kafka',\n" +
"   'password' = 'Bonc@123'\n" +
" )";


String kafka_sink_sql=
"create table kafka_sink (id string,name string) with (" +
" 'connector' = 'kafka'," +
"  'topic' = 'test_info_2'," +
"  'properties.bootstrap.servers' = 'localhost:9092'," +
"  'format' = 'json')";


        tableEnv.executeSql(mysql_sql);
//tableEnv.executeSql(kafka_sink_sql);
//tableEnv.executeSql(sql);
//插入数据的sql语句
//tableEnv.executeSql("insert into print_table select * from sensor_source");


        tableEnv.executeSql("insert into mysql_sink select * from sensor_source");




//tableEnv.executeSql("insert into kafka_sink select * from sensor_source");


    }
}

运行之后mysql 里面数据就有了

三、Flink 实时计算平台

依据上面的代码,我们可以抽象出一层Flink 实时计算平台。

 文章来源于诸葛子房

因篇幅问题不能全部显示,请点此查看更多更全内容