Un point sur les streams

Les streams, c’est l’API de gestion de flux de données de Node.js.

Cette API est très élégante car elle mime le fonctionnement des flux à la Unix, avec la notion de « pipe », permettant de définir des éléments de code unitaires qui se contentent de recevoir et/ou publier un type de données.

Imaginons qu’on souhaite faire une requête SQL sur un annuaire, effectuer quelques transformations (mettre le nom en majuscule), puis écrire le tout dans un fichier CSV.

Avec l’API asynchrone standard on pourrait imaginer quelque chose comme ça (évidemment il ne s’agit pas d’API existantes exactement, c’est juste une idée) :

var csv = CSVFile('fichier.csv');
query('SELECT nom, prenom, telephone FROM annuaire', function (rows) {
  rows
    .map(function (row) {
      row.nom = row.nom.toUpperCase();
      return row;
    })
    .forEach(function (row) {
      csv.write(row);
    });
});

Et maintenant, la même avec les streams :

query('SELECT nom, prenom, telephone FROM annuaire')
  .pipe(streamMap(function (row) {
    row.nom = row.nom.toUpperCase();
    return row;
  }))
  .pipe(CSVWriter('fichier.csv'));

C’est à la fois plus court, et plus lisible.

Ça ressemble furieusement à ce qu’on ferait en shell : `select_annuaire | nom_majuscule | csv_append ‘fichier.csv’`.

Note : les promesses auraient pu représenter un bon compromis entre les deux, mais ce n’est pas le sujet ici 😉

Les streams jusqu’à présent

On dispose en standard de quelques streams :

  • Le module « fs » propose fs.createReadStream et fs.createWriteStream qui permettent de « streamer » du contenu vers ou depuis un fichier.
  • Le module TTY intègre aussi des streams pour gérer les entrées sorties du terminal.
  • Les objets « request » et « response » dans le callback des sockets (et notamment le serveur HTTP) sont des streams.

Il y a aussi quelques modules qui proposent une API Stream, notamment Mongoose (client MongoDB), ou ya-csv (pour le CSV), mais ils sont finalement assez peu nombreux.

Pourquoi si peu d’offre ?

Quand on voit ces « trous », on a envie de créer ses propres API basées sur les streams. Et là on a la réponse : écrire une classe de stream est une torture !

Node n’offre aucun outil, il faut tout (à part « pipe() », ils sont sympa) implémenter soi-même. Impossible de trouver un guide correct, un vrai désert.

L’enfer, c’est les streams

Un exemple con, supposons qu’on veuille créer une API de conversion de tableaux en streams. Objectif : new ArrayReadStream(array) crée une « readable stream » sur les entrées du tableau. On va s’épargner la version « writable », on aura compris avec un seul exemple :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
var Stream = require('stream');
 
module.exports = ArrayReadStream;
 
function ArrayReadStream (array) {
  Stream.call(this);
 
  this._array = array;
  this._ended = false;
  this._index = 0;
  this._paused = false;
 
  this.on('error', function () {
    this._ended = true;
  });
 
  process.nextTick(_forward.bind(this));
}
 
function _forward () {
  if (this._ended) return;
  if (!Array.isArray(this._array)) {
    this.emit('error', new Error('Array expected'));
  } else if (this._index >= this._array.length) {
    this._ended = true;
    this.emit('end');
  } else {
    this.emit('data', new Buffer(String(this._array[this._index++])));
    if (!this._paused) {
      process.nextTick(_forward.bind(this));
    }
  }
}
 
ArrayReadStream.prototype = {
  __proto__: Stream.prototype,
  get readable () {
    return Array.isArray(this._array) && this._array.length > this._index + 1;
  },
  resume: function () {
    this._paused = false;
    _forward.call(this);
  },
  setEncoding: function () {
    // Not implemented
  },
  pause: function () {
    this._paused = true;
  },
  destroy: function () {
    this._ended = true;
    this.emit('close');
  }
};

