Experiment with specifying a custom serializer for Spark to use

I haven't been able to make this work, and I cannot quite figure out where the
pieces are coming together for serializing Ruby objects.

When trying my example code, there's an exception thrown, but my expected log
lines do not get printed to indicate that the RubySerializerInstance is about to
process a Ruby object
This commit is contained in:
R Tyler Croy 2019-05-19 21:44:29 -07:00
parent 57e40d276e
commit bc8cce050a
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
7 changed files with 112 additions and 5 deletions

3
.rspec
View File

@ -1,2 +1,5 @@
--color
--format doc
--fail-fast
--order random

View File

@ -1,4 +1,5 @@
plugins {
id 'idea'
id 'java'
id 'com.github.jruby-gradle.base' version '1.7.0'
id 'com.github.jruby-gradle.jar' version '1.7.0'
@ -34,6 +35,11 @@ jrubyJar {
task spec(type: JRubyExec) {
group 'Redspark'
description 'Execute RSpec against spec/**/*.rb'
dependsOn 'compileJava'
classpath "${buildDir}/classes/java/main", configurations.compile.asPath
configuration 'rspec'
script 'rspec'
inputs.dir 'spec'
}

2
gradle.properties Normal file
View File

@ -0,0 +1,2 @@
maxTestForks = 1
testForkEvery = 0

View File

@ -1,9 +1,7 @@
require 'rspec'
java_import 'org.apache.spark.sql.SparkSession'
#
# Inspired by test code found here:
# http://codewicca.org/the-inevitable-task-not-serializable-sparkexception
@ -12,7 +10,7 @@ describe 'Serializing Ruby for Spark' do
let(:spark) do
SparkSession
.builder
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
.config('spark.serializer', 'com.github.jrubygradle.redspark.RubySerializer')
.master('local[*]')
.appName('rspec')
.getOrCreate
@ -24,9 +22,8 @@ describe 'Serializing Ruby for Spark' do
it 'should serialize and execute properly' do
data = spark.read.textFile(__FILE__).cache()
expect {
data.filter { |line| line.contains('a') }.count
data.filter { |line| line.contains('a').to_java }.count
}.not_to raise_error
end
end

View File

@ -0,0 +1,24 @@
package com.github.jrubygradle.redspark;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.spark.serializer.JavaSerializationStream;
import org.apache.spark.serializer.SerializationStream;
import scala.reflect.ClassTag;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.logging.Logger;
public class RubySerializationStream extends JavaSerializationStream {
private static final Logger log = Logger.getLogger( RubySerializationStream.class.getName() );
public RubySerializationStream(OutputStream out) {
super(out, 1000, true);
}
public <T> SerializationStream writeObject(T t, ClassTag<T> evidence) {
log.info("serializing writeObject");
log.info(t.getClass().toString());
return super.writeObject(t, evidence);
}
}

View File

@ -0,0 +1,30 @@
package com.github.jrubygradle.redspark;
import java.io.Serializable;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
/**
* RubySerializer implements ruby object serializer for writing Ruby spark jobs.
*
* It should be configured at job submit time, e.g.
*
* java_import 'org.apache.spark.sql.SparkSession'
* SparkSession
* .builder
* .config('spark.serializer', 'com.github.jrubygradle.redspark.RubySerializer')
* .master('local[*]')
* .getOrCreate
*
*/
public class RubySerializer extends Serializer {
/**
* Return the {@code SerializerInstance} required by the {@code Serializer}
* abstract class
*/
public SerializerInstance newInstance() {
return new RubySerializerInstance(1000,
true,
Thread.currentThread().getContextClassLoader());
}
}

View File

@ -0,0 +1,45 @@
package com.github.jrubygradle.redspark;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.logging.Logger;
import org.apache.spark.serializer.JavaSerializerInstance;
import scala.reflect.ClassTag;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.serializer.DeserializationStream;
import org.apache.spark.serializer.SerializationStream;
public class RubySerializerInstance<T> extends JavaSerializerInstance {
private static final Logger log = Logger.getLogger( RubySerializerInstance.class.getName() );
public RubySerializerInstance(int counterReset, boolean extraDebugInfo, ClassLoader loader) {
super(counterReset, true, loader);
}
public <T> T deserialize(ByteBuffer bytes, ClassLoader loader, ClassTag<T> evidence) {
log.info("deserialize with loader");
return super.deserialize(bytes, loader, evidence);
}
public <T> T deserialize(ByteBuffer bytes, ClassTag<T> evidence) {
log.info("deserialize");
return super.deserialize(bytes, evidence);
}
public <T> ByteBuffer serialize(T t, ClassTag<T> evidence) {
log.info("serialize");
return super.serialize(t, evidence);
}
public DeserializationStream deserializeStream(InputStream stream) {
log.info("deserialize stream");
return super.deserializeStream(stream);
}
public SerializationStream serializeStream(OutputStream stream) {
return new RubySerializationStream(stream);
}
}