mercredi 28 janvier 2015

HDFS : un système de fichiers réparti

HDFS (Hadoop Distributed File System) est avant tout un système de fichier écrit en java. Pour pouvoir stocker les données structurées sur un périphérique, on utilise un format qui les représente sous la forme d'une succession de blocs de données : c'est le job principal d'un un système de fichiers. Contrairement aux autres systèmes de fichiers, HDFS ne fait pas de mise à jour de fichier.
Les systèmes de fichiers les plus courants sont ReFS, NTFS et FAT32 (pour les stockages Windows) ou bien ext3, ext4 (pour les stockages Linux), iso 9660 (cd) et udf (dvd).

HDFS en plus d’être un file system, il est distribué c’est-à-dire qu’il divise les données d’un fichier par paquet (64 Mo par défaut) et les stocke sur plusieurs machines. Il est toutefois possible de monter à 128 Mo, 256 Mo, 512 Mo voire 2 Go ce qui permet de réduire le seek time (temps d’accès à un bloc). À titre de comparaison, le système de gestion de fichiers Linux ext3 a une taille de blocs de 4 ou 8 Ko.

La distribution des données est une abstraction pour l’utilisateur. En effet, l’utilisateur ne perçoit pas la distribution des données car il accède aux fichiers HDFS de manière classique au travers d’une arborescence classique sous la forme répertoire/sous-répertoire/fichier.extension. Cependant, les données des fichiers sont bien réparties sur plusieurs nœuds pour plus de performance avec un mécanisme de réplication pour une tolérance aux pannes. Cette performance est d’autant plus meilleure que lorsqu’on travaille sur des fichiers de taille importante (100 Mo ou plus) (des millions de fichiers de grande taille plutôt que des milliards de fichiers de petite taille).


Il existe 3 composants principaux : Name NodeSecondary Name Node et Data Node


Name Node orchestre le processus de stockage des données. Il sauvegarde l'emplacement de tous les fichiers dans le système de fichiers local. Le client communique avec lui à chaque fois qu'il demande un fichier à travers des opérations de lecture et d'écriture. Il dispose entre autre d’un fichier edits contenant toutes les opérations effectuées dans le système. A l’aide du fuchier edits, le Name Node enregistre le moindre changement du système de fichiers. Si par exemple, un fichier est supprimé de HDFS, le Name Node le consigne immédiatement dans ce journal. Le fichier edits sert un peu comme le backup qui doit permettre à reconstruire l'état de fichiers si le système plante.
Le Name Node est également responsable de gérer la réplication des blocs des fichiers. Tout changement du facteur de réplication de l’un des blocs sera consigné par le Name Node dans le fichier de log edits. Il reçoit régulièrement un ping et un rapport sur les blocs (Bloclreport) de tous les Data Nodes (voir section Data Node) dans le cluster afin de s’assurer que les data nodes fonctionnent correctement. Un Bloclreport contient une liste de tous les blocs d’un Data Node. En cas de défaillance du data node, le Name Node choisit de nouveaux data nodes pour de nouvelles réplications de blocs de données.
Pour accélér l'accès aux informations des blocs d'un fichier (nom de fichier, répertoire, association block/fichier, position des blocks, nombre de réplicas, permissions, localisation d’un bloc…), le Name Node les stocke dans la mémoire RAM. Ces informations consomment environ 200 octets de RAM. Cela explique bien pourquoi Hadoop est plus efficace pour traiter un nombre limité de fichiers de grande taille plutôt qu’un grand nombre de fichiers de petite taille. Un fichier d’1 GO avec une taille de bloc à 128 Mo consomme 5 000 octets de RAM. Tandis que 1000 fichiers d’1 Mo consomment environ 600 000 octets de RAM.

Data Node peut être vu comme un nœud client (ou slave, aussi appelé worker). Il effectue les opérations demandées par name node. Il stocke chaque bloc de données HDFS dans des fichiers séparés dans son système de fichiers local et retrouve les blocks demandés par name node. Il apporte également au name node, un rapport contenant une liste des blocks stockés. Ce rapport est envoyé périodiquement pour signaler l’état de santé global de HDFS.


Secondary namenode joue le rôle de backup régulier de l'image de Name Node. Un check de l'état de name node est périodiquement effectué au cours duquel les fichiers de méta-données sont copiés : edits.... Le check intervient à un intervalle régulier appelé checkpoint (point de contrôle; par exemple une fois par heure). Les informations collectées ne sont donc pas forcément à jour. Secondary namenode n'est pas une réplication du name node principal et il peut contenir des données périmées suite à des opérations effectuées après la création de son point de contrôle. L'intervalle de création du checkpoint peut être précisé dans la propriété de configuration fs.checkpoint.period.