Ouch. Devoir implémenter pause(), resume(), etc… alors que la plupart relève d’une logique qui est toujours la même, est une vraie plaie. Voilà quand-même une documentation qui essaie de dédramatiser cette situation 😉

Note : il est vrai qu’on peut grandement simplifier ce code car la plupart des méthodes sont en fait facultatives. Oui oui, vous m’avez bien compris, dans Node 0.8.x quand on manipule une Stream on n’est même pas assuré que la méthode pause() existe. L’utilisateur devrait tester les méthodes existantes avant d’utiliser une Stream ? On a le pire des deux mondes : une API à la fois complexe à implémenter pour le développeur de module, et sans la moindre garantie d’homogénéité pour l’utilisateur.

On comprend mieux pourquoi peu d’auteurs s’amusent à intégrer une API « streamable » à leurs modules.

Episode 4 : un nouvel espoir

Isaac Z. Schlueter a bien compris que cette problématique était handicapante pour l’adoption des streams, et a rapidement publié le module readable-stream qui permet d’avoir une base de travail pour la création de streams.

Notre précédent exemple deviendrait alors :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var Readable = require('readable-stream');
 
module.exports = ArrayReadStream;
 
function ArrayReadStream (array) {
  Readable.call(this);
  this._array = array;
  this._index = 0;
}
 
ArrayReadStream.prototype = {
  __proto__: Readable.prototype,
  _read: function (n, cb) {
    if (!Array.isArray(this._array)) {
      cb(new Error('Array expected'));
    } else if (this._index >= this._array.length) {
      cb(null, null);
    } else {
      cb(null, new Buffer(String(this._array[this._index++])));
    }
  }
}

C’est déjà beaucoup plus simple.

On voit à quel point le besoin de faire évoluer le core pour intégrer une API similaire – permettant enfin aux développeurs d’implémenter leurs propres streams sans risquer l’incident cardiaque – est évident. Côté utilisateurs de modules, on a aussi la garantie que le prototype de ces Streams est complet, stream.pause() n’a aucune chance de nous péter un « not a function » dans la tronche.

L’intégration au core

La nouvelle API de streams est intégrée au core à partir de la version 0.9.4, et représentera donc un changement majeur de la release stable 0.10.0 qui arrivera peu après.

La documentation du module « stream » (ici pour la 0.9.6) est bien différente de l’original, et comporte notamment une importante note sur la compatibilité (puisque la compatibilité ascendante n’est pas totalement assurée).

L’API Readable

Elle est extrêmement similaire au module « readable-stream » original, d’ailleurs notre code ne changera pas, à part le première ligne :

1
var Readable = require('stream').Readable;

Il y a plusieurs manières d’attaquer une Readable Stream :

  • Avec un pipe, le plus simple et de loin la méthode la plus conseillée : readable.pipe(writable).
  • « À l’ancienne » avec l’évènement « data » : readable.on('data', function (chunk) { … }), mais il vaut mieux penser à appeler readable.resume() manuellement pour être sûr qu’on est bien en « old mode » (dans mes tests ça a fonctionné avec ou sans).
  • Avec la nouvelle méthode read et l’évènement « readable ». La documentation n’est pas forcément au top (elle manque beaucoup d’exemples), donc voici les points importants :
    • read(0) va déclencher la lecture mais ne jamais rien retourner (qu’il y ait ou non de la donnée disponible). C’est un peu l’équivalent de resume(), mais avec une différence subtile : l’appel à resume() provoquera un premier évènement « readable » même s’il n’y a aucune donnée disponible (read() retournera null).
    • L’évènement « readable » est lancé à chaque fois qu’il y a de la donnée disponible, de manière générale (entre temps les Buffer passés au callback de _read() sont concaténés dans un buffer interne). Notre prochain appel à read() retournera le buffer interne. Un appel à read(N) ne retournera que les N premiers octets, l’appel suivant retournera les N octets suivants, et ainsi de suite jusqu’à null.
    • Une fois que la lecture est terminée, on retrouve notre bon vieil évènement « end ».
    stream.on('readable', function () {
      // some data is available
      var buffer = this.read(); // grab the full buffer
      // alternative: read chunk by chunk
      var chunk;
      while (chunk = this.read(42)) {
        console.log('42 bytes', String(chunk));
      }
    });
    stream.on('end', function () {
      console.log('EOF');
    });
    stream.read(0); // force-start the reading
