Good day

I had a dispute at work between me and the workers about the streams in the new version of Node.JS and the need to synchronize them. To begin with, they decided to choose the problem of parallel writing lines to a file The topic of worker_threads is hot, please under the cat.
A little about the threads themselves. They are experimental technology in Node.JS 10.5.0, and in order to have access to the "worker_threads" module, you need to run our Node.JS application with the "--experimental-worker" flag. I registered this flag in the start script in the package.json file:
{ "name": "worker-test", "version": "1.0.0", "description": "", "main": "app.js", "scripts": { "start": "node --max-old-space-size=4096 --experimental-worker app.js " }, "author": "", "license": "ISC" }
Now about the logic itself. The main thread spawns N worker threads, they all write to the file at some interval. Unlike all the examples where the main and child threads start from one file, I separated the threads into a separate one, which seems to me more pure and elegant.
Actually, the code.
The main app.js file is the entry point.
const { Worker } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Hello from main!'); for (var i = 1; i <= WORKERS_NUMBER ; i++) { const w = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); }
Here we simply create child threads using the Worker class and pointing to the start file for the './writer-worker-app/app.js' stream. When creating a stream, we pass a self-written ID as workerData data.
Starting file for the stream ./writer-worker-app/app.js:
const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); while (true) { sendMessage(); } function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); }
Well, the simplest class logger: ./writer-worker-app/logger.js
const fs = require('fs'); function log(message) { return fs.appendFileSync('./my-file.txt', message); } module.exports = { log };
When launching this application, we all hoped that we would end up with a mess in the file and the workers would shout out how locks with semaphores and other joys of parallel execution are needed. But no! In the file, all lines go without interruption, except in a random order:
Hello from worker number 14
Hello from worker number 3
Hello from worker number 9
Hello from worker number 15
Hello from worker number 2
Hello from worker number 4
Hello from worker number 7
Hello from worker number 6
Hello from worker number 1
Hello from worker number 11
A wonderful experiment, another small victory for the Node :-) My assumption is that all synchronization takes place at the level of I \ O flows of the Node, but I will be glad to know in the comments the correct version. Just in case, we checked the work using not
fs.appendFileSync , but
fs.createWriteStream and the
stream.write method.
The result came out the same.
But we did not stop there.
A colleague proposed the problem of thread synchronization. For our specific example, let it be the task of sequential writing to the file in order of increasing aydishnik. First, it writes the first thread, then the second, then the third, and so on.
For this, I introduced another thread Manager. It was possible to do the main thing, but I am so pleased to create these isolated workers and build communication through messages. Before you start writing the implementation of the Stream Manager, you need to create a communication channel between it and the writers-workers. For this, the
MessageChannel class was used. Instances of this class have two fields:
port1 and
port2 , each of which is able to listen and send messages to another through the methods
.on ('message') and
.postMessage () . This class was created within the framework of the “worker_threads” module for communication between threads, because usually when an object is transferred, it is simply cloned, and it will be useless in an isolated runtime environment.
For communication between 2 streams, we must give everyone a port.
An interesting fact : on 10.5.0
it is impossible to transfer the port through the designer of the worker , you need to do this only through worker.postMessage (), and you must specify the port in the transferList parameter!
The flow manager himself will send commands to the writing writers in ascending order of their identifiers, and he will send the next command only after receiving a response from the writer about a successful operation.
Under-UML application diagram:

