These are notes based on the Frontend Masters course by James Holiday (aka Substack). Link here
A payload is often broken up in multiple packets. Which you can receive out of order.
Language that computes programs speak to each other, examples of protocols:
Most services have (often) one or many default ports. A computer can have many services, ports differentiate between the services on a system. (range 1 - 65535). We can have a service listen to any port, but we have custom, default assignments.
By default, systems can only listen to ports below 1024 as the root user.
nc -l 5000
nc localhost 5000
key: value
, colon separated, space not mandatory $ nc google.com
GET / HTTP/1.0
HOST: google.com --> A server might serve multiple domains, or LB.
VERB PATH HTTPVERSION
HEADERS ...
BODY ...
and the response like
HTTPVERSION STATUSCODE STATUSMESSAGE
HEADERS ...
BODY ...
Taken the following response
HTTP/1.1 200 OK
Date: Mon, 12 Jan 2015 06:37:51 GMT
Connection: keep-alive
Transfer-Encoding: chunked <- Gonna send body in chunks, send links for chunks. Server doesn't know in advance how long the response will be.
3 <-- Hex value of the cuncked part
oi <-- payload
4 <-- Hex value of the cuncked part
ok <-- payload
0 <-- End, finished
$ curl -s http://substack.net <-- Do GET and print body
$ curl -i http://substack.net <-- Print body and headers
$ curl -I http://substack.net <-- Print only headers
# -s gets rid of progress output
# Use -X ti set the HTTP VERB and -d for form paramters
$ curl -X POST http://substack.net
$ curl -X POST http://substack.net -d title=whatever -d date=10000
# You can set headers with the -H flag
$ curl http://substack.net -H 'Content-Type: application/json'
.
and new line.Another text based protocol. So HTTP, SMTP and IRC are all plain text protocols that just follow a certain text layout and a port to listen on.
$ sudo tcpdump -X --> start listening
;$ sudo tcpdump -A --> start listening with other formatting
;$ sudo tcpdump 'tcp port 80' '-X -> string contains a query/filter
;Node.js has a handy interface for shuffling data around called streams.
Stream Origins
We should have some ways of connecting programs like garden hose screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also. Doug McIlroy, October 11, 1964
Thinks also of how we pipe in *nix systems between programs.
Why Streeams?
Composition
Just like how in unix we can pipe commands together, we can pipe streams together
$ cat file > jq -name '.age'> ...
a nodejs equivalent what $ cat
does.
const fs = require('fs');
fs.createReadStream(process.argv[2])
.pipe(process.stdout);
const fs = require('fs');
const through = require('through2');
fs.createReadStream(process.argv[2])
.pipe(through(toUpper))
.pipe(process.stdout);
function toUpper(buf, enc, next){
// buf is a binary description of the data
// Output should be a buffer or string
next(null, buf.toString().toUpperCase())
}
const through = require('through2');
process.stdin
.pipe(through(toUpper))
.pipe(process.stdout);
function toUpper(buf, enc, next){
// buf is a binary description of the data
// Output should be a buffer or string
next(null, buf.toString().toUpperCase())
}
const { Transform } = require('stream');
const toUpper = new Transform({
transform: function(buf, enc, next) {
next(null, buf.toString().toUpperCase())
}
// ... and other hooks
})
process.stdin
.pipe(toUpper)
.pipe(process.stdout);
flush
is what happens when a stream finishes.
With through there are 2 parameters: write
and end
.
Both are optional.
through(write, end)
function write (bug, enc, next) {}
function end () {}
Call next()
when you’re ready for the next chunk.
If you don’t call next()
, your stream will hang!
Call this.push(VALUE)
inside the callback to put VALUE into the stream’s output.
Use a VALUE
of NULL
to end the stream. This can happen when you need to buffer a certain amount of bytes before you
can do something. If you need 100 Byte, you keep doing this.push(VALUE)
on each chunk received and just call next()
until your buffer size is big enough and take proper actions.
npm install concat-stream
Concat-steam buffers up all the data in the stream:
const concat = require('concat-stream');
process.stdin.pipe(concat(function( body) {
console.log(body.length);
}))
You can only write to a concat-stream, You can’t read from a concat-stream. Keep in mind that all data will be in memory.
GOOD TO KNOW : When you are listening to a STDIN, it will keep taking input till it receives a CTRL + D
.
const concat = require('concat-stream');
const through = require('through2');
const http = require('http');
const qs = require('querystring');
const SIZE_LIMIT= 20;
var server = http.createServer(function(req, res) {
req
.pipe(counter())
.pipe(concat({encoding: 'string'}, onBody));
function counter() {
var size = 0;
return through(function(buf, enc, next) {
size += buf.length;
if(size > SIZE_LIMIT) {
next(null, null);
}else{
next(null, buf);
}
})
}
function onBody (body){
var params = qs.parse(body);
console.log(params);
res.end('ok\n');
}
});
server.listen(5000);
readable.pipe(A)
.write(buf)
.end()
.end(buf)
.on('finish', function () {})
(...).pipe(stream)
const fs = require('fs');
const w = fs.createWriteStream('cool.txt');
w.once('finished', function() {
console.log('FINISHED');
});
w.write('Hi\n');
w.write('Wow\n');
w.end();
A.pipe(writeable)
stream.pipe(..)
stream.once('end, function () {})
stream.read()
stream.on('readable, function () {})
const fs = require('fs');
const r = fs.createReadStream(process.argv[2]);
r.pipe(process.stdout);
A.pipe(transform).pipe(B)
input => transform => output
.A.pipe(duplex).pipe(A)
-> A
and duplex
are both gonna be a duplexconst net = require('net');
net.createServer(function(stream) {
stream.pipe(stream); // This does not create an infinite loop, just a echo server
}).listen(5000);
▶ nc localhost 5000
hi
hi
there
there
const net = require('net');
net.createServer(function(stream) {
stream
.pipe(net.connect(5000, 'localhost'))
.pipe(stream)
}).listen(5001);
▶ nc localhost 5000
hi
hi
there
there
echo.js
const net = require('net')
net.createServer(function (stream) {
stream.pipe(stream)
}).listen(5000)
vpn.js
const net = require('net')
const crypto = require('crypto')
const pump = require('pump')
const pw = 'abc123'
net.createServer(function (stream) {
pump(
stream,
crypto.createDecipher('aes192',pw),
net.connect(5000,'localhost'),
crypto.createCipher('aes192',pw),
stream,
function (err) {
console.error(err)
}
)
}).listen(5005)
vpn-client.js
const net = require('net')
const crypto = require('crypto')
const pw = 'abc123'
var stream = net.connect(5005,'localhost')
process.stdin
.pipe(crypto.createCipher('aes192',pw))
.pipe(stream)
.pipe(crypto.createDecipher('aes192',pw))
.pipe(process.stdout)
Normally you can only read and write buffers and strings with streams. However, if you initialize a stream in objectMode
,
you can use any kind of object (except for null
):
// This can be also done with native modules
const through = require('through2')
const tr = through.obj(function(reow, enc, next) {
next(null, (row.n * 1000) + '\n')
})
tr.pipe(process.stdout)
tr.write({n : 5})
tr.write({n : 10})
tr.write({n : 3})
tr.end();
When piping a object stream, the consuming stream should also be able to do objectMode
.
Many of the APIs in node core provide stream interfaces:
fs.createReadStream()
fs.createWriteStream()
process.stdin
, process.stdout
ps.stdin
, ps.stdout
, ps.stderr
next.connect()
, tls.connect()
net.createServer(function(stream) {})
tls.createServer(otps, function(stream) {})
const { spawn } = require('child_process');
const ps = spawn('grep', ['potato']);
ps.stdout.pipe(process.stdout); // We pipe the output of the child process to our stdout
ps.stdin.write('cheese\n');
ps.stdin.write('potato\n');
ps.stdin.end();
// We receive a request
// req: readable, res:writeable
http.createServer((req, res) => ({}))
// We make a request
// req: writeable, res:readable
var req = http.request((res) => ({}))
crypto.createCipher(algo, password)
- transform stream to encryptcrypto.createDecipher(algo, password)
- transform stream to decryptcrypto.createCipheriv(algo, key, iv)
- transform stream to encrypt with ivcrypto.createDecipheriv(algo, key, iv)
- transform stream to decrypt with ivcrypto.createHash(algo)
- transform stream to output cryptographic hashcrypto.createHash(algo, key)
- transform stream to output HMAC digestcrypto.createSign(algo)
- Writeable stream to sign messagescrypto.createVerify(algo)
- Writeable stream to verify signatures const { createHash } = require('crypto');
process.stdin
.pipe(createHash('sha512', { encoding : 'hex' }))
.pipe(process.stdout);
Don’t forget, when you run this, to use CTRL + D
to get the hash. It basically says, pull my shit.
zlib.createGzip(opts)
- transform stream to compress with gzipzlib.createGunzip(opts)
- transform stream to uncompress with gzipzlib.createDeflate(opts)
- transform stream to compress with deflatezlib.createInflate(opts)
- transform stream to uncompress with deflatezlib.createDeflateRaw(opts)
- transform stream to compress with raw deflatezlib.createInflateRaw(opts)
- transform stream to uncompress with raw deflatezlib.createUnzip(opts)
- transform stream to uncompress gzip and deflateSplit input on newlines
const split = require('split2');
const through = require('through2');
let count = 0;
process.stdin
.pipe(split()) // This splits on new lines
.pipe(through(write, end)); // Now we increase the count per chunk (each being a new line) and log total count
function write(next) {
count++;
next();
}
function end() {
console.log(count);
}
Streaming websockets in node and the browser.
const http = require('http');
const ecstatic = require('ecstatic');
const through = require('through2');
var server = http.createServer(ecstatic(__dirname + '/public')
server.listen(3000);
const wsock = require('websocket-stream');
wsock.createServer({server}, function (stream) {
// stream is a duplex stream
stream.pipe(loud()).pipe(stream);
})
function loud () {
return through(function(bug, enc, next){
next(null, buf.toString().toUpperCase());
});
}
const wsock = require('websocket-stream');
const stream = wsock('ws://localhost:500');
process.stdin.pipe(stream).pipe(process.stdout);
Collect a stream’s output into a single buffer. Useful for unit tests. For object streams, collect output into an array of objects.
const collect = require('collect-stream');
const split = require('split2');
const sp = process.stdin.pipe(split(JSON.parse))
collect(sp, function(err, rows){
if(err) console.error(err);
else console.log(rows)
})
Create a readable stream with a pull function. Reminds me a bit of a generator. (Enumeration)
const from = require('from2');
const messages = ['hello', 'world\n', null];
from(function(size, next) {
next(null, messages.shift())
}).pipe(process.stdout);
Create a writable stream with a write and flush function.
const to = require('to2');
const split = require('split2');
process.stdin
.pipe(split())
.pipe(to(function(buf, next) {
console.log(buf.length)
next();
}))
A logger example
const duplexify = require('duplexify');
const mkdirp = require('mkdirp');
const fs = require('fs');
module.exports = function (name) {
const d = duplexify();
mkdirp('logs', function(err) {
const w = fs.createWriteStream('logs/' + name + '.log');
d.setWriteable(w);
})
return d;
}
Usage example
cont log = require('./logger.js');
const stream = log('myname');
stream.write(Date.now() + '\n');
stream.end();
Streams are also even emitters. So errors can be caught with error listeners.
Pump can pipe streams on each other, but gently handles errors.
const pump = require('pump');
pump(stream1, stream2, stream3, function onError() {});
Unlike pump, you get back a stream you can write to and from.
Reliably detect when a stream is finished. This package is aware of all the obscure ways streams can end.
const onend = require('end-of-stream');
const net = require('net');
const server = net.createServer(function(stream) {
const iv = setInterval(() => {
stream.write(Date.now() + '\n');
}, 1000);
onend(stream, function onEndedOrErrorsOut(){
clearInterval(iv);
})
})
server.listen(5000);
Call methods defined by a remote endpoint.
Pack multiple streams into a single stream.