Copy sys.version # check the python version
locals() # show current local symbol table
globals() # global symbol table
#python 3
import urllib.parse
# can't import urllib and then try to access urllib.parse
urllib.parse.quote_plus(s)
#python2
urllib.quote_plus(s)
DictWriter and imap
Copy from csv import DictWriter
from multiprocessing import Pool
import time
def process_line(line):
# biz logic
return {
'name': name,
'age': age,
}
NUM_OF_WORKERS = 8
pool = Pool(NUM_OF_WORKERS)
with open('input.csv', 'r') as input, \
open('output.csv', 'w', newline ='') as output:
headers = [
'name',
'age',
]
writer = DictWriter(output, fieldnames = headers)
writer = writeheader()
s = time.time()
# imap returns one result as the worker finishes one task
# map returns all results when all tasks are done
for result in pool.imap(process_line, input, NUM_OF_WORKERS):
writer.writerow(result)
e = time.time()
rint(f'finished in {e-s} seconds.')
More on map vs imap
pyenv
建议在本地和服务器上都通过 pyenv 来管理版本,而不要去动系统自带的 python(以免引起额外的麻烦) https://blog.laisky.com/p/pyenv/ 另外一点就是,如果你想写一个兼容 2、3 的工具包,你可以考虑使用 future http://python-future.org/compatible_idioms.html… 最后提醒一下,2to3 这个脚本是有可能出错的。
Python Style Guide
PEP 8
Google python guide
注意事项
python 3 built in module
Other random python guide
并行
以前听别人讲过一个比喻,静态语言是吃冒菜,一次性烫好。而动态语言是涮火锅,吃一点涮一点。
那么我觉得,GIL 就是仅有一双筷子的火锅,即使你菜很多,一次也只能涮一个。
但是,对于 I/O bound 的操作,你不必一直夹着菜,而是可以夹一些扔到锅里,这样就可以同时涮很多,提高并行效率。
并行时主要是在考虑两个问题:同步控制和资源用量。
同步控制
thread, multiprocessing, asyncio 几个包里都会发现一系列的工具:
这个就不展开细谈了,属于另一个语言无关的大领域。
资源用量
一般来说,前者直接确定了你内存的消耗量,最好选择一个恰好或略高于消费量的数。后者一般直接决定了你的 CPU 使用率,过高的并发量会增加切换开销,得不偿失。
池pool
我们经常提到线程池、进程池、连接池。说白了就是对于一些可重用的资源,不必每次都创建新的,而是使用完毕后回收留待下一个数据继续使用。比如你可以选择不断地开子线程,也可以选择预先开好一批线程,然后通过 queue 来不断的获取和处理数据。
所以说使用“池”的主要目的就是减少资源的消耗。另一个优点是,使用池可以非常方便的控制并发度(很多新人以为 Queue 是用来控制并发度的,这是错误的,Queue 控制的是缓冲量)。 对于连接池,还有另一层好处,那就是端口资源是有限的,而且回收端口的速度很慢,你不断的创建连接会导致端口迅速耗尽。
对前面介绍的 python 中进程/线程做一个小结,线程池可以用来解决 I/O 的阻塞,而进程可以用来解决 GIL 对 CPU 的限制(因为每一个进程内都有一个 GIL)。所以你可以开 N 个(小于等于核数)进程池,然后在每一个进程中启动一个线程池,所有的线程池都可以订阅同一个 Queue,来实现真正的多核并行。
非常简单的描述一下进程/线程,对于操作系统而言,可以认为进程是资源的最小单位(在 PCB 内保存如图 1 的数据)。而线程是调度的最小单位。同一个进程内的线程共享除栈和寄存器外的所有数据。 所以在开发时候,要小心进程内多线程数据的冲突,也要注意多进程数据间的隔离(需要特别使用进程间通信)
不过在 Py 里,单机上最常用的进程间通信就是 multiprocessing 里的 Queue 和 sharedctypes。
做一个小结,一个简单的做法是,启动程序后,分别创建一个进程池(进程数小于等于可用核数)、线程池和 ioloop,ioloop 负责调度一切的协程,遇到阻塞的调用时,I/O 型的扔进线程池,CPU 型的扔进进程池,这样代码逻辑简单,还能尽可能的利用机器性能。
例子:
Copy """
✗ python process_thread_coroutine.py
[2019-08-11 09:09:37,670Z - INFO - kipp] - main running...
[2019-08-11 09:09:37,671Z - INFO - kipp] - coroutine_main running...
[2019-08-11 09:09:37,671Z - INFO - kipp] - io_blocking_task running...
[2019-08-11 09:09:37,690Z - INFO - kipp] - coroutine_task running...
[2019-08-11 09:09:37,691Z - INFO - kipp] - coroutine_error running...
[2019-08-11 09:09:37,691Z - INFO - kipp] - coroutine_error end, cost 0.00s
[2019-08-11 09:09:37,693Z - INFO - kipp] - cpu_blocking_task running...
[2019-08-11 09:09:38,674Z - INFO - kipp] - io_blocking_task end, cost 1.00s
[2019-08-11 09:09:38,695Z - INFO - kipp] - coroutine_task end, cost 1.00s
[2019-08-11 09:09:39,580Z - INFO - kipp] - cpu_blocking_task end, cost 1.89s
[2019-08-11 09:09:39,582Z - INFO - kipp] - coroutine_main got [None, AttributeError('yo'), None, None]
[2019-08-11 09:09:39,582Z - INFO - kipp] - coroutine_main end, cost 1.91s
[2019-08-11 09:09:39,582Z - INFO - kipp] - main end, cost 1.91s
"""
from time import sleep, time
from asyncio import get_event_loop, sleep as asleep, gather, ensure_future, iscoroutine
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait
from functools import wraps
from kipp.utils import get_logger
logger = get_logger()
N_FORK = 4
N_THREADS = 10
thread_executor = ThreadPoolExecutor(max_workers=N_THREADS)
process_executor = ProcessPoolExecutor(max_workers=N_FORK)
ioloop = get_event_loop()
def timer(func):
@wraps(func)
def wrapper(*args, **kw):
logger.info(f"{func.__name__} running...")
start_at = time()
try:
r = func(*args, **kw)
finally:
logger.info(f"{func.__name__} end, cost {time() - start_at:.2f}s")
return wrapper
def async_timer(func):
@wraps(func)
async def wrapper(*args, **kw):
logger.info(f"{func.__name__} running...")
start_at = time()
try:
return await func(*args, **kw)
finally:
logger.info(f"{func.__name__} end, cost {time() - start_at:.2f}s")
return wrapper
@timer
def io_blocking_task():
"""I/O 型阻塞调用"""
sleep(1)
@timer
def cpu_blocking_task():
"""CPU 型阻塞调用"""
for _ in range(1 << 26):
pass
@async_timer
async def coroutine_task():
"""异步协程调用"""
await asleep(1)
@async_timer
async def coroutine_error():
"""会抛出异常的协程调用"""
raise AttributeError("yo")
@async_timer
async def coroutine_main():
ioloop = get_event_loop()
r = await gather(
coroutine_task(),
coroutine_error(),
ioloop.run_in_executor(thread_executor, io_blocking_task),
ioloop.run_in_executor(process_executor, cpu_blocking_task),
return_exceptions=True,
)
logger.info(f"coroutine_main got {r}")
@timer
def main():
get_event_loop().run_until_complete(coroutine_main())
if __name__ == "__main__":
main()
有一些较为普适的经验:
下列方法如非瓶颈不要轻易用:
此外,“静态化”是一种提高程序可读性和可维护性的重要手段,比如在函数定义时指明 type-hints,写清楚参数和返回值的类型。 以及对于 OOP,也可以写出定义明确的的“接口-实现”型代码,比如按照 abc
-> BaseClass
-> Class
-> Instance
的形式进行定义,就会规范很多。
Copy from abc import ABC, abstractmethod, abstractproperty
class ThingsABC(ABC):
@abstractproperty
def etable(self):
pass
class BaseFood(ThingsABC):
etable = True
class BirdABC(ABC):
"""
在抽象类中定义抽象方法和属性,
实例化的时候会自动检查这些抽象方法和方法必须已被实现,否则会抛出异常。
具体实现的方法多种多样,比如直接在类里定义,或者多继承等等
"""
@abstractmethod
def fly(self):
pass
@abstractmethod
def eat(self, food: BaseFood):
pass
class BaseBird(BirdABC):
"""
可以定义一些鸟类都应该有的通用属性和方法
"""
pass
class Robin(BaseBird):
"""
定义一些知更鸟特有的属性和方法
"""
# def fly(self):
# pass
# def eat(self, foold: BaseFood):
# pass
r = Robin() # 会报错,因为没有实现抽象方法
# ---------------------------------------------------------------------------
# TypeError Traceback (most recent call last)
# <ipython-input-1-a4984ec6275b> in <module>
# 45
# 46
# ---> 47 r = Robin() # 会报错,因为没有实现抽象方法
# TypeError: Can't instantiate abstract class Robin with abstract methods eat, fly