Working with streams in Node.js 10.5.0

Most recently, version 10.5.0 of the Node.js platform was released. One of its main features was the first support for working with streams that was added to Node.js, while still being experimental. This fact is particularly interesting in the light of the fact that the platform now has this opportunity, its followers have always been proud that, thanks to the fantastic asynchronous I / O subsystem, it does not need threads. However, thread support in Node.js still appeared. Why did it happen? Who and why can they be useful?



If in a nutshell, you need this in order for the Node.js platform to reach new heights in those areas in which it previously showed not the most remarkable results. We are talking about performing calculations that intensively use processor resources. This is mainly due to the fact that Node.js does not have strong positions in such areas as artificial intelligence, machine learning, processing large amounts of data. It is a lot of effort to allow Node.js to show itself well in solving such tasks, but here this platform still looks much more modest than, for example, in the development of microservices.

The author of the material, the translation of which we are publishing today, says that he has decided to reduce the technical documentation, which can be found in the original pull request and in official sources , to a set of simple practical examples. He hopes that anyone who sorts out these examples will know enough to get down to working with streams in Node.js.

About the worker_threads module and the --experimental-worker flag


Support for multithreading in Node.js is implemented as a module worker_threads . Therefore, in order to take advantage of the new feature, this module must be connected using the require command.

Note that you can work with worker_threads only using the experimental-worker flag when the script is run, otherwise the system will not find this module.

Note that the flag includes the word “worker” (worker), and not “thread” (thread). This is exactly what we are talking about is mentioned in the documentation, which uses the terms "worker thread" (worker thread) or simply "worker" (worker). In the future, and we will adhere to the same approach.

If you have already written a multi-threaded code, then exploring the new features of Node.js, you will see a lot of this, with which you are already familiar. If earlier you didn’t work with anything like this - just continue reading further, as appropriate explanations for beginners will be given here.

About tasks that can be solved with the help of workers in Node.js


Threads vorkerov designed, as already mentioned, to solve problems, intensively using the capabilities of the processor. It should be noted that using them for solving I / O tasks is a waste of resources, since, according to official documentation, the internal Node.js mechanisms aimed at organizing asynchronous I / O are much more efficient than using for solving the same task of worker threads. Therefore, we will immediately decide that we will not deal with data input-output with the help of workers.

Let's start with a simple example that demonstrates how to create and use workers.

Example №1


 const { Worker, isMainThread,  workerData } = require('worker_threads'); let currentVal = 0; let intervals = [100,1000, 500] function counter(id, i){   console.log("[", id, "]", i)   return i; } if(isMainThread) {   console.log("this is the main thread")   for(let i = 0; i < 2; i++) {       let w = new Worker(__filename, {workerData: i});   }   setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[2], "MainThread"); } else {   console.log("this isn't")   setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[workerData], workerData); } 

The output of this code will look like a set of lines demonstrating counters, the values ​​of which increase with different speeds.


The results of the first example

Understand what is happening here:

  1. The instructions inside the if expression create 2 threads, the code for which, thanks to the __filename parameter, is taken from the same script that was transmitted to Node.js when the example was run. Now the workers need the full path to the file with the code, they do not support relative paths, which is why this value is used here.
  2. The data to these two workers is sent as a global parameter, in the form of the workerData attribute, which is used in the second argument. After that, access to this value can be obtained through a constant with the same name (note how the corresponding constant is created in the first line of the file, and how it is used in the last line).

Here is a very simple example of using the worker_threads module, nothing interesting happens here yet. Therefore, we consider another example.

Example 2


