马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
在Azure Databricks中使用PySpark实现迟钝变革维度(SCD)的三种核心范例,需联合Spark SQL和DataFrame API的特性,并使用Delta Lake的事务支持。以下是具体筹划与实现步骤,以及测试用例:
通过以下步骤,可在Azure Databricks中高效实现SCD逻辑,确保数据汗青可追溯且符合业务需求。
范例1:覆盖旧值(Overwrite Old Value)
筹划要点
- 直接更新目标表中变革的字段,不生存汗青记录。
- 使用MERGE INTO操纵(Delta Lake特性)实现高效更新。
实现步骤
- 读取数据:加载源数据(增量数据)和维度表(目标表)。
- 匹配键:根据业务键(如customer_id)匹配源数据和目标表。
- 更新记录:若匹配乐成且属性变革,更新目标表字段。
- 写入结果:使用MERGE操纵完成更新。
代码示例
- from pyspark.sql import SparkSession
- spark = SparkSession.builder.getOrCreate()
- # 加载源数据和目标表
- source_df = spark.read.format("delta").load("source_path")
- target_df = spark.read.format("delta").load("target_path")
- # 创建临时视图
- source_df.createOrReplaceTempView("source")
- target_df.createOrReplaceTempView("target")
- # 执行MERGE操作
- merge_sql = """
- MERGE INTO target AS t
- USING source AS s
- ON t.customer_id = s.customer_id
- WHEN MATCHED AND t.address <> s.address THEN
- UPDATE SET t.address = s.address
- WHEN NOT MATCHED THEN
- INSERT (customer_id, name, address)
- VALUES (s.customer_id, s.name, s.address)
- """
- spark.sql(merge_sql)
复制代码 测试用例
- 场景1:客户所在变更,验证目标表中旧所在被覆盖。
- 场景2:新增客户插入,验证记录精确添加。
- 场景3:无变革数据,确认无操纵实行。
范例2:创建新记录(SCD Type 2)
筹划要点
- 为每个变革生成新记录,维护start_date、end_date和is_current标记。
- 使用窗口函数检测变革,MERGE操纵插入新记录并更新旧记录状态。
实现步骤
- 添加版本标识:在目标表中增长start_date、end_date和is_current字段。
- 检测变革:对比源数据与目标表,标记变革的记录。
- 关闭旧记录:将变革记录的end_date设为当前日期,is_current设为False。
- 插入新记录:为新记录设置start_date为当前时间,end_date为NULL,is_current为True。
代码示例
- from pyspark.sql import functions as F
- from pyspark.sql.window import Window
- # 加载数据
- source = spark.read.format("delta").load("source_path")
- target = spark.read.format("delta").load("target_path")
- # 检测变化:假设通过last_updated_time判断
- changes = source.join(target, "product_id", "left_outer") \
- .filter(
- (source.price != target.price) |
- target.product_id.isNull()
- )
- # 关闭旧记录
- closed_records = target.join(changes, "product_id", "inner") \
- .withColumn("end_date", F.current_date()) \
- .withColumn("is_current", F.lit(False)) \
- .select(target["*"], "end_date", "is_current")
- # 生成新记录(新增或变化)
- new_records = changes.withColumn("start_date", F.current_date()) \
- .withColumn("end_date", F.lit(None).cast("date")) \
- .withColumn("is_current", F.lit(True)) \
- .select("product_id", "price", "start_date", "end_date", "is_current")
- # 合并并写入Delta表
- final_df = closed_records.unionByName(new_records)
- final_df.write.format("delta") \
- .mode("overwrite") \
- .saveAsTable("product_dimension")
复制代码 测试用例
- 场景1:产物代价初次插入,验证is_current=True且end_date=NULL。
- 场景2:代价变更后,旧记录is_current=False,新记录时间连续。
- 场景3:多次变更,查抄汗青版本数目及时间连续性。
范例3:添加有用日期(SCD Type 3)
筹划要点
- 为每个记录维护valid_from和valid_to日期,仅生存有限汗青(如最新一次变更)。
- 更新时修改旧记录的valid_to,插入新记录。
实现步骤
- 初始化字段:在目标表中添加valid_from和valid_to。
- 检测变革:对比新旧数据,找出属性变革的记录。
- 关闭旧记录:将旧记录valid_to设为当前日期。
- 插入新记录:设置新记录的valid_from为当前日期,valid_to为NULL。
代码示例
- # 加载数据
- employee_source = spark.read.format("delta").load("source_path")
- employee_target = spark.read.format("delta").load("target_path")
- # 检测职位变化
- changes = employee_source.join(employee_target, "employee_id", "left") \
- .filter(employee_source.position != employee_target.position)
- # 关闭旧记录(设置valid_to)
- closed = changes.select(employee_target["*"]) \
- .withColumn("valid_to", F.current_date())
- # 插入新记录
- new_records = changes.select(employee_source["*"]) \
- .withColumn("valid_from", F.current_date()) \
- .withColumn("valid_to", F.lit(None).cast("date"))
- # 合并数据并写入
- closed.unionByName(new_records) \
- .write.format("delta") \
- .mode("overwrite") \
- .saveAsTable("employee_dimension")
复制代码 测试用例
- 场景1:员工职位初次记录,valid_to为空。
- 场景2:职位变更后,旧记录valid_to更新,新记录valid_from精确。
- 场景3:查询特定时间点的职位状态,验证时间段正确性。
混合方法实现示例
联合范例1和范例2,对关键属性使用范例2,非关键属性使用范例1:- # 关键属性(如地址)使用类型2
- address_changes = detect_address_changes(source, target)
- close_old_address_records(address_changes)
- insert_new_address_records(address_changes)
- # 非关键属性(如电话)使用类型1
- phone_updates = detect_phone_changes(source, target)
- update_phone_records(phone_updates)
复制代码 关键优化点
- Delta Lake特性:使用MERGE INTO、ACID事务、Z-Order优化查询性能。
- 数据版本管理:使用DESCRIBE HISTORY检察SCD操纵汗青。
- 增量处理:仅处理变革的数据分区,淘汰盘算量。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |