spark (v1.3) - présentation (français)

43
Alexis Seigneurin @aseigneurin @ippontech

Upload: alexis-seigneurin

Post on 08-Aug-2015

335 views

Category:

Software


4 download

TRANSCRIPT

Page 2: Spark (v1.3) - Présentation (Français)

Spark

● Traitement de larges volumes de données● Traitement distribué (commodity hardware)● Ecrit en Scala, bindings Java et Python

Page 3: Spark (v1.3) - Présentation (Français)

Histoire

● 2009 : AMPLab de l'Université de Berkeley● Juin 2013 : "Top-level project" de la

fondation Apache● Mai 2014 : version 1.0.0● Actuellement : version 1.3

Page 4: Spark (v1.3) - Présentation (Français)

Use cases

● Analyse de logs● Traitement de fichiers texte● Analytics● Traitement données objets connectés● Recherche distribuée (Google, avant)● Détection de fraude● Recommandation (articles, produits...)

Page 5: Spark (v1.3) - Présentation (Français)

Proximité avec Hadoop

● Mêmes use cases● Même modèle de

développement : MapReduce

● Intégration dans l'écosystème

Page 6: Spark (v1.3) - Présentation (Français)

Plus simple qu’Hadoop

● API plus simple à prendre en main● Modèle MapReduce "relâché"● Spark Shell : traitement interactif

Page 8: Spark (v1.3) - Présentation (Français)

Écosystème Spark

● Spark● Spark Shell● Spark Streaming● Spark SQL● Spark ML● GraphX

Page 9: Spark (v1.3) - Présentation (Français)

Intégration

● Yarn, Zookeeper, Mesos● HDFS● Cassandra, Elasticsearch, MongoDB● Zeppelin

Page 10: Spark (v1.3) - Présentation (Français)

Fonctionnement de Spark

Page 11: Spark (v1.3) - Présentation (Français)

● Resilient Distributed Dataset● Abstraction, collection traitée en parallèle● Tolérant à la panne● Manipulation de tuples :

○ Clé - Valeur○ Tuples indépendants les uns des autres

RDD

Page 12: Spark (v1.3) - Présentation (Français)

Sources

● Fichier sur HDFS● Fichier local● Collection en mémoire● Amazon S3● Base NoSQL● ...● Ou une implémentation custom de

InputFormat

Page 13: Spark (v1.3) - Présentation (Français)

Transformations

● Manipule un RDD, retourne un autre RDD● Lazy !● Exemples :

○ map() : une valeur → une valeur○ mapToPair() : une valeur → un tuple○ filter() : filtre les valeurs/tuples○ groupByKey() : regroupe les valeurs par clés○ reduceByKey() : aggrège les valeurs par clés○ join(), cogroup()... : jointure entre deux RDD

Page 14: Spark (v1.3) - Présentation (Français)

Actions finales

● Ne retournent pas un RDD● Exemples :

○ count() : compte les valeurs/tuples○ saveAsHadoopFile() : sauve les résultats au

format Hadoop○ foreach() : exécute une fonction sur chaque

valeur/tuple○ collect() : récupère les valeurs dans une liste

(List<T>)

Page 15: Spark (v1.3) - Présentation (Français)

Exemple

Page 16: Spark (v1.3) - Présentation (Français)

● Arbres de Paris : fichier CSV en Open Data● Comptage d’arbres par espèce

Spark - Exemple

geom_x_y;circonfere;adresse;hauteurenm;espece;varieteouc;dateplanta48.8648454814, 2.3094155344;140.0;COURS ALBERT 1ER;10.0;Aesculus hippocastanum;;48.8782668139, 2.29806967519;100.0;PLACE DES TERNES;15.0;Tilia platyphyllos;;48.889306184, 2.30400164126;38.0;BOULEVARD MALESHERBES;0.0;Platanus x hispanica;;48.8599934405, 2.29504883623;65.0;QUAI BRANLY;10.0;Paulownia tomentosa;;1996-02-29...

