本文导语: 一、简介 我们将一个正在运行的程序称为进程。每个进程都有它自己的系统状态,包含内存状态、打开文件列表、追踪指令执行情况的程序指针以及一个保存局部变量的调用栈。通常情况下,一个进程依照一个单序列控制...
虽然进程之间是相互独立的,但是它们能够通过名为进程间通信(IPC)的机制进行相互通信。一个典型的模式是基于消息传递,可以将其简单地理解为一个纯字节的缓冲区,而send()或recv()操作原语可以通过诸如管道(pipe)或是网络套接字(network socket)等I/O通道传输或接收消息。还有一些IPC模式可以通过内存映射(memory-mapped)机制完成(例如mmap模块),通过内存映射,进程可以在内存中创建共享区域,而对这些区域的修改对所有的进程可见。
Python长久以来一直支持不同方式的并发编程,包括线程、子进程以及其他利用生成器(generator function)的并发实现。
import threading class SummingThread(threading.Thread): def __init__(self, low, high): super(SummingThread, self).__init__() self.low = low self.high = high = 0 def run(self): for i in range(self.low, self.high): += i thread1 = SummingThread(0, 500000) thread2 = SummingThread(500000, 1000000) thread1.start() # This actually causes the thread to run thread2.start() thread1.join() # This waits until the thread has completed thread2.join() # At this point, both threads have completed result = + print(result)
* do_threaded_work – 该函数将一系列给定的任务分配给对应的处理函数(分配顺序不确定)
* ThreadedWorker – 该类创建一个线程,它将从一个同步的工作队列中拉取工作任务并将处理结果写入同步结果队列
* start_logging_with_thread_info – 将线程id写入所有日志消息。(依赖日志环境)
* stop_logging_with_thread_info – 用于将线程id从所有的日志消息中移除。(依赖日志环境)
import threading import logging def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True): """ Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally). Parameters: - num_threads Default: len(work_items) --- Number of threads to use process items in work_items. - per_sync_timeout Default: 1 --- Each synchronized operation can optionally timeout. - preserve_result_ordering Default: True --- Reorders result_item to match original work_items ordering. Return: --- list of results from applying work_func to each work_item. Order is optionally preserved. Example: def process_url(/tech-python/url/index.html): # TODO: Do some work with the url return url urls_to_process = ["", "", "", ""] # process urls in parallel result_items = do_threaded_work(urls_to_process, process_url) # print(results) print(repr(result_items)) """ global wrapped_work_func if not num_threads: num_threads = len(work_items) work_queue = Queue.Queue() result_queue = Queue.Queue() index = 0 for work_item in work_items: if preserve_result_ordering: work_queue.put((index, work_item)) else: work_queue.put(work_item) index += 1 if preserve_result_ordering: wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1])) start_logging_with_thread_info() #spawn a pool of threads, and pass them queue instance for _ in range(num_threads): if preserve_result_ordering: t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout) else: t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout) t.setDaemon(True) t.start() work_queue.join() stop_logging_with_thread_info()'work_queue joined') result_items = [] while not result_queue.empty(): result = result_queue.get(timeout=per_sync_timeout)'found result[:500]: ' + repr(result)[:500]) if result: result_items.append(result) if preserve_result_ordering: result_items = [work_item for index, work_item in result_items] return result_items class ThreadedWorker(threading.Thread): """ Generic Threaded Worker Input to work_func: item from work_queue Example usage: import Queue urls_to_process = ["", "", "", ""] work_queue = Queue.Queue() result_queue = Queue.Queue() def process_url(/tech-python/url/index.html): # TODO: Do some work with the url return url def main(): # spawn a pool of threads, and pass them queue instance for i in range(3): t = ThreadedWorker(work_queue, result_queue, work_func=process_url) t.setDaemon(True) t.start() # populate queue with data for url in urls_to_process: work_queue.put(url) # wait on the queue until everything has been processed work_queue.join() # print results print repr(result_queue) main() """ def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1): threading.Thread.__init__(self) self.work_queue = work_queue self.result_queue = result_queue self.work_func = work_func self.stop_when_work_queue_empty = stop_when_work_queue_empty self.queue_timeout = queue_timeout def should_continue_running(self): if self.stop_when_work_queue_empty: return not self.work_queue.empty() else: return True def run(self): while self.should_continue_running(): try: # grabs item from work_queue work_item = self.work_queue.get(timeout=self.queue_timeout) # works on item work_result = self.work_func(work_item) #place work_result into result_queue self.result_queue.put(work_result, timeout=self.queue_timeout) except Queue.Empty: logging.warning('ThreadedWorker Queue was empty or Queue.get() timed out') except Queue.Full: logging.warning('ThreadedWorker Queue was full or Queue.put() timed out') except: logging.exception('Error in ThreadedWorker') finally: #signals to work_queue that item is done self.work_queue.task_done() def start_logging_with_thread_info(): try: formatter = logging.Formatter('[thread %(thread)-3s] %(message)s') logging.getLogger().handlers[0].setFormatter(formatter) except: logging.exception('Failed to start logging with thread info') def stop_logging_with_thread_info(): try: formatter = logging.Formatter('%(message)s') logging.getLogger().handlers[0].setFormatter(formatter) except: logging.exception('Failed to stop logging with thread info')
from test import ThreadedWorker from queue import Queue urls_to_process = ["", ""] work_queue = Queue() result_queue = Queue() def process_url(/tech-python/url/index.html): # TODO: Do some work with the url return url def main(): # spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadedWorker(work_queue, result_queue, work_func=process_url) t.setDaemon(True) t.start() # populate queue with data for url in urls_to_process: work_queue.put(url) # wait on the queue until everything has been processed work_queue.join() # print results print(repr(result_queue)) main()