jeudi 22 janvier 2015

Introduction HBase Shell & HBase Java API

HBase : Architecture


Comme évoqué dans l’article HBase : Un Système de Base de Données Orientée Colonne, les tables HBase sont constituées d’un ensemble de familles de colonnes (regroupements logiques de colonnes) spécifiées lors de leur création. Chaque Column Family est composée d’un un nombre arbitraire de colonnes pouvant être ajoutées après la création de la table, à n’importe quel moment. Chaque ligne est identifiée par une clé. Une clé est un tableau d’octets ce qui fait que théoriquement toute donnée peut servir de clé. Les tables trient les entrées en fonction de leurs clés.
Pour répartir les données (dans le but d’équilibrer les charges) sur plusieurs nœuds d’un cluster, les lignes (identifiées par Row ID et ses Column Family) des tables sont partitionnées en plusieurs parties appelées des regions. Lors de la création d’une table, une seule region est créée, elle est ensuite automatiquement divisée en sous-parties lorsque sa taille atteint un seuil limite configurable. Chaque region est placée sur un nœud appelé region server



Master Server :

La tâche du serveur maître consiste à assigner les régions aux serveurs de régions enregistrés. Il maintient l’état du cluster et assure l’équilibre de charge entre les serveurs régions. Il réassigne automatiquement les régions d’un serveur de région défaillant et s’occupe de tous les changements du schéma de données. Pour accéder aux données, le client ne passe pas directement par le serveur maître mais plutôt par le serveur de région (Region Server).

Region Server

Les serveurs de région sont responsables des requêtes d’écriture et de lecture du client. Une communication est régulièrement établie entre le serveur maître et les serveurs de région. Cela permet aux serveurs de région de connaitre la liste des régions qu’ils gèrent et permet au serveur maître de savoir si un serveur de région est défaillant ou non. Lorsqu’une région d’un serveur de région nécessite d’être partitionnée (si la taille dépasse une limite fixée), ce dernier communique avec le maître pour lui signifier qu’une nouvelle région doit être créée. Si un serveur de région n’est plus joignable, il est automatiquement remplacé. Cette tâche est confiée à un autre service : ZooKeeper

ZooKeeper

Le service ZooKeeper est chargé de maintenir les informations du cluster en cas de défaillance. Le serveur maître et les serveurs de région sont tous enregistrés auprès du Zookeeper et si l’un d’eux subit une défaillance, Zookeeper se chargera de le faire réparer ou de le remplacer.

Modes d'exécution

HBase est framework avec une architecture distribuée. Cependant, son utilisation en mode non distribué est également possible. Il exite trois modes d'exécution : autonome (standalone), pseudo-distribué (pseudo-distributed) et distribué (fully distributed).

Le fonctionnement par défaut est configuré en mode autonome. Il suffit de télécharger le zip, le décompresser et mettre la variable d'environnement JAVA_HOME en place. Le fonctionnement en mode distribué nécessite l'installation d'Hadopp (pour l'utilisation du système de fichier HDFS) et la modification des fichiers de configuration (répertoire conf).

En mode « Standalone », HBase ne fait pas usage du système de fichiers HDFS. Il utilise plutôt le système de fichiers de la machine locale et tous les démons liés à Hbase sont installés en local (ZooKeeper, …) et s'exécutent sur la même JVM. Bien que ce mode soit très facile à installer, il est très peu performant du fait qu’il ne tire aucunement profit de la force du nombre que fournit HDFS. Ce mode est donc conseillé pour les développeurs qui en sont à leurs premiers pas avec HBase et qui désirent comprendre et jouer avec le système.

En mode pseudo-distribué, Hbase utilise le système de fichier Hadoop. Il est donc indispensable d'installer Hadoop. Comme en mode autonome, tous les démons sont installés sur la même machine et au sein de la même JVM. Le mode distribué nécessite au moins 2 machines afin de répartir les démons. Les deux modes pseudo-distribué et distribué nécessitent des configurations supplémentaires (hbase-site.xml).

Exemples Simples : HBase Shell & JAVA API

CREATE : création de table  :




