python 多线程 mongodb数据库:motor代码示例
# -*- coding: utf-8 -*-
from motor import motor_asyncio
import asyncio
import platform
if "Windows" in platform.platform():
policy = asyncio.WindowsSelectorEventLoopPolicy()
asyncio.set_event_loop_policy(policy)
async def init():
client = motor_asyncio.AsyncIOMotorClient()
# client = motor_asyncio.AsyncIOMotorClient('mongodb://127.0.0.1:27017')
# 创建或连接数据库
db = client['kanchai']
# 创建或连接集合(表)
colllection = db['article']
return colllection
async def find_one(collection, title):
result = await collection.find_one({"title": title}, {"_id": 0})
print(result)
async def find_all(collection, skip, limit):
# cousor = collection.find({}, skip=skip, limit=limit)
# for item in await cousor.to_list(length=limit):
# print(item)
async for item in collection.find({}, {"_id": 0}, skip=skip, limit=limit):
print(item)
async def count_all(collection):
counts = await collection.count_documents({})
print(counts)
async def update_document(collection, title):
ret = await collection.update_one({"title": title}, {"$set": {"isindex": 1}})
print(ret.modified_count)
async def delete_document(collection, title):
# ret = await collection.delete_one({"title": title})
ret = await collection.delete_many({"title": title})
print(f"成功删除:{ret.deleted_count}个文档")
print(ret.acknowledged)
print(ret.raw_result)
async def main():
collection = await init()
title = "全链路数字化转型下,零售企业如何做到消费者“心智占领”"
await find_one(collection, title)
# await find_all(collection, 9, 10)
# await count_all(collection)
# await update_document(collection, title)
await delete_document(collection, title)
asyncio.run(main())