Premiers pas avec apache-flink

Présentation et exigences

Qu’est-ce que Flink

Comme Apache Hadoop et Apache Spark, Apache Flink est un framework open source piloté par la communauté pour l’analyse Big Data distribuée. . Écrit en Java, Flink possède des API pour Scala, Java et Python, permettant des analyses de flux par lots et en temps réel.

Conditions

  • un environnement de type UNIX, tel que Linux, Mac OS X ou Cygwin ;
  • Java 6.X ou ultérieur ;
  • [facultatif] Maven 3.0.4 ou version ultérieure.

Empiler

[![entrez la description de l’image ici][1]][1]

Environnements d’exécution

Apache Flink est un système de traitement de données et une alternative au composant MapReduce de Hadoop. Il est livré avec son propre runtime plutôt que de s’appuyer sur MapReduce. En tant que tel, il peut fonctionner complètement indépendamment de l’écosystème Hadoop.

Le ExecutionEnvironment est le contexte dans lequel un programme est exécuté. Il existe différents environnements que vous pouvez utiliser, en fonction de vos besoins.

  1. Environnement JVM : Flink peut s’exécuter sur une seule machine virtuelle Java, permettant aux utilisateurs de tester et de déboguer les programmes Flink directement depuis leur IDE. Lorsque vous utilisez cet environnement, tout ce dont vous avez besoin est les bonnes dépendances Maven.

  2. Environnement local : pour pouvoir exécuter un programme sur une instance Flink en cours d’exécution (et non depuis votre IDE), vous devez installer Flink sur votre machine. Voir configuration locale.

  3. Environnement de cluster : l’exécution de Flink de manière entièrement distribuée nécessite un cluster autonome ou filaire. Voir la page de configuration du cluster ou [ce slideshare](http://www.slideshare.net/ sbaltagi/stepbystep-introduction-to-apache-flink) pour plus d’informations. mportant__ : le 2.11 dans le nom de l’artefact est la version scala, assurez-vous de correspondre à celle que vous avez sur votre système.

# API

Flink peut être utilisé pour le traitement par flux ou par lots. Ils proposent trois API :

  • DataStream API : traitement de flux, c’est-à-dire transformations (filtres, fenêtres temporelles, agrégations) sur des flux de données illimités.
  • DataSet API : traitement par lots, c’est-à-dire transformations sur des ensembles de données.
  • Table API : un langage d’expression de type SQL (comme les dataframes dans Spark) qui peut être intégré à la fois dans des applications de traitement par lots et de diffusion en continu.

Blocs de construction

Au niveau le plus élémentaire, Flink est constitué de source(s), transformation(s) et puits(s).

[![entrez la description de l’image ici][2]][2]

Au niveau le plus élémentaire, un programme Flink est composé de :

  • Source de données : données entrantes traitées par Flink
  • Transformations : l’étape de traitement, lorsque Flink modifie les données entrantes
  • Récepteur de données : où Flink envoie les données après le traitement

Les sources et les récepteurs peuvent être des fichiers locaux/HDFS, des bases de données, des files d’attente de messages, etc. De nombreux connecteurs tiers sont déjà disponibles, ou vous pouvez facilement créer le vôtre.

[1] : https://i.stack.imgur.com/ziCa7.png [2] : https://i.stack.imgur.com/Zn1EI.png

Configuration de l’environnement d’exécution local

  1. Assurez-vous que vous avez Java 6 ou supérieur et que la variable d’environnement JAVA_HOME est définie.

  2. téléchargez le dernier binaire flink ici:

    wget flink-XXXX.tar.gz
    

Si vous ne prévoyez pas de travailler avec Hadoop, choisissez la version hadoop 1. Notez également la version scala que vous téléchargez, afin que vous puissiez ajouter les bonnes dépendances maven dans vos programmes.

  1. commencer fort :

    tar xzvf flink-XXXX.tar.gz
    ./flink/bin/start-local.sh
    

Flink est déjà configuré pour s’exécuter localement. Pour vous assurer que flink est en cours d’exécution, vous pouvez inspecter les journaux dans flink/log/ ou ouvrir l’interface de flink jobManager s’exécutant sur http://localhost:8081.

  1. arrêtez fort :

    ./flink/bin/stop-local.sh
    

Pour exécuter un programme flink à partir de votre IDE (nous pouvons utiliser Eclipse ou Intellij IDEA (préféré)), vous avez besoin de deux dépendances : flink-java / flink-scala et flink-clients (à partir de février 2016) . Ces JARS peuvent être ajoutés à l’aide de Maven et SBT (si vous utilisez scala).

  • Maven
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.1.4</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.1.4</version>
</dependency>
  • SBT name := " "

      version := "1.0"
    
      scalaVersion := "2.11.8"
    
      libraryDependencies ++= Seq(
        "org.apache.flink" %% "flink-scala" % "1.2.0",
        "org.apache.flink" %% "flink-clients" % "1.2.0"
      )
    

important : le 2.11 dans le nom de l’artefact est la version scala, assurez-vous de correspondre à celle que vous avez sur votre système.

WordCount - API de tableau

Cet exemple est identique à WordCount, mais utilise l’API Table. Voir WordCount pour plus de détails sur l’exécution et les résultats.

Maven

Pour utiliser l’API Table, ajoutez flink-table en tant que dépendance Maven :

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.1.4</version>
</dependency>

Le code

public class WordCountTable{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment( env );

        // get input data
        DataSource<String> source = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );

        // split the sentences into words
        FlatMapOperator<String, String> dataset = source
                .flatMap( ( String value, Collector<String> out ) -> {
                    for( String token : value.toLowerCase().split( "\\W+" ) ){
                        if( token.length() > 0 ){
                            out.collect( token );
                        }
                    }
                } )
                // with lambdas, we need to tell flink what type to expect
                .returns( String.class );

        // create a table named "words" from the dataset
        tableEnv.registerDataSet( "words", dataset, "word" );

        // word count using an sql query
        Table results = tableEnv.sql( "select word, count(*) from words group by word" );
        tableEnv.toDataSet( results, Row.class ).print();
    }
}

