out) throws Exception {
String connection = DBUtils.getConnection();
System.out.println("connection = [" + connection + "]");
DBUtils.returnConnection(connection);
}
}).print();
}first groupBy sortGroup Scala first表示获取前几个,groupBy表示分组,sortGroup表示分组内排序
def firstFunction(env:ExecutionEnvironment): Unit = {
val info = ListBuffer[(Int, String)]()
info.append((1, "hadoop"))
info.append((1, "spark"))
info.append((1, "flink"))
info.append((2, "java"))
info.append((2, "springboot"))
info.append((3, "linux"))
info.append((4, "vue"))
val data = env.fromCollection(info)
data.first(3).print()
//输出:(1,hadoop)
//(1,spark)
//(1,flink)
data.groupBy(0).first(2).print()//根据第一个字段分组,每个分组获取前两个数据
//(3,linux)
//(1,hadoop)
//(1,spark)
//(2,java)
//(2,springboot)
//(4,vue)
data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() //根据第一个字段分组,然后在分组内根据第二个字段升序排序,并取出前两个数据
//输出(3,linux)
//(1,flink)
//(1,hadoop)
//(2,java)
//(2,springboot)
//(4,vue)
} Java public static void firstFunction(ExecutionEnvironment env) throws Exception {
List> info = new ArrayList<>();
info.add(new Tuple2<>(1, "hadoop"));
info.add(new Tuple2<>(1, "spark"));
info.add(new Tuple2<>(1, "flink"));
info.add(new Tuple2<>(2, "java"));
info.add(new Tuple2<>(2, "springboot"));
info.add(new Tuple2<>(3, "linux"));
info.add(new Tuple2<>(4, "vue"));
DataSource> data = env.fromCollection(info);
data.first(3).print();
data.groupBy(0).first(2).print();
data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();
} FlatMap Function 获取一个元素,然后产生0个、1个或多个元素
Scala def flatMapFunction(env: ExecutionEnvironment): Unit = {
val info = ListBuffer[(String)]()
info.append("hadoop,spark");
info.append("hadoop,flink");
info.append("flink,flink");
val data = env.fromCollection(info)
data.flatMap(_.split(",")).print()
} 输出:
hadoop
spark
hadoop
flink
flink
flink FlatMap将每个元素都用逗号分割,然后变成多个。
经典例子:
data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print() 将每个元素用逗号分割,然后每个元素做map,然后根据第一个字段分组,然后根据第二个字段求和。
输出结果如下:
(hadoop,2)
(flink,3)
(spark,1) Java 同样实现一个经典案例wordcount
public static void flatMapFunction(ExecutionEnvironment env) throws Exception {
List info = new ArrayList<>();
info.add("hadoop,spark");
info.add("hadoop,flink");
info.add("flink,flink");
DataSource data = env.fromCollection(info);
data.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String input, Collector out) throws Exception {
String[] splits = input.split(",");
for(String split: splits) {
//发送出去
out.collect(split);
}
}
}).map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2<>(value,1);
}
}).groupBy(0).sum(1).print();
} Distinct 去重操作
Scala def distinctFunction(env: ExecutionEnvironment): Unit = {
val info = ListBuffer[(String)]()
info.append("hadoop,spark");
info.append("hadoop,flink");
info.append("flink,flink");
val data = env.fromCollection(info)
data.flatMap(_.split(",")).distinct().print()
} 这样就将每一个元素都做了去重操作。输出如下:
hadoop
flink
spark Java public static void distinctFunction(ExecutionEnvironment env) throws Exception {
List info = new ArrayList<>();
info.add("hadoop,spark");
info.add("hadoop,flink");
info.add("flink,flink");
DataSource data = env.fromCollection(info);
data.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String input, Collector out) throws Exception {
String[] splits = input.split(",");
for(String split: splits) {
//发送出去
out.collect(split);
}
}
}).distinct().print();
} Join Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.
result = input1.join(input2)
.where(0) // key of the first input (tuple field 0)
.equalTo(1); // key of the second input (tuple field 1) 表示第一个tuple input1中的第0个字段,与第二个tuple input2中的第一个字段进行join。
def joinFunction(env: ExecutionEnvironment): Unit = {
val info1 = ListBuffer[(Int, String)]() //编号 名字
info1.append((1, "hadoop"))
info1.append((2, "spark"))
info1.append((3, "flink"))
info1.append((4, "java"))
val info2 = ListBuffer[(Int, String)]() //编号 城市
info2.append((1, "北京"))
info2.append((2, "上海"))
info2.append((3, "深圳"))
info2.append((5, "广州"))
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.join(data2).where(0).equalTo(0).apply((first, second)=>{
(first._1, first._2, second._2)
}).print()
} 输出结果如下:
(3,flink,深圳)
(1,hadoop,北京)
(2,spark,上海) Java public static void joinFunction(ExecutionEnvironment env) throws Exception {
List> info1 = new ArrayList<>(); //编号 名字
info1.add(new Tuple2<>(1, "hadoop"));
info1.add(new Tuple2<>(2, "spark"));
info1.add(new Tuple2<>(3, "flink"));
info1.add(new Tuple2<>(4, "java"));
List> info2 = new ArrayList<>(); //编号 城市
info2.add(new Tuple2<>(1, "北京"));
info2.add(new Tuple2<>(2, "上海"));
info2.add(new Tuple2<>(3, "深圳"));
info2.add(new Tuple2<>(5, "广州"));
DataSource> data1 = env.fromCollection(info1);
DataSource> data2 = env.fromCollection(info2);
data1.join(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {
@Override
public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
return new Tuple3(first.f0, first.f1,second.f1);
}
}).print();
} Tuple2, Tuple2表示两个输入的集合,Tuple3>表示输出的Tuple3
OuterJoin 上面讲的join是内连接,这个OuterJoin是外连接,包括左外连接,右外连接,全连接在两个数据集上。
def outJoinFunction(env: ExecutionEnvironment): Unit = {
val info1 = ListBuffer[(Int, String)]() //编号 名字
info1.append((1, "hadoop"))
info1.append((2, "spark"))
info1.append((3, "flink"))
info1.append((4, "java"))
val info2 = ListBuffer[(Int, String)]() //编号 城市
info2.append((1, "北京"))
info2.append((2, "上海"))
info2.append((3, "深圳"))
info2.append((5, "广州"))
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {
if (second == null) {
(first._1, first._2, "-")
}else {
(first._1, first._2, second._2)
}
}).print() //左外连接 把左边的所有数据展示出来
} 左外连接,当左边的数据在右边没有对应的数据时,需要进行处理,否则会出现空指针异常。输出如下:
(3,flink,深圳)
(1,hadoop,北京)
(2,spark,上海)
(4,java,-) 右外连接:
data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {
if (first == null) {
(second._1, "-", second._2)
}else {
(first._1, first._2, second._2)
}
}).print() 右外连接,输出:
(3,flink,深圳)
(1,hadoop,北京)
(5,-,广州)
(2,spark,上海) 全连接:
data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {
if (first == null) {
(second._1, "-", second._2)
}else if (second == null){
(second._1, "-", second._2)
} else {
(first._1, first._2, second._2)
}
}).print() (3,flink,深圳)
(1,hadoop,北京)
(5,-,广州)
(2,spark,上海)
(4,java,-) Java 左外连接:
public static void outjoinFunction(ExecutionEnvironment env) throws Exception {
List> info1 = new ArrayList<>(); //编号 名字
info1.add(new Tuple2<>(1, "hadoop"));
info1.add(new Tuple2<>(2, "spark"));
info1.add(new Tuple2<>(3, "flink"));
info1.add(new Tuple2<>(4, "java"));
List> info2 = new ArrayList<>(); //编号 城市
info2.add(new Tuple2<>(1, "北京"));
info2.add(new Tuple2<>(2, "上海"));
info2.add(new Tuple2<>(3, "深圳"));
info2.add(new Tuple2<>(5, "广州"));
DataSource> data1 = env.fromCollection(info1);
DataSource> data2 = env.fromCollection(info2);
data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {
@Override
public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
if(second == null) {
return new Tuple3(first.f0, first.f1, "-");
}
return new Tuple3(first.f0, first.f1,second.f1);
}
}).print();
} 右外连接:
data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {
@Override
public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
if (first == null) {
return new Tuple3(second.f0, "-", second.f1);
}
return new Tuple3(first.f0, first.f1, second.f1);
}
}).print(); 全连接:
data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {
@Override
public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {
if (first == null) {
return new Tuple3(second.f0, "-", second.f1);
} else if (second == null) {
return new Tuple3(first.f0, first.f1, "-");
}
return new Tuple3(first.f0, first.f1, second.f1);
}
}).print(); cross function Scala 笛卡尔积,左边与右边交叉处理
def crossFunction(env: ExecutionEnvironment): Unit = {
val info1 = List("乔峰", "慕容复")
val info2 = List(3,1,0)
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.cross(data2).print()
} 输出:
(乔峰,3)
(乔峰,1)
(乔峰,0)
(慕容复,3)
(慕容复,1)
(慕容复,0) Java public static void crossFunction(ExecutionEnvironment env) throws Exception {
List info1 = new ArrayList<>();
info1.add("乔峰");
info1.add("慕容复");
List info2 = new ArrayList<>();
info2.add("3");
info2.add("1");
info2.add("0");
DataSource data1 = env.fromCollection(info1);
DataSource data2 = env.fromCollection(info2);
data1.cross(data2).print();
} 到此,关于“Apache下Flink transformation的用法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!
本文名称:Apache下Flinktransformation的用法
文章网址:http://cdweb.net/article/pshoog.html