Skip to content

Python 学习笔记

匿名函数

临时使用一次的函数可以使用 lambda 匿名函数,不必用 def,例如在使用映射函数时就可以应用 lambda

python
array = [1, 2, 3, 4, 5]

# map(function, iterables)
array_1 = map(lambda x: x ** 2, array)
print(list(array_1))  # 输出:[1, 4, 9, 16, 25]

# filter(function, iterable)
array_2 = filter(lambda x: x % 2 != 0, array)
print(list(array_2))  # 输出:[1, 3, 5]

# reduce(function, sequence)
from functools import reduce  # Python3 不再内置该函数,需要从 functools 库导入
res = reduce(lambda x, y: x + y, array)
print(res)  # 输出:15

迭代器

可迭代对象

在Python的任意对象中,只要它定义了可以返回一个迭代器的 __iter__ 方法,或者定义了 __getitem__ 方法,那么它就是一个可迭代对象,通俗地讲就是可以通过 for 循环 进行遍历了。

迭代器对象

迭代器就是实现了 __next____iter__ 方法(缺一不可)的对象,就叫迭代器。其中,__iter__ 方法返回迭代器本身,__next__ 方法不断返回迭代器中的下一个值,直到容器中没有更多元素时抛出 StopIteration 异常,以终止迭代。

迭代器的优点:懒加载,工厂模式,节约内存空间。比如提供一亿个元素,列表占用内存过大无法完成,但是迭代器可以,迭代器不会一次性把数据全生产出来,只会按需生产,迭代器没有长度属性。最常用的情况是让迭代器无限提供数据,例如:itertools.count(start, step)

Python
from itertools import count

counter = count(10)  # 第一个参数为起始值,默认0;第二个参数为步长,默认1
for _ in range(100):
    print(next(counter)) # 输出10到109等100个数

print(len(counter))  # 报错:迭代器没有len属性

生成器

生成器其实是一种特殊的迭代器,不过这种迭代器更加优雅。如果一个函数包含 yield 关键字,这个函数就会变成一个生成器。

生成器中 yield 关键字的作用:

  1. 程序每次在代码中遇到 yield 关键字后,会返回结果,并保留当前函数状态,等待下次调用
  2. 下次调用从上次返回 yield 的语句出开始继续执行
Python
def demo():
    print("Hello")
    yield 1
    print("World")
    yield 2

d = demo()
print(next(d))
print("-----")
print(next(d))

# 输出:
# Hello
# 1
# -----
# World
# 2

生成器的 send(value) 方法可以调用生成器,同时还能给生成器传递数据。

Python
def demo():
    t = yield 1  # t 接收从 send 方法传入的值
    print(t)
    yield 2

d = demo()
print(next(d))  # 需要预计激活后才能用 send(value) 传值,预激活也可以用 send(None)
print(d.send(3))  

# 输出:
# 1  # yield 返回的
# 3  # send() 传递的
# 2  # yield 返回的

将列表表达式的方括号换成圆括号,返回的是一个生成器。

Python
a = (i for i in range(10))
b = [i for i in range(10)]
print(type(a))  # generator
print(type(b))  # list

通过生成器实现携协程(消费者-生产者模式):

Python
def consumer():
    r = ''
    while True:
        n = yield r
        print(f"[consumer] {n}")
        r = 'ok'

def producer(c):
    c.send(None)
    for n in range(5):
        r = c.send(n)
        print(f"[producer] {r}")

c = consumer()
producer(c)

多进程

os.getpid():返回当前进程 id

multiprocessing.Process(target=None, args=(), kwargs=None):创建进程对象

  • target:目标函数
  • args:以列表或元组形式传递参数
  • kwargs:以字典形式传递参数
进程的函数介绍参数返回值
start执行进程
join阻塞进程
kill杀死进程
alive进程是否存活bool
python
def f(n):
    while True:
        print(n, os.getpid())

if __name__ == '__main__':
    process1 = multiprocessing.Process(target=f, args=[1])
    process2 = multiprocessing.Process(target=f, kwargs={'n': 2})
    for process in (process1, process2):
        process.start()

TIP

当多个进程运行时,可能会出现的问题及解决方案:

  • 通过进程模块执行的函数无法获取返回值——进程间如何通信:使用队列
  • 多个进程同时修改文件可能会出现错误——进程间如何避免资源抢占:创建进程锁
  • 进程数量太多可能会造成资源不足,甚至死机等情况——如何避免创建进程数量过多:创建进程池

进程通讯

python
def send(queue: multiprocessing.Queue, data):
    queue.put(data)     # 将数据放入队列

