*Breaking New* Streams & 0.9.8: objectMode!

C’est tout frais ça vient de sortir dans la 0.9.8, le support des objets autre que des String ou des Buffer dans les streams !

Tiens pour changer je vais mettre les exemples en coffee-script 😉 (je n’aime pas particulièrement, mais c’est utile d’en voir de temps en temps, vue sa popularité il faut bien s’habituer).

Rappelez-vous, notre classe ArrayReadStream :

class ArrayReadStream extends stream.Readable
        constructor: (array) ->
                super {objectMode: true}
                @_array = array
                @_index = 0
        _read: (n, cb) ->
                process.nextTick =>
                        if @_index < @_array.length
                                cb null, @_array[@_index++]
                        else
                                cb null, null

Imaginons qu’on l’utilise pour un tableau d’objet (exemple typique du résultat qu’on aurait voulu avoir avec un client SQL) :

array = [{name: 'John Smith'}, {name: 'John Williams'}, {name: {first: 'John', last: 'Rambo'}}]
 
s = new ArrayReadStream array
 
s.pipe process.stdout

La nouveauté c’est l’option « objectMode«  qui permet d’activer le support des objets directs.

Jusqu’à la 0.9.7 comprise, Ce code ne fait rien. En effet, les données retournée par _read n’étant ni des Buffer ni des String la lecture s’arrête (sans erreur, ce que je trouve un peu moche d’ailleurs…).

En 0.9.8 en revanche ce code va planter :

[…]
TypeError: invalid data
    at WriteStream.Socket.write (net.js:565:11)
[…]

C’est parce que d’un coup process.stdout se prend dans la tête un truc qu’il ne sait pas écrire. Des objets autre qu’un Buffer ou une String ?

Créons une petite stream Writable qui fait du console.log pour être moins tatillon sur l’input :

class ConsoleLog extends stream.Writable
        constructor: ->
                super {}
        _write: (data, cb) ->
                process.nextTick ->
                        console.log typeof chunk, String(data), JSON.stringify(data)
                        cb()

Résultat du test en remplaçant process.stdout par new ConsoleLog :

object [object Object] {"name":"John Smith"}
object [object Object] {"name":"John Williams"}
object [object Object] {"name":{"first":"John","last":"Rambo"}}

On a bien nos objets, pas dénaturés, sans erreur. Very good news !

On peut évidemment coller des streams Transform au milieu, comme un « pluck » version stream object :

class StreamPluck extends stream.Transform
        constructor: (key, options) ->
                super options
                @_key = key
        _transform: (input, emit, done) ->
                emit input?[@_key]
                done()

Puis si on remplace s.pipe new ConsoleLog par s.pipe(new StreamPluck 'name').pipe(new ConsoleLog) on obtient :

object John Smith [74,111,104,110,32,83,109,105,116,104]
object John Williams [74,111,104,110,32,87,105,108,108,105,97,109,115]
object [object Object] {"first":"John","last":"Rambo"}

Observez comment nos String on bien été remplacées par des Buffer avant passage à la Writable stream. L’objet quant à lui n’a pas été dénaturé, c’est bien le comportement attendu :)

ATTENTION cette API n’est toujours pas stable et j’ai encore noté quelques bugs. Déjà, l’option « objectMode » agit comme si elle était toujours à true, quelque soit la valeur qu’on lui donne impossible de revenir au comportement de la version 0.9.7. Et ensuite dans la documentation rien n’est spécifié quant à sa valeur par défaut, on supposera que c’est false mais il y a un manque qui pour moi illustre clairement le fait que cette partie n’est pas (du tout) finie 😉

EDIT en fait l’option objectMode semble ne spécifier que le fait qu’on ignorera la valeur de « n ». Dans notre cas elle n’aurait donc aucun sens à être à false quand bien même on aurait un tableau de chaînes. On dira qu’elle est juste très mal nommée 😉

En tous cas maintenant plus rien n’empêche de créer des API vraiment streamable pour interroger des BDD ou se faire son ETL maison.

Laisser un commentaire