编程 31

进程和线程,IO多路复用

    我们大多数的时候使用多线程,以及多进程,但是python中由于GIL全局解释器锁的原因,python的多线程并没有真的实现

目录

一、开启线程的两种方式
    1.1 直接利用利用threading.Thread()类实例化
    1.2 创建一个类,并继承Thread类
    1.3 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别
        1.3.1 谁的开启速度更快?
        1.3.2 看看PID的不同
        1.3.3 练习
        1.3.4 线程的join与setDaemon
        1.3.5 线程相关的其他方法补充

二、 Python GIL
    2.1 什么是全局解释器锁GIL
    2.2 全局解释器锁GIL设计理念与限制

三、 Python多进程与多线程对比
四、锁
    4.1 同步锁
    GIL vs Lock
    4.2 死锁与递归锁
    4.3 信号量Semaphore
    4.4 事件Event
    4.5 定时器timer
    4.6 线程队列queue

五、协程
    5.1 yield实现协程
    5.2 greenlet实现协程
    5.3 gevent实现协程

六、IO多路复用

七、socketserver实现并发
    7.1 ThreadingTCPServer

八、基于UDP的套接字

一、进程和线程的概念

     
实际上,python在执行多线程的时候,是通过GIL锁,进行上下文切换线程执行,每次真实只有一个线程在运行。所以上边才说,没有真的实现多现程。

一、开启线程的两种方式

在python中开启线程要导入threading,它与开启进程所需要导入的模块multiprocessing在使用上,有很大的相似性。在接下来的使用中,就可以发现。

同开启进程的两种方式相同:

首先,引出“多任务”的概念:多任务处理是指用户可以在同一时间内运行多个应用程序,每个应用程序被称作一个任务。Linux、windows就是支持多任务的操作系统,比起单任务系统它的功能增强了许多。

      那么python的多线程就没有什么用了吗?

1.1 直接利用利用threading.Thread()类实例化

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()

    print('主线程')

例如,你一边在用浏览器上网,一边在听网易云音乐,一边在用Word赶作业,这就是多任务,至少同时有3个任务正在运行。还有很多任务悄悄地在后台同时运行着,只是桌面上没有显示而已。

             
不是这个样子的,python多线程一般用于IO密集型的程序,那么什么叫做IO密集型呢,举个例子,比如说带有阻塞的。当前线程阻塞等待其它线程执行。

1.2 创建一个类,并继承Thread类

from threading import Thread
import time
calss Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        time.sleep(2)
        print("%s say hello" %self.name)

if __name__ == "__main__":
    t = Sayhi("egon")
    t.start()
    print("主线程")

但是,这些任务是同时在运行着的吗?众所周知,运行一个任务就需要cpu去处理,那同时运行多个任务就必须需要多个cpu?那如果有100个任务需要同时运行,就得买一个100核的cpu吗?显然不能!

      即然说到适合python多线程的,那么什么样的不适合用python多线程呢?

1.3 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

现在,多核CPU已经非常普及了,但是,即使过去的单核CPU,也可以执行多任务。由于CPU执行代码都是顺序执行的,那么,单核CPU是怎么执行多任务的呢?

             
答案是CPU密集型的,那么什么样的是CPU密集型的呢?百度一下你就知道。

1.3.1 谁的开启速度更快?

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    hello
    主线程/主进程
    '''

    #在主进程下开启子进程
    t=Process(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    主线程/主进程
    hello
    '''

结论:由于创建子进程是将主进程完全拷贝一份,而线程不需要,所以线程的创建速度更快。

答案就是操作系统轮流让各个任务交替执行,任务1执行0.01秒,切换到任务2,任务2执行0.01秒,再切换到任务3,执行0.01秒……这样反复执行下去。表面上看,每个任务都是交替执行的,但是,由于CPU的执行速度实在是太快了,我们感觉就像所有任务都在同时执行一样。

      

1.3.2 看看PID的不同

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主线程/主进程pid',os.getpid())

    #part2:开多个进程,每个进程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主线程/主进程pid',os.getpid())


'''
hello 13552
hello 13552
主线程pid: 13552
主线程pid: 13552
hello 1608
hello 6324
'''

总结:可以看出,主进程下开启多个线程,每个线程的PID都跟主进程的PID一样;而开多个进程,每个进程都有不同的PID。

小结:一个cpu同一时刻只能运行一个“任务”;真正的并行执行多任务只能在多核CPU上实现,但是,由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。

       现在有这样一项任务:需要从200W个url中获取数据?

1.3.3 练习

练习一:利用多线程,实现socket 并发连接
服务端:

from threading import Thread
from socket import *
import os

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
tcpsock.bind(("127.0.0.1",60000))
tcpsock.listen(5)

