
In this post, we will look at access to the Spark API from various programming languages in the JVM, as well as some performance issues when going beyond the Scala language. Even if you work outside the JVM, this section may be useful, since non-JVM languages often depend on the Java API, rather than on the Scala API.
Working in other programming languages does not always mean the need to go beyond the JVM, and working in the JVM has many advantages in terms of performance - mainly due to the fact that you do not need to copy data. Although it is not necessary to use special binding libraries or adapters to access Spark from a non-Scala language, it can be difficult to call Scala code from other programming languages. The Spark framework supports the use of Java 8 in lambda expressions, and those who use older versions of the JDK can implement the corresponding interface from the org.apache.spark.api.java.function package. Even in cases where it is not necessary to copy the data, working in another programming language may have small but important nuances related to performance.
Particularly vividly, the difficulties with accessing various Scala APIs manifest themselves when calling functions with class tags or using properties provided by using implicit type conversions (for example, all the functionality of the RDD sets related to the Double and Tuple classes). For mechanisms that depend on implicit type conversions, equivalent concrete classes are often provided along with explicit conversions to them. Functions that depend on class tags can be passed to dummy class tags (say, AnyRef), and often adapters do this automatically. Using specific classes instead of implicit type conversion usually does not lead to additional overhead, but dummy class tags can impose restrictions on some compiler optimizations.
The Java API is not very different from the Scala API in terms of properties, only occasionally some functionality or developer APIs are missing. Other JVM programming languages, such as Clojure with
Flambo DSL and
sparkling libraries, are
supported using various Java APIs instead of directly calling the Scala API. Since most language bindings, even non-JVM languages like Python and R, go through the
Java API, it will be useful to deal with it.
The Java API is very similar to the Scala API, although it does not depend on class tags and implicit conversions. The absence of the latter means that instead of automatically converting RDD sets of Tuple or double objects into special classes with additional functions, you must use functions of explicit type conversion (for example, mapToDouble or mapToPair). These functions are defined only for the Java RDD sets; Fortunately for compatibility, these special types are simply adapters for the Scala RDD sets. In addition, these special functions return various data types, such as JavaDoubleRDD and JavaPairRDD, with the capabilities provided by implicit Scala language transforms.
Turn again to the canonical pattern of word counting, using the Java API (Example 7.1). Since calling the Scala API from Java can sometimes be difficult, the Java API of the Spark framework is almost all implemented in Scala with hidden class tags and implicit conversions. Because of this, Java adapters are a very thin layer, on average consisting of only a few lines of code, and rewriting them is practically effortless.
Example 7.1. Word counting (java)
import scala.Tuple2; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD import org.apache.spark.api.java.JavaSparkContext; import java.util.regex.Pattern; import java.util.Arrays; public final class WordCount { private static final Pattern pattern = Pattern.compile(" "); public static void main(String[] args) throws Exception { JavaSparkContext jsc = new JavaSparkContext(); JavaRDD<String> lines = jsc.textFile(args[0]); JavaRDD<String> words = lines.flatMap(e -> Arrays.asList( pattern.split(e)).iterator()); JavaPairRDD<String, Integer> wordsIntial = words.mapToPair( e -> new Tuple2<String, Integer>(e, 1)); } }
Sometimes it may be necessary to convert the RDD Java sets to the RDD Scala sets or vice versa. Most often this is needed for libraries that require RDD Scala at the input or return sets, but sometimes the basic Spark properties may not yet be available in the Java API. Converting an RDD Java set to an RDD Scala set is the easiest way to use these new properties.
If you need to transfer the RDD Java set to the Scala library, which is waiting for a regular RDD Spark input, you can access the underlying RDD Scala using the rdd () method. Most often this is enough to transfer the final RDD to any desired Scala library; Among the notable exceptions are the Scala libraries, which rely on implicit type conversion for the contents of sets or class tag information in their work. In this case, the simplest way to address implicit conversions is to write a small adapter on Scala. If you cannot use Scala wrappers, you can call the corresponding class function of
JavaConverters and form a dummy class tag.
To create a dummy class tag, you can use the scala.reflect.ClassTag $ .MODULE $ .AnyRef () method or get a real one using scala.reflect.ClassTag $ .MODULE $ .apply (CLASS), as shown in examples 7.2 and 7.3.
For converting from RDD Scala to RDD Java, class tag information is often more important than for most Spark libraries. The reason is that, although the various JavaRDD classes provide public constructors that take RDD Scala as arguments, they are intended to be called from Scala code, and therefore require information about the class tag.
Dummy class tags are most often used in generic or template code, where the exact types are unknown at the time of compilation. Such tags are often sufficient, although there is a possibility of losing some nuances on the side of the Scala code; in very rare cases, Scala code requires precise information about the class tag. In this case, you will have to use this tag. In most cases, this requires little more effort and improves performance, so try to use these tags wherever possible.
Example 7.2. Ensuring Java / Scala RDD Compatibility with a Dummy Class Tag
public static JavaPairRDD wrapPairRDDFakeCt( RDD<Tuple2<String, Object>> RDD) { // AnyRef — // , // , // // ClassTag<Object> fake = ClassTag$.MODULE$.AnyRef(); return new JavaPairRDD(rdd, fake, fake); }
Example 7.3. RDD Java / Scala Compatibility
public static JavaPairRDD wrapPairRDD( RDD<Tuple2<String, Object>> RDD) { // ClassTag<String> strCt = ClassTag$.MODULE$.apply(String.class); ClassTag<Long> longCt = ClassTag$.MODULE$.apply(scala.Long.class); return new JavaPairRDD(rdd, strCt, longCt); }
Both the Spark SQL and ML pipeline APIs were for the most part made uniform in Java and Scala. However, there are support functions for Java, and functions of the Scala language equivalent to them are not easy to call. Here are their examples: various numeric functions, such as plus, minus, etc., for the class Column. Calling their overloaded equivalents from Scala (+, -) is difficult. Instead of using JavaDataFrame and JavaSQLContext, the necessary Java methods are made available in SQLContext and regular DataFrame sets. This may confuse you, because some of the methods mentioned in the Java documentation cannot be used from Java code, but in such cases functions with similar names are provided for calling from Java.
User-defined functions (UDF) in the Java language, and for that matter, in most other languages besides Scala, you need to specify the type of value returned by the function, since it cannot be inferred, just like it is done in Scala (example 7.4) .
Example 7.4. Java UDF Sample
sqlContext.udf() .register("strlen", (String s) -> s.length(), DataTypes.StringType);
Although the types required for the Scala and Java APIs are different, wrapping the collection types of the Java language does not require additional copying. In the case of iterators, the type conversion required by the adapter is deferred as elements are accessed, which allows Spark to reset the data if necessary (as discussed in the section “Iterator-iterator transformations” using the mapPartitions function ”on page 121). This is very important, since for many simple operations the cost of copying data may be higher than the cost of the calculation itself.
Beyond Scala and JVM
If you don’t limit yourself to JVM, then the number of programming languages available for work increases dramatically. However, with the current Spark architecture, working outside the JVM — especially on worker nodes — can result in significant cost increases due to copying data in the working nodes between the JVM and the code in the target language. With complex operations, the share of data copying costs is relatively small, but with simple operations, it can easily lead to a doubling of total computational costs.
The first non-JVM programming language directly supported outside of Spark is Python, its API and interface have become a model on which implementations for the remaining non-JVM programming languages are based.
How PySpark Works
PySpark connects to JVM Spark using a mixture of channels on workers and Py4J - a specialized library that provides Python / Java interaction - on the driver. Under this, at first glance, simple architecture hides a lot of complex nuances, thanks to which PySpark works, as shown in fig. 7.1. One of the main problems is that even when the data is copied from a Python employee to a JVM, they are not in a form that the virtual machine can easily parse. Special efforts are needed on the side of both the Python and Java workers to ensure that the JVM has enough information for operations such as partitioning.
RDD PySpark Sets
The resource costs for transferring data to and from the JVM, as well as running the Python executor, are significant. You can avoid many performance problems with the RDD PySpark API set using the DataFrame / Dataset API, because the data remains in the JVM for as long as possible.
Copying data from the JVM to Python is done using sockets and serialized bytes. A more general version for interacting with programs in other languages is available through the PipedRDD interface, the use of which is shown in the “Using pipe” subsection.
The organization of channels for data exchange (in two directions) for each conversion would be too expensive. As a result, PySpark organizes (if possible) the Python transformation pipeline inside the Python interpreter, chaining the filter operation, and after it the map, on the Python objects iterator using the specialized PipelinedRDD class. Even when you need to shuffle the data and PySpark is not able to link the transformations in a chain in the individual employee's virtual machine, you can reuse the Python interpreter so that the cost of starting the interpreter will not lead to further slowing down the work.
This is only part of the puzzle. Regular PipedRDDs work with the String type, which is not so easy to shuffle due to the lack of a natural key. In PySpark, and in its image and likeness in libraries binding to many other programming languages, the special type PairwiseRDD is used, where the key is a long integer, and its deserialization is performed by the user code in the Scala language, which is used to parse Python values. The cost of this deserialization is not too high, but it demonstrates that Scala in the Spark framework basically considers the results of the work of the Python code as "opaque" byte arrays.
For all its simplicity, this approach to integration works surprisingly well, and in Python most of the operations on RDD Scala sets are available. In some of the most difficult places in the code, libraries are accessed, for example MLlib, as well as loading / saving data from various sources.
Working with various data formats also imposes its limitations, since much of the code for loading / saving data from the Spark framework is based on Hadoop's Java interfaces. This means that all loaded data is first loaded into the JVM, and only then transferred to Python.
Two approaches are usually used to interact with MLlib: either PySpark uses a specialized data type with Scala type conversions, or the algorithm is re-implemented in Python. These problems can be avoided by using the Spark ML package, which uses the DataFrame / Dataset interface, which usually stores data in the JVM.
The DataSprame and Dataset PySpark Packages
The DataFrame and Dataset sets are deprived of many of the performance problems with the RDD Python API sets because they store data in the JVM for as long as possible. The same performance test that we conducted to illustrate the superiority of the DataFrame sets over the RDD sets (see Figure 3.1) shows significant differences when running in Python (Figure 7.2).
With many operations on the DataFrame and Dataset sets, it may not be necessary to transfer data from the JVM at all, although using different UDF, UDAF and lambda expressions of the Python language naturally requires moving part of the data to the JVM. This leads to the following simplified diagram for many operations, looking like it is shown in fig. 7.3.
Access to the underlying Java objects and Scala mixed code
An important consequence of the PySpark architecture is that many of the Python classes in the Spark framework are actually adapters used to translate calls from Python code into a clear JVM form.
If you collaborate with Scala / Java developers and want to interact with their code, then there will be no adapters in advance for accessing your code, but you can register your Java / Scala UDF and use them from Python code. Starting with Spark 2.1, this can be done using the registerJavaFunction method of the sqlContext object.
Sometimes these adapters do not have all the necessary mechanisms, and since Python lacks strict protection against accessing private methods, you can immediately turn to the JVM. The same technique will allow you to access your own code in the JVM and with little effort convert the results back to Python objects.
In the subsection "Large query plans and iterative algorithms" on p. 91, we noted the importance of using the JVM version of the DataFrame and RDD sets in order to shorten the query plan. This is a workaround, because when query plans become too large to be processed by the Spark SQL optimizer, the SQL optimizer loses the opportunity to look beyond the moment the data appears in the RDD because of putting the RDD set in the middle. The same can be achieved with the help of public Python APIs, however, many benefits of the DataFrame sets will be lost, because all the data will have to go back and forth through the Python work nodes. Instead, you can reduce the origin graph by continuing to store data in the JVM (as shown in Example 7.5).
Example 7.5. Truncating a large query plan for a DataFrame set using Python
def cutLineage(df): """ DataFrame — .. : >>> df = RDD.toDF() >>> cutDf = cutLineage(df) >>> cutDf.count() 3 """ jRDD = df._jdf.toJavaRDD() jSchema = df._jdf.schema() jRDD.cache() sqlCtx = df.sql_ctx try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema) newDF = DataFrame(newJavaDF, sqlCtx) return newDF
In general, by convention, the syntax _j [short_name] is used to access the internal Java versions of most Python objects. For example, a SparkContext object has _jsc, which allows you to get an internal SparkContext Java object. This is only possible in the driver program, so when you send PySpark objects to the working nodes you will not be able to access the internal Java component and most of the API will not work.
To access the Spark class in the JVM, which does not have a Python adapter, you can use the Py4J gateway on the driver. The SparkContext object contains a link to the gateway in the _gateway property. The sc._gateway.jvm. [Full_class_name_to_JVM] syntax will allow access to any Java object.
A similar technique will work for your own Scala classes, if they are arranged in accordance with the class path. You can add JAR files to the class path using the spark-submit command with the --jars parameter or by setting the spark.driver.extraClassPath configuration properties. Example 7.6, which helped generate rice. 7.2, is deliberately designed to generate performance test data using existing Scala code.
Example 7.6. Calling non-Spark-JVM classes with Py4J
sc = sqlCtx._sc # SQL Context, 2.1, 2.0 , # 2.0, — , :p try: try: javaSqlCtx = sqlCtx._jsqlContext except: javaSqlCtx = sqlCtx._ssql_ctx except: javaSqlCtx = sqlCtx._jwrapped jsc = sc._jsc scalasc = jsc.sc() gateway = sc._gateway # java-, RDD JVM- # Row (Int, Double). RDD Python # RDD Java ( Row), # , . # Java-RDD Row — # DataFrame, # RDD Row. java_rdd = (gateway.jvm.com.highperformancespark.examples. tools.GenerateScalingData. generateMiniScaleRows(scalasc, rows, numCols)) # JSON . # Python- Java-. schema = StructType([ StructField("zip", IntegerType()), StructField("fuzzyness", DoubleType())]) # 2.1 / 2.1 try: jschema = javaSqlCtx.parseDataType(schema.json()) except: jschema = sqlCtx._jsparkSession.parseDataType(schema.json()) # RDD (Java) DataFrame (Java) java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema) # DataFrame (Java) DataFrame (Python) python_dataframe = DataFrame(java_dataframe, sqlCtx) # DataFrame (Python) RDD pairRDD = python_dataframe.rdd.map(lambda row: (row[0], row[1])) return (python_dataframe, pairRDD)
Although many Python classes are simply adapters of Java objects, not all Java objects can be wrapped into Python objects and then used in Spark. For example, objects in RDD PySpark sets are represented as serialized strings, which can easily be parsed only in Python code. Fortunately, DataFrame objects are standardized between different programming languages, so if you can convert your data into DataFrame sets, you can then wrap them into Python objects and either directly use the Python DataFrame or convert the Python DataFrame to RDD of this same language.
»More information about the book can be found on
the publisher site.»
Table of Contents»
ExcerptFor Habrozhiteley 20% discount coupon -
Spark