一、类与对象

1.1 根类和元类

在python当中一切皆为对象,而在这当中有两个特别重要的概念:元类(meta class)和根类(root class)。对应两个基本类型typeobject

类型本身也是一种对象,它是的类型即为元类type。例如我们常见的内置数据类型str、list、dict、int和float等均为type的实例,如下示例。

1
2
3
4
5
assert isinstance(str, type)
assert isinstance(list, type)
assert isinstance(dict, type)
assert isinstance(int, type)
assert isinstance(object, type)

而所有类型又都是根类object的直接或间接子类,继承了object的属性,如下示例

1
2
3
assert issubclass(str, object)
assert issubclass(list, object)
assert issubclass(type, object)


有心的人可以看到,object类是type类的实例对象,而type类又是object类的后继子类。那么先有type还是object呢?这是一个鸡和蛋问题。有兴趣可以去琢磨一下。

1.2 python对象的创建

大家很熟练地能够创建一个自定义类型并将其实例化。例如

1
2
3
4
5
class myClass(object):
def __init__(self, initArgs:str):
pass

myClassObj = myClass("initArgs")

那么请思考一下,在创建myClassObj时,我们只传递了initArgs这个参数,那么self参数是如何来的,它又是什么呢?这就涉及了python对象的创建过程了。self实际上就是myClassObj,它们指向的是一个内存地址。下面的示例可以看到两者具有一样的内存指向。也就是说,在传入init方法前,我们的对象已经被创立了。

1
2
3
4
5
6
7
class myClass(object):
def __init__(self, initArgs:str):
print(id(self))

myClassObj = myClass("initArgs")
print(id(myClassObj))
# 输出两行内存地址,结果一致

请再思考,myClass("initArgs")直接调用的到底是什么方法?答案是myClass的元类type__call__方法。下面是type的实现逻辑片段:

1
2
3
4
5
6
class type(object):
def __call__(cls: Type, *args: Any, **kwargs: Any) -> Any:
self = cls.__new__(cls)
if hasattr(self, "__init__"):
self.__init__(*args, **kwargs)
return self

可以看出,当我们创建一个新的对象,首先调用了__call__方法,在python中,变量后直接根括号的语法,调用的是该变量类型的__call__方法,对类型对象,如前所述,类型也是type类的实例,故会调用type.__call__方法。而方法中调用了一个特殊方法__new__,默认继承object.__new__。它负责根据类型,创建对象实例,分配属性表和基础内存空间。也就是self的来源。最后才会用__init__初始化。

扩展知识:默认元类是type,但为了做一些扩展功能,可以自定义元类,例如abc库提供了type的子类abc.ABCMeta作为自定义元类,配合abc.abstractmethod等装饰器,可以做出类似java抽象接口的功能。自定义类可以用metaclass属性声明,例如在本人的SoCube项目中,定义了下列元类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import abc
import torch
from torch.nn import Module
class NetMetaBase(abc.ABCMeta):
def __call__(
self:Type[Module],
inChannels:int,
outChannels:int,
*args: Any, **kwargs: Any) -> Module:
"""
该方法创建self参数指定类的实例对象
Args:
self 即为NetMetaBase的实例,也就是以NetMetaBase为元类的类对象

"""
assert issubclass(self, Module), "Class must be a subclass of torch.nn.Module"
obj = self.__new__(self)
obj.__init__(inChannels, outChannels, *args, **kwargs)
return obj

class ModuleBase(Module, metaclass=NetMetaBase):
def __init__(self, inChannels:int, outChannels:int, **kwargs)->None:
super(ModuleBase, self).__init__()
self._inChannels = inChannels
self._outChannels = outChannels

@abc.abstractmethod
def forward(self, x:torch.Tensor)->torch.Tensor:
raise NotImplementedError

1.3 python的多继承

