We're building an Apache Beam (Java SDK) pipeline that writes to Cloud Storage. We're using the
TextIO.write() transform to write to storage. Within this operation, we would like to dynamically change the subdirectory where the file is stored based on the current datetime.
This is part of a streaming pipeline. Ideally, we would like to deploy it and let the Beam job dynamically change the folder subdirectory where it saves the file based on the processing datetime.
Our current pipeline transform looks like this:
DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd"); myPCollection.apply(TextIO.write().to("gs://my-bucket/%s", dtfOut.print(new DateTime()));
The problem with this code is that, the DateTime value returned by the function is stuck at the same value as when the pipeline is deployed to Google Cloud Dataflow. We would like to dynamically change the subdirectory structure based on the datetime at the time of processing an incoming message.
I was going to:
- Get the datetime from a ParDo function.
- Create a new ParDo function and use the message as a main input and pass the datetime from the other ParDo function as side input.
Is this the best approach? Are there built-in tools in Apache Beam that can solve our use-case?