static final String USERS_TABLE_NAME = "users";
    static final String USERS_COLUMN_FAMILY_INFO = "info";
    static final String USERS_COLUMN_FAMILY_PROFESSION = "profession";

    public static void main(String[] args) throws Exception {

        //configuration de la machine locale
        //Possibilité de renseigner pour une config distante :
        // conf.set("hbase.master", "192.168.15.20:60000");
        // conf.set("hbase.zookeeper.quorum", "192.168.15.20");
        //conf.set("hbase.zookeeper.property.clientPort","2181");
        // or  conf.addResource(new Path("/home/user/hbase/core-site.xml"));  conf.addResource(new Path("/home/user/hbase/hbase-site.xml"));
        Configuration conf = HBaseConfiguration.create();

        HBaseAdmin admin = new HBaseAdmin(conf);

        HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(USERS_TABLE_NAME));

        tableDescriptor.addFamily(new HColumnDescriptor(USERS_COLUMN_FAMILY_INFO));
        tableDescriptor.addFamily(new HColumnDescriptor(USERS_COLUMN_FAMILY_PROFESSION));

        admin.createTable(tableDescriptor);
        System.out.println("tableAvailable = " + admin.isTableAvailable(USERS_TABLE_NAME));

    }

ALTER : Pour modifier la structure des tables ....

Modifions le nombre de version associé à la colunm family profession




static final String USERS_TABLE_NAME = "users";
    static final String USERS_COLUMN_FAMILY_PROFESSION = "profession";

    public static void main(String[] args) throws Exception {

        //configuration de la machine locale
        //Possibilité de renseigner pour une config distante :
        // conf.set("hbase.master", "192.168.15.20:60000");
        // conf.set("hbase.zookeeper.quorum", "192.168.15.20");
        //conf.set("hbase.zookeeper.property.clientPort","2181");
        // or  conf.addResource(new Path("/home/user/hbase/core-site.xml"));  conf.addResource(new Path("/home/user/hbase/hbase-site.xml"));
        Configuration conf = HBaseConfiguration.create();

        HBaseAdmin admin = new HBaseAdmin(conf);

        HTableDescriptor hTableDescriptor = admin.getTableDescriptor(USERS_TABLE_NAME.getBytes());
        HColumnDescriptor[] families=hTableDescriptor.getColumnFamilies();
        for (  HColumnDescriptor h : families) {
            System.out.println("Column Family = " + h.getNameAsString());
            System.out.println("Max version = " + h.getMaxVersions());
        }

        admin.disableTable(USERS_TABLE_NAME);
        HColumnDescriptor professionCF = new HColumnDescriptor(USERS_COLUMN_FAMILY_PROFESSION);
        professionCF.setMaxVersions(5);
        admin.modifyColumn(USERS_TABLE_NAME, professionCF); // modifying existing ColumnFamily
        admin.enableTable(USERS_TABLE_NAME);
        admin.flush(USERS_TABLE_NAME);
        hTableDescriptor = admin.getTableDescriptor(USERS_TABLE_NAME.getBytes());
        families=hTableDescriptor.getColumnFamilies();
        for (  HColumnDescriptor h : families) {
            System.out.println("Column Family = " + h.getNameAsString());
            System.out.println("Max version = " + h.getMaxVersions());
        }

    }

PUT : L’opération PUT (insert dans le mode SQL) permet de sauvegarder/insérer les données dans  une table HBase créée préalablement avec l'opération CREATE.

Insertion d’une ligne identifiée par 001E : Ajout d’une nouvelle colonne (type) à la Column Family (profession) avec comme valeur Etudiant


On ajoute l’université du user 001E. Cette colonne appartient également à la Column Family profession :


Ajout d’un nouvel user avec comme identifiant 001S, type de profession Ingénieur, nom de Column Family info, prenom, age ...



Ajout successif de l’entreprise pour user 001S. Toutes les valeurs écrasées sont versionnées selon le nombre de version de la Column Family profession :



