How to know active workers on python multiprocessing

I have developed a code that creates N number of workers depending on other conf file and variable named: config.threadsTaskApi. This run X workers on parallel and always use this X workers. Each worker has to identify Y limit_WF from database and sent it to an API.

At some point I want to be noticed if there is only 1 worker active, because maybe when I'm deciding the workers by client_id, there is one client_id that has 100.000 registers on db while the others are on 5.000. So the process end up by finishing the others client_id process and only having 1 child process worker active. process() function is called by other .py file.

When this is happenning I want to be noticed and change the behaviour, making X number of workers on the process of this child, to make a parallelism on its work. So, I will always having the parallelism active. Here it is the code:

class BigDHistoric:
def __init__(self):
    print("BigD Historic object created")
    self.config = Config()
def updateBigdQueueJsonBatch(self, client_id):
    batchConn = None
    foundWf = 1
    processed_workfiles = 0
    try:
        print("Processing client_id: "+str(client_id)+"; historic data bigd_queue with process id: "+str(os.getpid()))
        tasksApi = TasksApi()
        token = tasksApi.getTasksApiToken()
        while foundWf == 1:  
            print('1 - CLIENT_ID: '+str(client_id)+" BEFORE FETCH , processed_workfiles: "+str(processed_workfiles))   
            batchConn = psycopg2.connect(user = self.config.rds_data_user, password = self.config.rds_data_password, host = self.config.rds_data_host, port = "5432", database = self.config.rds_data_database)
            cur = batchConn.cursor()
            sqlBatch = "select workfile_id from bigd_queue where sent = 0  and message_code = 1 and client_id = "+str(client_id)+"  order by id  limit "+str(self.config.limit_WF)      
            cur.execute(sqlBatch)
            rowcount = cur.rowcount                
            if rowcount > 0 :
                queue_records_wf = cur.fetchall()
                batchConn.close()
                listWf = []
                for wf in queue_records_wf:
                    listWf.append(wf[0])                  
                data = {"client_id": client_id, "workfile_id":listWf, "use_read_replica":1, "verbose":self.config.verboseTaskApi}
                controller = 'WorkfileStats/Batch/'

                start_time = time.time()  
                r = tasksApi.post(controller, data, token)
                time_response_taskApi = time.time()
                elapsed_time = round(time_response_taskApi - start_time,2)
                
                print("2 - CLIENT_ID: "+str(client_id)+" RESULT: "+str(r.status_code))
                if r.status_code == 404 or r.status_code == 401 or r.status_code == 503 or r.status_code == 504 or r.status_code == 500:
                    self.createErrorLogPost(client_id,"[Cron_bigd.bigd_historic] Error returning status from TaskAPI: "+str(r.status_code)+" client_id: "+str(client_id)+"; workfile_list: "+str(listWf)+"; elapsed_time response: "+str(elapsed_time))
                    time.sleep(35)
                    self.updateBigdQueueJsonBatch(client_id)
                else:
                    processed_workfiles += rowcount
                    print("3 - CLIENT_ID: "+str(client_id)+" processed_workfiles: "+str(processed_workfiles))
                    if(rowcount < self.config.limit_WF):
                        foundWf = 0
                        batchConn.close()
                if(self.config.verboseTaskApi == 1):
                    self.createLogPostMessage(client_id,0,"[Cron_bigd.bigd_historic] returning status from TaskAPI: "+str(r.status_code)+" client_id: "+str(client_id)+"; workfile_list: "+str(listWf)+"; elapsed_time response: "+str(elapsed_time))
            else:
                foundWf = 0
                batchConn.close()   
    except Exception as e:
        print(e)
        raise e
    finally:
        print("PROCESSED client_id: "+str(client_id)+"; historic data bigd_queue with process id: "+str(os.getpid()))
        if(batchConn):
            batchConn.close()

def process(self):
    connBigD = None
    try:
        print("--------------------------------------")
        print("STARTING BIGD_HISTORIC FROM BIGD_QUEUE")
        print("--------------------------------------")
        connBigD = psycopg2.connect(user = self.config.rds_data_user, password = self.config.rds_data_password, host = self.config.rds_data_host, port = "5432", database = self.config.rds_data_database)
        #Comprovem si cal omplir workfiles amb data, són aquells històrics generats per la tasca de ifs wf_send_stats 
        sqlBatch = "select client_id from bigd_queue where message_code = 1 and sent = 0 group by client_id order by client_id"
        curBatch = connBigD.cursor()
        curBatch.execute(sqlBatch)
        rowcount = curBatch.rowcount                
        cliList = []
        if rowcount > 0 :
            queue_records_batch = curBatch.fetchall()  
            for row_client in queue_records_batch:
                cliList.append(row_client[0])
        if(connBigD):
            connBigD.close()

        if(len(cliList) > 0):
            p = Pool(self.config.threadsTaskApi)
            result = p.map_async(
                self.updateBigdQueueJsonBatch,
                cliList,
            )
            result.get()
            p.close()
            p.join()
    except Exception as e:
        print("Error Exception on bigd_historic: "+str(e))
        self.createErrorLogPost(0,"[Cron_bigd.bigd_historic] Error: "+str(e))
        sys.exit(1)
    finally:
        print("--------------------------------------------")
        print("ENDING PROCESS BIGD_HISTORIC FROM BIGD_QUEUE")
        print("--------------------------------------------")
        if(connBigD):
            connBigD.close()


Read more here: https://stackoverflow.com/questions/66999200/how-to-know-active-workers-on-python-multiprocessing

Content Attribution

This content was originally published by Adria Navarro Martinez at Recent Questions - Stack Overflow, and is syndicated here via their RSS feed. You can read the original post over there.

%d bloggers like this: