<small id='g89gW'></small><noframes id='g89gW'>

      <bdo id='g89gW'></bdo><ul id='g89gW'></ul>

  1. <tfoot id='g89gW'></tfoot>
    <i id='g89gW'><tr id='g89gW'><dt id='g89gW'><q id='g89gW'><span id='g89gW'><b id='g89gW'><form id='g89gW'><ins id='g89gW'></ins><ul id='g89gW'></ul><sub id='g89gW'></sub></form><legend id='g89gW'></legend><bdo id='g89gW'><pre id='g89gW'><center id='g89gW'></center></pre></bdo></b><th id='g89gW'></th></span></q></dt></tr></i><div id='g89gW'><tfoot id='g89gW'></tfoot><dl id='g89gW'><fieldset id='g89gW'></fieldset></dl></div>

      <legend id='g89gW'><style id='g89gW'><dir id='g89gW'><q id='g89gW'></q></dir></style></legend>

    1. 消耗看门狗队列事件的 Python 并行线程

      Python parallel thread that consume Watchdog queue events(消耗看门狗队列事件的 Python 并行线程)
    2. <small id='JvcDu'></small><noframes id='JvcDu'>

        • <bdo id='JvcDu'></bdo><ul id='JvcDu'></ul>
              <tbody id='JvcDu'></tbody>

            1. <legend id='JvcDu'><style id='JvcDu'><dir id='JvcDu'><q id='JvcDu'></q></dir></style></legend>

              <i id='JvcDu'><tr id='JvcDu'><dt id='JvcDu'><q id='JvcDu'><span id='JvcDu'><b id='JvcDu'><form id='JvcDu'><ins id='JvcDu'></ins><ul id='JvcDu'></ul><sub id='JvcDu'></sub></form><legend id='JvcDu'></legend><bdo id='JvcDu'><pre id='JvcDu'><center id='JvcDu'></center></pre></bdo></b><th id='JvcDu'></th></span></q></dt></tr></i><div id='JvcDu'><tfoot id='JvcDu'></tfoot><dl id='JvcDu'><fieldset id='JvcDu'></fieldset></dl></div>
              <tfoot id='JvcDu'></tfoot>
              • 本文介绍了消耗看门狗队列事件的 Python 并行线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

                问题描述

                每当外部程序 (TCPdump) 在我的目录中创建 *.pcap 文件时,我有这段代码应该将事件放入队列中.我的问题是我总是得到一个空队列,尽管我从 process() 函数得到了打印.

                I have this code that should put an event in a queue each time an external program (TCPdump) creates a *.pcap file in my directory. My problem is that I always get an empty queue, although I got the print from process() function.

                我做错了什么?队列是否正确定义并在两个类之间共享?

                What am I doing wrong? Is the queue correctly defined and shared between the two classes?

                编辑-----------------
                我可能明白为什么我有一个空队列,我认为这是因为我正在打印我在 Handler 类填充之前初始化的队列.我修改了我的代码并创建了两个应该使用同一个队列的进程,但现在执行卡在 queue.put() 并且线程 ReadPcapFiles() 停止运行.

                EDIT-----------------
                I maybe understood why I got an empty queue, I think it is because I'm printing the queue that I initialized before it gets filled by Handler class. I modified my code and created two processes that should consume the same queue, but now the execution stuck on queue.put() and the thread ReadPcapFiles() stop running.

                这里是更新的代码:

                import time
                import pyshark
                import concurrent.futures
                import threading
                import logging
                from queue import Queue
                from multiprocessing import Process
                from watchdog.observers import Observer, api
                from watchdog.events import PatternMatchingEventHandler
                
                class Handler(PatternMatchingEventHandler):
                    patterns = ["*.pcap", "*.pcapng"]
                
                    def __init__(self, queue):
                        PatternMatchingEventHandler.__init__(self)
                        self.queue = queue
                
                    def process(self, event):
                        #print(f'event type: {event.event_type}  path : {event.src_path}')   
                        self.queue.put(event.src_path)
                        logging.info(f"Storing message: {self.queue.qsize()}")
                        print("Producer queue: ", list(self.queue.queue))
                        #self.queue.get()
                
                    def on_created(self, event):
                        self.process(event)          
                
                
                def StartWatcher(watchdogq, event):
                    path = 'C:\...'
                    handler = Handler(watchdogq)
                    observer = Observer()
                    while not event.is_set():
                        observer.schedule(handler, path, recursive=False)
                        print("About to start observer")
                        observer.start()
                        try:
                            while True:
                                time.sleep(1)
                        except Exception as error:
                            observer.stop()
                            print("Error: " + str(error))
                        observer.join()
                
                
                def ReadPcapFiles(consumerq, event):
                    while not event.is_set() or not consumerq.empty():
                        print("Consumer queue: ", consumerq.get())
                        #print("Consumer queue: ", list(consumerq.queue))
                
                    # pcapfile = pyshark.FileCapture(self.queue.get())
                    #     for packet in pcapfile:
                    #         countPacket +=1 
                
                if __name__ == '__main__':
                    format = "%(asctime)s: %(message)s"
                    logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
                    logging.getLogger().setLevel(logging.DEBUG)
                
                    queue = Queue()
                    event = threading.Event()
                    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
                        executor.submit(StartWatcher,queue, event)
                        executor.submit(ReadPcapFiles,queue, event)
                
                        time.sleep(0.1)
                        logging.info("Main: about to set event")
                        event.set()
                

                旧代码:

                import time
                from queue import Queue
                from watchdog.observers import Observer
                from watchdog.events import PatternMatchingEventHandler
                
                class Handler(PatternMatchingEventHandler):
                    patterns = ["*.pcap", "*.pcapng"]
                
                    def __init__(self, queue):
                        PatternMatchingEventHandler.__init__(self)
                        self.queue = queue
                
                    def process(self, event):
                        print(f'event type: {event.event_type}  path : {event.src_path}')   
                        self.queue.put(event.src_path)
                
                    def on_created(self, event):
                        self.process(event)
                
                class Watcher():
                    def __init__(self, path):
                        self.queue = Queue()
                        self.observer = Observer()
                        self.handler = Handler(self.queue)
                        self.path = path
                
                    def start(self): 
                        self.observer.schedule(self.handler, self.path, recursive=True)
                        self.observer.start()
                        try:
                            while True:
                                time.sleep(1)
                                self.queue.get()
                                print(list(self.queue.queue))
                        except Exception as error:
                            self.observer.stop()
                            print("Error: " + str(error))
                        self.observer.join()  
                
                if __name__ == '__main__':
                    watcher = Watcher('C:\...')
                    watcher.start()
                

                推荐答案

                这对我有用(我从 this回答,谢谢!)但请注意,我认为这是一种解决方法,所以如果有人对此有更好的解决方案或者可以更好地解释 Python 中这种行为的原因,请不要犹豫回答!

                This is working for me (I got the main idea from this answer, thanks!) but notice that I consider this a workaround, so if someone has a better solution to this or can better explain the reason of such behavior in Python, please do not hesitate to answer!

                我的猜测是我有两个主要问题:
                - 我正在另一个线程中启动看门狗进程(这以某种方式阻塞了我的队列消耗线程).
                - Python 线程确实不能真正并行工作,因此需要启动一个独立的进程.

                My guess is that I had two main problems:
                - I was starting Watchdog process inside another thread (and that was blocking somehow my queue consuming thread).
                - Python threading does not work really in parallel and therefore starting an independent process was necessary.

                这是我的代码:

                import time
                import pyshark
                import threading
                import logging
                import os
                from queue import Queue
                from multiprocessing import Process, Pool
                from watchdog.observers import Observer, api
                from watchdog.events import PatternMatchingEventHandler
                from concurrent.futures import ThreadPoolExecutor
                
                class Handler(PatternMatchingEventHandler):
                    patterns = ["*.pcap", "*.pcapng"]
                
                    def __init__(self, queue):
                        PatternMatchingEventHandler.__init__(self)
                        self.queue = queue
                
                    def process(self, event):  
                        self.queue.put(event.src_path)
                        logging.info(f"Storing message: {self.queue.qsize()}")
                        print("Producer queue: ", list(self.queue.queue))
                
                
                    def on_created(self, event):
                        #wait that the transfer of the file is finished before processing it
                        file_size = -1
                        while file_size != os.path.getsize(event.src_path):
                            file_size = os.path.getsize(event.src_path)
                            time.sleep(1)
                
                        self.process(event)         
                
                def ConsumeQueue(consumerq):
                    while True:
                        if not consumerq.empty(): 
                            pool = Pool()
                            pool.apply_async(ReadPcapFiles, (consumerq.get(), ))
                        else:    
                            time.sleep(1)
                
                def ReadPcapFiles(get_event):        
                    createdFile = get_event
                    print(f"This is my event in ReadPacapFile {createdFile}")
                
                    countPacket = 0
                    bandwidth = 0
                    pcapfile = pyshark.FileCapture(createdFile)
                    for packet in pcapfile:
                        countPacket +=1
                        bandwidth = bandwidth + int(packet.length)
                    print(f"Packet nr {countPacket}")
                    print(f"Byte per second {bandwidth}")
                
                
                if __name__ == '__main__':
                
                    format = "%(asctime)s: %(message)s"
                    logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
                    logging.getLogger().setLevel(logging.DEBUG)
                
                    queue = Queue()
                    path = 'C:\...'
                
                    worker = threading.Thread(target=ConsumeQueue, args=(queue, ), daemon=True)
                    print("About to start worker")
                    worker.start()
                
                    event_handler = Handler(queue)
                    observer = Observer()
                    observer.schedule(event_handler, path, recursive=False)
                    print("About to start observer")
                    observer.start()
                
                    try:
                        while True:
                            time.sleep(1)
                    except Exception as error:
                        observer.stop()
                        print("Error: " + str(error))
                    observer.join()
                

                这篇关于消耗看门狗队列事件的 Python 并行线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

                本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

                相关文档推荐

                Adding config modes to Plotly.Py offline - modebar(将配置模式添加到 Plotly.Py 离线 - 模式栏)
                Plotly: How to style a plotly figure so that it doesn#39;t display gaps for missing dates?(Plotly:如何设置绘图图形的样式,使其不显示缺失日期的间隙?)
                python save plotly plot to local file and insert into html(python将绘图保存到本地文件并插入到html中)
                Plotly: What color cycle does plotly express follow?(情节:情节表达遵循什么颜色循环?)
                How to save plotly express plot into a html or static image file?(如何将情节表达图保存到 html 或静态图像文件中?)
                Plotly: How to make a line plot from a pandas dataframe with a long or wide format?(Plotly:如何使用长格式或宽格式的 pandas 数据框制作线图?)

                  <legend id='OSwAF'><style id='OSwAF'><dir id='OSwAF'><q id='OSwAF'></q></dir></style></legend>
                    <tbody id='OSwAF'></tbody>

                  <tfoot id='OSwAF'></tfoot>
                1. <small id='OSwAF'></small><noframes id='OSwAF'>

                    • <i id='OSwAF'><tr id='OSwAF'><dt id='OSwAF'><q id='OSwAF'><span id='OSwAF'><b id='OSwAF'><form id='OSwAF'><ins id='OSwAF'></ins><ul id='OSwAF'></ul><sub id='OSwAF'></sub></form><legend id='OSwAF'></legend><bdo id='OSwAF'><pre id='OSwAF'><center id='OSwAF'></center></pre></bdo></b><th id='OSwAF'></th></span></q></dt></tr></i><div id='OSwAF'><tfoot id='OSwAF'></tfoot><dl id='OSwAF'><fieldset id='OSwAF'></fieldset></dl></div>
                        • <bdo id='OSwAF'></bdo><ul id='OSwAF'></ul>