Page 17: Spark (v1.3) - Présentation (Français)

Spark - ExempleJavaSparkContext sc = new JavaSparkContext("local", "arbres");

sc.textFile("data/arbresalignementparis2010.csv") .filter(line -> !line.startsWith("geom")) .map(line -> line.split(";")) .mapToPair(fields -> new Tuple2<String, Integer>(fields[4], 1)) .reduceByKey((x, y) -> x + y) .sortByKey() .foreach(t -> System.out.println(t._1 + " : " + t._2));

[... ; … ; …]

[... ; … ; …]

[... ; … ; …]

[... ; … ; …]

[... ; … ; …]

[... ; … ; …]

u

m

k

m

a

a

textFile mapToPairmap

reduceByKey

foreach

1

1

1

1

1

u

m

k

1

2

1

2a

...

...

...

...

filter

...

...

sortByKey

a

m

2

1

2

1u

...

...

...

...

...

...

geom;...

1 k

Page 18: Spark (v1.3) - Présentation (Français)

Spark - ExempleAcacia dealbata : 2

Acer acerifolius : 39

Acer buergerianum : 14

Acer campestre : 452

...

Page 19: Spark (v1.3) - Présentation (Français)

DataFrames et Spark SQL

Page 20: Spark (v1.3) - Présentation (Français)

● Spark 1.3● Processing de données structurées● DataFrames ~= RDD + colonnes nommées● DSL :

○ select()○ where()○ groupBy()○ ...

DataFrames

Page 21: Spark (v1.3) - Présentation (Français)

DataFrames

Préalable :

● Disposer de données tabulaires● Décrire le schéma → DataFrame

Description de schéma :

● Description programmatique des données● Inférence de schéma par réflexion (POJO)

Page 22: Spark (v1.3) - Présentation (Français)

JavaRDD<Row> rdd = trees.map(fields -> Row.create( Float.parseFloat(fields[3]), fields[4]));

● Création de données tabulaires○ Type Row

○ Type personnalisé

DataFrames - Exemple

---------------------------------------

| 10.0 | Aesculus hippocastanum |

| 15.0 | Tilia platyphyllos |

| 0.0 | Platanus x hispanica |

| 10.0 | Paulownia tomentosa |

| ... | ... |

JavaRDD<Row> rdd = trees.map(fields -> new Tree( Float.parseFloat(fields[3]), fields[4]));

Page 23: Spark (v1.3) - Présentation (Français)

DataFrames - Inférence du schémaDataFrame df = sqlContext.createDataFrame(trees, Tree.class);

adresse circonfere dateplanta espece geom_x_y hauteurenm varieteouc COURS ALBERT 1ER 140.0 Aesculus hippocas... 48.8648454814, 2.... 10.0 PLACE DES TERNES 100.0 Tilia platyphyllos 48.8782668139, 2.... 15.0 BOULEVARD MALESHE... 38.0 Platanus x hispanica 48.889306184, 2.3... 0.0 QUAI BRANLY 65.0 1996-02-29 Paulownia tomentosa 48.8599934405, 2.... 10.0

df.printSchema();

df.show();

root |-- adresse: string (nullable = true) |-- circonfere: string (nullable = true) |-- dateplanta: string (nullable = true) |-- espece: string (nullable = true) |-- geom_x_y: string (nullable = true) |-- hauteurenm: float (nullable = false) |-- varieteouc: string (nullable = true)

Page 24: Spark (v1.3) - Présentation (Français)

● Comptage d’arbres par espèce

DataFrames - DSL

espece COUNT(1)

Acacia dealbata 2

Acer acerifolius 39

Acer buergerianum 14

Acer campestre 452

Acer cappadocicum 111

...

DataFrame df = sqlContext.createDataFrame(trees, Tree.class);df .select(df.col("espece")) .where(df.col("espece").notEqual("")) .groupBy(df.col("espece")) .agg(Collections.singletonMap("*", "count")) .sort("espece") .show();

Page 25: Spark (v1.3) - Présentation (Français)