def recv(queue: multiprocessing.Queue):
    data = queue.get()  # 从队列取出数据

queue = multiprocessing.Queue()  # 创建队列
process1 = multiprocessing.Process(target=send, args=(queue, 1))
process2 = multiprocessing.Process(target=recv, kwargs={'queue': queue})
process1.start()
process2.start()

进程池

multiprocessing.Pool(processes: int | None = None):创建进程池对象

  • processes:进程池中的进程数量
进程池的函数介绍参数返回值
apply_async将任务加入进程池(异步)func, args
close关闭进程池
join等待进程池任务结束
python
def task(n):
    while True:
        print(n, os.getpid())

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    for i in range(4):
        pool.apply_async(func=task, args=(i,))
    pool.close()
    pool.join()

apply_async 能够获取到任务函数的返回值,例如:

python
def task(n):
    return n

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    results = []
    for i in range(4):
        # res: multiprocessing.pool.ApplyResult
        res = pool.apply_async(func=task, args=(i,))
        results.append(res)
    pool.close()
    pool.join()
    for res in results:
        print(res.get())

进程锁

进程锁的函数介绍参数返回值
acquire上锁
release解锁
python
def task(n, lock):
    lock.acquire()  # 开启进程锁
    print(n, os.getpid())
    time.sleep(3)
    lock.release()  # 关闭进程锁

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    lock = manager.Lock()

    pool = multiprocessing.Pool(processes=4)
    for i in range(4):
        pool.apply_async(func=task, args=(i, lock))

    pool.close()
    pool.join()

多线程

threading.Thread(target=None, args=(), kwargs=None):创建线程对象

  • target:目标函数
  • args:以列表或元组形式传递参数
  • kwargs:以字典形式传递参数
线程的函数介绍参数返回值
start启动线程
join阻塞直到线程执行结束
is_alive判断线程是否存活bool
python
def f(n):
    while True:
        print(n)

thread1 = threading.Thread(target=f, args=[1])
thread2 = threading.Thread(target=f, kwargs={'n': 2})
thread1.start()  # 启动线程
thread2.start()  # 启动线程

线程通讯

  • Event:主要同于通过事件通知机制实现线程的大规模并发
  • Condition:主要用于多个线程间轮流交替执行任务
  • Queue:主要用于不同线程间任意类型数据的共享

线程通讯-Queue消息队列

Python
def product(q: queue.Queue, data: Any):
    q.put(data)  # 如果队满,将会阻塞等待
    print(f"{threading.current_thread().name} put: {data}")

def consume(q: queue.Queue):
    item = q.get()  # 如果队列中没有数据,将会阻塞等待
    print(f"{threading.current_thread().name} get: {item}")

q = queue.Queue(maxsize=1)  # maxsize=0则表示无上限
threading.Thread(target=product, args=(q, "Hello"), name="producer").start()
threading.Thread(target=consume, args=(q,), name="consumer").start()

线程通讯-Event事件对象

  • event.clear():重置 event 对象,使得所有该 event 事件都处于待命状态。
  • event.wait():阻塞线程,等待 event 指令。
  • event.set():发送 event 指令,使得所有设置该 event 事件的线程执行。
Python
class MyThread(threading.Thread):
    def __init__(self, event: threading.Event):
        super().__init__()
        self.event = event

    def run(self):
        print(f"{self.name} 初始化完成,随时准备启动...")
        self.event.wait()  # 阻塞,等待 evnet 指令
        print(f"{self.name} 开始执行")

if __name__ == '__main__':
    event = threading.Event()
    threads = [MyThread(event) for _ in range(8)]
    event.clear()
    for thread in threads:
        thread.start()
    # 3s后发送指令,所有线程并发执行
    time.sleep(3)
    event.set()

线程通讯-Condition条件对象

Python
class MyThread(threading.Thread):
    def __init__(self, condition: threading.Condition):
        super().__init__()
        self.condition = condition

    def run(self):
        condition.acquire()
        while True:
            time.sleep(1)
            print(f"{self.name}")
            self.condition.notify()  # 通知另一个线程执行
            self.condition.wait()
        condition.release()

if __name__ == '__main__':
    condition = threading.Condition()
    threads = [MyThread(condition) for _ in range(2)]
    for thread in threads:
        thread.start()

# 控制台2个线程将交替输出

Condition非常适合需要线程轮流执行的情况。

线程通讯-消息隔离

threading.local() 能够实现线程间的消息隔离,避免线程间共享变量带来的安全问题。

Python
local_data = threading.local()
local_data.name = "local_data"

