Warning: This Python package is currently still in development phase
Placeholder
pip install simple-mongodb
import asyncio
from typing import Any
from bson import ObjectId
from pydantic import BaseModel
from simple_mongodb import BaseCollection, MongoDBClient
class AccountCollection(BaseCollection):
db = 'my-db' # The name of the database or set the enviroment variable MONGODB_DB
collection = 'account-collection' # The name of the collection
class Account(BaseModel):
name: str
async def main() -> None:
# Initialize a client object and pass the url or set enviroment variables
# MONGODB_HOST, MONGODB_PORT,
# MONGODB_USERNAME, MONGODB_PASSWORD
# Is the url param or enviroment variables not set the default values are used
client: MongoDBClient = MongoDBClient(url='mongodb://user:pass@host:27017')
# Initialize the account collection
account_collection: AccountCollection = AccountCollection(client=client)
account: Account = Account(name='example-name')
try:
# Insert the document in the collection
document: dict[str, Any] = account.model_dump()
inserted_id: ObjectId = await account_collection.insert_one(document=document)
# Find the document
where: dict[str, Any] = {'_id': inserted_id}
document: dict[str, Any] = await account_collection.find_one(where=where)
# Update the document
update: dict[str, Any] = {'$set': {'name': 'other-name'}}
# Returns the id of the new document if upsert=True
await account_collection.update_one(where=where, update=update, upsert=False)
except account_collection.InsertError:
pass
except account_collection.FindError:
pass
except account_collection.UpdateError:
pass
except account_collection.ServerTimeoutError:
pass
# Close the db connection
client.close()
if __name__ == '__main__':
asyncio.run(main())