Part 1: Problem Statement
Hi, Habr! I am a solution architect at CleverDATA. Today I will talk about how we classify large amounts of data using models built using almost any available machine learning library. In this series of two articles, we will look at the following questions.
- How to present a model of machine learning as a service (Model as a Service)?
- How are tasks of distributed processing of large volumes of data physically performed using Apache Spark?
- What problems arise when Apache Spark interacts with external services?
- Using the aka-streams and akka-http libraries, as well as the Reactive Streams approach, can you organize effective interaction between Apache Spark and external services?
Initially, I planned to write one article, but since the volume of the material was quite large, I decided to split it into two parts. Today in the first part we will look at the general formulation of the problem, as well as the main problems that need to be solved during implementation. In the
second part we will talk about the practical implementation of the solution of this problem using the Reactive Streams approach.
Our company
CleverDATA has a data analytics team that uses a wide range of tools (such as scikit-learn, facebook fastText, xgboost, tensorFlow, etc.) to train machine learning models. De facto, the main programming language that analysts use is Python. Almost all libraries for machine learning, even initially implemented in other languages, have an interface in Python and are integrated with the main Python libraries (primarily with NumPy).
On the other hand, the Hadoop ecosystem is widely used for storing and processing large arrays of unstructured data. It stores data on the HDFS file system in the form of distributed replicable blocks of a certain size (as a rule, 128 MB, but it is possible to configure). The most efficient algorithms for processing distributed data try to minimize network interaction between cluster machines. To do this, the data must be processed on the same machines where they are stored.
Of course, in many cases, network interaction cannot be completely avoided, but, nevertheless, you should try to perform all tasks locally and minimize the amount of data that will need to be transmitted over the network.
This principle of processing distributed data is called “transfer compute to data” (move computations close to data). All main frameworks, mainly Hadoop MapReduce and Apache Spark, adhere to this principle. They determine the composition and sequence of specific operations that will need to be run on the machines where the necessary data blocks are stored.
Figure 1. The HDFS cluster consists of several machines, one of which is the Name Node and the others are the Data Node. The Name Node stores information about the files that make up the blocks, and about the machines where they are physically located. Data Node stores the blocks themselves, which are replicated to several machines for increased reliability. Data processing tasks are also launched on Data Node. Tasks consist of a main process (Master, M), which coordinates the launching of work processes (Worker, W) on machines where the necessary data blocks are stored.Virtually all components of the Hadoop ecosystem are launched using a Java virtual machine (Java Virtual Machine, JVM) and are tightly integrated with each other. For example, to run tasks written using Apache Spark to work with data stored on HDFS, virtually no additional manipulations are required: the framework provides this functionality out of the box.
Unfortunately, the bulk of libraries intended for machine learning assume that data is stored and processed locally. At the same time, there are such libraries that are tightly integrated with the Hadoop ecosystem, for example, Spark ML or Apache Mahout. However, they have a number of significant drawbacks. First, they provide far fewer implementations of machine learning algorithms. Secondly, not all data analysts are able to work with them. The advantages of these libraries include the fact that they can be used to train models on large amounts of data using distributed computing.
However, data analysts often use alternative methods for training models, in particular, libraries that allow the use of GPUs. I will not consider the issues of training models in this article, because I want to focus on using ready-made models built using any available machine learning library for classifying large amounts of data.
So, the main task we are trying to solve here is the application of machine learning models to large amounts of data stored on HDFS. If we could use the SparkML module from the Apache Spark library, which implements the basic machine learning algorithms, then the classification of large amounts of data would be a trivial task:
val model: LogisticRegressionModel = LogisticRegressionModel.load("/path/to/model") val dataset = spark.read.parquet("/path/to/data") val result = model.transform(dataset)
Unfortunately, this approach only works for algorithms implemented in the SparkML module (the full list can be found
here ). In the case of using other libraries, and besides being implemented not on the JVM, everything becomes much more complicated.
To solve this problem, we decided to wrap the model in a REST service. Accordingly, when running a task to classify data stored on HDFS, it is necessary to organize the interaction between the machines on which data is stored and the machine (or cluster of machines) on which the classification service is running.
Figure 2. The concept of Model as a ServiceDescription of python classification service
In order to present the model as a service, it is necessary to solve the following tasks:
- implement efficient access to the model via HTTP;
- to ensure the most efficient use of machine resources (first of all, all processor cores and memory);
- provide resistance to high loads;
- provide the ability to scale horizontally.
Access to the model via HTTP is quite simple: for Python, a large number of libraries have been developed that allow you to implement a REST access point using a small amount of code. One of these microframes is
Flask . The implementation of the classification service on Flask is as follows:
from flask import Flask, request, Response model = load_model() n_features = 100 app = Flask(__name__) @app.route("/score", methods=['PUT']) def score(): inp = np.frombuffer(request.data, dtype='float32').reshape(-1, n_features) result = model.predict(inp) return Response(result.tobytes(), mimetype='application/octet-stream') if __name__ == "__main__": app.run()
Here, at the start of the service, we load the model into memory, and then use it when calling the classification method. The load_model function loads a model from some external source, be it a file system, key-value storage, etc.
A model is a certain object that has a method predict. In the case of classification, it takes as input a certain feature vector of a certain size and gives either a boolean value indicating whether the specified vector is suitable for this model, or some value from 0 to 1, to which the cutoff threshold can be applied: everything above the threshold, is a positive result of the classification, the rest is not.
The feature vector that we need to classify is passed in binary form and deserialized into the numpy array. It would be overhead to make an HTTP request for each vector. For example, in the case of a 100-dimensional vector and using float32 values, a full HTTP request, including headers, would look something like this:
PUT /score HTTP/1.1 Host: score-node-1:8099 User-Agent: curl/7.58.0 Accept: */* Content-Type: application/binary Content-Length: 400 [400 bytes of data]
As you can see, the efficiency of such a request is very low (400 bytes of the payload / (133 bytes header + 400 bytes body) = 75%). Fortunately, in almost all libraries, the predict method allows you to take not a [1 xn] vector as an input, but a [mxn] matrix, and, accordingly, to produce the result immediately for m input values.
In addition, the numpy library is optimized for working with large matrices, allowing you to efficiently use all available machine resources. Thus, we can send in one request not one, but rather a large number of feature vectors, deserialize them into a numpy matrix of size [mxn], classify, and return the vector [mx 1] from boolean or float32 values. As a result, the efficiency of HTTP interaction when using a matrix of 1000 lines becomes almost equal to 100%. The size of the HTTP headers in this case can be neglected.
To test the Flask service on a local machine, it can be run from the command line. However, this method is not suitable for industrial use. The fact is that Flask is single-threaded and, if we look at the CPU load diagram while the service is running, we see that one core is 100% loaded and the rest are inactive. Fortunately, there are ways to use all the cores of the machine: to do this, Flask must be run via the uwsgi web application server. It allows you to optimally adjust the number of processes and threads so as to ensure a uniform load on all processor cores. More details with all the options for configuring uwsgi can be found
here .
It is better to use nginx as an HTTP entry point, since uwsgi may be unstable in case of high loads. Nginx accepts the entire input stream of requests to itself, filters out incorrect requests, and discharges the load on uwsgi. Nginx communicates with uwsgi through linux sockets using a process file. An example nginx configuration is shown below:
server { listen 80; server_name 127.0.0.1; location / { try_files $uri @score; } location @score { include uwsgi_params; uwsgi_pass unix:/tmp/score.sock; } }
As we see, it turned out quite a difficult configuration for one machine. If we need to classify large amounts of data, a high number of requests will come to this service, and it may become a bottleneck. The solution to this problem is horizontal scaling.
For convenience, we pack the service into a Docker container and then deploy it to the required number of machines. If desired, you can use automated deployment tools such as Kubernetes. An example Dockerfile structure for creating a service container is shown below.
FROM ubuntu #Installing required ubuntu and python modules RUN apt-get update RUN apt-get -y install python3 python3-pip nginx RUN update-alternatives
So, the structure of the service for classification is as follows:
Figure 3. Service scheme for classificationBrief description of Apache Spark in the Hadoop ecosystem
Now consider the processing of data stored on HDFS. As I have already noted, the principle of transferring calculations to data is used for this. To start processing tasks, you need to know which machines store the data blocks we need in order to run processes directly on them. It is also necessary to coordinate the launch of these processes, restart them in case of emergency situations, if necessary, aggregate the results of the various subtasks, etc.
All these tasks are solved by many frameworks that work with the Hadoop ecosystem. One of the most popular and convenient is Apache Spark. The main concept around which the whole framework is built is RDD (Resilient Distributed Dataset). In general, RDD can be considered as a fall-resistant distributed collection. RDD can be obtained in two main ways:
- creation from an external source, such as a collection in memory, a file or directory on the file system, etc .;
- conversion from another RDD by applying transformation operations. RDD supports all basic collection operations, such as map, flatMap, filter, groupBy, join, etc.
It is important to understand that RDD, unlike collections, is not directly data, but a sequence of operations that must be performed on data. Therefore, when invoking transformation operations, no work actually takes place, and we just get a new RDD, which will contain one more operation than the previous one. The work itself is started when calling the so-called terminal operations, or actions. These include saving to a file, saving to a collection in memory, counting the number of elements, etc.
When you start a terminal operation, Spark builds an acyclic operations graph (DAG, Directed Acyclic Graph) on the basis of the final RDD and sequentially launches them on a cluster according to the resulting graph. When building a DAG based on RDD, Spark performs a series of optimizations, for example, where possible combines several consecutive transformations into one operation.
RDD was the basic unit of interaction with the Spark API in Spark 1.x versions. In Spark 2.x, the developers stated that now the main concept for interaction is Dataset. Dataset is an add-on to RDD with support for SQL-like interactions. When using the Dataset API Spark allows you to use a wide range of optimizations, including quite low-level ones. But in general, the basic principles applicable to RDD also apply to Dataset.
More information about the work of Spark can be found in the
documentation on the official site .
Consider an example of the simplest classification on Spark without the use of external services. Here, a rather meaningless algorithm is implemented that counts the proportion of each of the Latin letters in the text, and then counts the standard deviation. Here, first of all, it is important to pay attention directly to the basic steps used when working with Spark.
case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) def std(vector: Array[Float]): Float = ???
In this example, we:
- we define the structure of input, intermediate and output data (the input data is defined as some text with which a specific identifier is associated, the intermediate data correlates the identifier with the feature vector, and the output identifies the identifier with some numerical value);
- we define the function for calculating the resulting value by the feature vector (for example, the standard deviation, the implementation is not given);
- we define the source Dataset as data stored on HDFS in parquet format along the path / path / to / data;
- we determine the intermediate Dataset as an elementwise transformation (map) from the source Dataset;
- we similarly define the resulting Dataset through the elementwise conversion from the intermediate;
- save the resulting Dataset on HDFS in parquet format along the path / path / to / result. Since saving to a file is a terminal operation, the calculations themselves are started at this stage.
Apache Spark works on the principle of master-worker. When the application starts, the main process starts, called the driver. It executes the code responsible for generating the RDD, on the basis of which the calculations will be performed.
When calling a terminal operation, the driver generates a DAG based on the resulting RDD. Then the driver initiates the launch of workflows, called executors, in which the data will be processed directly. After running the workflows, the driver sends them an executable block that needs to be executed, and also indicates to which part of the data it needs to be applied.
Below is the code of our example, which highlights the parts of the code executed on the artist (between the lines executor part begin and executor part end). The rest of the code is executed on the driver.
case class Data(id: String, text: String) case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) def std(vector: Array[Float]): Float = ??? val ds: Dataset[Data] = spark.read.parquet("/path/to/data").as[Data] val result: Dataset[Score] = ds.map {
In the Hadoop ecosystem, all applications run in containers. A container is a process running on one of the cluster machines with a certain amount of resources allocated to it. The containers are started by the resource manager YARN. It determines which of the machines has a sufficient number of processor cores and RAM, and also whether there are on it the necessary data blocks for processing.
When you start a Spark application, YARN creates and runs a container on one of the cluster machines in which the driver is launched. Then, when the driver prepares the DAG from the operations that need to be run on executors, YARN launches additional containers on the right machines.
As a rule, it is enough for the driver to allocate one core and a small amount of memory (unless, of course, then the result of the calculations is not aggregated on the driver into memory). For executors in order to optimize resources and reduce the total number of processes in the system, more than one core can be distinguished: in this case, the executor will be able to perform several tasks simultaneously.
But here it is important to understand that if one of the tasks running in the container fails or if there is a shortage of resources, YARN may decide to stop the container, and then all the tasks that were performed in it will have to be restarted again on another artist. In addition, if we allocate a sufficiently large number of cores per container, then there is a possibility that YARN will not be able to launch it. For example, if we have two machines on which two cores are left unused, then we will be able to run on each container that requires two cores, but we will not be able to launch one container that requires four cores.
Now let's see how the code from our example will be executed directly on the cluster. Imagine that the size of the source data is 2 terabytes. Accordingly, if the block size on HDFS is 128 megabytes, then there will be a total of 16384 blocks. Each block is replicated to multiple machines to ensure reliability. For simplicity, let's take a replication factor of two, that is, there will be a total of 32768 available blocks. Suppose that for storage we use a cluster of 16 machines. Accordingly, on each of the machines, in the case of a uniform distribution, there will be approximately 2048 units, or 256 GB per machine. On each of the machines we have 8 processor cores and 64 GB of RAM.
For our task, the driver does not require a lot of resources, so we will allocate for it 1 core and 1 GB of memory. We give the artists 2 cores and 4 GB of memory. Suppose we want to maximize the use of cluster resources. Thus, we have 64 containers: one for the driver, and 63 for the performers.
Figure 4. The processes running on Data Node and the resources they use.Since in our case we use only map operations, our DAG will consist of one operation. It consists of the following actions:
- take one block of data from a local hard disk
- convert the data
- save the result to a new block on your own local disk.
In total, we need to process 16384 blocks, so each performer must perform 16384 / (63 performers * 2 cores) = 130 operations. Thus, the life cycle of the contractor as a separate process (in case everything happens without falling) will look like this.
- Launch container
- Getting a task from the driver, in which there will be a block identifier and the necessary operation. Since we have allocated two cores to the container, the contractor receives two tasks at once.
- Execution of the task and sending the result of the execution to the driver.
- Receiving the next task from the driver and repeating items 2 and 3 until all blocks for this local machine have been processed.
- Stop the container.
Note : more complex DAGs are obtained in case of need for redistribution of intermediate data between machines, as a rule for grouping operations (groupBy, reduceByKey, etc.) and connections (join), which are beyond the scope of this article.
The main problems of Apache Spark interaction with external services
If within the map operation we need to access some external service, the task becomes less trivial. Suppose that an object of the class ExternalServiceClient is responsible for interacting with an external service. In general, before starting work we need to initialize it, and then call it as necessary:
val client = ExternalServiceClient.create()
Typically, client initialization takes some time, so, as a rule, it is initialized when the application starts, and then used to get an instance of the client from some global context or pool. Therefore, when a container with a Spark executor receives a task that requires interaction with an external service, it would be nice to get an already initialized client before starting work on the data array, and then reuse it for each element.
In Spark, this can be done in two ways. First, if the client is serializable (the client itself and all its fields must extend the java.io.Serializable interface), then it can be initialized on the driver and then
transferred to the performers via the broadcast variable mechanism .
val client = ExternalServiceClient.create() val clientBroadcast = sparkContext.broadcast(client) ds.map { f: Features => val score = clientBroadcast.value.score(f.vector) Score(f.id, score) }
If the client is not serializable, or client initialization is a process depending on the settings of the specific machine on which it starts (for example, for balancing requests from one part of the machines must go to the first service machine, and for the other to the second), then the client can be initialized directly on the artist.
For this, RDD (and Dataset) have a mapPartitions operation, which is a generalized version of the map operation (if you look at the source code of the RDD class, then the map operation is implemented via mapPartitions). The function passed to the mapPartitions operation is run once for each block. The input of this function is an iterator for the data that we will read from the block, and at the output it must return an iterator for the output data corresponding to the input block:
ds.mapPartitions {fi: Iterator[Features] => val client = ExternalServiceClient.create() fi.map { f: Features => val score = client.score(f.vector) Score(f.id, score) } }
In this code, a client to an external service is created for each block of input data. This, of course, is better than creating a client each time to process each item, and in many cases this is an acceptable solution. However, a little further I will show how you can create an object that will be initialized once at the start of the container and then used when starting all the tasks that come into this container.The processing operation of the resulting iterator is single-threaded. Let me remind you that the main access pattern for an iterator type structure is a sequential call to the hasNext and next methods: while (i.hasNext()) { val item = i.next() … }
If we have two cores for the contractor, then there will be only two main workflows involved in data processing. Let me remind you that if we have 8 cores on the machine, then YARN will not allow it to run more than 4 performers of 2 cores on it, respectively, we will have only 8 threads per machine. For local computing, this is the optimal choice, as this will ensure maximum computational power utilization with minimal flow control overhead. However, in the case of interaction with external services, the picture changes.When using external services, performance is one of the most important issues. The simplest way to implement is to use a synchronous client, in which we access the service for each element, and, having received an answer from it, we form the resulting value. However, this approach has one major drawback: with synchronous interaction, a stream that synchronously calls an external service is blocked for the duration of interaction with this service. The fact is that when calling the hasNext method, we expect to get an unequivocal answer to the question of whether there are more elements to process. In case of uncertainty (for example, when we sent a request for an external service and do not know whether it will return an empty or non-empty answer), we have no choice but to wait for the answer, thus blocking the stream that caused this method. Consequently,The iterator is a blocking data structure .Figure 5. Element processing of an iterator obtained as a result of a function call passed to mapPartitions occurs in one stream. As a result, we get extremely inefficient use of resources.As you remember, we optimized our classification service so that it allows us to process several requests at the same time. Accordingly, we need to collect the required number of requests from the source iterator, send them to the service, get an answer and issue them to the resulting iterator.Figure 6. Synchronous interaction when sending a classification request for a group of elementsIn fact, in this case, the performance will be slightly better, because, first, we have to keep the main thread in a locked state while interacting with an external service, and secondly, the external service is inactive while we are working with the result.Final formulation of the problem
Thus, when using an external service, we must solve the problem of synchronous access. Ideally, it would be convenient to bring interaction with external services into a separate thread pool. In this case, requests to external services would be executed simultaneously with processing the results of previous requests, and thus it would be possible to more efficiently use the resources of the machine. For interaction between threads, one could use a blocking queue that would serve as a communication buffer. The threads responsible for interacting with external services would put the data in the queue, and the thread processing the resulting iterator, respectively, would take it from there.However, such asynchronous processing involves a number of additional problems.- , , , .
- , , . , . , .
- In order for the hasNext method to return false in the resulting iterator, you need to make sure that all requests have been answered and signal that there will be no more data in the buffer. In synchronous processing, this is quite simple: if, after processing the next response, the source iterator returns hasNext = false, then, accordingly, there will be no more elements. In the case of asynchronous processing, especially if we send several requests at the same time, it is necessary to additionally coordinate the receipt of responses, and only after receiving the last response to send a signal about the completion of processing.
About how we managed to effectively solve these problems, I will discuss in the next section . Stay tuned!
In the meantime, look at the vacancies of our company, maybe we are looking for you?