Can I "branch" stream into many and write them in parallel in pyspark?

I am receiving Kafka stream in pyspark. Currently I am grouping it by one set of fields and writing updates to database:

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
        .option("subscribe", topic)

...

df = df \
        .groupBy("myfield1") \
        .agg(
            expr("count(*) as cnt"),
            min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
        ) \
        .select("cnt", "minData.*") \
        .select(
            col("...").alias("..."),
            ...
            col("userId").alias("user_id")

query = df \
        .writeStream \
        .outputMode("update") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df, epoch)) \
        .start()

query.awaitTermination()

Can I take the same chain in the middle and create another grouping like

df2 = df \
        .groupBy("myfield2") \
        .agg(
            expr("count(*) as cnt"),
            min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
        ) \
        .select("cnt", "minData.*") \
        .select(
            col("...").alias("..."),
            ...
            col("userId").alias("user_id")

and write it's ooutput into different place in parallel?

Where to call writeStream and awaitTermination?



Read more here: https://stackoverflow.com/questions/67169239/can-i-branch-stream-into-many-and-write-them-in-parallel-in-pyspark

Content Attribution

This content was originally published by Dims at Recent Questions - Stack Overflow, and is syndicated here via their RSS feed. You can read the original post over there.

%d bloggers like this: