分类 技术 下的文章
OAuth2.0实现原理
原理图:
流程图:
其详细步骤分解如下:
1、用户点击登录按钮:
用户在一个第三方应用(客户端)上点击“使用微信登录”。
2、重定向到授权服务器:
客户端将用户重定向到微信的授权服务器(Authorization Server),并携带以下关键信息:
response_type=code(表明使用授权码模式)
client_id(第三方应用的标识ID,提前在微信开放平台注册的)
redirect_uri(授权成功后用户被重定向回客户端的地址)
scope(申请的权限范围,如 read_contacts)
state(一个随机生成的字符串,用于防止CSRF攻击)
3、用户认证与授权:
用户在微信的页面上输入用户名和密码进行登录(确保密码不会泄露给第三方应用)。
登录成功后,微信会向用户展示一个授权页面,列出第三方应用申请的权限(如“获取你的好友列表”)。
用户选择是否同意授权。
4、颁发授权码(Authorization Code):
如果用户同意授权,授权服务器会将用户重定向回第一步提供的 redirect_uri,并在URL中附带一个授权码(Code) 和之前传来的 state。
https://client.example.com/callback?code=AUTH_CODE_HERE&state=xyz
5、换取访问令牌(Access Token):
关键步骤:第三方应用的后端服务器(而不是浏览器前端)收到授权码后,会向授权服务器的令牌端点(Token Endpoint)发起一个后台的、服务器到服务器的请求。这个请求包含:
grant_type=authorization_code(声明授权类型)
code(上一步收到的授权码)
redirect_uri(必须与第一步的完全一致)
client_id 和 client_secret(应用密钥,用于证明自己的身份,必须保密!)
这个请求是后端发起的,避免了 client_secret 暴露给前端。
6、颁发访问令牌:
授权服务器验证所有信息:client_id/client_secret 是否匹配、授权码是否有效、重定向URI是否正确。
验证通过后,授权服务器会返回一个 JSON 数据,里面包含:
access_token: 期盼已久的访问令牌。
refresh_token: (可选)刷新令牌。
expires_in: 访问令牌的过期时间(例如 7200 秒)。
token_type: 令牌类型,通常是 Bearer。
7、访问受保护资源:
现在,第三方应用的后端或前端(取决于场景)就可以使用这个 access_token 去向微信的资源服务器请求数据了。
通常在 API 请求的 Authorization 头中加入:Authorization: Bearer <access_token>。
资源服务器会验证这个令牌的有效性(签名、有效期、范围),然后返回请求的数据(如好友列表)。
8、刷新访问令牌(可选):
当 access_token 过期后,客户端可以使用 refresh_token 再次向授权服务器请求一个新的 access_token,而无需用户再次手动授权。
python asyncio流
客户端:
#! python3
import asyncio
import time
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8885)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
async def tcp_echo_client1(message):
try:
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8885)
async def read():
while True:
data = await reader.readline()
if not data:
break
# message = data.decode().strip()
print(f'Received: {data.decode()!r}')
async def write():
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
async def ping():
await asyncio.sleep(5)
while True:
print("ping")
writer.write("ping\n".encode())
await writer.drain()
await asyncio.sleep(10)
await asyncio.gather(read(), write(), ping())
except ConnectionResetError as e:
print("断开连接")
print(e)
except Exception as e:
print("异常断开连接")
print(e)
# data = await reader.read(100)
# print(f'Received: {data.decode()!r}')
# print('Close the connection')
# writer.close()
# await writer.wait_closed()
if __name__ == '__main__':
asyncio.run(tcp_echo_client1("Hello World!\n"))
服务端:
#! python3
import asyncio
import time
async def handle_echo(reader, writer):
try:
addr = writer.get_extra_info('peername')
while True:
# 5秒接收不到数据就超时
data = await asyncio.wait_for(reader.readline(), timeout=30.0)
if not data:
print(f"断开连接")
break
message = data.decode()
print(f"Received {message!r} from {addr!r}")
# print(f"Send: {message!r}")
if message.strip() == "ping":
writer.write("pong\n".encode())
else:
fromServer = f"from server: {message.strip()}\n"
writer.write(fromServer.encode())
await writer.drain()
except TimeoutError:
print("超时断连")
writer.write("超时断开连接\n".encode())
await writer.drain()
finally:
print("Close the connection")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_echo, '0.0.0.0', 8885)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())
python asyncio协程并发
#! python3
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
# return f"task done: {what}"
async def main():
print(f"started at {time.strftime('%X')}")
# await say_after(1, 'hello')
# await say_after(2, 'world')
# print(f"finished at {time.strftime('%X')}")
# 总执行时间≈2秒(由最慢的任务决定)
# task1 = asyncio.create_task(say_after(1, 'hello'))
# task2 = asyncio.create_task(say_after(2, 'world'))
# await task1
# await task2
# print(f"finished at {time.strftime('%X')}")
# task管理器 当存在上下文管理器时 await 是隐式执行的。
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(say_after(1, 'hello'))
task2 = tg.create_task(say_after(2, 'world'))
print(f"finished at {time.strftime('%X')}")
async def taskCallback():
task = asyncio.create_task(say_after(1, 'hello'))
task.add_done_callback(lambda t: print("Task is done! Result: {!r}".format(t.result())))
await task
class TaskCancelException(Exception):
"""Exception raised to terminate a task group."""
async def force_cancel_task():
raise TaskCancelException()
async def taskGroupCancel():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(say_after(1, 'hello'))
task2 = tg.create_task(say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
await asyncio.sleep(1)
tg.create_task(force_cancel_task()) # 异步任务组取消
except* TaskCancelException:
print("Task is canceled!")
async def taskCancel():
print(f"started main at {time.strftime('%X')}")
task = asyncio.create_task(say_after(5, 'hello'))
print(f"started at {time.strftime('%X')}")
await asyncio.sleep(1)
task.cancel()
# task.cancel() 并不会立即终止任务,而是会在下一个 await 或挂起点 抛出 asyncio.CancelledError 异常
try:
await task
except asyncio.CancelledError:
print("main task is canceled!")
print(f"finished at {time.strftime('%X')}")
async def taskTimeout():
try:
print(f"started at {time.strftime('%X')}")
async with asyncio.timeout(5):
await asyncio.sleep(10)
except TimeoutError:
print("Task is timeout!")
print(f"finished at {time.strftime('%X')}")
async def taskTimeoutAt():
loop = asyncio.get_running_loop()
deadline = loop.time() + 5
print(f"started at {time.strftime('%X')}")
print(deadline)
try:
async with asyncio.timeout_at(deadline):
await asyncio.sleep(10)
except TimeoutError:
print("The long operation timed out, but we've handled it.")
print(f"finished at {time.strftime('%X')}")
async def eternity():
# 休眠一小时
await asyncio.sleep(10)
print('yay!')
async def taskWaitFor():
# 等待至多 5 秒
try:
print(f"started at {time.strftime('%X')}")
await asyncio.wait_for(eternity(), timeout=5.0)
except TimeoutError:
print('timeout!')
print(f"finished at {time.strftime('%X')}")
def blocking_io():
print(f"start blocking_io at {time.strftime('%X')}")
# 请注意 time.sleep() 可被替换为任意一种
# 阻塞式 IO 密集型操作,例如文件操作。
time.sleep(5)
print(f"blocking_io complete at {time.strftime('%X')}")
async def taskExcute1():
# print(f"started main at {time.strftime('%X')}")
await asyncio.sleep(2)
print(f"finished task1 at {time.strftime('%X')}")
async def taskExcuteInThread():
print(f"started main at {time.strftime('%X')}")
await asyncio.gather(
asyncio.to_thread(blocking_io),
taskExcute1())
print(f"finished main at {time.strftime('%X')}")
async def task1(i):
await asyncio.sleep(i)
print(f"task {i} is done")
return f'the result {i}'
async def taskWaitAllComplete():
print(f"started main at {time.strftime('%X')}")
# 多个并发任务,完成时间取决于最慢的任务
tasks = [asyncio.create_task(task1(i)) for i in [1, 2, 3]]
# done, pending = await asyncio.wait(tasks, timeout=2.0, return_when=asyncio.ALL_COMPLETED) # 设置超时时间
done, pending = await asyncio.wait(tasks, timeout=None, return_when=asyncio.ALL_COMPLETED) # 不设置超时时间
for d in done:
print(d.result())
for p in pending:
print("超时未完成task")
print(p)
print(f"finished main at {time.strftime('%X')}")
async def taskWaitOneComplete():
print(f"started main at {time.strftime('%X')}")
# 多个并发任务,完成时间取决于最慢的任务
tasks = [asyncio.create_task(task1(i)) for i in [1, 2, 3]]
# done, pending = await asyncio.wait(tasks, timeout=2.0, return_when=asyncio.ALL_COMPLETED) # 设置超时时间
done, pending = await asyncio.wait(tasks, timeout=None, return_when=asyncio.FIRST_COMPLETED) # 不设置超时时间
for d in done:
print(d.result())
for p in pending:
print("超时未完成task")
print(p)
p.cancel()
print(f"finished main at {time.strftime('%X')}")
async def taskLock(name):
# 创建一个锁
async with lock:
print(f"started task_{name} at {time.strftime('%X')}")
print(f"lock acquired at {time.strftime('%X')}")
await asyncio.sleep(1)
print(f"lock released at {time.strftime('%X')}")
async def runTaskLock():
async with asyncio.TaskGroup() as tg:
tg.create_task(taskLock("A"))
tg.create_task(taskLock("B"))
tg.create_task(taskLock("C"))
print("All tasks are done")
async def taskEvent(event):
print(f"waiting event at {time.strftime('%X')}")
await event.wait()
print(f"done event at {time.strftime('%X')}")
async def taskEventSet(event):
await asyncio.sleep(5)
event.set()
async def runTaskEvent():
event = asyncio.Event()
async with asyncio.TaskGroup() as tg:
tg.create_task(taskEvent(event))
tg.create_task(taskEvent(event))
tg.create_task(taskEvent(event))
tg.create_task(taskEventSet(event))
async def taskConditionProducer(condition):
async with condition:
await asyncio.sleep(2)
print(f"condition nofify at {time.strftime('%X')}")
# condition.notify()
condition.notify_all()
# for i in range(10):
# await asyncio.sleep(1)
# condition.notify()
async def taskConditionConsumer(condition, name):
async with condition:
while True:
print(f"{name} waiting condition at {time.strftime('%X')}")
await condition.wait()
print(f"{name} done condition at {time.strftime('%X')}")
break
# print(f"waiting condition at {time.strftime('%X')}")
# await condition.wait()
# print(f"done condition at {time.strftime('%X')}")
async def runTaskCondition():
condition = asyncio.Condition()
async with asyncio.TaskGroup() as tg:
tg.create_task(taskConditionConsumer(condition, "A"))
tg.create_task(taskConditionConsumer(condition, "B"))
tg.create_task(taskConditionConsumer(condition, "C"))
tg.create_task(taskConditionConsumer(condition, "D"))
tg.create_task(taskConditionConsumer(condition, "E"))
tg.create_task(taskConditionConsumer(condition, "F"))
tg.create_task(taskConditionConsumer(condition, "G"))
tg.create_task(taskConditionProducer(condition))
async def taskSemaphore(semaphore, i):
async with semaphore:
print(f"started task_{i} at {time.strftime('%X')}")
print(f"semaphore acquired at {time.strftime('%X')}")
await asyncio.sleep(1)
print(f"semaphore released at {time.strftime('%X')}")
async def runTaskSemaphore():
# 信号量, 限制最多3个并发任务
semaphore = asyncio.Semaphore(3)
async with asyncio.TaskGroup() as tg:
for i in range(10):
tg.create_task(taskSemaphore(semaphore, i))
async def taskBarrier(barrier, i):
print(f"started task_{i} at {time.strftime('%X')}")
await asyncio.sleep(1 + i * 0.5)
await barrier.wait()
print(f"done task_{i} at {time.strftime('%X')}")
async def runTaskBarrier():
# 任务屏障, 限制最多3个并发任务
barrier = asyncio.Barrier(3)
async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(taskBarrier(barrier, i))
async def runTaskBarrier1():
# 任务屏障, 限制最多3个并发任务
barrier = asyncio.Barrier(3)
asyncio.create_task(taskBarrier(barrier, 0))
asyncio.create_task(taskBarrier(barrier, 1))
await asyncio.sleep(2)
await barrier.wait()
print("All tasks are done")
async def taskQueueProducer(queue):
for i in range(20):
await queue.put(i)
print(f"put {i} to queue")
await asyncio.sleep(1)
await queue.join()
print("All items have been processed")
raise TaskDoneException()
async def taskQueueConsumer(queue):
while True:
item = await queue.get()
await asyncio.sleep(1)
queue.task_done()
print(f"get {item} from queue")
class TaskDoneException(Exception):
"""Exception raised to terminate a task group."""
async def runTaskQueue():
queue = asyncio.Queue()
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(taskQueueProducer(queue))
tg.create_task(taskQueueConsumer(queue))
tg.create_task(taskQueueConsumer(queue))
except* TaskDoneException:
print("Tasks is done!")
if __name__ == '__main__':
# asyncio.run(main())
# 任务回调
# asyncio.run(taskCallback())
# 任务组取消 (模拟抛异常终结)
# asyncio.run(taskGroupCancel())
# 任务取消 (模拟正常终结)
# asyncio.run(taskCancel())
# 任务超时异常
# asyncio.run(taskTimeout())
# 任务超时异常
# asyncio.run(taskTimeoutAt())
# 任务超时异常
# asyncio.run(taskWaitFor())
# 针对于阻塞IO密集型操作,在新线程调用运行
# asyncio.run(taskExcuteInThread())
# 任务简单等待 (不会抛超时异常, 分别返回完成、未完成任务future)
# asyncio.run(taskWaitAllComplete())
# 任务简单等待 (会抛超时异常, 分别返回完成、未完成任务future)
# asyncio.run(taskWaitOneComplete())
# lock = asyncio.Lock()
# # 任务锁
# asyncio.run(runTaskLock())
# 任务事件event 协程之间的信号传递, 事件可被用来通知多个 asyncio 任务已经有事件发生。
# asyncio.run(runTaskEvent())
# 任务条件condition
# asyncio.run(runTaskCondition())
# 信号量, 限制最多3个并发任务
# asyncio.run(runTaskSemaphore())
# 任务屏障, 限制最多3个并发任务
# asyncio.run(runTaskBarrier())
# asyncio.run(runTaskBarrier1())
# 任务队列 (生产者-消费者模式) 任务完成后自动退出
asyncio.run(runTaskQueue())
workerman分布式部署
1、NGINX + websocket负载均衡
http {
upstream websocket_cluster {
# Workerman 节点地址,可多台机器或多个端口
server 192.168.1.10:7272;
server 192.168.1.11:7272;
server 192.168.1.12:7272;
}
server {
listen 80;
server_name yourdomain.com;
location /ws/ {
proxy_pass http://websocket_cluster;
# WebSocket 必要头部
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 防止超时
proxy_read_timeout 60s;
proxy_send_timeout 60s;
}
}
}

