在Azure Databricks中实现迟钝变革维度(SCD)的三种范例

[复制链接]
发表于 2025-9-22 02:58:48 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
在Azure Databricks中使用PySpark实现迟钝变革维度(SCD)的三种核心范例,需联合Spark SQL和DataFrame API的特性,并使用Delta Lake的事务支持。以下是具体筹划与实现步骤,以及测试用例:
通过以下步骤,可在Azure Databricks中高效实现SCD逻辑,确保数据汗青可追溯且符合业务需求。

范例1:覆盖旧值(Overwrite Old Value)

筹划要点


  • 直接更新目标表中变革的字段,不生存汗青记录。
  • 使用MERGE INTO操纵(Delta Lake特性)实现高效更新。
实现步骤


  • 读取数据:加载源数据(增量数据)和维度表(目标表)。
  • 匹配键:根据业务键(如customer_id)匹配源数据和目标表。
  • 更新记录:若匹配乐成且属性变革,更新目标表字段。
  • 写入结果:使用MERGE操纵完成更新。
代码示例
  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.getOrCreate()
  3. # 加载源数据和目标表
  4. source_df = spark.read.format("delta").load("source_path")
  5. target_df = spark.read.format("delta").load("target_path")
  6. # 创建临时视图
  7. source_df.createOrReplaceTempView("source")
  8. target_df.createOrReplaceTempView("target")
  9. # 执行MERGE操作
  10. merge_sql = """
  11. MERGE INTO target AS t
  12. USING source AS s
  13. ON t.customer_id = s.customer_id
  14. WHEN MATCHED AND t.address <> s.address THEN
  15.   UPDATE SET t.address = s.address
  16. WHEN NOT MATCHED THEN
  17.   INSERT (customer_id, name, address)
  18.   VALUES (s.customer_id, s.name, s.address)
  19. """
  20. spark.sql(merge_sql)
复制代码
测试用例


  • 场景1:客户所在变更,验证目标表中旧所在被覆盖。
  • 场景2:新增客户插入,验证记录精确添加。
  • 场景3:无变革数据,确认无操纵实行。

范例2:创建新记录(SCD Type 2)

筹划要点


  • 为每个变革生成新记录,维护start_date、end_date和is_current标记。
  • 使用窗口函数检测变革,MERGE操纵插入新记录并更新旧记录状态。
实现步骤


  • 添加版本标识:在目标表中增长start_date、end_date和is_current字段。
  • 检测变革:对比源数据与目标表,标记变革的记录。
  • 关闭旧记录:将变革记录的end_date设为当前日期,is_current设为False。
  • 插入新记录:为新记录设置start_date为当前时间,end_date为NULL,is_current为True。
代码示例
  1. from pyspark.sql import functions as F
  2. from pyspark.sql.window import Window
  3. # 加载数据
  4. source = spark.read.format("delta").load("source_path")
  5. target = spark.read.format("delta").load("target_path")
  6. # 检测变化:假设通过last_updated_time判断
  7. changes = source.join(target, "product_id", "left_outer") \
  8.     .filter(
  9.         (source.price != target.price) |
  10.         target.product_id.isNull()
  11.     )
  12. # 关闭旧记录
  13. closed_records = target.join(changes, "product_id", "inner") \
  14.     .withColumn("end_date", F.current_date()) \
  15.     .withColumn("is_current", F.lit(False)) \
  16.     .select(target["*"], "end_date", "is_current")
  17. # 生成新记录(新增或变化)
  18. new_records = changes.withColumn("start_date", F.current_date()) \
  19.     .withColumn("end_date", F.lit(None).cast("date")) \
  20.     .withColumn("is_current", F.lit(True)) \
  21.     .select("product_id", "price", "start_date", "end_date", "is_current")
  22. # 合并并写入Delta表
  23. final_df = closed_records.unionByName(new_records)
  24. final_df.write.format("delta") \
  25.     .mode("overwrite") \
  26.     .saveAsTable("product_dimension")
复制代码
测试用例


  • 场景1:产物代价初次插入,验证is_current=True且end_date=NULL。
  • 场景2:代价变更后,旧记录is_current=False,新记录时间连续。
  • 场景3:多次变更,查抄汗青版本数目及时间连续性。

范例3:添加有用日期(SCD Type 3)

筹划要点


  • 为每个记录维护valid_from和valid_to日期,仅生存有限汗青(如最新一次变更)。
  • 更新时修改旧记录的valid_to,插入新记录。
实现步骤


  • 初始化字段:在目标表中添加valid_from和valid_to。
  • 检测变革:对比新旧数据,找出属性变革的记录。
  • 关闭旧记录:将旧记录valid_to设为当前日期。
  • 插入新记录:设置新记录的valid_from为当前日期,valid_to为NULL。
代码示例
  1. # 加载数据
  2. employee_source = spark.read.format("delta").load("source_path")
  3. employee_target = spark.read.format("delta").load("target_path")
  4. # 检测职位变化
  5. changes = employee_source.join(employee_target, "employee_id", "left") \
  6.     .filter(employee_source.position != employee_target.position)
  7. # 关闭旧记录(设置valid_to)
  8. closed = changes.select(employee_target["*"]) \
  9.     .withColumn("valid_to", F.current_date())
  10. # 插入新记录
  11. new_records = changes.select(employee_source["*"]) \
  12.     .withColumn("valid_from", F.current_date()) \
  13.     .withColumn("valid_to", F.lit(None).cast("date"))
  14. # 合并数据并写入
  15. closed.unionByName(new_records) \
  16.     .write.format("delta") \
  17.     .mode("overwrite") \
  18.     .saveAsTable("employee_dimension")
复制代码
测试用例


  • 场景1:员工职位初次记录,valid_to为空。
  • 场景2:职位变更后,旧记录valid_to更新,新记录valid_from精确。
  • 场景3:查询特定时间点的职位状态,验证时间段正确性。

混合方法实现示例

联合范例1和范例2,对关键属性使用范例2,非关键属性使用范例1:
  1. # 关键属性(如地址)使用类型2
  2. address_changes = detect_address_changes(source, target)
  3. close_old_address_records(address_changes)
  4. insert_new_address_records(address_changes)
  5. # 非关键属性(如电话)使用类型1
  6. phone_updates = detect_phone_changes(source, target)
  7. update_phone_records(phone_updates)
复制代码

关键优化点


  • Delta Lake特性:使用MERGE INTO、ACID事务、Z-Order优化查询性能
  • 数据版本管理:使用DESCRIBE HISTORY检察SCD操纵汗青。
  • 增量处理:仅处理变革的数据分区,淘汰盘算量。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表