Warm tip: This article is reproduced from serverfault.com, please click

mysql-在python中从数据库实时获取数据

(mysql - Fetching data in realtime from database in python)

发布于 2020-11-27 14:05:14

我在Python中有一个用于多处理的类,它创建了3个不同的进程。第一个过程是检查我的硬件是否有任何信号并将其推入队列,第二个过程是将数据移出队列并将其推入数据库,第三个过程是将数据移出数据库并将其推送到服务器上。

obj = QE()
stdFunct = standardFunctions()

watchDogProcess = multiprocessing.Process(target=obj.watchDog)
watchDogProcess.start()
        
pushToDBSProcess = multiprocessing.Process(target=obj.pushToDBS)
pushToDBSProcess.start()
        
pushToCloud = multiprocessing.Process(target=stdFunct.uploadCycleTime)
pushToCloud.start()
        
watchDogProcess.join()
pushToDBSProcess.join()
pushToCloud.join()

我的前两个过程运行正常,但是我在第三个过程中苦苦挣扎。以下是我的第三个过程的代码:

def uploadCycleTime(self):
    while True:
        uploadCycles = []
        lastUpPointer = "SELECT id FROM lastUploaded"
        lastUpPointer = self.dbFetchone(lastUpPointer)
        lastUpPointer = lastUpPointer[0]
        # print("lastUploaded :"+str(lastUpPointer))
        cyclesToUploadSQL = "SELECT id,machineId,startDateTime,endDateTime,type FROM cycletimes WHERE id > "+str(lastUpPointer)
        cyclesToUpload = self.dbfetchMany(cyclesToUploadSQL,15)
        cyclesUploadLength = len(cyclesToUpload)
        if(cyclesUploadLength>0):
            for cycles in cyclesToUpload:
                uploadCycles.append({"dataId":cycles[0],"machineId":cycles[1],"startDateTime":cycles[2].strftime('%Y-%m-%d %H:%M:%S.%f'),"endDateTime":cycles[3].strftime('%Y-%m-%d %H:%M:%S.%f'),"type":cycles[4]})        
            # print("length : "+str(cyclesUploadLength))
            lastUpPointer = uploadCycles[cyclesUploadLength-1]["dataId"]
            
            uploadCycles = json.dumps(uploadCycles)
            api = self.dalUrl+"/cycle-times"
            uploadResponse = self.callPostAPI(api,str(uploadCycles))
            print(lastUpPointer)
            changePointerSQL = "UPDATE lastUploaded SET id="+str(lastUpPointer)
            try:
                changePointerSQL = self.dbAbstraction(changePointerSQL)
            except Exception as errorPointer:
                print("Pointer change Error : "+str(errorPointer))
            time.sleep(2)

现在,我保存了一个指针,以记住上一次上传的ID,并从那里继续上传15个数据包。当数据库中存在数据时,代码可以很好地工作,但是,如果在启动过程并随后发送数据时不存在数据,则它将无法从数据库中获取数据。

我尝试实时打印长度,尽管数据不断被实时不断地推送到数据库中,但长度始终为0。

Questioner
Aaditya Damani
Viewed
11
Aaditya Damani 2020-11-29 13:13:40

在上传过程中,我错过了commit()

def dbFetchAll(self,dataString):
    # dataToPush = self.cycletimeQueue.get()
    # print(dataToPush)
    dbTry = 1
    try:
        while(dbTry == 1): # This while is to ensure the data has been pushed
            sql = dataString
            self.conn.execute(sql)
            response = self.conn.fetchall()
            dbTry = 0
            return response
            # print(self.conn.rowcount, "record inserted.")
    except Exception as error:
        print ("Error : "+str(error))
        return dbTry

    ***finally:
        self.mydb.commit()***