def work(conn,addr):
    while True:
        try:
            data = conn.recv(1024)
            print(os.getpid(),addr,data.decode("utf-8"))
            conn.send(data.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn,addr = tcpsock.accept()
        t = Thread(target=work,args=(conn,addr))
        t.start()

"""
开启了4个客户端
服务器端输出:
13800 ('127.0.0.1', 63164) asdf
13800 ('127.0.0.1', 63149) asdf
13800 ('127.0.0.1', 63154) adsf
13800 ('127.0.0.1', 63159) asdf

可以看出每个线程的PID都是一样的。
""

客户端:

from socket import *

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.connect(("127.0.0.1",60000))

while True:
    msg = input(">>: ").strip()
    if not msg:continue
    tcpsock.send(msg.encode("utf-8"))
    data = tcpsock.recv(1024)
    print(data.decode("utf-8"))

练习二:有三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件。

from threading import Thread

recv_l = []
format_l = []

def Recv():
    while True:
        inp = input(">>: ").strip()
        if not inp:continue
        recv_l.append(inp)

def Format():
    while True:
        if recv_l:
            res = recv_l.pop()
            format_l.append(res.upper())

def Save(filename):
    while True:
        if format_l:
            with open(filename,"a",encoding="utf-8") as f:
                res = format_l.pop()
                f.write("%s\n" %res)

if __name__ == '__main__':
    t1 = Thread(target=Recv)
    t2 = Thread(target=Format)
    t3 = Thread(target=Save,args=("db.txt",))
    t1.start()
    t2.start()
    t3.start()

对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。

      
那么我们真心不能用多线程,上下文切换是需要时间的,数据量太大,无法接受。这里我们就要用到多进程+协程

1.3.4 线程的join与setDaemon

与进程的方法都是类似的,其实multiprocessing模块是模仿threading模块的接口;

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #设置为守护线程,主线程结束,子线程也跟着线束。
    t.start()
    t.join()  #主线程等待子线程运行结束
    print('主线程')
    print(t.is_alive())

有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。

      那么什么是协程呢?

1.3.5 线程相关的其他方法补充

Thread实例对象的方法:

  • isAlive():返回纯种是否是活跃的;
  • getName():返回线程名;
  • setName():设置线程名。

threading模块提供的一些方法:

  • threading.currentThread():返回当前的线程变量
  • threading.enumerate():返回一个包含正在运行的线程的列表。正在运行指线程启动后、结束前,不包括启动前和终止后。
  • threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同结果。

from threading import Thread
import threading
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName()) #获取当前线程名
    print(threading.current_thread()) #主线程
    print(threading.enumerate()) #连同主线程在内有两个运行的线程,返回的是活跃的线程列表
    print(threading.active_count())  #活跃的线程个数
    print('主线程/主进程')

    '''
    打印结果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    2
    主线程/主进程
    Thread-1
    '''

由于每个进程至少要干一件事,所以,一个进程至少有一个线程。当然,像Word这种复杂的进程可以有多个线程,多个线程可以同时执行,多线程的执行方式和多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样。当然,真正地同时执行多线程需要多核CPU才可能实现。

      协程,又称微线程,纤程。英文名Coroutine。

二、 Python GIL

GIL全称Global Interpreter
Lock
,即全局解释器锁。首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL
C++,Visual
C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

小结:

     
协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。

2.1 什么是全局解释器锁GIL

Python代码的执行由Python
虚拟机(也叫解释器主循环,CPython版本)来控制,Python
在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python
虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
在多线程环境中,Python 虚拟机按以下方式执行:

  1. 设置GIL
  2. 切换到一个线程去运行
  3. 运行:
    a. 指定数量的字节码指令,或者
    b. 线程主动让出控制(可以调用time.sleep(0))
  4. 把线程设置为睡眠状态
  5. 解锁GIL
  6. 再次重复以上所有步骤

在调用外部代码(如C/C++扩展函数)的时候,GIL
将会被锁定,直到这个函数结束为止(由于在这期间没有Python
的字节码被运行,所以不会做线程切换)。

  • 进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。
  • 线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。线程没有自己的系统资源。

     
协程有什么好处呢,协程只在单线程中执行,不需要cpu进行上下文切换,协程自动完成子程序切换。

2.2 全局解释器锁GIL设计理念与限制

GIL的设计简化了CPython的实现,使得对象模型,包括关键的内建类型如字典,都是隐含可以并发访问的。锁住全局解释器使得比较容易的实现对多线程的支持,但也损失了多处理器主机的并行计算能力。
但是,不论标准的,还是第三方的扩展模块,都被设计成在进行密集计算任务是,释放GIL。
还有,就是在做I/O操作时,GIL总是会被释放。对所有面向I/O
的(会调用内建的操作系统C 代码的)程序来说,GIL 会在这个I/O
调用之前被释放,以允许其它的线程在这个线程等待I/O
的时候运行。如果是纯计算的程序,没有 I/O 操作,解释器会每隔 100
次操作就释放这把锁,让别的线程有机会执行(这个次数可以通过
sys.setcheckinterval 来调整)如果某线程并未使用很多I/O
操作,它会在自己的时间片内一直占用处理器(和GIL)。也就是说,I/O
密集型的Python 程序比计算密集型的程序更能充分利用多线程环境的好处。

下面是Python 2.7.9手册中对GIL的简单介绍:
The mechanism used by the CPython interpreter to assure that only one
thread executes Python bytecode at a time. This simplifies the CPython
implementation by making the object model (including critical built-in
types such as dict) implicitly safe against concurrent access. Locking
the entire interpreter makes it easier for the interpreter to be
multi-threaded, at the expense of much of the parallelism afforded by
multi-processor machines.
However, some extension modules, either standard or third-party, are
designed so as to release the GIL when doing computationally-intensive
tasks such as compression or hashing. Also, the GIL is always released
when doing I/O.
编程,Past efforts to create a “free-threaded” interpreter (one which locks
shared data at a much finer granularity) have not been successful
because performance suffered in the common single-processor case. It is
believed that overcoming this performance issue would make the
implementation much more complicated and therefore costlier to maintain.

从上文中可以看到,针对GIL的问题做的很多改进,如使用更细粒度的锁机制,在单处理器环境下反而导致了性能的下降。普遍认为,克服这个性能问题会导致CPython实现更加复杂,因此维护成本更加高昂。

二、进程和线程的关系

     
这里没有使用yield协程,这个python自带的并不是很完善,至于为什么有待于你去研究了。

三、 Python多进程与多线程对比

有了GIL的存在,同一时刻同一进程中只有一个线程被执行?这里也许人有一个疑问:多进程可以利用多核,但是开销大,而Python多线程开销小,但却无法利用多核的优势?要解决这个问题,我们需要在以下几点上达成共识:

  • CPU是用来计算的!
  • 多核CPU,意味着可以有多个核并行完成计算,所以多核提升的是计算性能;
  • 每个CPU一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处。

当然,对于一个程序来说,不会是纯计算或者纯I/O,我们只能相对的去看一个程序到底是计算密集型,还是I/O密集型。从而进一步分析Python的多线程有无用武之地。

分析:

我们有四个任务需要处理,处理访求肯定是要有并发的效果,解决方案可以是:

  • 方案一:开启四个进程;
  • 方案二:一个进程下,开启四个进程。

单核情况下,分析结果:

  • 如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜;
  • 如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜。

多核情况下,分析结果:

  • 如果四个任务是密集型,多核意味着并行
    计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜;
  • 如果四个任务是I/O密集型,再多的核 也解决不了I/O问题,方案二胜。

结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至
不如串行(没有大量切换),但是,对于I/O密集型的任务效率还是有显著提升的。

代码实现对比

计算密集型:

#计算密集型
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
    res=0
    for i in range(1000000):
        res+=i

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(100):
        # t=Thread(target=work) #我的机器4核cpu,多线程大概15秒
        t=Process(target=work) #我的机器4核cpu,多进程大概10秒
        t_l.append(t)
        t.start()

    for i in t_l:
        i.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))
    print('主线程')

I/O密集型:

#I/O密集型
from threading import Thread
from multiprocessing import Process
import time
import os
def work():
    time.sleep(2) #模拟I/O操作,可以打开一个文件来测试I/O,与sleep是一个效果
    print(os.getpid())

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(500):
        # t=Thread(target=work) #run time is 2.195
        t=Process(target=work) #耗时大概为37秒,创建进程的开销远高于线程,而且对于I/O密集型,多cpu根本不管用
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

总结:
应用场景:
多线程用于I/O密集型,如socket、爬虫、web
多进程用于计算密集型,如金融分析

进程是计算机中的程序关于某数据集上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。或者说进程是具有一定独立功能的程序关于某个数据集上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。
线程则是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。

      这里使用比较完善的第三方协程包gevent

四、锁

编程 1

      pip  install    gevent

4.1 同步锁

需求:对一个全局变量,开启100个线程,每个线程都对该全局变量做减1操作;

不加锁,代码如下:

import time
import threading

num = 100  #设定一个共享变量
def addNum():
    global num #在每个线程中都获取这个全局变量
    #num-=1

    temp=num
    time.sleep(0.1)
    num =temp-1  # 对此公共变量进行-1操作

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('Result: ', num)

分析:以上程序开启100线程并不能把全局变量num减为0,第一个线程执行addNum遇到I/O阻塞后迅速切换到下一个线程执行addNum,由于CPU执行切换的速度非常快,在0.1秒内就切换完成了,这就造成了第一个线程在拿到num变量后,在time.sleep(0.1)时,其他的线程也都拿到了num变量,所有线程拿到的num值都是100,所以最后减1操作后,就是99。加锁实现。

加锁,代码如下:

import time
import threading

num = 100   #设定一个共享变量
def addNum():
    with lock:
        global num
        temp = num
        time.sleep(0.1)
        num = temp-1    #对此公共变量进行-1操作

thread_list = []

if __name__ == '__main__':
    lock = threading.Lock()   #由于同一个进程内的线程共享此进程的资源,所以不需要给每个线程传这把锁就可以直接用。
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
        thread_list.append(t)

    for t in thread_list:  #等待所有线程执行完毕
        t.join()

    print("result: ",num)

加锁后,第一个线程拿到锁后开始操作,第二个线程必须等待第一个线程操作完成后将锁释放后,再与其它线程竞争锁,拿到锁的线程才有权操作。这样就保障了数据的安全,但是拖慢了执行速度。
注意:with locklock.acquire()(加锁)与lock.release()(释放锁)的简写。

import threading

R=threading.Lock()

R.acquire()
'''
对公共数据的操作
'''
R.release()

小结:

每个进程下N个协程,   

GIL vs Lock

机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 

首先我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据

然后,我们可以得出结论:保护不同的数据就应该加不同的锁。

最后,问题就很明朗了,GIL
与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock

详细的:

因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake
up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序
里的线程和
py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,
这可以说是Python早期版本的遗留问题。

  • 一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。

  • 资源分配给进程,同一进程的所有线程共享该进程的所有资源。

  • CPU分给线程,即真正在CPU上运行的是线程。
#coding=utf-8
from multiprocessing import Process
import gevent
#from gevent import monkey; monkey.patch_socket()
#用于协程的了程序
def yield_execFunc(x):
    print('______________%s'%x)


#yield_clist决定协程的数量
#开始协程操作
def yield_start(yield_clist):
    task=[] #用来存储协程
    for i in yield_clist:
        task.append(gevent.spawn(yield_execFunc,i))

    gevent.joinall(task) #执行协程

if  __name__=="__main__":
    list1=[1,2,3,4,5,6,7,8,9,10] #元素个数决定开起的协程数量
    list2=[1,2,3,4,5,6,7,8,9,10]
    list3=[1,2,3,4,5,6,7,8,9,10]
    process_list =[list1,list2,list3] #元素个数决定进程数量
    for plist in process_list:
        p = Process(target=yield_start,args=(plist,))
        p.start()

4.2 死锁与递归锁

所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态,或系统产生了死锁。这此永远在互相等待的进程称死锁进程

如下代码,就会产生死锁:

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A锁\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B锁\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B锁\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A锁\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
然后就卡住,死锁了
'''

解决死锁的方法

避免产生死锁的方法就是用递归锁,在python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire(获得锁)的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release(释放)后,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,就不会发生死锁的现象了。

mutexA=mutexB=threading.RLock()
#一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止。

三、并行(xing)和并发

执行结果:开了三个进程,每个进程下执行10个协程协作任务

4.3 信号量Semaphore

同进程的信号量一样。
用一个粗俗的例子来说,锁相当于独立卫生间,只有一个坑,同一时刻只能有一个人获取锁,进去使用;而信号量相当于公共卫生间,例如有5个坑,同一时刻可以有5个人获取锁,并使用。

Semaphore管理一个内置的计数器,每当调用acquire()时,内置计数器-1;调用release()时,内置计数器+1;计数器不能小于0,当计数器为0时,acquire()将阻塞线程,直到其他线程调用release()

实例:
同时只有5个线程可以获得Semaphore,即可以限制最大连接数为5:

import threading
import time

sem = threading.Semaphore(5)
def func():
    if sem.acquire():   #也可以用with进行上下文管理
        print(threading.current_thread().getName()+"get semaphore")
        time.sleep(2)
        sem.release()

for i in range(20):
    t1 = threading.Thread(target=func)
    t1.start()

利用with进行上下文管理:

import threading
import time

sem = threading.Semaphore(5)

def func():
    with sem:   
        print(threading.current_thread().getName()+"get semaphore")
        time.sleep(2)

for i in range(20):
    t1 = threading.Thread(target=func)
    t1.start()

注:信号量与进程池是完全不同一的概念,进程池Pool(4)最大只能产生4个进程,而且从头到尾都只是这4个进程,不会产生新的,而信号量是产生一堆线程/进程。

并行处理(Parallel
Processing)是计算机系统中能同时执行两个或更多个处理的一种计算方法。并行处理可同时工作于同一程序的不同方面。并行处理的主要目的是节省大型和复杂问题的解决时间。

C:\Python27\python.exe D:/weixin/temp/yield_tmp.py
______________1
______________2
______________3
______________4
______________5
______________6
______________7
______________8
______________9
______________10
______________1
______________1
______________2
______________2
______________3
______________3
______________4
______________4
______________5
______________5
______________6
______________6
______________7
______________7
______________8
______________8
______________9
______________9
______________10
______________10

Process finished with exit code 0

4.4 事件Event

同进程的一样

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手,为了解决这些问题我们使用threading库中的Event对象。

Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被
一直阻塞直至该
标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被
设置 为真的Event对象,那么它将忽略这个事件,继续执行。

Event对象具有一些方法:
event = threading.Event() #产生一个事件对象

  • event.isSet():返回event状态值;
  • event.wait():如果event.isSet() == False,将阻塞线程;
  • event.set():设置event的状态值为True,所有阻塞池的线程进入就绪状态,等待操作系统高度;
  • event.clear():恢复event的状态值False。

应用场景:

例如,我们有多个线程需要连接数据库,我们想要在启动时确保Mysql服务正常,才让那些工作线程去连接Mysql服务器,那么我们就可以采用threading.Event()机制来协调各个工作线程的连接操作,主线程中会去尝试连接Mysql服务,如果正常的话,触发事件,各工作线程会尝试连接Mysql服务。

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    print('\033[42m%s 等待连接mysql。。。\033[0m' %threading.current_thread().getName())
    event.wait()  #默认event状态为False,等待
    print('\033[42mMysql初始化成功,%s开始连接。。。\033[0m' %threading.current_thread().getName())


def check_mysql():
    print('\033[41m正在检查mysql。。。\033[0m')
    time.sleep(random.randint(1,3))
    event.set()   #设置event状态为True
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql) #等待连接mysql
    t2=Thread(target=conn_mysql) #等待连接myqsl
    t3=Thread(target=check_mysql) #检查mysql

    t1.start()
    t2.start()
    t3.start()


'''
输出如下:
Thread-1 等待连接mysql。。。
Thread-2 等待连接mysql。。。
正在检查mysql。。。
Mysql初始化成功,Thread-1开始连接。。。
Mysql初始化成功,Thread-2开始连接。。。
'''

注:threading.Eventwait方法还可以接受一个超时参数,默认情况下,如果事件一直没有发生,wait方法会一直阻塞下去,而加入这个超时参数之后,如果阻塞时间超过这个参数设定的值之后,wait方法会返回。对应于上面的应用场景,如果mysql服务器一直没有启动,我们希望子线程能够打印一些日志来不断提醒我们当前没有一个可以连接的mysql服务,我们就可以设置这个超时参数来达成这样的目的:

上例代码修改后如下:

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count = 1
    while not event.is_set():
        print("\033[42m%s 第 <%s> 次尝试连接。。。"%(threading.current_thread().getName(),count))
        event.wait(0.2)
        count+=1
    print("\033[45mMysql初始化成功,%s 开始连接。。。\033[0m"%(threading.current_thread().getName()))

def check_mysql():
    print('\033[41m正在检查mysql。。。\033[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql) #等待连接mysql
    t2=Thread(target=conn_mysql) #等待连接mysql
    t3=Thread(target=check_mysql) #检查mysql

    t1.start()
    t2.start()
    t3.start()

这样,我们就可以在等待Mysql服务启动的同时,看到工作线程里正在等待的情况。应用:连接池。

并发处理(concurrency
Processing)指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机(CPU)上运行,但任一个时刻点上只有一个程序在处理机(CPU)上运行。

 

4.5 定时器timer

定时器,指定n秒后执行某操作。

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)  #1秒后执行任务hello
t.start()   # after 1 seconds, "hello, world" will be printed

编程 2

   

4.6 线程队列queue

queue队列:使用import queue,用法与进程Queue一样。

queue下有三种队列:

  • queue.Queue(maxsize) 先进先出,先放进队列的数据,先被取出来;
  • queue.LifoQueue(maxsize) 后进先出,(Lifo 意为last in first
    out),后放进队列的数据,先被取出来
  • queue.PriorityQueue(maxsize) 优先级队列,优先级越高优先取出来。

举例:
先进先出:

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''

后进先出:

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

优先级队列:

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

并发的关键是你有处理多个任务的能力,不一定要同时。并行的关键是你有同时处理多个任务的能力。所以说,并行是并发的子集。

五、协程

协程:是单线程下的并发,又称微线程、纤程,英文名:Coroutine协程是一种用户态的轻量级线程,协程是由用户程序自己控制调度的。

需要强调的是:

1.
python的线程属于内核级别的,即由操作系统控制调度(如单线程一旦遇到io就被迫交出cpu执行权限,切换其他线程运行)

  1. 单线程内开启协程,一旦遇到io,从应用程序级别(而非操作系统)控制切换

对比操作系统控制线程的切换,用户在单线程内控制协程的切换,优点如下:

1.
协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

  1. 单线程内就可以实现并发的效果,最大限度地利用cpu。

要实现协程,关键在于用户程序自己控制程序切换,切换之前必须由用户程序自己保存协程上一次调用时的状态,如此,每次重新调用时,能够从上次的位置继续执行

(详细的:协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈)

四、同步与异步

5.1 yield实现协程

我们之前已经学习过一种在单线程下可以保存程序运行状态的方法,即yield,我们来简单复习一下:

  • yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
  • send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换

#不用yield:每次函数调用,都需要重复开辟内存空间,即重复创建名称空间,因而开销很大
import time
def consumer(item):
    # print('拿到包子%s' %item)
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333

    pass
def producer(target,seq):
    for item in seq:
        target(item) #每次调用函数,会临时产生名称空间,调用结束则释放,循环100000000次,则重复这么多次的创建和释放,开销非常大

start_time=time.time()
producer(consumer,range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time)) #30.132838010787964


#使用yield:无需重复开辟内存空间,即重复创建名称空间,因而开销小
import time
def init(func):
    def wrapper(*args,**kwargs):
        g=func(*args,**kwargs)
        next(g)
        return g
    return wrapper

init
def consumer():
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333
    while True:
        item=yield
        # print('拿到包子%s' %item)
        pass
def producer(target,seq):
    for item in seq:
        target.send(item) #无需重新创建名称空间,从上一次暂停的位置继续,相比上例,开销小

start_time=time.time()
producer(consumer(),range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time)) #21.882073879241943

缺点:
协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程。
协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程。

协程的定义(满足1,2,3就可以称为协程):

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

注意:yield切换在没有io的情况下或者没有重复开辟内存空间的操作,对效率没有什么提升,甚至更慢,为此,可以用greenlet来为大家演示这种切换。

在计算机领域,同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去。

5.2 greenlet实现协程

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator。

安装greenlet模块
pip install greenlet

from greenlet import greenlet
import time

def t1():
    print("test1,first")
    gr2.switch()
    time.sleep(5)
    print("test1,second")
    gr2.switch()

def t2():
    print("test2,first")
    gr1.switch()
    print("test2,second")

gr1 = greenlet(t1)
gr2 = greenlet(t2)
gr1.switch()


'''
输出结果:
test1,first
test2,first   #等待5秒
test1,second
test2,second
'''

可以在第一次switch时传入参数

from greenlet import greenlet
import time
def eat(name):
    print("%s eat food 1"%name)
    gr2.switch(name="alex")
    time.sleep(5)
    print("%s eat food 2"%name)
    gr2.switch()

def play_phone(name):
    print("%s play phone 1"%name)
    gr1.switch()
    print("%s play phone 1" % name)

gr1 = greenlet(eat)
gr2 = greenlet(play_phone)
gr1.switch(name="egon")  #可以在第一次switch时传入参数,以后都不需要

注意:greenlet只是提供了一种比generator更加便捷的切换方式,仍然没有解决遇到I/O自动切换的问题,而单纯的切换,反而会降低程序的执行速度。这就需要用到gevent模块了。

异步是指进程不需要一直等下去,而是继续执行其它操作,不管其他进程的状态。当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率。举个例子,打电话时就是同步通信,发短息时就是异步通信。

5.3 gevent实现协程

gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。greenlet全部运行在主程操作系统进程的内部,但它们被协作式地调试。遇到I/O阻塞时会自动切换任务。

注意:gevent有自己的I/O阻塞,如:gevent.sleep()和gevent.socket();但是gevent不能直接识别除自身之外的I/O阻塞,如:time.sleep(2),socket等,要想识别这些I/O阻塞,必须打一个补丁:from gevent import monkey;monkey.patch_all()

  • 需要先安装gevent模块
    pip install gevent

  • 创建一个协程对象g1
    g1 =gevent.spawn()
    spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给第一个参数(函数)eat的。

from gevent import monkey;monkey.patch_all()
import gevent

def eat():
    print("点菜。。。")
    gevent.sleep(3)   #等待上菜
    print("吃菜。。。")

def play():
    print("玩手机。。。")
    gevent.sleep(5)  #网卡了
    print("看NBA...")

# gevent.spawn(eat)
# gevent.spawn(play)
# print('主') # 直接结束

#因而也需要join方法,进程或现场的jion方法只能join一个,而gevent的joinall方法可以join多个
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])  #传一个gevent对象列表。
print("主线程")

"""
输出结果:
点菜。。。
玩手机。。。    
##等待大概3秒       此行没打印
吃菜。。。
##等待大概2秒          此行没打印
看NBA...
主线程
"""

注:上例中的gevent.sleep(3)是模拟的I/O阻塞。跟time.sleep(3)功能一样。

同步/异步

import gevent
def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)

def synchronous():  #同步执行
    for i in range(1, 10):
        task(i)

def asynchronous(): #异步执行
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()   #执行后,会顺序打印结果

print('Asynchronous:')
asynchronous()  #执行后,会异步同时打印结果,无序的。

爬虫应用

#协程的爬虫应用

from gevent import monkey;monkey.patch_all()
import gevent
import time
import requests

def get_page(url):
    print("GET: %s"%url)
    res = requests.get(url)
    if res.status_code == 200:
        print("%d bytes received from %s"%(len(res.text),url))

start_time = time.time()
g1 = gevent.spawn(get_page,"https://www.python.org")
g2 = gevent.spawn(get_page,"https://www.yahoo.com")
g3 = gevent.spawn(get_page,"https://www.github.com")
gevent.joinall([g1,g2,g3])
stop_time = time.time()
print("run time is %s"%(stop_time-start_time))

上以代码输出结果:

GET: https://www.python.org
GET: https://www.yahoo.com
GET: https://www.github.com
47714 bytes received from https://www.python.org
472773 bytes received from https://www.yahoo.com
98677 bytes received from https://www.github.com
run time is 2.501142978668213

应用:
通过gevent实现单线程下的socket并发,注意:from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞。

服务端代码:

from gevent import monkey;monkey.patch_all()
import gevent
from socket import *

class server:
    def __init__(self,ip,port):
        self.ip = ip
        self.port = port


    def conn_cycle(self):   #连接循环
        tcpsock = socket(AF_INET,SOCK_STREAM)
        tcpsock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
        tcpsock.bind((self.ip,self.port))
        tcpsock.listen(5)
        while True:
            conn,addr = tcpsock.accept()
            gevent.spawn(self.comm_cycle,conn,addr)

    def comm_cycle(self,conn,addr):   #通信循环
        try:
            while True:
                data = conn.recv(1024)
                if not data:break
                print(addr)
                print(data.decode("utf-8"))
                conn.send(data.upper())
        except Exception as e:
            print(e)
        finally:
            conn.close()

s1 = server("127.0.0.1",60000)
print(s1)
s1.conn_cycle()

客户端代码 :

from socket import *

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.connect(("127.0.0.1",60000))

while True:
    msg = input(">>: ").strip()
    if not msg:continue
    tcpsock.send(msg.encode("utf-8"))
    data = tcpsock.recv(1024)
    print(data.decode("utf-8"))

通过gevent实现并发多个socket客户端去连接服务端

from gevent import monkey;monkey.patch_all()
import gevent
from socket import *

def client(server_ip,port):
    try:
        c = socket(AF_INET,SOCK_STREAM)
        c.connect((server_ip,port))
        count = 0
        while True:
            c.send(("say hello %s"%count).encode("utf-8"))
            msg = c.recv(1024)
            print(msg.decode("utf-8"))
            count+=1
    except Exception as e:
        print(e)
    finally:
        c.close()

# g_l = []
# for i in range(500):
#     g = gevent.spawn(client,'127.0.0.1',60000)
#     g_l.append(g)
# gevent.joinall(g_l)

#上面注释代码可简写为下面代码这样。

threads = [gevent.spawn(client,"127.0.0.1",60000) for i in range(500)]
gevent.joinall(threads)

举个例子:

六、IO多路复用

由于CPU和内存的速度远远高于外设的速度,所以,在IO编程中,就存在速度严重不匹配的问题。比如要把100M的数据写入磁盘,CPU输出100M的数据只需要0.01秒,可是磁盘要接收这100M数据可能需要10秒,有两种办法解决:

通过IO多路复用实现同时监听多个端口的服务端

示例一:

# 示例一:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author : Cai Guangyin

from socket import socket
import select

sock_1 = socket()
sock_1.bind(("127.0.0.1",60000))
sock_1.listen(5)

sock_2 = socket()
sock_2.bind(("127.0.0.1",60001))
sock_2.listen(5)

inputs = [sock_1,sock_2]

while True:
    # IO多路复用
    # -- select方法,内部进行循环操作,哪个socket对象有变化(连接),就赋值给r;监听socket文件句柄有个数限制(1024个)
    # -- poll方法,也是内部进行循环操作,没有监听个数限制
    # -- epoll方法,通过异步回调,哪个socket文件句柄有变化,就会自动告诉epoll,它有变化,然后将它赋值给r;
    # windows下没有epoll方法,只有Unix下有,windows下只有select方法
    r,w,e=select.select(inputs,[],[],0.2)  #0.2是超时时间
        #当有人连接sock_1时,返回的r,就是[sock_1,];是个列表
        #当有人连接sock_2时,返回的r,就是[sock_2,];是个列表
        #当有多人同时连接sock_1和sock_2时,返回的r,就是[sock_1,sock_2,];是个列表
        #0.2是超时时间,如果这段时间内没有连接进来,那么r就等于一个空列表;
    for obj in r:
        if obj in [sock_1,sock_2]:

            conn, addr = obj.accept()
            inputs.append(conn)
            print("新连接来了:",obj)

        else:
            print("有连接用户发送消息来了:",obj)
            data = obj.recv(1024)
            if not data:break
            obj.sendall(data)

客户端:

# -*- coding:utf-8 -*-
#!/usr/bin/python
# Author : Cai Guangyin

from socket import *

tcpsock = socket(AF_INET,SOCK_STREAM)   #创建一个tcp套接字
tcpsock.connect(("127.0.0.1",60001))     #根据地址连接服务器

while True:   #客户端通信循环
    msg = input(">>: ").strip()   #输入消息
    if not msg:continue           #判断输入是否为空
        #如果客户端发空,会卡住,加此判断,限制用户不能发空
    if msg == 'exit':break       #退出
    tcpsock.send(msg.encode("utf-8"))   #socket只能发送二进制数据
    data = tcpsock.recv(1024)    #接收消息
    print(data.decode("utf-8"))

tcpsock.close()

以上服务端运行时,如果有客户端断开连接则会抛出如下异常:

编程 3

异常

  1. CPU等着,也就是程序暂停执行后续代码,等100M的数据在10秒后写入磁盘,再接着往下执行,这种模式称为同步IO
  2. CPU不等待,只是告诉磁盘,慢慢写不着急,写完通知我,我接着干别的事去了,于是后续代码可以接着执行,这种模式称为异步IO

改进版如下

收集异常并将接收数据和发送数据分开处理
示例二:

# 示例二
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author : Cai Guangyin

from socket import *
import select

sk1 = socket(AF_INET,SOCK_STREAM)
sk1.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk1.bind(("127.0.0.1",60000))
sk1.listen(5)

sk2 = socket(AF_INET,SOCK_STREAM)
sk2.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk2.bind(("127.0.0.1",60001))
sk2.listen(5)


inputs = [sk1,sk2]
w_inputs = []

while True:
    r,w,e = select.select(inputs,w_inputs,inputs,0.1)
    for obj in r:
        if obj in [sk1,sk2]:
            print("新连接:",obj.getsockname())
            conn,addr = obj.accept()
            inputs.append(conn)

        else:
            try:
                # 如果客户端断开连接,将获取异常,并将收取数据data置为空
                data = obj.recv(1024).decode('utf-8')
                print(data)
            except Exception as e:
                data = ""

            if data:
                # 如果obj能正常接收数据,则认为它是一个可写的对象,然后将它加入w_inputs列表
                w_inputs.append(obj)
            else:
                # 如果数据data为空,则从inputs列表中移除此连接对象obj
                print("空消息")
                obj.close()
                inputs.remove(obj)


        print("分割线".center(60,"-"))

    # 遍历可写的对象列表,
    for obj in w:
        obj.send(b'ok')
        # 发送数据后删除w_inputs中的此obj对象,否则客户端断开连接时,会抛出”ConnectionResetError“异常
        w_inputs.remove(obj)

五、threading模块

七、socketserver实现并发

基于TCP的套接字,关键就是两个循环,一个连接循环,一个通信循环。

SocketServer内部使用 IO多路复用 以及 “多线程” 和 “多进程”
,从而实现并发处理多个客户端请求的Socket服务端。即:每个客户端请求连接到服务器时,Socket服务端都会在服务器是创建一个“线程”或者“进程”
专门负责处理当前客户端的所有请求。

socketserver模块中的类分为两大类:server类(解决链接问题)和request类(解决通信问题)

server类:

编程 4

server类

request类:

编程 5

request类

线程server类的继承关系:

编程 6

线程server类的继承关系

进程server类的继承关系:

编程 7

进程server类的继承关系

request类的继承关系:

编程 8

request类的继承关系

以下述代码为例,分析socketserver源码:

ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()

查找属性的顺序:ThreadingTCPServer –> ThreadingMixIn –>
TCPServer->BaseServer

  1. 实例化得到ftpserver,先找类ThreadingTCPServer__init__,在TCPServer中找到,进而执行server_bind,server_active
  2. ftpserver下的serve_forever,在BaseServer中找到,进而执行self._handle_request_noblock(),该方法同样是在BaseServer
  3. 执行self._handle_request_noblock()进而执行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),然后执行self.process_request(request, client_address)
  4. ThreadingMixIn中找到process_request,开启多线程应对并发,进而执行process_request_thread,执行self.finish_request(request, client_address)
  5. 上述四部分完成了链接循环,本部分开始进入处理通讯部分,在BaseServer中找到finish_request,触发我们自己定义的类的实例化,去找__init__方法,而我们自己定义的类没有该方法,则去它的父类也就是BaseRequestHandler中找….

源码分析总结:
基于tcp的socketserver我们自己定义的类中的

  • self.server 即套接字对象
  • self.request 即一个链接
  • self.client_address 即客户端地址

基于udp的socketserver我们自己定义的类中的

  • self.request是一个元组(第一个元素是客户端发来的数据,第二部分是服务端的udp套接字对象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
  • self.client_address即客户端地址。

线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Python也不例外,并且,Python的线程是真正的Posix
Thread,而不是模拟出来的线程。

6.1 ThreadingTCPServer

ThreadingTCPServer实现的Soket服务器内部会为每个client创建一个
“线程”,该线程用来和客户端进行交互。

使用ThreadingTCPServer:

  • 创建一个继承自 SocketServer.BaseRequestHandler 的类
  • 类中必须定义一个名称为 handle 的方法
  • 启动ThreadingTCPServer。
  • 启动serve_forever() 链接循环

服务端:

import socketserver

class MyServer(socketserver.BaseRequestHandler):
    def handle(self):
        conn = self.request
        # print(addr)
        conn.sendall("欢迎致电10086,请输入1XXX,0转人工服务。".encode("utf-8"))
        Flag = True
        while Flag:
            data = conn.recv(1024).decode("utf-8")
            if data == "exit":
                Flag = False
            elif data == '0':
                conn.sendall("您的通话可能会被录音。。。".encode("utf-8"))
            else:
                conn.sendall("请重新输入。".encode('utf-8'))

if __name__ == '__main__':
    server = socketserver.ThreadingTCPServer(("127.0.0.1",60000),MyServer)
    server.serve_forever()  #内部实现while循环监听是否有客户端请求到达。

客户端:

import socket

ip_port = ('127.0.0.1',60000)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024).decode("utf-8")
    print('receive:',data)
    inp = input('please input:')
    sk.sendall(inp.encode('utf-8'))
    if inp == 'exit':
        break
sk.close()

Python的标准库提供了两个模块:_threadthreading_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

七、基于UDP的套接字

  • recvfrom(buffersize[, flags])接收消息,buffersize是一次接收多少个字节的数据。
  • sendto(data[, flags], address)
    发送消息,data是要发送的二进制数据,address是要发送的地址,元组形式,包含IP和端口

服务端:

from socket import *
s=socket(AF_INET,SOCK_DGRAM)  #创建一个基于UDP的服务端套接字,注意使用SOCK_DGRAM类型
s.bind(('127.0.0.1',8080))  #绑定地址和端口,元组形式

while True:    #通信循环
    client_msg,client_addr=s.recvfrom(1024) #接收消息
    print(client_msg)
    s.sendto(client_msg.upper(),client_addr) #发送消息

客户端:

from socket import *
c=socket(AF_INET,SOCK_DGRAM)   #创建客户端套接字

while True:
    msg=input('>>: ').strip()
    c.sendto(msg.encode('utf-8'),('127.0.0.1',8080)) #发送消息
    server_msg,server_addr=c.recvfrom(1024) #接收消息
    print('from server:%s msg:%s' %(server_addr,server_msg))

模拟即时聊天
由于UDP无连接,所以可以同时多个客户端去跟服务端通信

服务端:

from socket import *

server_address = ("127.0.0.1",60000)
udp_server_sock = socket(AF_INET,SOCK_DGRAM)
udp_server_sock.bind(server_address)

while True:
    qq_msg,addr = udp_server_sock.recvfrom(1024)
    print("来自[%s:%s]的一条消息:\033[32m%s\033[0m"%(addr[0],addr[1],qq_msg.decode("utf-8")))
    back_msg = input("回复消息:").strip()
    udp_server_sock.sendto(back_msg.encode("utf-8"),addr)

udp_server_sock.close()

客户端:

from socket import *

BUFSIZE = 1024
udp_client_sock = socket(AF_INET,SOCK_DGRAM)
qq_name_dic = {
    "alex":("127.0.0.1",60000),
    "egon":("127.0.0.1",60000),
    "seven":("127.0.0.1",60000),
    "yuan":("127.0.0.1",60000),
}

while True:
    qq_name = input("请选择聊天对象:").strip()
    while True:
        msg = input("请输入消息,回车发送:").strip()
        if msg == "quit":break
        if not msg or not qq_name or qq_name not in qq_name_dic:continue
        print(msg,qq_name_dic[qq_name])
        udp_client_sock.sendto(msg.encode("utf-8"),qq_name_dic[qq_name])

        back_msg,addr = udp_client_sock.recvfrom(BUFSIZE)
        print("来自[%s:%s]的一条消息:\033[32m%s\033[0m" %(addr[0],addr[1],back_msg.decode("utf-8")))
udp_client_sock.close()

注意:
1.你单独运行上面的udp的客户端,你发现并不会报错,相反tcp却会报错,因为udp协议只负责把包发出去,对方收不收,我根本不管,而tcp是基于链接的,必须有一个服务端先运行着,客户端去跟服务端建立链接然后依托于链接才能传递消息,任何一方试图把链接摧毁都会导致对方程序的崩溃。

2.上面的udp程序,你注释任何一条客户端的sendinto,服务端都会卡住,为什么?因为服务端有几个recvfrom就要对应几个sendinto,哪怕是sendinto(b”)那也要有。

3.recvfrom(buffersize)如果设置每次接收数据的字节数,小于对方发送的数据字节数,如果运行Linux环境下,则只会接收到recvfrom()所设置的字节数的数据;而如果运行windows环境下,则会报错。

基于socketserver实现多线程的UDP服务端:

import socketserver

class MyUDPhandler(socketserver.BaseRequestHandler):
    def handle(self):
        client_msg,s=self.request
        s.sendto(client_msg.upper(),self.client_address)

if __name__ == '__main__':
    s=socketserver.ThreadingUDPServer(('127.0.0.1',60000),MyUDPhandler)
    s.serve_forever()

1. 调用Thread类直接创建

启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行:

编程 9编程 10

 1 import time, threading
 2 
 3 # 新线程执行的代码:
 4 def loop():
 5     print('thread %s is running...' % threading.current_thread().name)
 6     n = 0
 7     while n < 5:
 8         n = n + 1
 9         print('thread %s >>> %s' % (threading.current_thread().name, n))
10         time.sleep(1)
11     print('thread %s ended.' % threading.current_thread().name)
12 
13 print('thread %s is running...' % threading.current_thread().name)
14 t = threading.Thread(target=loop, name='LoopThread')
15 t.start()
16 t.join()
17 print('thread %s ended.' % threading.current_thread().name)
18 
19 
20 #运行结果:
21 #thread MainThread is running...
22 # thread LoopThread is running...
23 # thread LoopThread >>> 1
24 # thread LoopThread >>> 2
25 # thread LoopThread >>> 3
26 # thread LoopThread >>> 4
27 # thread LoopThread >>> 5
28 # thread LoopThread ended.
29 # thread MainThread ended.

实例1

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1Thread-2……

编程 11编程 12

 1 import threading
 2 import time
 3 
 4 def countNum(n): # 定义某个线程要运行的函数
 5 
 6     print("running on number:%s" %n)
 7 
 8     time.sleep(3)
 9 
10 if __name__ == '__main__':
11 
12     t1 = threading.Thread(target=countNum,args=(23,)) #生成一个线程实例
13     t2 = threading.Thread(target=countNum,args=(34,))
14 
15     t1.start() #启动线程
16     t2.start()
17 
18     print("ending!")
19 
20 
21 #运行结果:程序打印完“ending!”后等待3秒结束
22 #running on number:23
23 #running on number:34
24 #ending!

实例2

该实例中共有3个线程:主线程,t1和t2子线程

编程 13

 

2. 自定义Thread类继承式创建

编程 14编程 15

 1 #继承Thread式创建
 2 
 3 import threading
 4 import time
 5 
 6 class MyThread(threading.Thread):
 7 
 8     def __init__(self,num):
 9         threading.Thread.__init__(self)    #继承父类__init__
10         self.num=num
11 
12     def run(self):    #必须定义run方法
13         print("running on number:%s" %self.num)
14         time.sleep(3)
15 
16 t1=MyThread(56)
17 t2=MyThread(78)
18 
19 t1.start()
20 t2.start()
21 print("ending")

View Code

3. Thread类的实例方法

join和dameon

编程 16编程 17

 1 import threading
 2 from time import ctime,sleep
 3 
 4 def Music(name):
 5 
 6         print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
 7         sleep(3)
 8         print("end listening {time}".format(time=ctime()))
 9 
10 def Blog(title):
11 
12         print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
13         sleep(5)
14         print('end recording {time}'.format(time=ctime()))
15 
16 
17 threads = []
18 
19 
20 t1 = threading.Thread(target=Music,args=('FILL ME',))
21 t2 = threading.Thread(target=Blog,args=('',))
22 
23 threads.append(t1)
24 threads.append(t2)
25 
26 if __name__ == '__main__':
27 
28     #t2.setDaemon(True)
29 
30     for t in threads:
31 
32         #t.setDaemon(True) #注意:一定在start之前设置
33         t.start()
34 
35         #t.join()
36 
37     #t1.join()
38     #t2.join()    #  考虑这三种join位置下的结果?
39 
40     print ("all over %s" %ctime())

join和setDaemon

其它方法:

1 Thread实例对象的方法
2   # isAlive(): 返回线程是否活动的。
3   # getName(): 返回线程名。
4   # setName(): 设置线程名。
5 
6 threading模块提供的一些方法:
7   # threading.currentThread(): 返回当前的线程变量。
8   # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
9   # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

六、GIL

'''

定义:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)

'''

Python中的线程是操作系统的原生线程,Python虚拟机使用一个全局解释器锁(Global
Interpreter
Lock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。
GIL:在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。
在调用任何Python C API之前,要先获得GIL
GIL缺点:多处理器退化为单处理器;优点:避免大量的加锁解锁操作。

1.
GIL的早期设计

Python支持多线程,而解决多线程之间数据完整性和状态同步的最简单方法自然就是加锁。
于是有了GIL这把超级大锁,而当越来越多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操作)。慢慢的这种实现方式被发现是蛋疼且低效的。但当大家试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而非常难以去除了。有多难?做个类比,像MySQL这样的“小项目”为了把Buffer
Pool
Mutex这把大锁拆分成各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,并且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更何况Python这样核心开发和代码贡献者高度社区化的团队呢?

2.
GIL的影响

无论你启多少个线程,你有多少个cpu,
Python在执行一个进程的时候会淡定的在同一时刻只允许一个线程运行。
所以,python是无法利用多核CPU实现多线程的。
这样,python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。

编程 18

计算密集型实例:

编程 19编程 20

 1 #coding:utf8
 2 from threading import Thread
 3 import time
 4 
 5 def counter():
 6     i = 0
 7     for _ in range(100000000):
 8         i = i + 1
 9     return True
10 
11 
12 def main():
13     l=[]
14     start_time = time.time()
15     for i in range(2):
16 
17         t = Thread(target=counter)
18         t.start()
19         l.append(t)
20         t.join()
21 
22     for t in l:
23         t.join()
24     # counter()
25     # counter()
26     end_time = time.time()
27     print("Total time: {}".format(end_time - start_time))
28 
29 if __name__ == '__main__':
30     main()
31 
32 
33 '''
34 py2.7:
35      串行:9.17599987984s
36      并发:9.26799988747s
37 py3.6:
38      串行:9.540389776229858s
39      并发:9.568442583084106s
40 
41 '''

计算密集型,多线程并发相比串行,没有显著优势

3. 解决方案

用multiprocessing替代Thread
multiprocessing库的出现很大程度上是为了弥补thread库因为GIL而低效的缺陷。它完整的复制了一套thread所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的GIL,因此也不会出现进程之间的GIL争抢。

编程 21编程 22

 1 #coding:utf8
 2 from multiprocessing import Process
 3 import time
 4 
 5 def counter():
 6     i = 0
 7     for _ in range(100000000):
 8         i = i + 1
 9 
10     return True
11 
12 def main():
13 
14     l=[]
15     start_time = time.time()
16 
17     # for _ in range(2):
18     #     t=Process(target=counter)
19     #     t.start()
20     #     l.append(t)
21     #     #t.join()
22     #
23     # for t in l:
24     #    t.join()
25     counter()
26     counter()
27     end_time = time.time()
28     print("Total time: {}".format(end_time - start_time))
29 
30 if __name__ == '__main__':
31     main()
32 
33 
34 '''
35 
36 py2.7:
37      串行:8.92299985886 s
38      并行:8.19099998474 s
39 
40 py3.6:
41      串行:9.963459014892578 s
42      并发:5.1366541385650635 s
43 
44 '''

multiprocess多进程实现并发运算能够提升效率

当然multiprocessing也不是万能良药。它的引入会增加程序实现时线程间数据通讯和同步的困难。就拿计数器来举例子,如果我们要多个线程累加同一个变量,对于thread来说,申明一个global变量,用thread.Lock的context包裹住,三行就搞定了。而multiprocessing由于进程之间无法看到对方的数据,只能通过在主线程申明一个Queue,put再get或者用share
memory的方法。这个额外的实现成本使得本来就非常痛苦的多线程程序编码,变得更加痛苦了。

小结:因为GIL的存在,只有IO
Bound场景下的多线程会得到较好的性能提升;如果对并行计算性能较高的程序可以考虑把核心部分变成C模块,或者索性用其他语言实现;GIL在较长一段时间内将会继续存在,但是会不断对其进行改进。

七、同步锁(lock)

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

编程 23编程 24

 1 import time
 2 import threading
 3 
 4 def subNum():
 5     global num #在每个线程中都获取这个全局变量
 6     temp = num
 7     time.sleep(0.1)
 8     num =temp-1  # 对此公共变量进行-1操作
 9 
10 num = 100  #设定一个共享变量
11 thread_list = []
12 
13 for i in range(100):
14     t = threading.Thread(target=subNum)
15     t.start()
16     thread_list.append(t)
17 
18 for t in thread_list: #等待所有线程执行完毕
19     t.join()
20 
21 print('Result: ', num)
22 
23 
24 #运行结果:
25 #Result:  99

多线程共享变量,无法保证变量安全

如上实例,在一个进程内,设置共享变量num=100,然后创建100个线程,执行num-=1的操作,但是,由于在函数subNum中存在time.sleep(0.1),该语句可以等价于IO操作。于是在这短短的0.1秒的时间内,所有的线程已经创建并启动,拿到了num=100的变量,等待0.1秒过后,最终得到的num其实是99.

锁通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁:

编程 25编程 26

 1 import time
 2 import threading
 3 
 4 def subNum():
 5     global num #在每个线程中都获取这个全局变量
 6     lock.acquire()
 7     temp = num
 8     time.sleep(0.1)
 9     num =temp-1  # 对此公共变量进行-1操作
10     lock.release()
11 
12 
13 num = 100  #设定一个共享变量
14 lock = threading.Lock()    #生成一个同步锁对象
15 thread_list = []
16 
17 for i in range(100):
18     t = threading.Thread(target=subNum)
19     t.start()
20     thread_list.append(t)
21 
22 for t in thread_list: #等待所有线程执行完毕
23     t.join()
24 
25 print('Result: ', num)
26 
27 #运行结果:
28 #Result:  0

应用lock方法,保证变量安全

 

lock.acquire()与lock.release()包起来的代码段,保证同一时刻只允许一个线程引用。

1 import threading
2 
3 R=threading.Lock()
4 
5 R.acquire()
6 '''
7 对公共数据的操作
8 '''
9 R.release()

八、死锁与递归锁

所谓死锁:
是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

编程 27编程 28

 1 import threading
 2 import time
 3 
 4 mutexA = threading.Lock()
 5 mutexB = threading.Lock()
 6 
 7 class MyThread(threading.Thread):
 8 
 9     def __init__(self):
10         threading.Thread.__init__(self)
11 
12     def run(self):
13         self.fun1()
14         self.fun2()
15 
16     def fun1(self):
17 
18         mutexA.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放
19 
20         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
21 
22         mutexB.acquire()
23         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
24         mutexB.release()
25 
26         mutexA.release()
27 
28 
29     def fun2(self):
30 
31         mutexB.acquire()
32         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
33         time.sleep(0.2)
34 
35         mutexA.acquire()
36         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
37         mutexA.release()
38 
39         mutexB.release()
40 
41 if __name__ == "__main__":
42 
43     print("start---------------------------%s"%time.time())
44 
45     for i in range(0, 10):
46         my_thread = MyThread()
47         my_thread.start()
48 
49 
50 
51 #运行结果:
52 #start---------------------------1494316634.4121563
53 #I am Thread-1 , get res: ResA---1494316634.4121563
54 #I am Thread-1 , get res: ResB---1494316634.4121563
55 #I am Thread-1 , get res: ResB---1494316634.4121563
56 #I am Thread-2 , get res: ResA---1494316634.4121563

死锁实例

 

在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

编程 29编程 30

 1 import threading
 2 import time
 3 
 4 # mutexA = threading.Lock()
 5 # mutexB = threading.Lock()
 6 rlock = threading.RLock()
 7 
 8 class MyThread(threading.Thread):
 9 
10     def __init__(self):
11         threading.Thread.__init__(self)
12 
13     def run(self):
14         self.fun1()
15         self.fun2()
16 
17     def fun1(self):
18         rlock.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放
19 
20         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
21 
22         rlock.acquire()
23         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
24         rlock.release()
25 
26         rlock.release()
27 
28 
29     def fun2(self):
30         rlock.acquire()
31         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
32         time.sleep(0.2)
33 
34         rlock.acquire()
35         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
36         rlock.release()
37 
38         rlock.release()
39 
40 if __name__ == "__main__":
41 
42     print("start---------------------------%s"%time.time())
43 
44     for i in range(0, 10):
45         my_thread = MyThread()
46         my_thread.start()
47 
48 
49 #运行结果:从以下结果也可以发现,线程之间是竞争关系
50 """
51 start---------------------------1494316940.0863945
52 I am Thread-1 , get res: ResA---1494316940.0873976
53 I am Thread-1 , get res: ResB---1494316940.0873976
54 I am Thread-1 , get res: ResB---1494316940.0873976
55 I am Thread-1 , get res: ResA---1494316940.287911
56 I am Thread-2 , get res: ResA---1494316940.287911
57 I am Thread-2 , get res: ResB---1494316940.287911
58 I am Thread-2 , get res: ResB---1494316940.287911
59 I am Thread-2 , get res: ResA---1494316940.4883447
60 I am Thread-4 , get res: ResA---1494316940.4883447
61 I am Thread-4 , get res: ResB---1494316940.4883447
62 I am Thread-4 , get res: ResB---1494316940.4883447
63 I am Thread-4 , get res: ResA---1494316940.6886203
64 I am Thread-6 , get res: ResA---1494316940.6886203
65 I am Thread-6 , get res: ResB---1494316940.6896234
66 I am Thread-6 , get res: ResB---1494316940.6896234
67 I am Thread-6 , get res: ResA---1494316940.890659
68 I am Thread-8 , get res: ResA---1494316940.890659
69 I am Thread-8 , get res: ResB---1494316940.890659
70 I am Thread-8 , get res: ResB---1494316940.890659
71 I am Thread-8 , get res: ResA---1494316941.0918815
72 I am Thread-10 , get res: ResA---1494316941.0918815
73 I am Thread-10 , get res: ResB---1494316941.0918815
74 I am Thread-10 , get res: ResB---1494316941.0918815
75 I am Thread-10 , get res: ResA---1494316941.2923715
76 I am Thread-5 , get res: ResA---1494316941.2923715
77 I am Thread-5 , get res: ResB---1494316941.2923715
78 I am Thread-5 , get res: ResB---1494316941.2923715
79 I am Thread-5 , get res: ResA---1494316941.493138
80 I am Thread-9 , get res: ResA---1494316941.493138
81 I am Thread-9 , get res: ResB---1494316941.493138
82 I am Thread-9 , get res: ResB---1494316941.493138
83 I am Thread-9 , get res: ResA---1494316941.6937861
84 I am Thread-7 , get res: ResA---1494316941.6937861
85 I am Thread-7 , get res: ResB---1494316941.6937861
86 I am Thread-7 , get res: ResB---1494316941.6937861
87 I am Thread-7 , get res: ResA---1494316941.8946414
88 I am Thread-3 , get res: ResA---1494316941.8946414
89 I am Thread-3 , get res: ResB---1494316941.8946414
90 I am Thread-3 , get res: ResB---1494316941.8946414
91 I am Thread-3 , get res: ResA---1494316942.0956843
92 """

递归锁解决死锁

九、event对象

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为False。如果有线程等待一个Event对象,
而这个Event对象的标志为False,那么这个线程将会被一直阻塞直至该标志为True。一个线程如果将一个Event对象的信号标志设置为True,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,
继续执行。

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

编程 31

 

可以考虑一种应用场景(仅仅作为说明),例如,我们有多个线程从Redis队列中读取数据来处理,这些线程都要尝试去连接Redis的服务,一般情况下,如果Redis连接不成功,在各个线程的代码中,都会去尝试重新连接。如果我们想要在启动时确保Redis服务正常,才让那些工作线程去连接Redis服务器,那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作:主线程中会去尝试连接Redis服务,如果正常的话,触发事件,各工作线程会尝试连接Redis服务。

编程 32编程 33

 1 import threading
 2 import time
 3 import logging
 4 
 5 logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
 6 
 7 def worker(event):
 8     logging.debug('Waiting for redis ready...')
 9     while not event.isSet():
10         logging.debug('connect failed...')
11         event.wait(1)
12     logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
13     time.sleep(1)
14 
15 def main():
16     readis_ready = threading.Event()
17     t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
18     t1.start()
19 
20     t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
21     t2.start()
22 
23     logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
24     time.sleep(3) # simulate the check progress
25     logging.debug('redis server is running')
26     readis_ready.set()
27 
28 if __name__=="__main__":
29     main()
30 
31 
32 #运行结果:
33 (t1        ) Waiting for redis ready...
34 # (t1        ) connect failed...
35 # (t2        ) Waiting for redis ready...
36 # (t2        ) connect failed...
37 # (MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
38 # (t1        ) connect failed...
39 # (t2        ) connect failed...
40 # (t2        ) connect failed...
41 # (t1        ) connect failed...
42 # (MainThread) redis server is running
43 # (t2        ) redis ready, and connect to redis server and do some work [Tue May  9 16:15:18 2017]
44 # (t1        ) redis ready, and connect to redis server and do some work [Tue May  9 16:15:18 2017]

监听Redis服务

十、Semaphore(信号量)

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

编程 34编程 35

 1 import threading
 2 import time
 3 
 4 semaphore = threading.Semaphore(5)
 5 
 6 def func():
 7     if semaphore.acquire():
 8         print (threading.currentThread().getName() + ' get semaphore')
 9         time.sleep(2)
10         semaphore.release()
11 
12 for i in range(20):
13   t1 = threading.Thread(target=func)
14   t1.start()
15 
16 
17 #运行结果:
18 # Thread-1 get semaphore
19 # Thread-2 get semaphore
20 # Thread-3 get semaphore
21 # Thread-4 get semaphore
22 # Thread-5 get semaphore
23 # Thread-6 get semaphore#隔2秒打印
24 # Thread-7 get semaphore
25 # Thread-8 get semaphore
26 # Thread-9 get semaphore
27 # Thread-10 get semaphore
28 # Thread-11 get semaphore#隔2秒打印
29 # Thread-12 get semaphore
30 # Thread-13 get semaphore
31 # Thread-14 get semaphore
32 # Thread-15 get semaphore
33 # Thread-16 get semaphore#隔2秒打印
34 # Thread-17 get semaphore
35 # Thread-18 get semaphore
36 # Thread-20 get semaphore
37 # Thread-19 get semaphore

semaphore实例

十一、multiprocessing

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. 
The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. 
Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

 

由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

multiprocessing包是python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(),
run(),
join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类
(这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

编程 36编程 37

 1 from multiprocessing import Process
 2 import time
 3 def f(name):
 4 
 5     print('hello', name,time.ctime())
 6     time.sleep(1)
 7 
 8 if __name__ == '__main__':
 9     p_list=[]
10     for i in range(3):
11         p = Process(target=f, args=('alvin:%s'%i,))
12         p_list.append(p)
13         p.start()
14     for i in p_list:
15         p.join()
16     print('end')
17 
18 
19 #运行结果:
20 #hello alvin:0 Tue May  9 16:41:18 2017
21 #hello alvin:1 Tue May  9 16:41:18 2017
22 #hello alvin:2 Tue May  9 16:41:18 2017
23 #end

Process类调用

 

 

编程 38编程 39

 1 from multiprocessing import Process
 2 import time
 3 
 4 class MyProcess(Process):
 5     def __init__(self):
 6         super(MyProcess, self).__init__()
 7 
 8     def run(self):
 9 
10         print ('hello', self.name,time.ctime())
11         time.sleep(1)
12 
13 
14 if __name__ == '__main__':
15     p_list=[]
16     for i in range(3):
17         p = MyProcess()
18         p.start()
19         p_list.append(p)
20 
21     for p in p_list:
22         p.join()
23 
24     print('end')
25 
26 
27 #运行结果:
28 #hello MyProcess-1 Tue May  9 16:42:46 2017
29 #hello MyProcess-2 Tue May  9 16:42:46 2017
30 #hello MyProcess-3 Tue May  9 16:42:46 2017
31 #end

继承Process类调用

process类:

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  daemon:和线程的setDeamon功能一样

  name:进程名字。

  pid:进程号。

实例:

编程 40编程 41

 1 from multiprocessing import Process
 2 import os
 3 import time
 4 def info(name):
 5 
 6 
 7     print("name:",name)
 8     print('parent process:', os.getppid())
 9     print('process id:', os.getpid())
10     print("------------------")
11     time.sleep(1)
12 
13 def foo(name):
14 
15     info(name)
16 
17 if __name__ == '__main__':
18 
19     info('main process line')
20 
21 
22     p1 = Process(target=info, args=('alvin',))
23     p2 = Process(target=foo, args=('egon',))
24     p1.start()
25     p2.start()
26 
27     p1.join()
28     p2.join()
29 
30     print("ending")
31 
32 
33 
34 #运行结果:
35 # name: main process line
36 # parent process: 5112
37 # process id: 10808
38 # ------------------
39 # name: alvin
40 # name: egon
41 # parent process: 10808
42 # process id: 9576
43 # ------------------
44 # parent process: 10808
45 # process id: 9604
46 # ------------------
47 # ending

process类创建多进程

通过tasklist(Win)或者ps -elf
|grep(linux)命令检测每一个进程号(PID)对应的进程名.

十二、协程

 1 import time
 2 
 3 """
 4 传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
 5 如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
 6 """
 7 # 注意到consumer函数是一个generator(生成器):
 8 # 任何包含yield关键字的函数都会自动成为生成器(generator)对象
 9 
10 def consumer():
11     r = ''
12     while True:
13         # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
14         #    yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
15         #    当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
16         #    就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
17         n = yield r
18         if not n:
19             return
20         print('[CONSUMER] ←← Consuming %s...' % n)
21         time.sleep(1)
22         r = '200 OK'
23 def produce(c):
24     # 1、首先调用c.next()启动生成器
25     next(c)
26     n = 0
27     while n < 5:
28         n = n + 1
29         print('[PRODUCER] →→ Producing %s...' % n)
30         # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
31         cr = c.send(n)
32         # 4、produce拿到consumer处理的结果,继续生产下一条消息;
33         print('[PRODUCER] Consumer return: %s' % cr)
34     # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
35     c.close()
36 if __name__=='__main__':
37     # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
38     c = consumer()
39     produce(c)
40     
41     
42 '''
43 result:
44 
45 [PRODUCER] →→ Producing 1...
46 [CONSUMER] ←← Consuming 1...
47 [PRODUCER] Consumer return: 200 OK
48 [PRODUCER] →→ Producing 2...
49 [CONSUMER] ←← Consuming 2...
50 [PRODUCER] Consumer return: 200 OK
51 [PRODUCER] →→ Producing 3...
52 [CONSUMER] ←← Consuming 3...
53 [PRODUCER] Consumer return: 200 OK
54 [PRODUCER] →→ Producing 4...
55 [CONSUMER] ←← Consuming 4...
56 [PRODUCER] Consumer return: 200 OK
57 [PRODUCER] →→ Producing 5...
58 [CONSUMER] ←← Consuming 5...
59 [PRODUCER] Consumer return: 200 OK
60 '''

 

greenlet:

greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet是python中实现我们所谓的”Coroutine(协程)”的一个基础库. 

 1 from greenlet import greenlet
 2  
 3 def test1():
 4     print (12)
 5     gr2.switch()
 6     print (34)
 7     gr2.switch()
 8  
 9 def test2():
10     print (56)
11     gr1.switch()
12     print (78)
13  
14 gr1 = greenlet(test1)
15 gr2 = greenlet(test2)
16 gr1.switch()
17 
18 
19 #运行结果:
20 #12
21 #56
22 #34
23 #78

基于greenlet的框架——gevent

gevent模块实现协程:

Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。

gevent是第三方库,通过greenlet实现协程,其基本思想是:

当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey
patch完成:

编程 42编程 43

 1 from gevent import monkey
 2 monkey.patch_all()
 3 import gevent
 4 from urllib import request
 5 import time
 6 
 7 def f(url):
 8     print('GET: %s' % url)
 9     resp = request.urlopen(url)
10     data = resp.read()
11     print('%d bytes received from %s.' % (len(data), url))
12 
13 start=time.time()
14 
15 gevent.joinall([
16         gevent.spawn(f, 'https://itk.org/'),
17         gevent.spawn(f, 'https://www.github.com/'),
18         gevent.spawn(f, 'https://zhihu.com/'),
19 ])
20 
21 print(time.time()-start)
22 
23 
24 
25 #运行结果:
26 #GET: https://itk.org/
27 #GET: https://www.github.com/
28 #GET: https://zhihu.com/
29 #9077 bytes received from https://zhihu.com/.
30 #12323 bytes received from https://itk.org/.
31 #92574 bytes received from https://www.github.com/.
32 #3.7679357528686523

gevent实例

 

 

编程 44编程 45

 1 from gevent import monkey
 2 monkey.patch_all()
 3 import gevent
 4 from urllib import request
 5 import time
 6 
 7 def f(url):
 8     print('GET: %s' % url)
 9     resp = request.urlopen(url)
10     data = resp.read()
11     print('%d bytes received from %s.' % (len(data), url))
12 
13 start=time.time()
14 
15 # gevent.joinall([
16 #         gevent.spawn(f, 'https://itk.org/'),
17 #         gevent.spawn(f, 'https://www.github.com/'),
18 #         gevent.spawn(f, 'https://zhihu.com/'),
19 # ])
20 
21 f('https://itk.org/')
22 f('https://www.github.com/')
23 f('https://zhihu.com/')
24 
25 print(time.time()-start)
26 
27 
28 
29 #运行结果:
30 #GET: https://itk.org/
31 #12323 bytes received from https://itk.org/.
32 #GET: https://www.github.com/
33 #92572 bytes received from https://www.github.com/.
34 #GET: https://zhihu.com/
35 #8885 bytes received from https://zhihu.com/.
36 #5.089903354644775

对比串行方式的运行效率

 

参考资料:

2.