Attention au code synchrone !

Une règle d’or est à suivre quand on écrit des streams : TOUJOURS ASYNCHRONE.

Notre exemple Array2Stream est intéressant pour ça car par défaut la méthode _read est complètement synchrone. Bug ou comportement normal (je soumettrai l’issue), mais en l’état l’évènement « readable » n’est jamais émis (et il n’est pas simplement émis avant qu’on ne l’écoute, mais emit('readable') n’est vraiment jamais appelé). Du coup on doit faire directement un while (chunk = stream.read()) pour récupérer chaque item du tableau. En revanche on('data') et pipe() continuent de fonctionner.

Il suffit d’inclure le code de notre fonction _read() dans un process.nextTick et tout roule :

  _read: function (n, cb) {
    process.nextTick((function () {}).bind(this));
  }

Il s’agit évidemment de la conjoncture de cas un peu particuliers (une stream synchrone, qu’on ne souhaite lire ni avec « data » ni avec un pipe), mais ça peut arriver, autant y penser :)

Résumé des méthodes de lecture

Une fois que tout marche, voici quelques exemples donnant le même résultat :

var stream = new ArrayReadStream(['john', 'bob', 'arnold']);
 
// pipe
stream.pipe(process.stdout);
 
// old mode
stream.on('data', function (chunk) {
  process.stdout.write(chunk);
});
stream.resume();
 
// new mode
stream.on('readable', function () {
  process.stdout.write(this.read());
});
stream.read(0);
 
// all methods display "johnbobarnold"

L’API Writable

Je ne l’ai pas abordé, mais écrire une Stream Writable était encore pire qu’une Readable. Maintenant ça se passe assez facilement, sur le même modèle que les Readable on n’a qu’une méthode à implémenter et tout roule :)

Voici notre classe ArrayWriteStream :)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var Writable = require('stream').Writable;
 
function ArrayWriteStream () {
  Stream.Writable.call(this);
  this._array = [];
}
 
ArrayWriteStream.prototype = {
  __proto__: Stream.Writable.prototype,
  _write: function (chunk, cb) {
    process.nextTick((function () {
      this._array.push(chunk);
      cb();
    }).bind(this));
  },
  get array () {
    return this._array;
  }
}

Et un exemple trivial d’utilisation :

var stream = new ArrayWriteStream();
 
stream.write('john');
stream.write('bob');
stream.end('arnold');
 
stream.on('finish', function () {
  console.error(stream.array.map(String)); // [ 'john', 'bob', 'arnold' ]
});

La seule chose à noter est le nouvel évènement « finish », plus sûr que « close ». Ce dernier fait référence à la fermeture d’une éventuelle ressource interne et est facultatif ; alors que « finish » est toujours émis après end() et dès qu’il n’y a plus rien à écrire (= tous les callbacks des appels à _write ont été appelés).

Souhaitons la bienvenue aux nouveaux ! Duplex, Transform, et PassThrough

Duplex prend à sa charge la question de l’héritage multiple entre Readable et Writable, et permet de créer des Stream à la fois Writable et Readable. Il suffit d’hériter de Duplex, et évidemment d’implémenter _read et _write et votre stream pourra porter les deux rôles simultanément.

Quand a-t-on besoin de Stream Duplex ? On peut implémenter un équivalent de la commande Unix tee. Mais évidemment on pensera à la notion de transformation de données dans le flux. C’est ce que faisait notre fonction fictive « streamMap » du tout début.