python是支持多继承的,可以通过class 新类名(类1, 类2, ...)的形式继承。调用父类方法时利用super类控制,下列输出依次为A, B, C。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class A(object):
def __init__(self):
print("A")
print(self.__class__)

class B(object):
def __init__(self):
print("B")
print(self.__class__)

class C(A, B):
def __init__(self):
super(C, self).__init__()
super(A, self).__init__()
print("C")
print(self.__class__)

C()
assert C.__mro__ == (C, A, B, object)
# 输出如下
# A
# <class '__main__.C'>
# B
# <class '__main__.C'>
# C
# <class '__main__.C'>

python继承是一个树结构,super(cls, obj)调用的是类别继承队列__mro__属性中,cls类后面的那个类,并将obj作为self参数传入,因此等价于
1
2
super(C, self).__init__()
A.__init__(self)

二、同步运算

python支持多线程和多进程两种同步运算。其中由于全局解释器锁(GIL)的存在,python的多线程不能像java一样做到并行运算,发挥不出多核的优势。python的同步运算工具封装在concurrent.futures包中。

2.1 单个线程的创建与使用

Python2.X中使用thread模块进行多线程操作,它是一个比较初级的线程模块。而在Python3.X中,引入了threading模块,并将thread模块更名为_threadthreading模块具有更多高级特性。

thread模块使用start_new_thread(function, args[, kwargs])的API创建线程,function即新线程要运行的函数(类似于java中的Runnable接口),args是线程函数对应的参数,其为元组。

1
2
3
4
5
6
7
8
9
10
11
12
import _thread as th
import time

