一、类与对象
1.1 根类和元类
在python当中一切皆为对象,而在这当中有两个特别重要的概念:元类(meta class)和根类(root class)。对应两个基本类型type
和object
。
类型本身也是一种对象,它是的类型即为元类type
。例如我们常见的内置数据类型str、list、dict、int和float
等均为type
的实例,如下示例。1
2
3
4
5assert isinstance(str, type)
assert isinstance(list, type)
assert isinstance(dict, type)
assert isinstance(int, type)
assert isinstance(object, type)
而所有类型又都是根类object的直接或间接子类,继承了object的属性,如下示例1
2
3assert issubclass(str, object)
assert issubclass(list, object)
assert issubclass(type, object)
有心的人可以看到,object类是type类的实例对象,而type类又是object类的后继子类。那么先有type
还是object
呢?这是一个鸡和蛋问题。有兴趣可以去琢磨一下。
1.2 python对象的创建
大家很熟练地能够创建一个自定义类型并将其实例化。例如1
2
3
4
5class myClass(object):
def __init__(self, initArgs:str):
pass
myClassObj = myClass("initArgs")
那么请思考一下,在创建myClassObj
时,我们只传递了initArgs
这个参数,那么self
参数是如何来的,它又是什么呢?这就涉及了python对象的创建过程了。self
实际上就是myClassObj
,它们指向的是一个内存地址。下面的示例可以看到两者具有一样的内存指向。也就是说,在传入init方法前,我们的对象已经被创立了。
1 | class myClass(object): |
请再思考,myClass("initArgs")
直接调用的到底是什么方法?答案是myClass的元类type
中__call__
方法。下面是type
的实现逻辑片段:1
2
3
4
5
6class type(object):
def __call__(cls: Type, *args: Any, **kwargs: Any) -> Any:
self = cls.__new__(cls)
if hasattr(self, "__init__"):
self.__init__(*args, **kwargs)
return self
可以看出,当我们创建一个新的对象,首先调用了__call__
方法,在python中,变量后直接根括号的语法,调用的是该变量类型的__call__
方法,对类型对象,如前所述,类型也是type
类的实例,故会调用type.__call__
方法。而方法中调用了一个特殊方法__new__
,默认继承object.__new__
。它负责根据类型,创建对象实例,分配属性表和基础内存空间。也就是self
的来源。最后才会用__init__
初始化。
扩展知识:默认元类是type,但为了做一些扩展功能,可以自定义元类,例如abc
库提供了type
的子类abc.ABCMeta
作为自定义元类,配合abc.abstractmethod
等装饰器,可以做出类似java抽象接口的功能。自定义类可以用metaclass
属性声明,例如在本人的SoCube项目中,定义了下列元类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import abc
import torch
from torch.nn import Module
class NetMetaBase(abc.ABCMeta):
def __call__(
self:Type[Module],
inChannels:int,
outChannels:int,
*args: Any, **kwargs: Any) -> Module:
"""
该方法创建self参数指定类的实例对象
Args:
self 即为NetMetaBase的实例,也就是以NetMetaBase为元类的类对象
"""
assert issubclass(self, Module), "Class must be a subclass of torch.nn.Module"
obj = self.__new__(self)
obj.__init__(inChannels, outChannels, *args, **kwargs)
return obj
class ModuleBase(Module, metaclass=NetMetaBase):
def __init__(self, inChannels:int, outChannels:int, **kwargs)->None:
super(ModuleBase, self).__init__()
self._inChannels = inChannels
self._outChannels = outChannels
def forward(self, x:torch.Tensor)->torch.Tensor:
raise NotImplementedError
1.3 python的多继承
python是支持多继承的,可以通过class 新类名(类1, 类2, ...)
的形式继承。调用父类方法时利用super
类控制,下列输出依次为A, B, C。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26class A(object):
def __init__(self):
print("A")
print(self.__class__)
class B(object):
def __init__(self):
print("B")
print(self.__class__)
class C(A, B):
def __init__(self):
super(C, self).__init__()
super(A, self).__init__()
print("C")
print(self.__class__)
C()
assert C.__mro__ == (C, A, B, object)
# 输出如下
# A
# <class '__main__.C'>
# B
# <class '__main__.C'>
# C
# <class '__main__.C'>
python继承是一个树结构,super(cls, obj)
调用的是类别继承队列__mro__
属性中,cls
类后面的那个类,并将obj作为self参数传入,因此等价于1
2super(C, self).__init__()
A.__init__(self)
二、同步运算
python支持多线程和多进程两种同步运算。其中由于全局解释器锁(GIL)的存在,python的多线程不能像java一样做到并行运算,发挥不出多核的优势。python的同步运算工具封装在concurrent.futures
包中。
2.1 单个线程的创建与使用
Python2.X中使用thread
模块进行多线程操作,它是一个比较初级的线程模块。而在Python3.X中,引入了threading
模块,并将thread
模块更名为_thread
。threading
模块具有更多高级特性。
thread模块使用start_new_thread(function, args[, kwargs])
的API创建线程,function即新线程要运行的函数(类似于java中的Runnable接口),args是线程函数对应的参数,其为元组。1
2
3
4
5
6
7
8
9
10
11
12import _thread as th
import time
def print_time( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print ("%s: %s" % ( threadName, time.ctime(time.time()) ))
th.start_new_thread(print_time, ("Thread-1", 2))
th.start_new_thread(print_time, ("Thread-2", 2))
输出1
2
3
4
5
6
7
8
9
10
11139633559193344
Thread-1: Mon Jun 28 12:53:50 2021Thread-2: Mon Jun 28 12:53:50 2021
Thread-2: Mon Jun 28 12:53:52 2021
Thread-1: Mon Jun 28 12:53:52 2021
Thread-2: Mon Jun 28 12:53:54 2021
Thread-1: Mon Jun 28 12:53:54 2021
Thread-2: Mon Jun 28 12:53:56 2021
Thread-1: Mon Jun 28 12:53:56 2021
Thread-2: Mon Jun 28 12:53:58 2021
Thread-1: Mon Jun 28 12:53:58 2021
threading模块提供了更为丰富的线程实现,能够方便实现线程同步,保证线程安全。继承threading.Thread
类来实现多线程,其模式与java的java.lang.Thread
类实现多线程类似。
- run()
通过重写该方法实现自定义线程函数,等效于java的Thread.run方法 - start()
通过该方法启动线程,实现了线程声明与启动的分离(前面thread模块的start_new_thread直接调用系统API创建并运行线程),等效于java的Thread.start方法 - join([time])
通过该方法阻塞当前线程(指的是调用该函数时所在线程,而非创建的这个线程),直到创建的线程结束或等待超时,等效于java的Thread.join方法 - getName()/setName()
线程名称name
属性的getter/setter方法,当然可以直接对属性赋值,python没有变量封装,但这不是个好习惯 - isAlive()
返回线程是否活动的。
1 | import threading |
输出1
2
3
4
5
6
7
8
9
10
11
12Two thread created and started
Thread-1: Mon Jun 28 12:59:34 2021Thread-2: Mon Jun 28 12:59:34 2021
Thread-1: Mon Jun 28 12:59:36 2021
Thread-2: Mon Jun 28 12:59:36 2021
Thread-1: Mon Jun 28 12:59:38 2021Thread-2: Mon Jun 28 12:59:38 2021
Thread-1: Mon Jun 28 12:59:40 2021
Thread-2: Mon Jun 28 12:59:40 2021
Thread-1: Mon Jun 28 12:59:42 2021
Thread-2: Mon Jun 28 12:59:42 2021
Main thread ended
2.2 线程同步
threading
的Lock
或RLock
可以提供线程锁,利用对应的acquire
和release
方法进行线程锁的控制。类似于java中的Lock接口(但和synchronized是不一样的)。当然,可以用with语句来简化,两个锁对象都实现了__enter__
和__exit__
函数,可以使用with语句,如with lock
替代acquire
和release
1 |
|
输出1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20Thread-3 准备获取锁
Thread-3 获取锁成功
Thread-4 准备获取锁
Two thread created and started
Thread-3: Mon Jun 28 13:17:26 2021
Thread-3: Mon Jun 28 13:17:28 2021
Thread-3: Mon Jun 28 13:17:30 2021
Thread-3: Mon Jun 28 13:17:32 2021
Thread-3: Mon Jun 28 13:17:34 2021
Thread-3 准备释放锁
Thread-3 释放锁成功
Thread-4 获取锁成功
Thread-4: Mon Jun 28 13:17:36 2021
Thread-4: Mon Jun 28 13:17:38 2021
Thread-4: Mon Jun 28 13:17:40 2021
Thread-4: Mon Jun 28 13:17:42 2021
Thread-4: Mon Jun 28 13:17:44 2021
Thread-4 准备释放锁
Thread-4 释放锁成功
Main thread ended
2.3 线程池的使用
单个线程的操作不利于线程的管理与复用。python提供了concurrent.futures.ThreadPoolExecutor
来管理线程。它继承了Executor
,实现了其一系列任务接口:1
2
3
4
5
6
7
8
9
10
11
12
13
14class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
def submit(*args, **kwargs)->Future:
def map(self, fn, *iterables, timeout=None, chunksize=1):
def shutdown(self, wait=True):
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
使用十分简单1
2
3
4
5
6
7
8
9import time
from concurrent.futures import ThreadPoolExecutor, Future
def task(a, b, sleeps = 5):
time.sleep(sleeps)
return a + b
pool = ThreadPoolExecutor(max_workers = 8)
task: Future = pool.submit(task, a = 5, b = 6, sleeps = 6)
print(f"a + b = {task.result}")
pool.shutdown(wait=True)
上述方法会创建8个工作线程反复使用。同时可以看到Executor支持使用with上下文管理,方便自动释放资源。pool.shutdown(wait=True)
意味着会等待任务完成再退出。
2.4 进程池的使用
如前所述,python的线程存在局限性,因此可以使用python的multiprocessing
来进行多进程管理,同时python提供了进程池concurrent.futures.ProcessPoolExecutor
来管理。它同样是Executor
的子类,使用与线程池类似。1
2
3
4
5
6
7
8
9
10
11
12
13
14import time
import os
from concurrent.futures import ProcessPoolExecutor, Future
def task1(a, b, sleeps = 5):
print(f"PID is {os.getpid()}, PPID is {os.getppid()}")
time.sleep(sleeps)
return a + b
if __name__ == "__main__":
print(f"PID is {os.getpid()}")
pool = ProcessPoolExecutor(max_workers = 8)
task: Future = pool.submit(task1, a = 5, b = 6, sleeps = 6)
print(f"a + b = {task.result()}")
pool.shutdown(wait=True)
会输出如下,可以看到在子进程27568中运行了task1。1
2
3PID is 11248
PID is 27568, PPID is 11248
a + b = 11
但是多进程与多线程最大的不同在于进程是最小的资源单位,进程之间资源不共享。因此每个子进程需要重新加载用到的模块,包括当前执行脚本(即main模块)。我们将上述代码适当修改如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14import time
import os
from concurrent.futures import ProcessPoolExecutor, Future
def task1(a, b, sleeps = 5):
print(f"PID is {os.getpid()}, PPID is {os.getppid()}")
time.sleep(sleeps)
return a + b
print(f"Import {__name__} in PID is {os.getpid()}")
if __name__ == "__main__":
pool = ProcessPoolExecutor(max_workers = 8)
task: Future = pool.submit(task1, a = 5, b = 6, sleeps = 6)
print(f"a + b = {task.result()}")
pool.shutdown(wait=True)
会得到如下输出,也就是print(f"Import {__name__} in PID is {os.getpid()}")
执行了9次,一次是启动程序的父进程,8次对应进程池的8个工作进程。当前脚本的模块名称分别是__main__
和__mp_main__
。1
2
3
4
5
6
7
8
9
10
11Import __main__ in PID is 30336
Import __mp_main__ in PID is 18740
Import __mp_main__ in PID is 31404
Import __mp_main__ in PID is 16500
Import __mp_main__ in PID is 20584
PID is 18740, PPID is 30336
Import __mp_main__ in PID is 36504
Import __mp_main__ in PID is 2624
Import __mp_main__ in PID is 12776
Import __mp_main__ in PID is 5780
a + b = 11
设想一下,如果将创建多进程的代码,也就是if __name__ == "__main__"
下面的代码块不放在这个条件下,那么则会无限递归地创建新进程。因此python禁止了这样的操作。1
2
3
4
5
6
7
8
9
10
11
12
13import time
import os
from concurrent.futures import ProcessPoolExecutor, Future
def task1(a, b, sleeps = 5):
print(f"PID is {os.getpid()}, PPID is {os.getppid()}")
time.sleep(sleeps)
return a + b
print(f"Import {__name__} in PID is {os.getpid()}")
pool = ProcessPoolExecutor(max_workers = 8)
task: Future = pool.submit(task1, a = 5, b = 6, sleeps = 6)
print(f"a + b = {task.result()}")
pool.shutdown(wait=True)
上述代码会抛出RuntimeError
,提示An attempt has been made to start a new process before the current process has finished its bootstrapping phase
。即不能在当前进程加载模块的时候创建子进程。
在使用时,务必在上述if语句下控制。因为我们看到子进程的模块名称是__mp_main__
而非__main__
了,因此不在存在这个问题。
另外需要注意的一点是,多进程将一个可执行对象submit,本质上是利用了进程之间的IPC通信,那么实际上时将这个执行性对象利用pickle
序列化后传输给子进程,由子进程反序列化并执行,因此需要满足序列化的要求,即对象一致性。例如下列情况就会抛出picke_error
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import functools
import os
import time
from concurrent.futures import ProcessPoolExecutor, Future
def parallel(func):
def wrapper(*args, **kwargs):
pool = ProcessPoolExecutor(max_workers=2)
task = pool.submit(func, *args, **kwargs)
return task
return wrapper
def task1(a, b, sleeps = 5):
print(f"PID is {os.getpid()}, PPID is {os.getppid()}")
time.sleep(sleeps)
return a + b
if __name__ == "__main__":
task: Future = task1(a=5, b= 6)
print(task.result())
上述代码会报错_pickle.PicklingError: Can't pickle <function task1 at 0x000001A9FA495310>: it's not the same object as __main__.task1
。这是因为task1被装饰了,而不是提交任务的那个task1。