Warm tip: This article is reproduced from serverfault.com, please click

Fetching data in realtime from database in python

发布于 2020-11-27 14:05:14

I have a class for multiprocessing in Python which creates 3 different processes. First process is for checking if there is any signal from my hardware and pushing it into a Queue, second process is for getting the data out of the Queue and pushing it into a database and the third processes is for getting the data out of the database and pushing it on a server.

obj = QE()
stdFunct = standardFunctions()

watchDogProcess = multiprocessing.Process(target=obj.watchDog)
watchDogProcess.start()
        
pushToDBSProcess = multiprocessing.Process(target=obj.pushToDBS)
pushToDBSProcess.start()
        
pushToCloud = multiprocessing.Process(target=stdFunct.uploadCycleTime)
pushToCloud.start()
        
watchDogProcess.join()
pushToDBSProcess.join()
pushToCloud.join()

My first two processes are running perfectly as desired, however I am struggling with the third process. The following is the code of my third process :

def uploadCycleTime(self):
    while True:
        uploadCycles = []
        lastUpPointer = "SELECT id FROM lastUploaded"
        lastUpPointer = self.dbFetchone(lastUpPointer)
        lastUpPointer = lastUpPointer[0]
        # print("lastUploaded :"+str(lastUpPointer))
        cyclesToUploadSQL = "SELECT id,machineId,startDateTime,endDateTime,type FROM cycletimes WHERE id > "+str(lastUpPointer)
        cyclesToUpload = self.dbfetchMany(cyclesToUploadSQL,15)
        cyclesUploadLength = len(cyclesToUpload)
        if(cyclesUploadLength>0):
            for cycles in cyclesToUpload:
                uploadCycles.append({"dataId":cycles[0],"machineId":cycles[1],"startDateTime":cycles[2].strftime('%Y-%m-%d %H:%M:%S.%f'),"endDateTime":cycles[3].strftime('%Y-%m-%d %H:%M:%S.%f'),"type":cycles[4]})        
            # print("length : "+str(cyclesUploadLength))
            lastUpPointer = uploadCycles[cyclesUploadLength-1]["dataId"]
            
            uploadCycles = json.dumps(uploadCycles)
            api = self.dalUrl+"/cycle-times"
            uploadResponse = self.callPostAPI(api,str(uploadCycles))
            print(lastUpPointer)
            changePointerSQL = "UPDATE lastUploaded SET id="+str(lastUpPointer)
            try:
                changePointerSQL = self.dbAbstraction(changePointerSQL)
            except Exception as errorPointer:
                print("Pointer change Error : "+str(errorPointer))
            time.sleep(2)

Now I am saving a pointer to remember the last id uploaded, and from there on keep uploading 15 packets. When there is data existing in the DB the code works well, however if there is no existing when the process is initiated and data is sent afterwards then it fails to fetch the data from the DB.

I tried printing the length in realtime, it keeps giving me 0, inspite of data being continuously pushed into the DB in real-time.

Questioner
Aaditya Damani
Viewed
0
Aaditya Damani 2020-11-29 13:13:40

In my upload process, I missed out on a commit()

def dbFetchAll(self,dataString):
    # dataToPush = self.cycletimeQueue.get()
    # print(dataToPush)
    dbTry = 1
    try:
        while(dbTry == 1): # This while is to ensure the data has been pushed
            sql = dataString
            self.conn.execute(sql)
            response = self.conn.fetchall()
            dbTry = 0
            return response
            # print(self.conn.rowcount, "record inserted.")
    except Exception as error:
        print ("Error : "+str(error))
        return dbTry

    ***finally:
        self.mydb.commit()***