当前位置:  编程技术>python

Python中的并发编程实例

    来源: 互联网  发布时间:2014-10-08

    本文导语:  一、简介   我们将一个正在运行的程序称为进程。每个进程都有它自己的系统状态,包含内存状态、打开文件列表、追踪指令执行情况的程序指针以及一个保存局部变量的调用栈。通常情况下,一个进程依照一个单序列控制...

一、简介

  我们将一个正在运行的程序称为进程。每个进程都有它自己的系统状态,包含内存状态、打开文件列表、追踪指令执行情况的程序指针以及一个保存局部变量的调用栈。通常情况下,一个进程依照一个单序列控制流顺序执行,这个控制流被称为该进程的主线程。在任何给定的时刻,一个程序只做一件事情。

  一个程序可以通过Python库函数中的os或subprocess模块创建新进程(例如os.fork()或是subprocess.Popen())。然而,这些被称为子进程的进程却是独立运行的,它们有各自独立的系统状态以及主线程。因为进程之间是相互独立的,因此它们同原有的进程并发执行。这是指原进程可以在创建子进程后去执行其它工作。

  虽然进程之间是相互独立的,但是它们能够通过名为进程间通信(IPC)的机制进行相互通信。一个典型的模式是基于消息传递,可以将其简单地理解为一个纯字节的缓冲区,而send()或recv()操作原语可以通过诸如管道(pipe)或是网络套接字(network socket)等I/O通道传输或接收消息。还有一些IPC模式可以通过内存映射(memory-mapped)机制完成(例如mmap模块),通过内存映射,进程可以在内存中创建共享区域,而对这些区域的修改对所有的进程可见。

  多进程能够被用于需要同时执行多个任务的场景,由不同的进程负责任务的不同部分。然而,另一种将工作细分到任务的方法是使用线程。同进程类似,线程也有其自己的控制流以及执行栈,但线程在创建它的进程之内运行,分享其父进程的所有数据和系统资源。当应用需要完成并发任务的时候线程是很有用的,但是潜在的问题是任务间必须分享大量的系统状态。

  当使用多进程或多线程时,操作系统负责调度。这是通过给每个进程(或线程)一个很小的时间片并且在所有活动任务之间快速循环切换来实现的,这个过程将CPU时间分割为小片段分给各个任务。例如,如果你的系统中有10个活跃的进程正在执行,操作系统将会适当的将十分之一的CPU时间分配给每个进程并且循环地在十个进程之间切换。当系统不止有一个CPU核时,操作系统能够将进程调度到不同的CPU核上,保持系统负载平均以实现并行执行。

  利用并发执行机制写的程序需要考虑一些复杂的问题。复杂性的主要来源是关于同步和共享数据的问题。通常情况下,多个任务同时试图更新同一个数据结构会造成脏数据和程序状态不一致的问题(正式的说法是资源竞争的问题)。为了解决这个问题,需要使用互斥锁或是其他相似的同步原语来标识并保护程序中的关键部分。举个例子,如果多个不同的线程正在试图同时向同一个文件写入数据,那么你需要一个互斥锁使这些写操作依次执行,当一个线程在写入时,其他线程必须等待直到当前线程释放这个资源。

Python中的并发编程

  Python长久以来一直支持不同方式的并发编程,包括线程、子进程以及其他利用生成器(generator function)的并发实现。

  Python在大部分系统上同时支持消息传递和基于线程的并发编程机制。虽然大部分程序员对线程接口更为熟悉,但是Python的线程机制却有着诸多的限制。Python使用了内部全局解释器锁(GIL)来保证线程安全,GIL同时只允许一个线程执行。这使得Python程序就算在多核系统上也只能在单个处理器上运行。Python界关于GIL的争论尽管很多,但在可预见的未来却没有将其移除的可能。

  Python提供了一些很精巧的工具用于管理基于线程和进程的并发操作。即使是简单地程序也能够使用这些工具使得任务并发进行从而加快运行速度。subprocess模块为子进程的创建和通信提供了API。这特别适合运行与文本相关的程序,因为这些API支持通过新进程的标准输入输出通道传送数据。signal模块将UNIX系统的信号量机制暴露给用户,用以在进程之间传递事件信息。信号是异步处理的,通常有信号到来时会中断程序当前的工作。信号机制能够实现粗粒度的消息传递系统,但是有其他更可靠的进程内通讯技术能够传递更复杂的消息。threading模块为并发操作提供了一系列高级的,面向对象的API。Thread对象们在一个进程内并发地运行,分享内存资源。使用线程能够更好地扩展I/O密集型的任务。multiprocessing模块同threading模块类似,不过它提供了对于进程的操作。每个进程类是真实的操作系统进程,并且没有共享内存资源,但multiprocessing模块提供了进程间共享数据以及传递消息的机制。通常情况下,将基于线程的程序改为基于进程的很简单,只需要修改一些import声明即可。