● Exploitation d’un DataFrame en SQL● Moteur d’exécution SQL : convertit les

requêtes en instructions de base

Spark SQL

Page 26: Spark (v1.3) - Présentation (Français)

● Comptage d’arbres par espèce

Spark SQL - Requêtage

sqlContext.sql("SELECT espece, COUNT(*) FROM tree WHERE espece <> '' GROUP BY espece ORDER BY espece") .show();

espece c1

Acacia dealbata 2

Acer acerifolius 39

Acer buergerianum 14

Acer campestre 452

Acer cappadocicum 111

...

Page 27: Spark (v1.3) - Présentation (Français)

Spark en cluster

Page 28: Spark (v1.3) - Présentation (Français)

Topologie & Terminologie

● Un master / des workers○ (+ un master en standby)

● On soumet une application● Exécution pilotée par un driver

Page 29: Spark (v1.3) - Présentation (Français)

Spark en cluster

Plusieurs options

● YARN● Mesos● Standalone

○ Workers démarrés individuellement○ Workers démarrés par le master

Page 30: Spark (v1.3) - Présentation (Français)

MapReduce● Spark (API)● Traitement distribué● Tolérant à la panne

Stockage● HDFS, base NoSQL...● Stockage distribué● Tolérant à la panne

Stockage & traitements

Page 31: Spark (v1.3) - Présentation (Français)

Colocation données & traitement

● “Data locality”● Traiter la donnée là où elle se trouve● Eviter les network I/Os

Page 32: Spark (v1.3) - Présentation (Français)

Colocation données & traitement

Spark Worker

HDFS Datanode

Spark Worker

HDFS Datanode

Spark Worker

HDFS Datanode

Spark Master

HDFS Namenode

HDFS Namenode (Standby)

SparkMaster

(Standby)

Page 33: Spark (v1.3) - Présentation (Français)

DémoSpark en cluster

Page 34: Spark (v1.3) - Présentation (Français)

Démo$ $SPARK_HOME/sbin/start-master.sh

$ $SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://MBP-de-Alexis:7077 --cores 2 --memory 2G

$ mvn clean package$ $SPARK_HOME/bin/spark-submit --master spark://MBP-de-Alexis:7077 --class com.seigneurin.spark.WikipediaMapReduceByKey --deploy-mode cluster target/pres-spark-0.0.1-SNAPSHOT.jar

Page 35: Spark (v1.3) - Présentation (Français)

Spark Streaming

Page 36: Spark (v1.3) - Présentation (Français)

Micro-batches

● Découpe un flux continu en batches● API identique● ≠ Apache Storm

Page 37: Spark (v1.3) - Présentation (Français)

DStream

● Discretized Streams● Séquence de RDDs● Initialisé avec une Duration

Page 38: Spark (v1.3) - Présentation (Français)

Window operations

● Fenêtre glissante● Réutilise des données d'autres fenêtres● Initialisé avec window length et slide interval

Page 39: Spark (v1.3) - Présentation (Français)

Sources

● Socket● Kafka● Flume● HDFS● MQ (ZeroMQ...)● Twitter● ...● Ou une implémentation custom de Receiver

Page 40: Spark (v1.3) - Présentation (Français)

DémoSpark Streaming

Page 41: Spark (v1.3) - Présentation (Français)

Démo de Spark Streaming

● Consommation de Tweets #Android○ Twitter4J

● Détection de la langue du Tweet○ Language Detection

● Indexation dans Elasticsearch● Analyse dans Kibana 4

Page 42: Spark (v1.3) - Présentation (Français)

$ curl -X DELETE localhost:9200$ curl -X PUT localhost:9200/spark/_mapping/tweets '{ "tweets": { "properties": { "user": {"type": "string","index": "not_analyzed"}, "text": {"type": "string"}, "createdAt": {"type": "date","format": "date_time"}, "language": {"type": "string","index": "not_analyzed"} } }}'

● Lancer ElasticSearch

Démo

● Lancer Kibana -> http://localhost:5601● Lancer le traitement