Note : Pour une version utilisant Java < 8, remplacez le lambda par une classe anonyme :

FlatMapOperator<String, String> dataset = source.flatMap( new FlatMapFunction<String, String>(){
        @Override
        public void flatMap( String value, Collector<String> out ) throws Exception{
            for( String token : value.toLowerCase().split( "\\W+" ) ){
                if( token.length() > 0 ){
                    out.collect( token );
                }
            }
        }
    } );

Nombre de mots

Maven

Ajoutez les dépendances flink-java et flink-client (comme expliqué dans l’exemple de configuration de l’environnement JVM).

Le code

public class WordCount{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // input data
        // you can also use env.readTextFile(...) to get words
        DataSet<String> text = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles,"
        );

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap( new LineSplitter() )
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy( 0 )
                        .aggregate( Aggregations.SUM, 1 );

        // emit result
        counts.print();
    }   
}

LineSplitter.java :

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>{

    public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
        // normalize and split the line into words
        String[] tokens = value.toLowerCase().split( "\\W+" );

        // emit the pairs
        for( String token : tokens ){
            if( token.length() > 0 ){
                out.collect( new Tuple2<String, Integer>( token, 1 ) );
            }
        }
    }
}

Si vous utilisez Java 8, vous pouvez remplacer .flatmap(new LineSplitter()) par une expression lambda :

DataSet<Tuple2<String, Integer>> counts = text
    // split up the lines in pairs (2-tuples) containing: (word,1)
    .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
        // normalize and split the line into words
        String[] tokens = value.toLowerCase().split( "\\W+" );

        // emit the pairs
        for( String token : tokens ){
            if( token.length() > 0 ){
                out.collect( new Tuple2<>( token, 1 ) );
            }
        }
    } )
    // group by the tuple field "0" and sum up tuple field "1"
    .groupBy( 0 )
    .aggregate( Aggregations.SUM, 1 );

Exécution

Depuis l’IDE : appuyez simplement sur run dans votre IDE. Flink créera un environnement à l’intérieur de la JVM.

Depuis la ligne de commande flink : pour exécuter le programme dans un environnement local autonome, procédez comme suit :

  1. assurez-vous que flink est en cours d’exécution (flink/bin/start-local.sh);

  2. créez un fichier jar (paquet maven);

  3. utilisez l’outil de ligne de commande flink (dans le dossier bin de votre installation flink) pour lancer le programme :

      flink run -c your.package.WordCount target/your-jar.jar
    

    The -c option allows you to specify the class to run. It is not necessary if the jar is executable/defines a main class.

Résultat

(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)

WordCount - API de diffusion en continu

Cet exemple est identique à WordCount, mais utilise l’API Table. Voir WordCount pour plus de détails sur l’exécution et les résultats.

Maven

Pour utiliser l’API Streaming, ajoutez flink-streaming en tant que dépendance Maven :

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.1.4</version>
</dependency>

Le code

public class WordCountStreaming{

    public static void main( String[] args ) throws Exception{

        // set up the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // get input data
        DataStreamSource<String> source = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles"
        );
        
        source
                // split up the lines in pairs (2-tuples) containing: (word,1)
                .flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
                    // emit the pairs
                    for( String token : value.toLowerCase().split( "\\W+" ) ){
                        if( token.length() > 0 ){
                            out.collect( new Tuple2<>( token, 1 ) );
                        }
                    }
                } )
                // due to type erasure, we need to specify the return type
                .returns( TupleTypeInfo.getBasicTupleTypeInfo( String.class, Integer.class ) )
                // group by the tuple field "0"
                .keyBy( 0 )
                // sum up tuple on field "1"
                .sum( 1 )
                // print the result
                .print();

        // start the job
        env.execute();
    }
}