Transform est là pour aider à gérer ce cas particulier de Duplex : au lieu d’implémenter _read (sortie des données transformées) et _write (réception des données à transformer), on va implémenter une seule méthode _transform.

PassThrough est une implémentation de Transform, qui se content de transmettre les chunks tels quels. C’est notre élément neutre.

Il devient trivial d’écrire notre fameuse fonction streamMap qui prend en paramère une fonction synchrone de transformation de buffer, et retourne une Stream Writable et Readable à placer entre deux pipe() :)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var Transform = require('stream').Transform;
 
function streamMap (transform) {
  var stream = new Transform();
  stream._transform = function (input, emit, done) {
    var result;
    try {
      result = transform(input);
      emit(result);
    } catch (e) {
      this.emit('error', e);
    }
    done();
  }
  return stream;
}

En combinant nos précédents exemples, on peut afficher les noms de notre tableau en majuscule :

new ArrayReadStream(['john', 'bob', 'arnold'])
  .pipe(streamMap(function (chunk) {
    return (chunk && chunk.length) ? String(chunk).toUpperCase() : null;
  }))
  .pipe(process.stdout);
 
// Display: "JOHNBOBARNOLD"

On est d’accord, c’est beaucoup pour un simple array.map(String).join('').toUpperCase(), mais c’est pour la beauté du geste 😉

Conclusion

Voilà, vous avez maintenant tout pour jouer avec les Stream. On a enfin à partir de la 0.9.4 une base utilisable pour régler tous les processus assimilables à de l’ETL.

Les flux étaient censés être des acteurs « first-class » de l’API, mais jusqu’à présent c’était complètement raté à cause de la difficulté démentielle pour l’implémentation de nouvelles Streams.

Le seul inconvénient selon moi est le fait de limiter les streams aux Buffers. Dès qu’on essaie de sortir des sentiers battus on rencontre de solides blocages. Par exemple un simple write(42) ou write({name: "John"}) bloquera une stream Writable (plus aucun évènement ne sera lancé, je n’ai pas encore compris pourquoi). Ça peut être des bugs, quoi qu’il en soit il est clairement stipulé dans la doc qu’on est censé manipuler des String ou des Buffer, point.

En tous cas, Buffer ou pas, le premier qui me sort un module de requêtage de base de données sans API Stream, je lui tire les oreilles 😛 aucune excuse pour ne pas au moins proposer cette alternative à vos utilisateurs (quite à émettre du JSON), maintenant que c’est si simple à mettre en place.

6 réflexions au sujet de « Un point sur les streams »

  1. Nicolas Froidure

    Merci pour ‘larticle. J’avoue que je me suis senti un peu seul quand j’ai créé le module VarStream, il y avait peu de modules sur lesquels s’inspirer et je ne suis toujours pas certains d’avoir bien fait mes devoirs :). Je vais voir pour convertir le module.

    Ce qui m’amène à une question. Comment on gère la compatibilité des modules ? On update juste le packege.json et ça marche out of the box ?

    Répondre
    1. naholyr Auteur de l’article

      Tu veux parler de la compatibilité avec la version de Node ? Si c’est bien ça tu peux définir la clé engines dans ton package.json, par exemple si ton module se base sur la nouvelle API Stream :

      {
        "engines": {"node": ">=0.9.4"}
      }

      Je me demande si ceux qui font ensuite un « npm install » de ton module avec une version de Node inférieure vont se faire jeter ou récupérer la dernière version compatible. Je vais tester 😉

      Répondre
      1. naholyr Auteur de l’article

        OK donc j’ai testé, imaginons que tu corriges un bug tu publish « 1.0.x » (avec « engines »: »node<0.9.4″) puis tu publish « 2.0.x » (la version avec la nouvelle API stream donc « engines »: »node>=0.9.4″), celui qui fera son npm install depuis un Node 0.8 se fera envoyer bouler.

        Il faudra indiquer dans le README qu’il faut faire un « npm install pkg@1 » 😉

        Répondre

Laisser un commentaire