Threading模块示例

  以threading模块为例,思考这样一个简单的问题:如何使用分段并行的方式完成一个大数的累加。

import threading
 
class SummingThread(threading.Thread):
  def __init__(self, low, high):
    super(SummingThread, self).__init__()
    self.low = low
    self.high = high
    self.total = 0
 
  def run(self):
    for i in range(self.low, self.high):
      self.total += i
 
thread1 = SummingThread(0, 500000)
thread2 = SummingThread(500000, 1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print(result)

自定义Threading类库

  我写了一个易于使用threads的小型Python类库,包含了一些有用的类和函数。

关键参数:

  * do_threaded_work – 该函数将一系列给定的任务分配给对应的处理函数(分配顺序不确定)

  * ThreadedWorker – 该类创建一个线程,它将从一个同步的工作队列中拉取工作任务并将处理结果写入同步结果队列

  * start_logging_with_thread_info – 将线程id写入所有日志消息。(依赖日志环境)

  * stop_logging_with_thread_info – 用于将线程id从所有的日志消息中移除。(依赖日志环境)

import threading
import logging
 
def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True):
  """ Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally).
 
    Parameters:
    - num_threads        Default: len(work_items) --- Number of threads to use process items in work_items.
    - per_sync_timeout     Default: 1        --- Each synchronized operation can optionally timeout.
    - preserve_result_ordering Default: True       --- Reorders result_item to match original work_items ordering.
 
    Return: 
    --- list of results from applying work_func to each work_item. Order is optionally preserved.
 
    Example:
 
    def process_url(/tech-python/url/index.html):
      # TODO: Do some work with the url
      return url
 
    urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
 
    # process urls in parallel
    result_items = do_threaded_work(urls_to_process, process_url)
 
    # print(results)
    print(repr(result_items))
  """
  global wrapped_work_func
  if not num_threads:
    num_threads = len(work_items)
 
  work_queue = Queue.Queue()
  result_queue = Queue.Queue()
 
  index = 0
  for work_item in work_items:
    if preserve_result_ordering:
      work_queue.put((index, work_item))
    else:
      work_queue.put(work_item)
    index += 1
 
  if preserve_result_ordering:
    wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1]))
 
  start_logging_with_thread_info()
 
  #spawn a pool of threads, and pass them queue instance 
  for _ in range(num_threads):
    if preserve_result_ordering:
      t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout)
    else:
      t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout)
    t.setDaemon(True)
    t.start()
 
  work_queue.join()
  stop_logging_with_thread_info()
 
  logging.info('work_queue joined')
 
  result_items = []
  while not result_queue.empty():
    result = result_queue.get(timeout=per_sync_timeout)
    logging.info('found result[:500]: ' + repr(result)[:500])
    if result:
      result_items.append(result)
 
  if preserve_result_ordering:
    result_items = [work_item for index, work_item in result_items]
 
  return result_items
 
class ThreadedWorker(threading.Thread):
  """ Generic Threaded Worker
    Input to work_func: item from work_queue
 
  Example usage:
 
  import Queue
 
  urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
 
  work_queue = Queue.Queue()
  result_queue = Queue.Queue()
 
  def process_url(/tech-python/url/index.html):
    # TODO: Do some work with the url
    return url
 
  def main():
    # spawn a pool of threads, and pass them queue instance 
    for i in range(3):
      t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
      t.setDaemon(True)
      t.start()
 
    # populate queue with data  
    for url in urls_to_process:
      work_queue.put(url)
 
    # wait on the queue until everything has been processed   
    work_queue.join()
 
    # print results
    print repr(result_queue)
 
  main()
  """
 
  def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1):
    threading.Thread.__init__(self)
    self.work_queue = work_queue
    self.result_queue = result_queue
    self.work_func = work_func
    self.stop_when_work_queue_empty = stop_when_work_queue_empty
    self.queue_timeout = queue_timeout
 
  def should_continue_running(self):
    if self.stop_when_work_queue_empty:
      return not self.work_queue.empty()
    else:
      return True
 
  def run(self):
    while self.should_continue_running():
      try:
        # grabs item from work_queue
        work_item = self.work_queue.get(timeout=self.queue_timeout)
 
        # works on item
        work_result = self.work_func(work_item)
 
        #place work_result into result_queue
        self.result_queue.put(work_result, timeout=self.queue_timeout)
 
      except Queue.Empty:
        logging.warning('ThreadedWorker Queue was empty or Queue.get() timed out')
 
      except Queue.Full:
        logging.warning('ThreadedWorker Queue was full or Queue.put() timed out')
 
      except:
        logging.exception('Error in ThreadedWorker')
 
      finally:
        #signals to work_queue that item is done
        self.work_queue.task_done()
 