static final String USERS_TABLE_NAME = "users";
    static final String USERS_COLUMN_FAMILY_INFO = "info";
    static final String USERS_COLUMN_FAMILY_PROFESSION = "profession";
    static final String ROW_ETUDIANT_1 = "001E";
    static final String ROW_INGENIEUR_1 = "001S";
    static final String CQ_NAME = "nom";
    static final String CQ_PRENOM = "prenom";
    static final String CQ_TYPE = "type";
    static final String CQ_ENTREPRISE = "entreprise";
    static final String CQ_UNIVERSITE = "universite";
    static final String CQ_AGE = "age";

    public static void main(String[] args) throws Exception {

        //configuration de la machine locale
        //Possibilité de renseigner pour une config distante :
        // conf.set("hbase.master", "192.168.15.20:60000");
        // conf.set("hbase.zookeeper.quorum", "192.168.15.20");
        //conf.set("hbase.zookeeper.property.clientPort","2181");
        // or  conf.addResource(new Path("/home/user/hbase/core-site.xml"));  conf.addResource(new Path("/home/user/hbase/hbase-site.xml"));
        Configuration conf = HBaseConfiguration.create();

        HTable table = new HTable(conf, USERS_TABLE_NAME);

        Put putEtudiant = new Put(Bytes.toBytes(ROW_ETUDIANT_1));
        putEtudiant.add(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_TYPE), Bytes.toBytes("Etudiant"));
        table.put(putEtudiant);

        putEtudiant = new Put(Bytes.toBytes(ROW_ETUDIANT_1));
        putEtudiant.add(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_UNIVERSITE), Bytes.toBytes("ENI"));
        table.put(putEtudiant);

        Put putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_TYPE), Bytes.toBytes("Ingénieur"));
        table.put(putIngenieur);

        putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_ENTREPRISE), Bytes.toBytes("Larbo Technologie"));
        table.put(putIngenieur);

        putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_INFO), Bytes.toBytes(CQ_NAME), Bytes.toBytes("Dupond"));
        table.put(putIngenieur);

        putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_INFO), Bytes.toBytes(CQ_PRENOM), Bytes.toBytes("Jean"));
        table.put(putIngenieur);

        putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_INFO), Bytes.toBytes(CQ_AGE), Bytes.toBytes("23"));
        table.put(putIngenieur);

        putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_ENTREPRISE), Bytes.toBytes("Entreprise 1"));
        table.put(putIngenieur);

        putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_ENTREPRISE), Bytes.toBytes("Entreprise 2"));
        table.put(putIngenieur);

        putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_ENTREPRISE), Bytes.toBytes("Entreprise 3"));
        table.put(putIngenieur);

        putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_ENTREPRISE), Bytes.toBytes("Entreprise 4"));
        table.put(putIngenieur);

        putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_ENTREPRISE), Bytes.toBytes("Entreprise 5"));
        table.put(putIngenieur);

        putIngenieur = new Put(Bytes.toBytes(ROW_INGENIEUR_1));
        putIngenieur.add(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_ENTREPRISE), Bytes.toBytes("Entreprise 6"));
        table.put(putIngenieur);

        table.flushCommits();
        table.close();

    }

 GET : la lecture de données s’effectue grâce à l’opération GET (select). Elle permet de lire les valeurs d’une ligne d’une ou plusieurs familles de colonne.

Recherche des valeurs par identifiant :



On peut afficher les valeurs précédentes de la colonne Entreprise liée à la column family profession pour user 001S :





import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.List;

public class GetUser {

    static final String USERS_TABLE_NAME = "users";
    static final String USERS_COLUMN_FAMILY_PROFESSION = "profession";
    static final String ROW_INGENIEUR_1 = "001S";
    private static final String TABS = "\t\t\t\t";

    public static void main(String[] args) throws Exception {

        //configuration de la machine locale
        //Possibilité de renseigner pour une config distante :
        // conf.set("hbase.master", "192.168.15.20:60000");
        // conf.set("hbase.zookeeper.quorum", "192.168.15.20");
        //conf.set("hbase.zookeeper.property.clientPort","2181");
        // or  conf.addResource(new Path("/home/user/hbase/core-site.xml"));  conf.addResource(new Path("/home/user/hbase/hbase-site.xml"));
        Configuration conf = HBaseConfiguration.create();

        HTable table = new HTable(conf, USERS_TABLE_NAME);

        Get get = new Get(Bytes.toBytes(ROW_INGENIEUR_1));
        get.addFamily(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION));
        Result result = table.get(get);
        List<Cell> cells = result.listCells();
        for (Cell cell : cells) {
            System.out.printf("%s%scolumn=%s:%s, timestamp=%d, value=%s\n",
                    Bytes.toString(CellUtil.cloneRow(cell)),
                    TABS,
                    Bytes.toString(CellUtil.cloneFamily(cell)),
                    Bytes.toString(CellUtil.cloneQualifier(cell)),
                    cell.getTimestamp(),
                    Bytes.toString(CellUtil.cloneValue(cell)));
        }