Our modified main ./app.js file:
const { Worker, MessageChannel } = require('worker_threads'); const path = require('path'); const WORKERS_NUMBER = 100; console.log('Main app initialized and started.'); const workersMeta = []; for (var i = 1; i <= WORKERS_NUMBER; i++) { const channel = new MessageChannel(); const worker = new Worker(path.join(__dirname, './writer-worker-app/app.js'), { workerData: { id: i } }); workersMeta.push({ id: i, worker, channel }); } workersMeta.forEach(({ worker, channel }) => { worker.postMessage({ orchestratorPort: channel.port1 }, [channel.port1]); }) setTimeout(() => { const orchestrator = new Worker(path.join(__dirname, './orchestrator-worker-app/app.js')); const orchestratorData = workersMeta.map((meta) => ({ id: meta.id, port: meta.channel.port2 })); orchestrator.postMessage({ workerPorts: orchestratorData }, orchestratorData.map(w => w.port)); console.log('All worker threads have been initialized'); }, WORKERS_NUMBER * 10);
Here, we first create workers, then send each port to communicate with the manager (and only this way, it’s impossible to do this through the constructor).
Then we create a thread manager, send it a list of ports for communicating with stream writers.
Updated : in an empirical way, I found out that when working with threads, it is better to first let them stand (initialize as necessary). For good it was necessary to listen to some answers from the streams in the style “I am ready!”, But I decided to go the easier way.
Let's change the behavior of the writer thread so that it sends the message only when it is told, and also returns the result when the write operation is completed:
./writer-worer-app/app.js
const { workerData, parentPort } = require('worker_threads'); const logger = require('./logger'); const id = workerData.id; console.log(`Worker ${id} initializad.`); parentPort.on('message', value => { const orchestratorPort = value.orchestratorPort; orchestratorPort.on('message', data => { if (data.command == 'write') { console.log(`Worker ${id} received write command`); sendMessage(); sendResult(orchestratorPort); } }); console.log(`Worker ${id} started.`); }); function sendMessage() { logger.log(`Hello from worker number ${workerData.id}\r\n`); } function sendResult(port) { port.postMessage({ id, status: 'completed' }); }
We correctly initialized the message from the parent stream, started the channel of the manager stream, when we receive the command, first write to the file, then send the result. It should be noted that the file is written synchronously, so sendResult () is called immediately after sendMessage ().
All that's left is to write the implementation of our smart manager.
./orchestrator-worker-app/app.js:
const { parentPort } = require('worker_threads'); console.log('Orchestrator initialized.') let workerPorts; parentPort.on('message', (value) => { workerPorts = value.workerPorts; workerPorts.forEach(wp => wp.port.on('message', handleResponse)); console.log('Orchestrator started.'); sendCommand(workerPorts[0]); }); function handleResponse(status) { const responseWorkerId = status.id; let nextWorker = workerPorts.find(wp => wp.id == responseWorkerId + 1); if (!nextWorker) { nextWorker = workerPorts[0]; } sendCommand(nextWorker); } function sendCommand(worker) { worker.port.postMessage({ command: 'write' }); }
We received a list of ports, ordered them, set up a callback for each port, and sent the command to the first one. In the kolbek we are looking for the next writer and send the team to him. In order not to strain the system too much, the interval between the teams was set.
That's it, our multithreaded thread management application is ready. We have learned not only to generate workflows in Node.JS, but also to create effective ways of communication between them. In my personal opinion, the architecture of isolated threads in Node.JS with waiting and sending messages is more than convenient and promising. Thank you all for your attention.
All source code can be found here .
UPDATE
In order not to mislead the readers, and also not to give unnecessary reasons to write that I am cheating with timeouts, I have updated the article and the repository.
Changes:
1) intervals in original writers are removed, now on hardcore goes while (true)
2) added - max-old-space-size = 4096 flag, just in case, because The current implementation of the threads is not very stable and I hope that this will help.
3) removed the intervals of sending messages to the flow manager. Now the recording is non-stop.
4) added timeout when initializing the manager, why - described above.
TO DO:
1) add variable length messages or counting the call to the logger - thanks FANAT1242
2) add a benchmark, compare the work of the first and second versions (how many lines will be recorded in 10 seconds, for example)
UPDATE 2
1) The logging code has been changed: now each message has a different length.
2) writer-worker-app / app.old.js has been changed: each thread writes 1000 times, then ends.
This was done to test the ideas of the user FANAT1242. Messages still do not rewrite each other, the lines in the file are exactly 1000 * N threads.