def print_time( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print ("%s: %s" % ( threadName, time.ctime(time.time()) ))

th.start_new_thread(print_time, ("Thread-1", 2))
th.start_new_thread(print_time, ("Thread-2", 2))

输出
1
2
3
4
5
6
7
8
9
10
11
139633559193344
Thread-1: Mon Jun 28 12:53:50 2021Thread-2: Mon Jun 28 12:53:50 2021

Thread-2: Mon Jun 28 12:53:52 2021
Thread-1: Mon Jun 28 12:53:52 2021
Thread-2: Mon Jun 28 12:53:54 2021
Thread-1: Mon Jun 28 12:53:54 2021
Thread-2: Mon Jun 28 12:53:56 2021
Thread-1: Mon Jun 28 12:53:56 2021
Thread-2: Mon Jun 28 12:53:58 2021
Thread-1: Mon Jun 28 12:53:58 2021

threading模块提供了更为丰富的线程实现,能够方便实现线程同步,保证线程安全。继承threading.Thread类来实现多线程,其模式与java的java.lang.Thread类实现多线程类似。

  • run()
    通过重写该方法实现自定义线程函数,等效于java的Thread.run方法
  • start()
    通过该方法启动线程,实现了线程声明与启动的分离(前面thread模块的start_new_thread直接调用系统API创建并运行线程),等效于java的Thread.start方法
  • join([time])
    通过该方法阻塞当前线程(指的是调用该函数时所在线程,而非创建的这个线程),直到创建的线程结束或等待超时,等效于java的Thread.join方法
  • getName()/setName()
    线程名称name属性的getter/setter方法,当然可以直接对属性赋值,python没有变量封装,但这不是个好习惯
  • isAlive()
    返回线程是否活动的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading

class timeThread(threading.Thread):
def __init__(self, threadName):
threading.Thread.__init__(self)
# self.name = threadName
self.setName(threadName)

def run(self):
print_time(self.getName(), 2)

thread1 = timeThread("Thread-1")
thread2 = timeThread("Thread-2")

thread1.start()
thread2.start()
print("Two thread created and started")
thread1.join()
thread2.join()
print("Main thread ended")

输出

1
2
3
4
5
6
7
8
9
10
11
12
Two thread created and started
Thread-1: Mon Jun 28 12:59:34 2021Thread-2: Mon Jun 28 12:59:34 2021

Thread-1: Mon Jun 28 12:59:36 2021
Thread-2: Mon Jun 28 12:59:36 2021
Thread-1: Mon Jun 28 12:59:38 2021Thread-2: Mon Jun 28 12:59:38 2021

Thread-1: Mon Jun 28 12:59:40 2021
Thread-2: Mon Jun 28 12:59:40 2021
Thread-1: Mon Jun 28 12:59:42 2021
Thread-2: Mon Jun 28 12:59:42 2021
Main thread ended

2.2 线程同步

threadingLockRLock可以提供线程锁,利用对应的acquirerelease方法进行线程锁的控制。类似于java中的Lock接口(但和synchronized是不一样的)。当然,可以用with语句来简化,两个锁对象都实现了__enter____exit__函数,可以使用with语句,如with lock替代acquirerelease

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

class timeLockThread(threading.Thread):
lock = threading.Lock()
def __init__(self, threadName):
threading.Thread.__init__(self)
self.setName(threadName)

def run(self):
print(self.getName(),"准备获取锁")
self.lock.acquire()
print(self.getName(),"获取锁成功")
print_time(self.getName(), 2)
print(self.getName(),"准备释放锁")
self.lock.release()
print(self.getName(),"释放锁成功")

lockthread1 = timeLockThread("Thread-3")
lockthread2 = timeLockThread("Thread-4")

lockthread1.start()
lockthread2.start()
print("Two thread created and started")
lockthread1.join()
lockthread2.join()
print("Main thread ended")

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Thread-3 准备获取锁
Thread-3 获取锁成功
Thread-4 准备获取锁
Two thread created and started
Thread-3: Mon Jun 28 13:17:26 2021
Thread-3: Mon Jun 28 13:17:28 2021
Thread-3: Mon Jun 28 13:17:30 2021
Thread-3: Mon Jun 28 13:17:32 2021
Thread-3: Mon Jun 28 13:17:34 2021
Thread-3 准备释放锁
Thread-3 释放锁成功
Thread-4 获取锁成功
Thread-4: Mon Jun 28 13:17:36 2021
Thread-4: Mon Jun 28 13:17:38 2021
Thread-4: Mon Jun 28 13:17:40 2021
Thread-4: Mon Jun 28 13:17:42 2021
Thread-4: Mon Jun 28 13:17:44 2021
Thread-4 准备释放锁
Thread-4 释放锁成功
Main thread ended

2.3 线程池的使用

单个线程的操作不利于线程的管理与复用。python提供了concurrent.futures.ThreadPoolExecutor来管理线程。它继承了Executor,实现了其一系列任务接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
def submit(*args, **kwargs)->Future:

def map(self, fn, *iterables, timeout=None, chunksize=1):

def shutdown(self, wait=True):

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False

使用十分简单
1
2
3
4
5
6
7
8
9
import time
from concurrent.futures import ThreadPoolExecutor, Future
def task(a, b, sleeps = 5):
time.sleep(sleeps)
return a + b
pool = ThreadPoolExecutor(max_workers = 8)
task: Future = pool.submit(task, a = 5, b = 6, sleeps = 6)
print(f"a + b = {task.result}")
pool.shutdown(wait=True)

上述方法会创建8个工作线程反复使用。同时可以看到Executor支持使用with上下文管理,方便自动释放资源。pool.shutdown(wait=True)意味着会等待任务完成再退出。

2.4 进程池的使用

如前所述,python的线程存在局限性,因此可以使用python的multiprocessing来进行多进程管理,同时python提供了进程池concurrent.futures.ProcessPoolExecutor来管理。它同样是Executor的子类,使用与线程池类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time
import os
from concurrent.futures import ProcessPoolExecutor, Future
def task1(a, b, sleeps = 5):
print(f"PID is {os.getpid()}, PPID is {os.getppid()}")
time.sleep(sleeps)
return a + b

if __name__ == "__main__":
print(f"PID is {os.getpid()}")
pool = ProcessPoolExecutor(max_workers = 8)
task: Future = pool.submit(task1, a = 5, b = 6, sleeps = 6)
print(f"a + b = {task.result()}")
pool.shutdown(wait=True)

会输出如下,可以看到在子进程27568中运行了task1。
1
2
3
PID is 11248
PID is 27568, PPID is 11248
a + b = 11

但是多进程与多线程最大的不同在于进程是最小的资源单位,进程之间资源不共享。因此每个子进程需要重新加载用到的模块,包括当前执行脚本(即main模块)。我们将上述代码适当修改如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time
import os
from concurrent.futures import ProcessPoolExecutor, Future
def task1(a, b, sleeps = 5):
print(f"PID is {os.getpid()}, PPID is {os.getppid()}")
time.sleep(sleeps)
return a + b

print(f"Import {__name__} in PID is {os.getpid()}")
if __name__ == "__main__":
pool = ProcessPoolExecutor(max_workers = 8)
task: Future = pool.submit(task1, a = 5, b = 6, sleeps = 6)
print(f"a + b = {task.result()}")
pool.shutdown(wait=True)

会得到如下输出,也就是print(f"Import {__name__} in PID is {os.getpid()}")执行了9次,一次是启动程序的父进程,8次对应进程池的8个工作进程。当前脚本的模块名称分别是__main____mp_main__
1
2
3
4
5
6
7
8
9
10
11
Import __main__ in PID is 30336
Import __mp_main__ in PID is 18740
Import __mp_main__ in PID is 31404
Import __mp_main__ in PID is 16500
Import __mp_main__ in PID is 20584
PID is 18740, PPID is 30336
Import __mp_main__ in PID is 36504
Import __mp_main__ in PID is 2624
Import __mp_main__ in PID is 12776
Import __mp_main__ in PID is 5780
a + b = 11

设想一下,如果将创建多进程的代码,也就是if __name__ == "__main__"下面的代码块不放在这个条件下,那么则会无限递归地创建新进程。因此python禁止了这样的操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
import time
import os
from concurrent.futures import ProcessPoolExecutor, Future
def task1(a, b, sleeps = 5):
print(f"PID is {os.getpid()}, PPID is {os.getppid()}")
time.sleep(sleeps)
return a + b

print(f"Import {__name__} in PID is {os.getpid()}")
pool = ProcessPoolExecutor(max_workers = 8)
task: Future = pool.submit(task1, a = 5, b = 6, sleeps = 6)
print(f"a + b = {task.result()}")
pool.shutdown(wait=True)

上述代码会抛出RuntimeError,提示An attempt has been made to start a new process before the current process has finished its bootstrapping phase。即不能在当前进程加载模块的时候创建子进程。
在使用时,务必在上述if语句下控制。因为我们看到子进程的模块名称是__mp_main__而非__main__了,因此不在存在这个问题。
另外需要注意的一点是,多进程将一个可执行对象submit,本质上是利用了进程之间的IPC通信,那么实际上时将这个执行性对象利用pickle序列化后传输给子进程,由子进程反序列化并执行,因此需要满足序列化的要求,即对象一致性。例如下列情况就会抛出picke_error
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import functools
import os
import time
from concurrent.futures import ProcessPoolExecutor, Future
def parallel(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
pool = ProcessPoolExecutor(max_workers=2)
task = pool.submit(func, *args, **kwargs)
return task
return wrapper

@parallel
def task1(a, b, sleeps = 5):
print(f"PID is {os.getpid()}, PPID is {os.getppid()}")
time.sleep(sleeps)
return a + b

if __name__ == "__main__":
task: Future = task1(a=5, b= 6)
print(task.result())

上述代码会报错_pickle.PicklingError: Can't pickle <function task1 at 0x000001A9FA495310>: it's not the same object as __main__.task1。这是因为task1被装饰了,而不是提交任务的那个task1。