Primeros pasos con apache-flink

Resumen y requisitos

Que es Flink

Al igual que Apache Hadoop y Apache Spark, Apache Flink es un marco de código abierto impulsado por la comunidad para el análisis distribuido de Big Data. . Escrito en Java, Flink tiene API para Scala, Java y Python, lo que permite análisis de transmisión por lotes y en tiempo real.

Requisitos

  • un entorno tipo UNIX, como Linux, Mac OS X o Cygwin;
  • Java 6.X o posterior;
  • [opcional] Maven 3.0.4 o posterior.

Pila

ingrese la descripción de la imagen aquí

Entornos de ejecución

Apache Flink es un sistema de procesamiento de datos y una alternativa al componente MapReduce de Hadoop. Viene con su propio tiempo de ejecución en lugar de construirse sobre MapReduce. Como tal, puede funcionar de forma completamente independiente del ecosistema Hadoop.

El ExecutionEnvironment es el contexto en el que se ejecuta un programa. Hay diferentes entornos que puede utilizar, dependiendo de sus necesidades.

  1. Entorno JVM: Flink puede ejecutarse en una sola máquina virtual Java, lo que permite a los usuarios probar y depurar programas Flink directamente desde su IDE. Al usar este entorno, todo lo que necesita son las dependencias maven correctas.

  2. Entorno local: para poder ejecutar un programa en una instancia de Flink en ejecución (no desde su IDE), debe instalar Flink en su máquina. Consulte configuración local.

  3. Entorno de clúster: ejecutar Flink de forma completamente distribuida requiere un clúster independiente o de hilo. Consulte la página de configuración del clúster o [este recurso compartido](http://www.slideshare.net/ sbaltagi/stepbystep-introduction-to-apache-flink) para obtener más información. mportant__: el 2.11 en el nombre del artefacto es la versión de scala, asegúrese de que coincida con la que tiene en su sistema.

API

Flink se puede utilizar para el procesamiento por secuencias o por lotes. Ofrecen tres API:

  • DataStream API: procesamiento de flujo, es decir, transformaciones (filtros, ventanas de tiempo, agregaciones) en flujos de datos ilimitados.
  • DataSet API: procesamiento por lotes, es decir, transformaciones en conjuntos de datos.
  • Table API: un lenguaje de expresión similar a SQL (como los marcos de datos en Spark) que se puede incrustar en aplicaciones tanto por lotes como de transmisión.

Bloques de construcción

En el nivel más básico, Flink se compone de fuentes, transformaciones y sumideros.

ingrese la descripción de la imagen aquí

En el nivel más básico, un programa Flink se compone de:

  • Fuente de datos: datos entrantes que procesa Flink
  • Transformaciones: el paso de procesamiento, cuando Flink modifica los datos entrantes
  • Disipador de datos: donde Flink envía datos después del procesamiento

Las fuentes y los receptores pueden ser archivos locales/HDFS, bases de datos, colas de mensajes, etc. Ya hay muchos conectores de terceros disponibles, o puede crear uno propio fácilmente.

Configuración del tiempo de ejecución local

  1. Asegúrese de tener Java 6 o superior y de que la variable de entorno JAVA_HOME esté configurada.

  2. descargue el último binario de flink [aquí] (https://flink.apache.org/downloads.html):

    wget flink-XXXX.tar.gz
    

Si no planea trabajar con Hadoop, elija la versión Hadoop 1. Además, tenga en cuenta la versión de Scala que descargue, para que pueda agregar las dependencias correctas de Maven en sus programas.

  1. empezar fuerte:

    tar xzvf flink-XXXX.tar.gz
    ./flink/bin/start-local.sh
    

Flink ya está configurado para ejecutarse localmente. Para asegurarse de que flink se está ejecutando, puede inspeccionar los registros en flink/log/ o abrir la interfaz de flink jobManager que se ejecuta en http://localhost:8081.

  1. detenerse con fuerza:

    ./flink/bin/stop-local.sh
    

Para ejecutar un programa flink desde su IDE (podemos usar Eclipse o Intellij IDEA (preferido)), necesita dos dependencias: flink-java / flink-scala y flink-clients (a partir de febrero de 2016) . Estos JARS se pueden agregar usando Maven y SBT (si está usando Scala).

  • Maven
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.1.4</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.1.4</version>
</dependency>
  • SBT name := " "

      version := "1.0"
    
      scalaVersion := "2.11.8"
    
      libraryDependencies ++= Seq(
        "org.apache.flink" %% "flink-scala" % "1.2.0",
        "org.apache.flink" %% "flink-clients" % "1.2.0"
      )
    

importante: el 2.11 en el nombre del artefacto es la versión de scala, asegúrese de que coincida con la que tiene en su sistema.

Contador de palabras - API de tabla

Este ejemplo es igual que WordCount, pero usa Table API. Consulte WordCount para obtener detalles sobre la ejecución y los resultados.

Experto

Para usar Table API, agregue flink-table como una dependencia experta:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.1.4</version>
</dependency>

El código

public class WordCountTable{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment( env );

        // get input data
        DataSource<String> source = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );

        // split the sentences into words
        FlatMapOperator<String, String> dataset = source
                .flatMap( ( String value, Collector<String> out ) -> {
                    for( String token : value.toLowerCase().split( "\\W+" ) ){
                        if( token.length() > 0 ){
                            out.collect( token );
                        }
                    }
                } )
                // with lambdas, we need to tell flink what type to expect
                .returns( String.class );

        // create a table named "words" from the dataset
        tableEnv.registerDataSet( "words", dataset, "word" );

        // word count using an sql query
        Table results = tableEnv.sql( "select word, count(*) from words group by word" );
        tableEnv.toDataSet( results, Row.class ).print();
    }
}