def start_logging_with_thread_info():
  try:
    formatter = logging.Formatter('[thread %(thread)-3s] %(message)s')
    logging.getLogger().handlers[0].setFormatter(formatter)
  except:
    logging.exception('Failed to start logging with thread info')
 
def stop_logging_with_thread_info():
  try:
    formatter = logging.Formatter('%(message)s')
    logging.getLogger().handlers[0].setFormatter(formatter)
  except:
    logging.exception('Failed to stop logging with thread info')

 使用示例

from test import ThreadedWorker
from queue import Queue
 
urls_to_process = ["http://facebook.com", "http://pypix.com"]
 
work_queue = Queue()
result_queue = Queue()
 
def process_url(/tech-python/url/index.html):
  # TODO: Do some work with the url
  return url
 
def main():
  # spawn a pool of threads, and pass them queue instance 
  for i in range(5):
    t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
    t.setDaemon(True)
    t.start()
 
  # populate queue with data  
  for url in urls_to_process:
    work_queue.put(url)
 
  # wait on the queue until everything has been processed   
  work_queue.join()
 
  # print results
  print(repr(result_queue))
 
main()


    
 
 

您可能感兴趣的文章:

  • python thread 并发且顺序运行示例
  • python高并发异步服务器核心库forkcore使用方法
  • Python namedtuple(命名元组)使用实例
  • python实现的重启关机程序实例
  • Python 3 Tkinter教程之事件Event绑定处理代码实例
  • python调用短信猫控件实现发短信功能实例
  • Python文件操作类操作实例详解
  • python 基础学习第二弹 类属性和实例属性
  • Python实现冒泡,插入,选择排序简单实例
  • Python 时间处理datetime实例
  • Python实现类继承实例
  • Python continue语句用法实例
  • python3编写C/S网络程序实例教程
  • 在python中的socket模块使用代理实例
  • python实现进程间通信简单实例
  • python字典多条件排序方法实例
  • python中enumerate的用法实例解析
  • python解析xml文件实例分享
  • python的绘图工具matplotlib使用实例
  • Python Tkinter简单布局实例教程
  • Python中__call__用法实例
  • 使用Python判断IP地址合法性的方法实例
  • Python中apply函数的用法实例教程
  •  
    本站(WWW.)旨在分享和传播互联网科技相关的资讯和技术,将尽最大努力为读者提供更好的信息聚合和浏览方式。
    本站(WWW.)站内文章除注明原创外,均为转载、整理或搜集自网络。欢迎任何形式的转载,转载请注明出处。












  • 相关文章推荐
  • Python GUI编程:tkinter实现一个窗口并居中代码
  • php和perl,python,学习哪种编程语言好。
  • python 示例分享---逻辑推理编程解决八皇后
  • python网络编程示例(客户端与服务端)
  • 用Python编程实现语音控制电脑
  • python原始套接字编程示例分享
  • python网络编程之TCP通信实例和socketserver框架使用例子
  • 从零学python系列之数据处理编程实例(一)
  • python网络编程之UDP通信实例(含服务器端、客户端、UDP广播例子)
  • python和C语言混合编程实例
  • python多线程编程方式分析示例详解
  • Python高效编程技巧
  • 在Python中使用异步Socket编程性能测试
  • python网络编程学习笔记(二):socket建立网络客户端
  • 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
  • 实例讲解python函数式编程
  • python网络编程学习笔记(五):socket的一些补充
  • python网络编程学习笔记(一)
  • 从零学python系列之数据处理编程实例(二)
  • Python 网络编程起步(Socket发送消息)
  • python网络编程学习笔记(九):数据库客户端 DB-API
  • Python不使用print而直接输出二进制字符串
  • 让python同时兼容python2和python3的8个技巧分享
  • Python中实现json字符串和dict类型的互转
  • 使用setup.py安装python包和卸载python包的方法
  • NOSQL iis7站长之家
  • 不小心把linux自带的python卸载了,导致安装一个依赖原python的软件不能安装,请问该怎么办?
  • python下用os.execl执行centos下的系统时间同步命令ntpdate
  • python读取csv文件示例(python操作csv)
  • Python namedtuple对象json序列化/反序列化及对象恢复
  • python基础教程之python消息摘要算法使用示例


  • 站内导航:


    特别声明:169IT网站部分信息来自互联网,如果侵犯您的权利,请及时告知,本站将立即删除!

    ©2012-2021,,E-mail:www_#163.com(请将#改为@)

    浙ICP备11055608号-3