changing namespaces and links to delta-io connectors based repo

This commit is contained in:
Alex Kushnir 2021-03-04 16:45:09 -05:00
parent a409f43dab
commit 22ce8e247e
1 changed files with 18 additions and 8 deletions

View File

@ -16,8 +16,9 @@ of data between HDFS and relational data stores. Our pipelines were using this t
to import MySQL data into HDFS. When Platform Engineering team at Scribd took on a effort
to migrate our on-premise Hadoop workloads to [Databricks Lakehouse Platform](https://databricks.com/product/data-lakehouse)
on AWS we had to write our own tool to import data from MySQL directly into S3 backed [Delta Lake](https://delta.io/).
In this post I will share the details about [sql-delta-import](https://github.com/scribd/sql-delta-import) - an
open-source spark utility to import data from any JDBC compatible database into Delta Lake
In this post I will share the details about `sql-delta-import` - an open-source spark utility to import data from any
JDBC compatible database into Delta Lake. This utility is being open sourced under
[Delta Lake Connectors](https://github.com/delta-io/connectors) project
### Sample import
@ -25,7 +26,7 @@ Importing data into a Delta Lake table is as easy as
```shell script
spark-submit /
--class "com.scribd.importer.spark.ImportRunner" sql-delta-import_2.12-0.1.0-SNAPSHOT.jar /
--class "com.scribd.importer.spark.ImportRunner" sql-delta-import_2.12-0.2.1-SNAPSHOT.jar /
--jdbc-url jdbc:mysql://hostName:port/database /
--source source.table
--destination destination.table
@ -48,7 +49,7 @@ optimize data storage for best performance on reads by just adding a couple of c
spark-submit /
--conf spark.databricks.delta.optimizeWrite.enabled=true /
--conf spark.databricks.delta.autoCompact.enabled=true /
--class "com.scribd.importer.spark.ImportRunner" sql-delta-import_2.12-0.1.0-SNAPSHOT.jar /
--class "com.scribd.importer.spark.ImportRunner" sql-delta-import_2.12-0.2.1-SNAPSHOT.jar /
--jdbc-url jdbc:mysql://hostName:port/database /
--source source.table
--destination destination.table
@ -71,7 +72,7 @@ concurrency thus allowing you to tune those parameters independently
spark-submit --num-executors 15 --executor-cores 4 /
--conf spark.databricks.delta.optimizeWrite.enabled=true /
--conf spark.databricks.delta.autoCompact.enabled=true /
--class "com.scribd.importer.spark.ImportRunner" sql-delta-import_2.12-0.1.0-SNAPSHOT.jar /
--class "com.scribd.importer.spark.ImportRunner" sql-delta-import_2.12-0.2.1-SNAPSHOT.jar /
--jdbc-url jdbc:mysql://hostName:port/database /
--source source.table
--destination destination.table
@ -115,9 +116,11 @@ libraries can be imported into your own project. You can specify custom data tra
more precised control of data type handling
```scala
...
import com.scribd.importer.spark._
import com.scribd.importer.spark.transform.DataTransform._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import io.delta.connectors.spark.JDBC._
implicit val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()
@ -127,6 +130,13 @@ import com.scribd.importer.spark.transform.DataTransform._
val config = ImportConfig(source = "table", destination = "target_database.table", splitBy = "id", chunks = 10)
// a sample transform to convert all timestamp columns to strings
val timeStampsToStrings : DataFrame => DataFrame = source => {
val tsCols = source.schema.fields.filter(_.dataType == DataTypes.TimestampType).map(_.name)
tsCols.foldLeft(source)((df, colName) =>
df.withColumn(colName, from_unixtime(unix_timestamp(col(colName)), "yyyy-MM-dd HH:mm:ss.S")))
}
// Whatever functions are passed to below transform will be applied during import
val transforms = new DataTransform(Seq(
df => df.withColumn("id", col("id").cast(types.StringType)), //custom function to cast id column to string