class MyThread(threading.Thread):
    def run(self):
        print(f"{threading.current_thread().name} {local_data.__dict__}")
        local_data.name = threading.current_thread().name
        print(f"{threading.current_thread().name} {local_data.__dict__}")

MyThread().start()
MyThread().start()
print(f"{threading.current_thread().name} {local_data.__dict__}")

# 输出:
# Thread-1 {}
# Thread-1 {'name': 'Thread-1'}
# Thread-2 {}
# Thread-2 {'name': 'Thread-2'}
# MainThread {'name': 'local_data'}

可以看出每个对象都有自己的 threading.local 的值。

线程池

concurrent.futures.ThreadPoolExecutor(max_workers=None):创建线程池对象

  • max_workers:最大线程数
python
def f(n):
    return n

pool = concurrent.futures.ThreadPoolExecutor(max_workers=2)
results = []
for i in range(8):
    res = pool.submit(f, i)  # submit(fn, *args),往线程池中加入任务
    results.append(res)
for res in results:
    print(res.done())    # 任务是否完成
    print(res.result())  # 获取当前线程执行任务的返回值

线程锁

threading.Lock():创建线程锁对象

线程锁的函数介绍
acquire上锁
release解锁
python
lock = threading.Lock()  # 创建线程锁对象

def f(n):
    lock.acquire()       # 开启线程锁
    print(n)
    time.sleep(1)
    lock.release()       # 关闭线程锁

pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
for i in range(8):
    pool.submit(f, i)

GIL全局锁

解决办法:多进程+多线程

进程和线程的区别

进程是系统分配资源的最小单位,线程是执行的最小单位。由于GIL全局锁的存在,CPU密集型任务使用多进程,IO密集型(文件处理、网络爬虫等)使用多线程。

死锁:多线程间共享多个资源时,如果两个线程分别占用一部分资源并等待对方的资源,就会造成死锁。

异步

  • 轻量级的线程——协程
  • 可以获取异步函数的返回值
  • 要求主进程也是异步,并且执行过程中要求所有程序都是异步

TIP

多线程和多进程无法直接获取任务返回值,线程池和进程池获取任务返回值是用了异步的方法。

关键字 async 定义异步,await 执行异步。
主程序没办法直接执行异步函数,需要借助内置模块 asyncio

asyncio的函数介绍参数返回值
gather将异步函数批量执行async_fn, ...函数的返回结果列表
run执行异步函数async_fn函数返回值
python
async def f(n):
    print(n)
    await asyncio.sleep(3)  # 不能使用 time.sleep()
    return n

async def main():
    results = await asyncio.gather(f(1), f(2))  # f(1), f(2) 同时执行
    print(results)  # 输出:[1, 2]
    return 0

res = asyncio.run(main())
print(res)  # 输出:0

协程

Python
async def hello(): pass
coro = hello()  # 创建协程对象,但是不会执行函数内部代码
print(isinstance(coro, Coroutine))  # 输出:True

重要概念

  • event_loop:事件循环,asyncio 中开启的一个无限的事件循环,asyncio 会自动在满足条件时去调用相应的协程对象,我们只需将协程对象注册到该事件循环上即可。
  • coroutine:协程对象,通过 async 定义的函数在调用后不会立即执行,而是返回一个协程对象,协程对象需要注册到事件循环,由事件循环进行调用。
  • future 对象:代表将来执行或没有执行的任务的结果,它和下面的 task 对象没有本质区别
  • task 对象:一个协程对象就是一个可以挂起的函数,而任务则是对协程的进一步封装,其中包含了任务的各种状态。

工作流程

  1. 定义协程对象
  2. 定义事件循环对象容器
  3. 将协程转为 task 任务
  4. task 扔进事件循环对象中触发
Python
async def hello(text: str):
    print("hello,", text)

coro = hello("world!")           # 1. 创建协程对象
loop = asyncio.new_event_loop()  # 2. 获取事件循环对象容器
task = loop.create_task(coro)    # 3. 将协程对象转为 task 对象
# task = asyncio.ensure_future(coro, loop=loop)  # 效果同上行代码
loop.run_until_complete(task)    # 4. 将 task 任务扔进事件循环对象中触发

通过 task.result() 可以拿到返回结果:

Python
async def func():
    return 200

coro = func()
loop = asyncio.new_event_loop()
task = asyncio.ensure_future(coro, loop=loop)
loop.run_until_complete(task)
print(task.result())  # 获取协程对象返回值

回调函数

Python
async def func():
    return 200

def callback(future):
    print(future.result())

coro = func()
loop = asyncio.new_event_loop()
task = asyncio.ensure_future(coro, loop=loop)
task.add_done_callback(callback)  # 通过 `add_done_callback` 添加回调函数
loop.run_until_complete(task)