Consider an example in which, first, we will perform some “heavy” computations, and second, we will do something asynchronous in the main thread.

 const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); const request = require("request"); if(isMainThread) {   console.log("This is the main thread")   let w = new Worker(__filename, {workerData: null});   w.on('message', (msg) => { //  !       console.log("First value is: ", msg.val);       console.log("Took: ", (msg.timeDiff / 1000), " seconds");   })   w.on('error', console.error);   w.on('exit', (code) => {       if(code != 0)           console.error(new Error(`Worker stopped with exit code ${code}`))   });   request.get('http://www.google.com', (err, resp) => {       if(err) {           return console.error(err);       }       console.log("Total bytes received: ", resp.body.length);   }) } else { //    function random(min, max) {       return Math.random() * (max - min) + min   }   const sorter = require("./list-sorter");   const start = Date.now()   let bigList = Array(1000000).fill().map( (_) => random(1,10000))   sorter.sort(bigList);   parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start}); } 

In order to run this example, note that this code needs the request module (you can install it using npm, for example, using the empty directory with the file containing the above code, the npm init --yes and npm install request --save commands npm install request --save ), and that it uses a helper module, which is connected with the command const sorter = require("./list-sorter"); . The file of this module ( list-sorter.js ) should be in the same place as the above-described file, its code looks like this:

 module.exports = {   firstValue: null,   sort: function(list) {       let sorted = list.sort();       this.firstValue = sorted[0]   } } 

This time we solve two problems in parallel. First, we load the google.com home page, and second, sort the randomly generated array of a million numbers. This may take a few seconds, which gives us a great opportunity to see the new Node.js mechanisms in action. In addition, here we measure the time it takes for the worker to sort the numbers, then send the measurement result (along with the first element of the sorted array) to the main thread, which outputs the results to the console.


The result of the second example

In this example, the most important thing is to demonstrate the mechanism of data exchange between threads.
Workers can receive messages from the main thread thanks to the on method. In the code you can find the events that we are listening. The message event is raised every time we send a message from a thread using the parentPort.postMessage method. In addition, the same method can be used to send a message to a thread, referring to a worker instance, and receive them using the parentPort object.

Now let's look at another example, very similar to what we have already seen, but this time we will pay special attention to the structure of the project.

Example number 3


As a final example, we propose to consider the implementation of the same functionality as in the previous example, but this time we will improve the structure of the code, make it cleaner, bring it to a form that improves the ease of supporting a software project.

Here is the code for the main program.

 const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); const request = require("request"); function startWorker(path, cb) {   let w = new Worker(path, {workerData: null});   w.on('message', (msg) => {       cb(null, msg)   })   w.on('error', cb);   w.on('exit', (code) => {       if(code != 0)           console.error(new Error(`Worker stopped with exit code ${code}`))  });   return w; } console.log("this is the main thread") let myWorker = startWorker(__dirname + '/workerCode.js', (err, result) => {   if(err) return console.error(err);   console.log("[[Heavy computation function finished]]")   console.log("First value is: ", result.val);   console.log("Took: ", (result.timeDiff / 1000), " seconds"); }) const start = Date.now(); request.get('http://www.google.com', (err, resp) => {   if(err) {       return console.error(err);   }   console.log("Total bytes received: ", resp.body.length);   //myWorker.postMessage({finished: true, timeDiff: Date.now() - start}) //     }) 

But the code describing the behavior of the worker's thread (in the above program, the path to the file with this code is generated using the __dirname + '/workerCode.js' ):

 const {  parentPort } = require('worker_threads'); function random(min, max) {   return Math.random() * (max - min) + min } const sorter = require("./list-sorter"); const start = Date.now() let bigList = Array(1000000).fill().map( (_) => random(1,10000)) /** //      : parentPort.on('message', (msg) => {   console.log("Main thread finished on: ", (msg.timeDiff / 1000), " seconds..."); }) */ sorter.sort(bigList); parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start}); 

Here are the features of this example:

  1. Now the code for the main thread and for the worker's thread is placed in different files. This facilitates project support and expansion.
  2. The startWorker function returns a new instance of the worker, which allows, if necessary, to send messages to the worker from the main thread.
  3. There is no need to check whether the code is executed in the main thread (we removed the if expression with the appropriate check).
  4. The worker shows a commented code fragment showing the mechanism for receiving messages from the main thread, which, taking into account the mechanism for sending messages already discussed, allows for two-way asynchronous data exchange between the main thread and the worker's thread.

Results


In this material we, on practical examples, examined the features of using new features for working with streams in Node.js. If you have mastered what was discussed here, it means that you are ready to look at the documentation and start your own experiments with the worker_threads module. Perhaps it is worth noting that this opportunity only appeared in Node.js while it is experimental, so over time, something in its implementation may change. In addition, if you encounter errors during your own experiments with worker_threads , or find that this module is not hampered by some missing feature in it, let the developers know about it and help improve the Node.js platform.

Dear readers! What do you think about multithreading support in Node.js? Do you plan to use this feature in your projects?

Source: https://habr.com/ru/post/415659/


All Articles