夜间模式
Python 学习笔记
匿名函数
临时使用一次的函数可以使用 lambda
匿名函数,不必用 def
,例如在使用映射函数时就可以应用 lambda
。
python
array = [1, 2, 3, 4, 5]
# map(function, iterables)
array_1 = map(lambda x: x ** 2, array)
print(list(array_1)) # 输出:[1, 4, 9, 16, 25]
# filter(function, iterable)
array_2 = filter(lambda x: x % 2 != 0, array)
print(list(array_2)) # 输出:[1, 3, 5]
# reduce(function, sequence)
from functools import reduce # Python3 不再内置该函数,需要从 functools 库导入
res = reduce(lambda x, y: x + y, array)
print(res) # 输出:15
迭代器
可迭代对象
在Python的任意对象中,只要它定义了可以返回一个迭代器的 __iter__
方法,或者定义了 __getitem__
方法,那么它就是一个可迭代对象,通俗地讲就是可以通过 for 循环
进行遍历了。
迭代器对象
迭代器就是实现了 __next__
和 __iter__
方法(缺一不可)的对象,就叫迭代器。其中,__iter__
方法返回迭代器本身,__next__
方法不断返回迭代器中的下一个值,直到容器中没有更多元素时抛出 StopIteration
异常,以终止迭代。
迭代器的优点:懒加载,工厂模式,节约内存空间。比如提供一亿个元素,列表占用内存过大无法完成,但是迭代器可以,迭代器不会一次性把数据全生产出来,只会按需生产,迭代器没有长度属性。最常用的情况是让迭代器无限提供数据,例如:itertools.count(start, step)
。
Python
from itertools import count
counter = count(10) # 第一个参数为起始值,默认0;第二个参数为步长,默认1
for _ in range(100):
print(next(counter)) # 输出10到109等100个数
print(len(counter)) # 报错:迭代器没有len属性
生成器
生成器其实是一种特殊的迭代器,不过这种迭代器更加优雅。如果一个函数包含 yield
关键字,这个函数就会变成一个生成器。
生成器中 yield
关键字的作用:
- 程序每次在代码中遇到
yield
关键字后,会返回结果,并保留当前函数状态,等待下次调用 - 下次调用从上次返回
yield
的语句出开始继续执行
Python
def demo():
print("Hello")
yield 1
print("World")
yield 2
d = demo()
print(next(d))
print("-----")
print(next(d))
# 输出:
# Hello
# 1
# -----
# World
# 2
生成器的 send(value)
方法可以调用生成器,同时还能给生成器传递数据。
Python
def demo():
t = yield 1 # t 接收从 send 方法传入的值
print(t)
yield 2
d = demo()
print(next(d)) # 需要预计激活后才能用 send(value) 传值,预激活也可以用 send(None)
print(d.send(3))
# 输出:
# 1 # yield 返回的
# 3 # send() 传递的
# 2 # yield 返回的
将列表表达式的方括号换成圆括号,返回的是一个生成器。
Python
a = (i for i in range(10))
b = [i for i in range(10)]
print(type(a)) # generator
print(type(b)) # list
通过生成器实现携协程(消费者-生产者模式):
Python
def consumer():
r = ''
while True:
n = yield r
print(f"[consumer] {n}")
r = 'ok'
def producer(c):
c.send(None)
for n in range(5):
r = c.send(n)
print(f"[producer] {r}")
c = consumer()
producer(c)
多进程
os.getpid()
:返回当前进程 id
multiprocessing.Process(target=None, args=(), kwargs=None)
:创建进程对象
target
:目标函数args
:以列表或元组形式传递参数kwargs
:以字典形式传递参数
进程的函数 | 介绍 | 参数 | 返回值 |
---|---|---|---|
start | 执行进程 | 无 | 无 |
join | 阻塞进程 | 无 | 无 |
kill | 杀死进程 | 无 | 无 |
alive | 进程是否存活 | 无 | bool |
python
def f(n):
while True:
print(n, os.getpid())
if __name__ == '__main__':
process1 = multiprocessing.Process(target=f, args=[1])
process2 = multiprocessing.Process(target=f, kwargs={'n': 2})
for process in (process1, process2):
process.start()
TIP
当多个进程运行时,可能会出现的问题及解决方案:
- 通过进程模块执行的函数无法获取返回值——进程间如何通信:使用队列
- 多个进程同时修改文件可能会出现错误——进程间如何避免资源抢占:创建进程锁
- 进程数量太多可能会造成资源不足,甚至死机等情况——如何避免创建进程数量过多:创建进程池
进程通讯
python
def send(queue: multiprocessing.Queue, data):
queue.put(data) # 将数据放入队列
def recv(queue: multiprocessing.Queue):
data = queue.get() # 从队列取出数据
queue = multiprocessing.Queue() # 创建队列
process1 = multiprocessing.Process(target=send, args=(queue, 1))
process2 = multiprocessing.Process(target=recv, kwargs={'queue': queue})
process1.start()
process2.start()
进程池
multiprocessing.Pool(processes: int | None = None)
:创建进程池对象
processes
:进程池中的进程数量
进程池的函数 | 介绍 | 参数 | 返回值 |
---|---|---|---|
apply_async | 将任务加入进程池(异步) | func , args | 无 |
close | 关闭进程池 | 无 | 无 |
join | 等待进程池任务结束 | 无 | 无 |
python
def task(n):
while True:
print(n, os.getpid())
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
for i in range(4):
pool.apply_async(func=task, args=(i,))
pool.close()
pool.join()
apply_async
能够获取到任务函数的返回值,例如:
python
def task(n):
return n
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
results = []
for i in range(4):
# res: multiprocessing.pool.ApplyResult
res = pool.apply_async(func=task, args=(i,))
results.append(res)
pool.close()
pool.join()
for res in results:
print(res.get())
进程锁
进程锁的函数 | 介绍 | 参数 | 返回值 |
---|---|---|---|
acquire | 上锁 | 无 | 无 |
release | 解锁 | 无 | 无 |
python
def task(n, lock):
lock.acquire() # 开启进程锁
print(n, os.getpid())
time.sleep(3)
lock.release() # 关闭进程锁
if __name__ == '__main__':
manager = multiprocessing.Manager()
lock = manager.Lock()
pool = multiprocessing.Pool(processes=4)
for i in range(4):
pool.apply_async(func=task, args=(i, lock))
pool.close()
pool.join()
多线程
threading.Thread(target=None, args=(), kwargs=None)
:创建线程对象
target
:目标函数args
:以列表或元组形式传递参数kwargs
:以字典形式传递参数
线程的函数 | 介绍 | 参数 | 返回值 |
---|---|---|---|
start | 启动线程 | 无 | 无 |
join | 阻塞直到线程执行结束 | 无 | 无 |
is_alive | 判断线程是否存活 | 无 | bool |
python
def f(n):
while True:
print(n)
thread1 = threading.Thread(target=f, args=[1])
thread2 = threading.Thread(target=f, kwargs={'n': 2})
thread1.start() # 启动线程
thread2.start() # 启动线程
线程通讯
- Event:主要同于通过事件通知机制实现线程的大规模并发
- Condition:主要用于多个线程间轮流交替执行任务
- Queue:主要用于不同线程间任意类型数据的共享
线程通讯-Queue消息队列
Python
def product(q: queue.Queue, data: Any):
q.put(data) # 如果队满,将会阻塞等待
print(f"{threading.current_thread().name} put: {data}")
def consume(q: queue.Queue):
item = q.get() # 如果队列中没有数据,将会阻塞等待
print(f"{threading.current_thread().name} get: {item}")
q = queue.Queue(maxsize=1) # maxsize=0则表示无上限
threading.Thread(target=product, args=(q, "Hello"), name="producer").start()
threading.Thread(target=consume, args=(q,), name="consumer").start()
线程通讯-Event事件对象
event.clear()
:重置 event 对象,使得所有该 event 事件都处于待命状态。event.wait()
:阻塞线程,等待 event 指令。event.set()
:发送 event 指令,使得所有设置该 event 事件的线程执行。
Python
class MyThread(threading.Thread):
def __init__(self, event: threading.Event):
super().__init__()
self.event = event
def run(self):
print(f"{self.name} 初始化完成,随时准备启动...")
self.event.wait() # 阻塞,等待 evnet 指令
print(f"{self.name} 开始执行")
if __name__ == '__main__':
event = threading.Event()
threads = [MyThread(event) for _ in range(8)]
event.clear()
for thread in threads:
thread.start()
# 3s后发送指令,所有线程并发执行
time.sleep(3)
event.set()
线程通讯-Condition条件对象
Python
class MyThread(threading.Thread):
def __init__(self, condition: threading.Condition):
super().__init__()
self.condition = condition
def run(self):
condition.acquire()
while True:
time.sleep(1)
print(f"{self.name}")
self.condition.notify() # 通知另一个线程执行
self.condition.wait()
condition.release()
if __name__ == '__main__':
condition = threading.Condition()
threads = [MyThread(condition) for _ in range(2)]
for thread in threads:
thread.start()
# 控制台2个线程将交替输出
Condition非常适合需要线程轮流执行的情况。
线程通讯-消息隔离
threading.local()
能够实现线程间的消息隔离,避免线程间共享变量带来的安全问题。
Python
local_data = threading.local()
local_data.name = "local_data"
class MyThread(threading.Thread):
def run(self):
print(f"{threading.current_thread().name} {local_data.__dict__}")
local_data.name = threading.current_thread().name
print(f"{threading.current_thread().name} {local_data.__dict__}")
MyThread().start()
MyThread().start()
print(f"{threading.current_thread().name} {local_data.__dict__}")
# 输出:
# Thread-1 {}
# Thread-1 {'name': 'Thread-1'}
# Thread-2 {}
# Thread-2 {'name': 'Thread-2'}
# MainThread {'name': 'local_data'}
可以看出每个对象都有自己的 threading.local 的值。
线程池
concurrent.futures.ThreadPoolExecutor(max_workers=None)
:创建线程池对象
max_workers
:最大线程数
python
def f(n):
return n
pool = concurrent.futures.ThreadPoolExecutor(max_workers=2)
results = []
for i in range(8):
res = pool.submit(f, i) # submit(fn, *args),往线程池中加入任务
results.append(res)
for res in results:
print(res.done()) # 任务是否完成
print(res.result()) # 获取当前线程执行任务的返回值
线程锁
threading.Lock()
:创建线程锁对象
线程锁的函数 | 介绍 |
---|---|
acquire | 上锁 |
release | 解锁 |
python
lock = threading.Lock() # 创建线程锁对象
def f(n):
lock.acquire() # 开启线程锁
print(n)
time.sleep(1)
lock.release() # 关闭线程锁
pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
for i in range(8):
pool.submit(f, i)
GIL全局锁
解决办法:多进程+多线程
进程和线程的区别
进程是系统分配资源的最小单位,线程是执行的最小单位。由于GIL全局锁的存在,CPU密集型任务使用多进程,IO密集型(文件处理、网络爬虫等)使用多线程。
死锁:多线程间共享多个资源时,如果两个线程分别占用一部分资源并等待对方的资源,就会造成死锁。
异步
- 轻量级的线程——协程
- 可以获取异步函数的返回值
- 要求主进程也是异步,并且执行过程中要求所有程序都是异步
TIP
多线程和多进程无法直接获取任务返回值,线程池和进程池获取任务返回值是用了异步的方法。
关键字 async
定义异步,await
执行异步。
主程序没办法直接执行异步函数,需要借助内置模块 asyncio
。
asyncio的函数 | 介绍 | 参数 | 返回值 |
---|---|---|---|
gather | 将异步函数批量执行 | async_fn, ... | 函数的返回结果列表 |
run | 执行异步函数 | async_fn | 函数返回值 |
python
async def f(n):
print(n)
await asyncio.sleep(3) # 不能使用 time.sleep()
return n
async def main():
results = await asyncio.gather(f(1), f(2)) # f(1), f(2) 同时执行
print(results) # 输出:[1, 2]
return 0
res = asyncio.run(main())
print(res) # 输出:0
协程
Python
async def hello(): pass
coro = hello() # 创建协程对象,但是不会执行函数内部代码
print(isinstance(coro, Coroutine)) # 输出:True
重要概念
event_loop
:事件循环,asyncio
中开启的一个无限的事件循环,asyncio
会自动在满足条件时去调用相应的协程对象,我们只需将协程对象注册到该事件循环上即可。coroutine
:协程对象,通过async
定义的函数在调用后不会立即执行,而是返回一个协程对象,协程对象需要注册到事件循环,由事件循环进行调用。future
对象:代表将来执行或没有执行的任务的结果,它和下面的task
对象没有本质区别task
对象:一个协程对象就是一个可以挂起的函数,而任务则是对协程的进一步封装,其中包含了任务的各种状态。
工作流程
- 定义协程对象
- 定义事件循环对象容器
- 将协程转为
task
任务 - 将
task
扔进事件循环对象中触发
Python
async def hello(text: str):
print("hello,", text)
coro = hello("world!") # 1. 创建协程对象
loop = asyncio.new_event_loop() # 2. 获取事件循环对象容器
task = loop.create_task(coro) # 3. 将协程对象转为 task 对象
# task = asyncio.ensure_future(coro, loop=loop) # 效果同上行代码
loop.run_until_complete(task) # 4. 将 task 任务扔进事件循环对象中触发
通过 task.result()
可以拿到返回结果:
Python
async def func():
return 200
coro = func()
loop = asyncio.new_event_loop()
task = asyncio.ensure_future(coro, loop=loop)
loop.run_until_complete(task)
print(task.result()) # 获取协程对象返回值
回调函数
Python
async def func():
return 200
def callback(future):
print(future.result())
coro = func()
loop = asyncio.new_event_loop()
task = asyncio.ensure_future(coro, loop=loop)
task.add_done_callback(callback) # 通过 `add_done_callback` 添加回调函数
loop.run_until_complete(task)
协程并发
方法一:asyncio.gather
Python
async def do_some_work(x):
print(x)
await asyncio.sleep(x) # 模拟一个耗时操作
return x
coro1 = do_some_work(1)
coro2 = do_some_work(2)
coro3 = do_some_work(3)
loop = asyncio.new_event_loop()
tasks = [
asyncio.ensure_future(coro1, loop=loop),
asyncio.ensure_future(coro2, loop=loop),
asyncio.ensure_future(coro3, loop=loop)
]
# 将 tasks 注册到事件循环中
loop.run_until_complete(asyncio.gather(*tasks))
for task in tasks:
print(task.result())
方法二:asyncio.wait
Python
async def do_some_work(x):
print(x)
await asyncio.sleep(x) # 模拟一个耗时操作
return x
coro1 = do_some_work(1)
coro2 = do_some_work(2)
coro3 = do_some_work(3)
loop = asyncio.new_event_loop()
tasks = [
asyncio.ensure_future(coro1, loop=loop),
asyncio.ensure_future(coro2, loop=loop),
asyncio.ensure_future(coro3, loop=loop)
]
# 将 tasks 注册到事件循环中
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print(task.result())
魔术方法
Python 类中内置的特殊方法,通过编写魔术方法,能够实现特定功能。
python
class Person:
def __init__(self, name: str, age: int):
self.name = name
self.age = age
person_1 = Person("Tom", 13)
person_2 = Person("Jerry", 11)
print(person_1) # 输出:<__main__.Person object at 0x00000199AFD01BE0>。对象所在地址。
print(person_1 < person_2) # 报错
print(person_1 == person_2) # 输出: False。比较的是两个对象的地址。
自定义魔术方法:
python
class Person:
def __init__(self, name: str, age: int):
self.name = name
self.age = age
# 实现对象转字符串的功能
def __str__(self):
return f"name: {self.name}, age: {self.age}"
# 实现Person对象根据age比较大小的功能
def __lt__(self, other):
return self.age < other.age
# 实现Person对象根据age判断是否相等的功能
def __eq__(self, other):
return self.age == other.age
person_1 = Person("Tom", 13)
person_2 = Person("Jerry", 11)
print(person_1) # 输出: name: Tom, age: 13
print(person_1 < person_2) # 输出: False
print(person_1 == person_2) # 输出: False
常见的魔术方法:
魔术方法名 | 用途 | 使用方法 |
---|---|---|
__init__(self, ...) | 构造方法,在创建对象时自动调用 | |
__str__(self) | 对象转字符串 | str(obj) |
__int__(self) | 对象转整数 | int(obj) |
__len__(self) | 返回对象长度 | len(obj) |
__getitem__(self, item) | 返回对象的项 | obj[item] |
__setitem__(self, key, value) | 设置对象的项 | obj[key] = value |
__delitem__(self, key) | 删除对象的项 | del obj[key] |
__add__(self, other) | 对象的加法 | obj + other |
__sub__(self, other) | 对象的减法 | obj - other |
__mul__(self, other) | 对象的乘法 | obj * other |
__mod__(self, other) | 对象的模运算 | obj % other |
__truediv__(self, other) | 对象的除法 | obj / other |
__floordiv__(self, other) | 对象的整数除法 | obj // other |
__lt__(self, other) | 比较对象大小 | obj < other |
__gt__(self, other) | 比较对象大小 | obj > other |
__eq__(self, other) | 比较对象是否相等 | obj == other |
__ne__(self, other) | 比较对象是否相等 | obj != other |
__le__(self, other) | 比较对象大小 | obj <= other |
__ge__(self, other) | 比较对象大小 | obj >= other |
封装
在成员变量名或方法名前加上两个下划线,该成员变量或方法就会变为私有,无法在外部调用它。
python
class Student:
def __init__(self, name: str):
self.__name = name
def __run(self):
print(self.__name, "is running.")
stu = Student("ZHH")
print(stu.__name) # 报错
stu.__run() # 报错
# 可以通过在私有成员名前加 "_对象名" 访问,例如:
print(stu._Student__name) # 正常输出: ZHH
stu._Student__run() # 正常输出: ZHH is running.
继承
多继承时,如果多个父类中有同名成员,会保留先继承的。
python
class Parent1:
id = 1
class Parent2:
id = 2
class Son(Parent1, Parent2): ...
print(Son.id) # 输出: 1
子类可通过 super().父类方法()
或者 父类.父类方法(self)
调用父类已经被复写的方法。
python
class Parent:
def f(self):
print("Parent")
class Son(Parent):
def f(self):
print("Son")
Parent.f(self) # 调用父类方法
super().f() # 调用父类方法