协程并发

方法一:asyncio.gather

Python
async def do_some_work(x):
    print(x)
    await asyncio.sleep(x)  # 模拟一个耗时操作
    return x

coro1 = do_some_work(1)
coro2 = do_some_work(2)
coro3 = do_some_work(3)

loop = asyncio.new_event_loop()
tasks = [
    asyncio.ensure_future(coro1, loop=loop),
    asyncio.ensure_future(coro2, loop=loop),
    asyncio.ensure_future(coro3, loop=loop)
]

# 将 tasks 注册到事件循环中
loop.run_until_complete(asyncio.gather(*tasks))
for task in tasks:
    print(task.result())

方法二:asyncio.wait

Python
async def do_some_work(x):
    print(x)
    await asyncio.sleep(x)  # 模拟一个耗时操作
    return x

coro1 = do_some_work(1)
coro2 = do_some_work(2)
coro3 = do_some_work(3)

loop = asyncio.new_event_loop()
tasks = [
    asyncio.ensure_future(coro1, loop=loop),
    asyncio.ensure_future(coro2, loop=loop),
    asyncio.ensure_future(coro3, loop=loop)
]

# 将 tasks 注册到事件循环中
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
    print(task.result())

魔术方法

Python 类中内置的特殊方法,通过编写魔术方法,能够实现特定功能。

python
class Person:
    def __init__(self, name: str, age: int):
        self.name = name
        self.age = age

person_1 = Person("Tom", 13)
person_2 = Person("Jerry", 11)
print(person_1)              # 输出:<__main__.Person object at 0x00000199AFD01BE0>。对象所在地址。
print(person_1 < person_2)   # 报错
print(person_1 == person_2)  # 输出: False。比较的是两个对象的地址。

自定义魔术方法:

python
class Person:
    def __init__(self, name: str, age: int):
        self.name = name
        self.age = age

    # 实现对象转字符串的功能
    def __str__(self):
        return f"name: {self.name}, age: {self.age}"

    # 实现Person对象根据age比较大小的功能
    def __lt__(self, other):
        return self.age < other.age

    # 实现Person对象根据age判断是否相等的功能
    def __eq__(self, other):
        return self.age == other.age

person_1 = Person("Tom", 13)
person_2 = Person("Jerry", 11)
print(person_1)              # 输出: name: Tom, age: 13
print(person_1 < person_2)   # 输出: False
print(person_1 == person_2)  # 输出: False

常见的魔术方法:

魔术方法名用途使用方法
__init__(self, ...)构造方法,在创建对象时自动调用
__str__(self)对象转字符串str(obj)
__int__(self)对象转整数int(obj)
__len__(self)返回对象长度len(obj)
__getitem__(self, item)返回对象的项obj[item]
__setitem__(self, key, value)设置对象的项obj[key] = value
__delitem__(self, key)删除对象的项del obj[key]
__add__(self, other)对象的加法obj + other
__sub__(self, other)对象的减法obj - other
__mul__(self, other)对象的乘法obj * other
__mod__(self, other)对象的模运算obj % other
__truediv__(self, other)对象的除法obj / other
__floordiv__(self, other)对象的整数除法obj // other
__lt__(self, other)比较对象大小obj < other
__gt__(self, other)比较对象大小obj > other
__eq__(self, other)比较对象是否相等obj == other
__ne__(self, other)比较对象是否相等obj != other
__le__(self, other)比较对象大小obj <= other
__ge__(self, other)比较对象大小obj >= other

封装

在成员变量名或方法名前加上两个下划线,该成员变量或方法就会变为私有,无法在外部调用它。

python
class Student:
    def __init__(self, name: str):
        self.__name = name

    def __run(self):
        print(self.__name, "is running.")

stu = Student("ZHH")
print(stu.__name)          # 报错
stu.__run()                # 报错

# 可以通过在私有成员名前加 "_对象名" 访问,例如:
print(stu._Student__name)  # 正常输出: ZHH
stu._Student__run()        # 正常输出: ZHH is running.

继承

多继承时,如果多个父类中有同名成员,会保留先继承的。

python
class Parent1:
    id = 1

class Parent2:
    id = 2

class Son(Parent1, Parent2): ...

print(Son.id)  # 输出: 1

子类可通过 super().父类方法() 或者 父类.父类方法(self) 调用父类已经被复写的方法。

python
class Parent:
    def f(self):
        print("Parent")

class Son(Parent):
    def f(self):
        print("Son")
        Parent.f(self)  # 调用父类方法
        super().f()     # 调用父类方法