Addressing feedback, adding link to delta-io connectors PR.

This commit is contained in:
Alex Kushnir 2021-03-08 13:33:48 -05:00
parent 36b1f23d13
commit 4eab67842a
1 changed files with 19 additions and 18 deletions

View File

@ -18,7 +18,7 @@ to migrate our on-premise Hadoop workloads to [Databricks Lakehouse Platform](ht
on AWS we had to write our own tool to import data from MySQL directly into S3 backed [Delta Lake](https://delta.io/).
In this post I will share the details about `sql-delta-import` - an open-source spark utility to import data from any
JDBC compatible database into Delta Lake. This utility is being open sourced under
[Delta Lake Connectors](https://github.com/delta-io/connectors) project
[Delta Lake Connectors](https://github.com/delta-io/connectors/pull/80) project
### Sample import
@ -87,7 +87,7 @@ but no more than 60 concurrent connections will be used for import since max deg
#### 3.1 `--num-mappers` and data skew just don't play nicely together
When `sqoop` imports data, source table will be split into ranges based on `--split-by` column and each mapper
would import it's corresponding range. This works good when `--split-by` column has a near uniform distribution
would import its corresponding range. This works good when `--split-by` column has a near uniform distribution
of data, but that's not always the case with source tables... As tables age we tend to add additional columns to them to
take on new business requirements so over time data in latest rows has a higher fill rate than earlier rows.
@ -104,7 +104,8 @@ there is an additional column that does not add business value, app developers n
storing it takes up database resources and if we plan to use it for imports it's better be indexed, thus even more
compute and storage resources.
With `sql-delta-import` we can "solve" this problem by making number of chunks much larger than max degree of parallelism.
With `sql-delta-import` we still split source tables into ranges based on `--split-by` column but if there is data
distribution skew we can "solve" this problem by making number of chunks much larger than max degree of parallelism.
This way large chunks with high data density are broken up into smaller pieces that a single executor can handle.
Executors that get chunks with little or no data can just quickly process them and move on to do some real work.
@ -122,30 +123,30 @@ import org.apache.spark.sql.types._
import io.delta.connectors.spark.JDBC._
implicit val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()
implicit val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()
// All additional possible jdbc connector properties described here - https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-configuration-properties.html
val jdbcUrl = "jdbc:mysql://hostName:port/database"
// All additional possible jdbc connector properties described here - https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-configuration-properties.html
val jdbcUrl = "jdbc:mysql://hostName:port/database"
val config = ImportConfig(source = "table", destination = "target_database.table", splitBy = "id", chunks = 10)
val config = ImportConfig(source = "table", destination = "target_database.table", splitBy = "id", chunks = 10)
// a sample transform to convert all timestamp columns to strings
val timeStampsToStrings : DataFrame => DataFrame = source => {
val tsCols = source.schema.fields.filter(_.dataType == DataTypes.TimestampType).map(_.name)
tsCols.foldLeft(source)((df, colName) =>
df.withColumn(colName, from_unixtime(unix_timestamp(col(colName)), "yyyy-MM-dd HH:mm:ss.S")))
val timeStampsToStrings : DataFrame => DataFrame = source => {
val tsCols = source.schema.fields.filter(_.dataType == DataTypes.TimestampType).map(_.name)
tsCols.foldLeft(source)((df, colName) =>
df.withColumn(colName, from_unixtime(unix_timestamp(col(colName)), "yyyy-MM-dd HH:mm:ss.S")))
}
// Whatever functions are passed to below transform will be applied during import
val transforms = new DataTransform(Seq(
df => df.withColumn("id", col("id").cast(types.StringType)), //custom function to cast id column to string
timeStampsToStrings //included transform function converts all Timestamp columns to their string representation
))
// Whatever functions are passed to below transform will be applied during import
val transforms = new DataTransform(Seq(
df => df.withColumn("id", col("id").cast(types.StringType)), //custom function to cast id column to string
timeStampsToStrings //included transform function converts all Timestamp columns to their string representation
))
val importer = new JDBCImport(jdbcUrl = jdbcUrl, importConfig = config, dataTransform = transforms)
val importer = new JDBCImport(jdbcUrl = jdbcUrl, importConfig = config, dataTransform = transforms)
importer.run()
importer.run()
```
---