        table.close();
    }
}

SCAN :  c'est l’opération la plus importante. Elle permet de faire de recherche, de filtrer les données par :
-          les valeurs de clé,
-          les noms de familles de colonnes,
-          les nomes de colonnes,
-          les valeurs  des colonnes
-          le numéro de version (timestamp) associés aux colonnes.


Le SCAN de base est équivalent à la requête SQL : select * from users


On peut effectuer une recherche de tous les users dont l’identifiant est préfixé par 002




 static final String  USERS_TABLE_NAME  = "users";
    static final String  SATART_WITH    = "\"002\"";
 private static final String TABS    = "\t\t\t\t";

 public static void main( String[] args ) throws Exception
 {

  // configuration de la machine locale
  // Possibilité de renseigner pour une config distante :
  // conf.set("hbase.master", "192.168.15.20:60000");
  // conf.set("hbase.zookeeper.quorum", "192.168.15.20");
  // conf.set("hbase.zookeeper.property.clientPort","2181");
  // or conf.addResource(new Path("/home/user/hbase/core-site.xml")); conf.addResource(new Path("/home/user/hbase/hbase-site.xml"));
  Configuration conf = HBaseConfiguration.create();

  HTable table = new HTable( conf, USERS_TABLE_NAME );

  byte[] prefix = SATART_WITH.getBytes();
  Scan scan = new Scan();
  // scan.setStartRow("002".getBytes());
  // scan.setStopRow("003".getBytes());
  PrefixFilter prefixFilter = new PrefixFilter( prefix );
  scan.setFilter( prefixFilter );

  ResultScanner resultScanner = table.getScanner( scan );
  Result result = resultScanner.next();
  while ( result != null )
  {

   List<Cell> cells = result.listCells();
   for ( Cell cell : cells )
   {
    System.out.printf( "%s%scolumn=%s:%s, timestamp=%d, value=%s\n", Bytes.toString( CellUtil.cloneRow( cell ) ), TABS,
      Bytes.toString( CellUtil.cloneFamily( cell ) ), Bytes.toString( CellUtil.cloneQualifier( cell ) ),
      cell.getTimestamp(), Bytes.toString( CellUtil.cloneValue( cell ) ) );
   }
   result = resultScanner.next();
  }

  table.close();
 }

DELETE : permet de supprimer le contenu d’un enregistrement, d’une famille de colonnes, d’une version d’une colonne ou des toutes les versions d’une colonne.

Suppression de toutes les valeurs des versions de la column family profession pour user 001S.


Suppression de la column family profession de la table users :




 static final String USERS_TABLE_NAME = "users";
    static final String USERS_COLUMN_FAMILY_PROFESSION = "profession";
    static final String ROW_INGENIEUR_1 = "001S";
    static final String CQ_ENTREPRISE = "entreprise";

    public static void main(String[] args) throws Exception {

        //configuration de la machine locale
        //Possibilité de renseigner pour une config distante :
        // conf.set("hbase.master", "192.168.15.20:60000");
        // conf.set("hbase.zookeeper.quorum", "192.168.15.20");
        //conf.set("hbase.zookeeper.property.clientPort","2181");
        // or  conf.addResource(new Path("/home/user/hbase/core-site.xml"));  conf.addResource(new Path("/home/user/hbase/hbase-site.xml"));
        Configuration conf = HBaseConfiguration.create();

        HTable table = new HTable(conf, USERS_TABLE_NAME);

        //création du composant pour la suppression d'une ligne spécifique : 001S
        Delete delete = new Delete(Bytes.toBytes(ROW_INGENIEUR_1));
        //On peut spécifier la version de la valeur associée à la colonne à supprimer
        delete.setTimestamp(1421608776178l);

        //On peut spécifier la version de la cellule à supprimer
        delete.deleteColumn(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_ENTREPRISE), 1421608776178l);

        //suppression de toutes les versions suppérieures ou égales 1421608776178
        delete.deleteColumns(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), Bytes.toBytes(CQ_ENTREPRISE), 1421608776178l);

        //suppression de toutes les valeurs d'une column family avec toutes les versions
        delete.deleteFamily(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION));

        //suppression de toutes les valeurs d'une column family dont la version est supérieure ou égale 1421608776178
        delete.deleteFamily(Bytes.toBytes(USERS_COLUMN_FAMILY_PROFESSION), 1421608776178l);

        table.delete(delete);
        table.close();
    }