Part 2: Solution
Hello again! Today I will continue my story about how we classify large amounts of data on Apache Spark using arbitrary machine learning models. In the
first part of the article, we examined the very formulation of the problem, as well as the main problems that arise when organizing the interaction between the cluster on which the source data is stored and processed, and the external classification service. In the second part, we will consider one of the solutions to this problem using the Reactive Streams approach and its implementation using the akka-streams library.
Reactive Streams concept
To solve the problems described in the first part, you can use an approach called
Reactive Streams . It allows you to control the process of transferring data streams between processing stages, working at different speeds and independently of each other without the need for buffering. In the event that one of the processing stages is slower than the previous one, then it is necessary to signal the more rapid stage about how much input data it is ready to process at the moment. This interaction is called backpressure. It consists in the fact that the faster stages process just as many elements as are required for the slower stage, and no more, and then release the computational resources.
In general, Reactive Streams is a specification for implementing a
Publisher-Subscriber pattern. This specification defines a set of four interfaces (Publisher, Subscriber, Processor and Subscription) and a contract for their methods.
Consider these interfaces in more detail:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } public interface Subscription { public void request(long n); public void cancel(); } public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
There are two sides to the Publisher-Subscriber model: transmitting and receiving. When implementing Reactive Streams, the class that implements the Publisher interface is responsible for transferring data, and Subscriber is responsible for receiving. To establish a connection between them, Subscriber must be registered with the Publisher by calling its subscribe method. According to the specification, after registering Subscriber, Publisher must call its methods in the following order:
- onSubscribe. This method is called immediately after Subscriber registration in Publisher. As a parameter, it is passed a Subscription object, through which Subscriber will request data from the Publisher. This object should be stored and called only in the context of this Subscriber.
- After Subscriber has requested data from Publisher, by calling the request method of the corresponding Subscription object, Publisher can call the onNext method from Subscriber, passing the next element.
- Subscriber can then periodically call the Subscription's request method, but the Publisher cannot call the onNext method more than the total requested through the request method.
- In case the data stream is finite, after transferring all elements via the onNext method, Publisher should call the onComplete method.
- If an error occurs in Publisher and the further processing of elements is not possible, it should call the onError method
- After calling the onComplete or onError methods, further Publisher interaction with Subscriber should be excluded.
Method calls can be viewed as sending signals between Publisher and Subscriber. Subscriber signals Publisher how many elements he is ready to process, and Publisher, in turn, signals him that there is either the next element, or there are no more elements, or some error has occurred.
In order to exclude other influences of Publisher and Subscriber on each other, calls to all methods that implement Reactive Streams interfaces must be non-blocking. In this case, the interaction between them will be completely asynchronous.
More details on the specification for Reactive Streams interfaces can be found
here .
Thus, by connecting the source and result iterators through converting them into Publisher and Subscriber respectively, we can solve the problems outlined in the previous part of the article. The problem of buffer overflow between stages is solved by requesting a certain number of Subscriber elements. The problem of successful or unsuccessful completion is solved by sending signals to Subscriber via the onComplete or onError methods, respectively. Publisher becomes responsible for sending these signals, which in our case should control how many HTTP requests were sent and how many of them were answered. After receiving the last answer and processing all the results that came in it, he should send an onComplete signal. If one of the requests fails, it should send an onError signal, and stop further sending Subscriber elements, as well as subtracting elements from the source iterator.
The resulting iterator should be implemented as a Subscriber. In this case, we cannot do without a buffer in which elements will be written when the onNext method is called from the Subscriber interface, and is read using the hasNext and next methods from the Iterator interface. As a buffer implementation, you can use a blocking queue, for example, LinkedBlockedQueue.
The attentive reader will immediately ask the question: why the blocking queue, because according to the Reactive Streams specification, the implementation of all methods should be non-blocking? But there is nothing wrong with this: since we request a strictly defined number of elements from Publisher, the onNext method will be called no more than this number of times, and the queue can always add a new element without blocking.
On the other hand, a lock can occur when the hasNext method is called in the case of an empty queue. However, there is nothing wrong with this either: the hasNext method is not part of the Subscriber interface contract, it is defined in the Iterator interface, which, as we already explained, is a blocking data structure. When calling the next method, we read the next element from the queue, and when its size becomes less than a certain threshold, we will have to request the next batch of elements through a call to the request method.
Figure 7. Asynchronous interaction with an external service using the Reactive Streams approachOf course, in this case we will not completely get rid of blocking calls. This is caused by a paradigm mismatch between Reactive streams, which assume a completely asynchronous interaction, and an iterator, which must return true or false to a call to the hasNext method. However, unlike synchronous interaction with an external service, downtime due to locks can be significantly reduced by increasing the overall load on the processor cores.
It would be convenient if Apache Spark developers in future versions implemented an analogue of the mapPartitions method that works with Publisher and Subscriber. This would allow a fully asynchronous interaction to be implemented, thus eliminating the possibility of blocking threads.
Akka-streams and akka-http as implementation of Reactive Streams specification
Currently there are already more than a dozen implementations of the Reactive Streams specification. One of these implementations is the akka-streams module from the
akka library. In the JVM world, akka has established itself as one of the most effective tools for writing parallel and distributed systems. This is achieved due to the fact that the basic principle laid down in its foundation is the
model of actors , which allows you to write highly competitive applications without direct control of streams and their pools.
A fairly large amount of literature has been written about the implementation of the concept of actors in akka, so we will not dwell on this here (the
akka official website is a very good source of information, I also recommend the book
akka in action ). Here we take a closer look at the technological side of implementation under the JVM.
In general, actors do not exist by themselves, but form a hierarchical system. In order to create a system of actors, it is necessary to allocate resources for it, so the first step when working with akka is to create an instance of the ActorSystem object. When you start ActorSystem, a separate thread pool is created, called the dispatcher, in which all the code defined in the actors is executed. As a rule, a single thread executes the code of the set of actors, however, if necessary, you can configure a separate dispatcher for a specific group of actors (for example, for actors that interact directly with the blocking API).
One of the most common tasks solved with the use of actors is the sequential processing of data streams. Previously, for this, you had to manually build chains of actors and ensure that there were no bottlenecks between them (for example, if one actor processes messages faster than the next, then it may overflow the incoming message queue, leading to an OutOfMemoryError error).
Starting from version 2.4, the akka-streams module was added to akka, which allows you to declaratively define the data processing process, and then create the necessary actors for its execution. Akka-streams also implemented the principle of backpressure, which eliminates the possibility of overflowing the queue of incoming messages for all actors involved in the processing.
The main elements for determining the processing flow in akka-streams are Source, Flow and Sink. By combining them together we get a graph of operations (Runnable Graph). To start the processing process, a materializer is used, which creates the actors working in accordance with the graph defined by us (the Materializer interface and its implementation by the ActorMaterializer).
Consider the Source, Flow and Sink stages in more detail. Source defines the source of the data. Akka-streams supports more than a dozen different ways to create sources, including from an iterator:
val featuresSource: Source[Array[Float], NotUsed] = Source.fromIterator { () => featuresIterator }
Source can also be obtained by converting an existing source:
val newSource: Source[String, NotUsed] = source.map(item => transform(item))
If the transformation is a nontrivial operation, it can be represented as a Flow entity. Akka-streams supports many different ways to create Flow. The easiest way is to create from a function:
val someFlow: Flow[String, Int, NotUsed] = Flow.fromFunction((x: String) => x.length)
By combining Source and Flow, we get a new Source.
val newSource: Source[Int, NotUsed] = oldSource.via(someFlow)
Sink is used as the final stage of data processing. As in the case of Source, akka-streams provides more than a dozen different Sink options, for example, Sink.foreach performs a specific operation for each element, Sink.seq collects all elements into a collection, etc.
val printSink: Sink[Any, Future[Done]] = Sink.foreach(println)
Source, Flow and Sink are parameterized by the types of input and / or output elements, respectively. In addition, each stage of processing may have some result of their work. For this, Source, Flow and Sink are also parameterized with an additional type that determines the result of the operation. This type is called the type of materializable value. If the operation does not imply the presence of an additional result of its work, for example, when we define Flow through a function, then the NotUsed type is used as the materializable value.
Combining the necessary Source, Flow and Sink together, we get a RunnableGraph. It is parameterized by a single type that determines the type of value that is obtained as a result of the execution of this graph. If necessary, when combining stages, you can specify the result of the work, which of the stages will be the result of the entire graph of operations. The default is the result of executing the Source stage:
val graph: RunnableGraph[NotUsed] = someSource.to(Sink.foreach(println))
However, if the result of performing the Sink stage is more important for us, then we should clearly indicate this:
val graph: RunnableGraph[Future[Done]] = someSource.toMat(Sink.foreach(println))(Keep.right)
After we have defined the operation graph, we have to start it. To do this, RunnableGraph must call the run method. As a parameter, this method takes an ActorMaterializer object (which can also be located in the implicit scope), which is responsible for creating the actors that will perform the operations. As a rule, the ActorMaterializer is created immediately after the creation of the ActorSystem, tied to its life cycle and uses it to create actors. Consider an example:
In the case of simple combinations, you can do without creating a separate RunnableGraph, but simply connect Source to Sink and start them by calling the runWith method of Source. This method also assumes that the ActorMaterializer object is present in the implicit scope. In addition, in this case, the materialized value defined in Sink will be used. For example, with the following code, we can convert Source to Publisher from the Reactive Streams specification:
val source: Source[Score, NotUsed] = Source.fromIterator(() => sourceIterator).map(item => transform(item)) val publisher: Publisher[Score] = source.runWith(Sink.asPublisher(false))
So, now we have shown how to get Reactive Streams Publisher by creating a Source from the source iterator and performing some transformations on its elements. Now we can connect it with Subscriber, supplying data to the resulting iterator. It remains to consider the last question: how to organize HTTP interaction with an external service.
The akka module includes the
akka-http module, which allows you to organize asynchronous non-blocking interaction via HTTP. In addition, this module is based on akka streams, which allows you to add HTTP interaction as an additional stage in the graph of operations for processing data flow.
To connect to external services, akka-http provides three different interfaces.
- Request-Level API - is the simplest option for the case of single requests to an arbitrary machine. At this level, HTTP connections are managed fully automatically, and in each request it is necessary to transmit the full address of the machine to which the request goes.
- Host-Level API - suitable when we know which port on which machine we will access. In this case, akka-http assumes control of the HTTP connection pool, and in requests it is enough to indicate the relative path to the requested resource.
- Connection-Level API - allows you to gain complete control over the management of HTTP connections, that is, their opening, closing, and distribution of requests over connections.
In our case, the address of the classification service is known to us in advance; therefore, it is necessary to organize HTTP interaction only with this particular machine. Therefore, the Host-Level API is best for us. Now let's look at how to create a pool of HTTP connections when using it:
val httpFlow: Flow[(HttpRequest,Id), (Try[HttpResponse],Id), Http.HostConnectionPool] = Http().cachedHostConnectionPool[Id](hostAddress, portNumber)
When you call Http (). CachedHostConnectionPool [T] (hostAddress, portNumber) in ActorSystem, which is in the implicit scope, resources are allocated to create a pool of connections, but the connections themselves are not established. As a result of this call, Flow is returned, which accepts a pair of an HTTP request and some identification object Id as input. The identification object is needed in order to match the request with the corresponding response because the HTTP call in akka-http is an asynchronous operation, and the order in which the responses arrive does not necessarily correspond to the order in which the requests are sent. Therefore, at the output, Flow gives a pair from the result of the request and the corresponding identification object.
Directly HTTP-connections are established when the launch (materialization) of the graph that includes this Flow occurs. Akka-http is implemented in such a way that no matter how many times the graphs containing httpFlow were materialized, within one ActorSystem there will always be one common pool of HTTP connections that will be used by all the materializations. This allows you to better control the use of network resources and avoid overloading them.
Thus, the life cycle of the pool of HTTP connections is tied to the ActorSystem. As mentioned earlier, the life cycle of the thread pool is also tied to it, in which the operations defined in the actors are performed (or in our case defined as stages of akka-streams and akka-http). Therefore, to achieve maximum efficiency, we need to reuse one instance of ActorSystem within a single JVM process.
Putting this all together: an example of the implementation of interaction with the classification service
So, now we can proceed to consider the process of classifying large amounts of distributed data on Apache Spark using asynchronous interaction with external services. The general scheme of this interaction has already been shown in Figure 7.
Suppose we have defined some source Dataset [Features]. Applying the mapPartitions operation to it, we should get a Dataset, in which each id from the initial set contains some value resulting from the classification (Dataset [Score]). To organize asynchronous processing on executors, we need to wrap the source and result iterators respectively in Publisher and Subscriber from the Reactive streams specification and link them together.
case class Features(id: String, vector: Array[Float]) case class Score(id: String, score: Float) //(1) val batchesRequestCount = config.getInt(“scoreService. batchesRequestCount”)
In this implementation, it is taken into account that the classification service can process the group of feature vectors at once, therefore, the result of the classification after accessing it will also be available immediately for the whole group. Therefore, as a parameter type in Publisher, we have not just a Score, as one would expect, but an Iterable [Score]. Thus, we send to the resultant iterator (which is also Subscriber) the classification results for this group by a single call to the onNext method. This is much more efficient than calling onNext for each element. Now we will analyze this code in more detail.
- We determine the structure of the input and output data. As input, we will have a bundle of some id with a feature-vector, as output, a bundle of identifier with a numeric value resulting from the classification.
- We determine the number of groups that Subscriber will request from Publisher at one time. Since it is assumed that these values will lie in the buffer and wait until they are read from the resulting iterator, this value depends on the amount of memory allocated to the performer.
- Create a Publisher from the source iterator. He will be responsible for interacting with the classification service. The createPublisher function is described below.
- Create a Subscriber that will be the resulting iterator. The code for the IteratorSubscriber class is also shown below.
- We register Subscriber at Publisher.
- We return IteratorSubscriber as a result of the mapPartitions operation.
Now consider the implementation of the createPublisher function.
type Ids = Seq[String]
- - , . httpFlow, .
- : , (batchSize) (parallelismLevel).
- implicit scope ActorSystem, ActorMaterializer httpFlow. Spark-. ActorSystemHolder .
- akka-streams . Source[Features] .
- batchSize .
- HttpRequest . HttpRequest createHttpRequest. createPublisher. feature-, , ( predict). , HTTP-. , HTTP-, HTTP-, URI .
- httpFlow.
- , . flatMapMerge, akka-http Source[ByteString], , . . parallelismLevel , ( ). HTTP-: , , , .
- : . akka ByteString. , ByteString O(1), ByteString . , , . , .
- HTTP- , Stream . , discardEntityBytes , , .
- . akka-http , .
- , Publisher, . , . false Sink.asPublisher , Publisher Subscriber-.
As we noted in the previous section, to work with akka, you need an ActorSystem, which needs to be created once and then reused. Unfortunately, we are not able to call the global environment of the artist's Spark, but we can resort to standard methods for creating global objects. Since Spark Executive is a separate JVM process, therefore, within it we can create a global object in which we will store the ActorSystem and the ActorMatrializer and httpFlow using it. object ActorSystemHolder { implicit lazy val actorSystem: ActorSystem = {
- We create all global variables using lazy initialization, that is, in fact they will be created when they are claimed for the first time.
- Here a new ActorSystem with a specific name is created.
- In order to correctly terminate all the processes running within the ActorSystem, we must call its terminate method, which, in turn, will stop all actors using their standard stop mechanism. To do this, we need to register a hook that is called when the JVM process terminates.
- Next, we create ActorMaterializer, which will run the execution of akka-streams processes using our ActorSystem.
- Finally, we create httpFlow to interact with an external service. As mentioned in the previous section, here we allocate resources for the pool of HTTP connections within the ActorSystem.
Now consider the implementation of the resulting iterator as a Subscriber in our HTTP interaction process. sealed trait QueueItem[+T] case class Item[+T](item: T) extends QueueItem[T] case object Done extends QueueItem[Nothing] case class Failure(cause: Throwable) extends QueueItem[Nothing] //(1) class StreamErrorCompletionException(cause: Throwable) extends Exception(cause) //(2) class IteratorSubscriber[T](requestSize: Int) extends Subscriber[Iterable[T]] with Iterator[T] {
The IteratorSubscriber class is an implementation of the Producer-Consumer model. The part that implements the Subscriber interface is Producer, and the part that implements Iterator is Consumer. The buffer used in the form of a blocking queue is used as a means of communication. Methods from the Iterator interface are called in the stream from the Apache Spark executor pool, and Subscriber interface methods are called in the pool belonging to ActorSystem.Now let's take a look at the implementation code for IteratorSubscriber in more detail.- To begin with, we define an algebraic data type for buffer elements. In the buffer, we can have either the next group of elements, or a sign of the successful completion of Done, or a sign of unsuccessful completion, containing Throwable, which caused the error.
- , hasNext .
- , , Publisher-.
- , . LinkedBlockingQueue, . , .
- , . , , Publisher-. , , Publisher- . hasNext next ( requestNextBatches hasNext), , .
- subscriptionPromise subscription Subscription, Publisher onSubscribe. , Reactive Streams Subscriber- Publisher- , , hasNext , onSubscribe. , subscription, Publisher-. lazy subscription, Promise.
- . hasNext next, , .
- , , hasNext false . hasNext, .
- onSubscribe Publisher- Subscription Promise, subscription.
- onNext Publisher-, . .
- Publisher onComplete, Done.
- Publisher onError. .
- hasNext , . , true, . , .
- , false.
- , , requestSize, Publisher. , , , Publisher- , HTTP- .
- . , , , . , , ( , , subscription), , , , .
- , currentIterator. , . , hasNext , ( , ), .
- , false hasNext. , isDone, , . - , hasNext , false. , hasNext , false , . , .
- , , , .
- The next method returns the next item from the current iterator. According to the semantics of its call, before that the caller must call the hasNext method, therefore, when calling next in the current iterator, there must always be the next element.
- Here we send a signal to the Publisher that we are ready to process the next group of results using the subscription object that we received when registering with the Publisher. The number of groups is determined by the requestSize value. We also increase the number of expected items by this amount.
Thus, the general scheme of complete processing for a data block, launched on the artist, looks like this:Figure 8. Interaction of actors with the source and result iterators.In conclusion: the advantages and disadvantages of this solution
The main advantage of this scheme is that it allows you to work with machine learning models implemented using any available means. This is achieved due to the fact that the HTTP protocol is used to access the model, which is a standard means of communication between applications. Due to this, the implementation of the model is not tied to its interface.Another advantage is that this scheme allows horizontal scaling of all its elements. Depending on what is the most loaded part, we can add machines either to the Hadoop cluster, or to run additional model instances. As a result, this scheme is fault tolerant, as in case of problems with any of the machines, we can easily replace it. This is achieved due to the fact that the data stored on the hdfs is replicated, and the classification service does not depend on some general changeable state, therefore, it can be deployed in several instances., . , akka-http , . , -, - Apache Spark , , , -.
Finally, using the settings, this scheme allows you to almost completely avoid downtime. By choosing the size of the group being sent for classification, the number of instances of the classification service and the size of the pool of http connections, you can maximize the load on computing power on both the service side and the cluster side.One of the main disadvantages of this scheme is its relative complexity, caused by the separation of components and the need to organize the interaction between them. In addition, part of the computing power will be used to ensure communication, which will slightly reduce the efficiency. It is also possible the occurrence of additional errors associated with communication. As a result, there is a need for additional settings to improve the efficiency of interaction.To exclude network interaction, one could consider the possibility of deploying instances of the service on the same machines that the data is stored. But, as a rule, the number of machines in the Hadoop cluster is quite large, so it would be unprofitable to deploy an instance of the model on each of them, especially in the case of large models., , Hadoop- , , .
, ,
CleverDATA . . , , , , , . , .