Flink Table API 读写 MySQL
import org.apache.flink.connector.jdbc.table.JdbcConnectorOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;import static org.apache.flink.table.api.Expressions.$;public class TableApiMysql {public static void main(String[] args) {EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);Schema schema = Schema.newBuilder().column("user_id", DataTypes.BIGINT()).column("user_name", DataTypes.STRING()).build();TableDescriptor tableDescriptor = TableDescriptor.forConnector("jdbc").option(JdbcConnectorOptions.URL, "jdbc:mysql://localhost:3306/tmp").option(JdbcConnectorOptions.USERNAME, "root").option(JdbcConnectorOptions.PASSWORD, "123456").option(JdbcConnectorOptions.TABLE_NAME, "test").schema(schema).build();tableEnv.createTable("source", tableDescriptor);System.out.println("select format 1: ");tableEnv.from("source").select($("user_id"), $("user_name")).execute().print();tableEnv.executeSql("insert into source(user_name) select 'hello'");System.out.println("select format 2: ");Table table = tableEnv.sqlQuery("select * from source");TableResult execute = table.execute();execute.print();}
}