Primeros pasos con apache-flink
Resumen y requisitos
Que es Flink
Al igual que Apache Hadoop y Apache Spark, Apache Flink es un marco de código abierto impulsado por la comunidad para el análisis distribuido de Big Data. . Escrito en Java, Flink tiene API para Scala, Java y Python, lo que permite análisis de transmisión por lotes y en tiempo real.
Requisitos
- un entorno tipo UNIX, como Linux, Mac OS X o Cygwin;
- Java 6.X o posterior;
- [opcional] Maven 3.0.4 o posterior.
Pila
Entornos de ejecución
Apache Flink es un sistema de procesamiento de datos y una alternativa al componente MapReduce de Hadoop. Viene con su propio tiempo de ejecución en lugar de construirse sobre MapReduce. Como tal, puede funcionar de forma completamente independiente del ecosistema Hadoop.
El ExecutionEnvironment
es el contexto en el que se ejecuta un programa. Hay diferentes entornos que puede utilizar, dependiendo de sus necesidades.
-
Entorno JVM: Flink puede ejecutarse en una sola máquina virtual Java, lo que permite a los usuarios probar y depurar programas Flink directamente desde su IDE. Al usar este entorno, todo lo que necesita son las dependencias maven correctas.
-
Entorno local: para poder ejecutar un programa en una instancia de Flink en ejecución (no desde su IDE), debe instalar Flink en su máquina. Consulte configuración local.
-
Entorno de clúster: ejecutar Flink de forma completamente distribuida requiere un clúster independiente o de hilo. Consulte la página de configuración del clúster o [este recurso compartido](http://www.slideshare.net/ sbaltagi/stepbystep-introduction-to-apache-flink) para obtener más información. mportant__: el
2.11
en el nombre del artefacto es la versión de scala, asegúrese de que coincida con la que tiene en su sistema.
API
Flink se puede utilizar para el procesamiento por secuencias o por lotes. Ofrecen tres API:
- DataStream API: procesamiento de flujo, es decir, transformaciones (filtros, ventanas de tiempo, agregaciones) en flujos de datos ilimitados.
- DataSet API: procesamiento por lotes, es decir, transformaciones en conjuntos de datos.
- Table API: un lenguaje de expresión similar a SQL (como los marcos de datos en Spark) que se puede incrustar en aplicaciones tanto por lotes como de transmisión.
Bloques de construcción
En el nivel más básico, Flink se compone de fuentes, transformaciones y sumideros.
En el nivel más básico, un programa Flink se compone de:
- Fuente de datos: datos entrantes que procesa Flink
- Transformaciones: el paso de procesamiento, cuando Flink modifica los datos entrantes
- Disipador de datos: donde Flink envía datos después del procesamiento
Las fuentes y los receptores pueden ser archivos locales/HDFS, bases de datos, colas de mensajes, etc. Ya hay muchos conectores de terceros disponibles, o puede crear uno propio fácilmente.
Configuración del tiempo de ejecución local
-
Asegúrese de tener Java 6 o superior y de que la variable de entorno
JAVA_HOME
esté configurada. -
descargue el último binario de flink [aquí] (https://flink.apache.org/downloads.html):
wget flink-XXXX.tar.gz
Si no planea trabajar con Hadoop, elija la versión Hadoop 1. Además, tenga en cuenta la versión de Scala que descargue, para que pueda agregar las dependencias correctas de Maven en sus programas.
-
empezar fuerte:
tar xzvf flink-XXXX.tar.gz ./flink/bin/start-local.sh
Flink ya está configurado para ejecutarse localmente.
Para asegurarse de que flink se está ejecutando, puede inspeccionar los registros en flink/log/
o abrir la interfaz de flink jobManager que se ejecuta en http://localhost:8081
.
-
detenerse con fuerza:
./flink/bin/stop-local.sh
Configuración del entorno Flink
Para ejecutar un programa flink desde su IDE (podemos usar Eclipse o Intellij IDEA (preferido)), necesita dos dependencias: flink-java
/ flink-scala
y flink-clients
(a partir de febrero de 2016) . Estos JARS se pueden agregar usando Maven y SBT (si está usando Scala).
- Maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.1.4</version>
</dependency>
-
SBT name := " "
version := "1.0" scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % "1.2.0", "org.apache.flink" %% "flink-clients" % "1.2.0" )
importante: el 2.11
en el nombre del artefacto es la versión de scala, asegúrese de que coincida con la que tiene en su sistema.
Contador de palabras - API de tabla
Este ejemplo es igual que WordCount, pero usa Table API. Consulte WordCount para obtener detalles sobre la ejecución y los resultados.
Experto
Para usar Table API, agregue flink-table
como una dependencia experta:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.1.4</version>
</dependency>
El código
public class WordCountTable{
public static void main( String[] args ) throws Exception{
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment( env );
// get input data
DataSource<String> source = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles"
);
// split the sentences into words
FlatMapOperator<String, String> dataset = source
.flatMap( ( String value, Collector<String> out ) -> {
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( token );
}
}
} )
// with lambdas, we need to tell flink what type to expect
.returns( String.class );
// create a table named "words" from the dataset
tableEnv.registerDataSet( "words", dataset, "word" );
// word count using an sql query
Table results = tableEnv.sql( "select word, count(*) from words group by word" );
tableEnv.toDataSet( results, Row.class ).print();
}
}
Nota: Para una versión que use Java < 8, reemplace la lambda por una clase anónima:
FlatMapOperator<String, String> dataset = source.flatMap( new FlatMapFunction<String, String>(){
@Override
public void flatMap( String value, Collector<String> out ) throws Exception{
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( token );
}
}
}
} );
El recuento de palabras
Experto
Agregue las dependencias flink-java
y flink-client
(como se explica en el ejemplo de configuración del entorno JVM).
El código
public class WordCount{
public static void main( String[] args ) throws Exception{
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// input data
// you can also use env.readTextFile(...) to get words
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap( new LineSplitter() )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );
// emit result
counts.print();
}
}
Divisor de línea.java
:
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>{
public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );
// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<String, Integer>( token, 1 ) );
}
}
}
}
Si usa Java 8, puede reemplazar .flatmap(new LineSplitter())
por una expresión lambda:
DataSet<Tuple2<String, Integer>> counts = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );
// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<>( token, 1 ) );
}
}
} )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );
Ejecución
Desde el IDE: simplemente presione ejecutar en su IDE. Flink creará un entorno dentro de la JVM.
Desde la línea de comandos de flink: para ejecutar el programa utilizando un entorno local independiente, haga lo siguiente:
-
asegúrese de que flink se esté ejecutando (
flink/bin/start-local.sh
); -
crear un archivo jar (
paquete maven
); -
use la herramienta de línea de comandos
flink
(en la carpetabin
de su instalación de flink) para iniciar el programa:flink run -c your.package.WordCount target/your-jar.jar
The
-c
option allows you to specify the class to run. It is not necessary if the jar is executable/defines a main class.
Resultado
(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)
WordCount - API de transmisión
Este ejemplo es igual que WordCount, pero usa Table API. Consulte WordCount para obtener detalles sobre la ejecución y los resultados.
Experto
Para usar la API de transmisión, agregue flink-streaming
como una dependencia experta:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.4</version>
</dependency>
El código
public class WordCountStreaming{
public static void main( String[] args ) throws Exception{
// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
DataStreamSource<String> source = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles"
);
source
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
// emit the pairs
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( new Tuple2<>( token, 1 ) );
}
}
} )
// due to type erasure, we need to specify the return type
.returns( TupleTypeInfo.getBasicTupleTypeInfo( String.class, Integer.class ) )
// group by the tuple field "0"
.keyBy( 0 )
// sum up tuple on field "1"
.sum( 1 )
// print the result
.print();
// start the job
env.execute();
}
}