Implementing SCD type2 using pyspark

I was trying to implement SCD type 2 using pyspark and insert data into Teradata . I was able to generate the data frame which has both old history record (which is already present in the database ) and new fresh records , but when I do spark.overwritewith truncatemode = true of that dataframe ,I could see that old history data from that dataframe are not getting inserted , only the new fresh records are inserted .For example below is one sample table and the dataframe . So in the table if a employee is promoted , then we have to give a entry for the new role as well as we have to maintain the old details for that employee . Like Ray is promoted to Manager from team member , then the final table should have 2 entries , team member with current record ind 0 , and new designation with current ind 1 . So to implement that I finally created a dataframe which has both the entries , but when I try to do spark.overwrite with truncate mode = 'true' into teradata database , we are seeing only new records are getting inserted , but the record with current ind = 0 is not getting inserted .One more thing I am generating the id (PK) column in dataframe itself with new id before inserting.


|id(PK)| emp_id | name | designation | current record ind
| 1    |101     |Ray   | team member | 1
| 2    |102     |John  | team member | 1


|id    | emp_id | name | designation | current record ind
| 3    |101     |Ray   | Manager     | 1
| 4    |102     |John  | team member  | 1
| 5    |101     | Ray  | team member | 0

Read more here:

Content Attribution

This content was originally published by sankha87 at Recent Questions - Stack Overflow, and is syndicated here via their RSS feed. You can read the original post over there.

%d bloggers like this: