代码开发

Installation

最简单的引入 Flink 依赖项的方式就是利用 Maven 或者 Gradle:

<!-- Use this dependency if you are using the DataStream API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
<!-- Use this dependency if you are using the DataSet API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>

不过需要注意的是,由于 Scala 2.11 编译版本与 2.10 版本无法兼容,因此在 Flink 的依赖项后面也加了个后缀来表示使用的 Scala 版本,你可以选择需要的 Scala 版本进行操作。

WordCount

Flink 有个方便的地方就是能够直接在本地运行而不需要提交到集群上,下面的测试程序直接右键点击 Run 即可。

Stream

这里展示的是基本的将

public class WordCount {
//
// Program
//
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// execute and print result
counts.print();
}
/**
* @function 分词函数
*/
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}

Window Word Count

package wx;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Created by apple on 16/5/27.
*/
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter()) //将Sentence转化为Collector流
.keyBy(0) //将Collector中的Tuple2按照word排序
.timeWindow(Time.seconds(5))
.sum(1); //进行求和操作
dataStream.print();
env.execute("Window WordCount");
}
/**
* @function 分词与映射器
*/
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
//每遇到1个词,将它设置加1
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

Submit Jobs

Command-Line Interface

笔者建议可以将 Flink 的命令添加到全局 :

export FLINK_HOME=/Users/apple/Desktop/Tools/SDK/Flink/flink-1.0.3
export PATH=$PATH:$FLINK_HOME/bin

完整的参数列表列举如下:

  • Run example program with no arguments.

    ./bin/flink run ./examples/batch/WordCount.jar
  • Run example program with arguments for input and result files

    ./bin/flink run ./examples/batch/WordCount.jar \
    file:///home/user/hamlet.txt file:///home/user/wordcount_out
  • Run example program with parallelism 16 and arguments for input and result files

    ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
    file:///home/user/hamlet.txt file:///home/user/wordcount_out
  • Run example program with flink log output disabled

    ./bin/flink run -q ./examples/batch/WordCount.jar
  • Run example program in detached mode

    ./bin/flink run -d ./examples/batch/WordCount.jar
  • Run example program on a specific JobManager:

    ./bin/flink run -m myJMHost:6123 \
    ./examples/batch/WordCount.jar \
    file:///home/user/hamlet.txt file:///home/user/wordcount_out
  • Run example program with a specific class as an entry point:

    ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
    ./examples/batch/WordCount.jar \
    file:///home/user/hamlet.txt file:///home/user/wordcount_out
  • Run example program using a per-job YARN cluster with 2 TaskManagers:

    ./bin/flink run -m yarn-cluster -yn 2 \
    ./examples/batch/WordCount.jar \
    hdfs:///user/hamlet.txt hdfs:///user/wordcount_out
  • Display the optimized execution plan for the WordCount example program as JSON:

    ./bin/flink info ./examples/batch/WordCount.jar \
    file:///home/user/hamlet.txt file:///home/user/wordcount_out
  • List scheduled and running jobs (including their JobIDs):

    ./bin/flink list
  • List scheduled jobs (including their JobIDs):

    ./bin/flink list -s
  • List running jobs (including their JobIDs):

    ./bin/flink list -r
  • Cancel a job:

    ./bin/flink cancel <jobID>
  • Stop a job (streaming jobs only):

    ./bin/flink stop <jobID>

Program:貌似不可用,API 变化了

try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// set the parallelism to 10 here
client.run(program, 10, true);
} catch (ProgramInvocationException e) {
e.printStackTrace();
}