Nota: Para una versión que use Java < 8, reemplace la lambda por una clase anónima:

FlatMapOperator<String, String> dataset = source.flatMap( new FlatMapFunction<String, String>(){
        @Override
        public void flatMap( String value, Collector<String> out ) throws Exception{
            for( String token : value.toLowerCase().split( "\\W+" ) ){
                if( token.length() > 0 ){
                    out.collect( token );
                }
            }
        }
    } );

El recuento de palabras

Experto

Agregue las dependencias flink-java y flink-client (como se explica en el ejemplo de configuración del entorno JVM).

El código

public class WordCount{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // input data
        // you can also use env.readTextFile(...) to get words
        DataSet<String> text = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles,"
        );

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap( new LineSplitter() )
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy( 0 )
                        .aggregate( Aggregations.SUM, 1 );

        // emit result
        counts.print();
    }   
}

Divisor de línea.java:

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>{

    public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
        // normalize and split the line into words
        String[] tokens = value.toLowerCase().split( "\\W+" );

        // emit the pairs
        for( String token : tokens ){
            if( token.length() > 0 ){
                out.collect( new Tuple2<String, Integer>( token, 1 ) );
            }
        }
    }
}

Si usa Java 8, puede reemplazar .flatmap(new LineSplitter()) por una expresión lambda:

DataSet<Tuple2<String, Integer>> counts = text
    // split up the lines in pairs (2-tuples) containing: (word,1)
    .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
        // normalize and split the line into words
        String[] tokens = value.toLowerCase().split( "\\W+" );

        // emit the pairs
        for( String token : tokens ){
            if( token.length() > 0 ){
                out.collect( new Tuple2<>( token, 1 ) );
            }
        }
    } )
    // group by the tuple field "0" and sum up tuple field "1"
    .groupBy( 0 )
    .aggregate( Aggregations.SUM, 1 );

Ejecución

Desde el IDE: simplemente presione ejecutar en su IDE. Flink creará un entorno dentro de la JVM.

Desde la línea de comandos de flink: para ejecutar el programa utilizando un entorno local independiente, haga lo siguiente:

  1. asegúrese de que flink se esté ejecutando (flink/bin/start-local.sh);

  2. crear un archivo jar (paquete maven);

  3. use la herramienta de línea de comandos flink (en la carpeta bin de su instalación de flink) para iniciar el programa:

      flink run -c your.package.WordCount target/your-jar.jar
    

    The -c option allows you to specify the class to run. It is not necessary if the jar is executable/defines a main class.

Resultado

(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)

WordCount - API de transmisión

Este ejemplo es igual que WordCount, pero usa Table API. Consulte WordCount para obtener detalles sobre la ejecución y los resultados.

Experto

Para usar la API de transmisión, agregue flink-streaming como una dependencia experta:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.1.4</version>
</dependency>

El código

public class WordCountStreaming{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // get input data
        DataStreamSource<String> source = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );
        
        source
                // split up the lines in pairs (2-tuples) containing: (word,1)
                .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
                    // emit the pairs
                    for( String token : value.toLowerCase().split( "\\W+" ) ){
                        if( token.length() > 0 ){
                            out.collect( new Tuple2<>( token, 1 ) );
                        }
                    }
                } )
                // due to type erasure, we need to specify the return type
                .returns( TupleTypeInfo.getBasicTupleTypeInfo( String.class, Integer.class ) )
                // group by the tuple field "0"
                .keyBy( 0 )
                // sum up tuple on field "1"
                .sum( 1 )
                // print the result
                .print();

        // start the job
        env.execute();
    }
}