258 lines
8.1 KiB
Python
258 lines
8.1 KiB
Python
import os
|
|
import logging
|
|
import time
|
|
import datetime
|
|
import traceback
|
|
import aiosqlite as sqlite3
|
|
from typing import LiteralString, Optional, Union
|
|
from fastapi import FastAPI, Request, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from .constructors import (
|
|
ValidTopKarmaRequest,
|
|
ValidKarmaRetrievalRequest,
|
|
ValidKarmaUpdateRequest,
|
|
)
|
|
|
|
|
|
class KarmaDB:
|
|
"""Karma DB Util"""
|
|
|
|
def __init__(self) -> None:
|
|
self.db_path: LiteralString = os.path.join(
|
|
"/", "usr", "local", "share", "sqlite_dbs", "karma.db"
|
|
)
|
|
|
|
async def get_karma(self, keyword: str) -> Union[int, dict]:
|
|
"""Get Karma Value for Keyword
|
|
Args:
|
|
keyword (str): The keyword to search
|
|
Returns:
|
|
Union[int, dict]
|
|
"""
|
|
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
|
|
async with await db_conn.execute(
|
|
"SELECT score FROM karma WHERE keyword LIKE ? LIMIT 1", (keyword,)
|
|
) as db_cursor:
|
|
try:
|
|
(score,) = await db_cursor.fetchone()
|
|
return score
|
|
except TypeError:
|
|
return {
|
|
"err": True,
|
|
"errorText": f"No records for {keyword}",
|
|
}
|
|
|
|
async def get_top(self, n: Optional[int] = 10) -> Optional[list[tuple]]:
|
|
"""
|
|
Get Top n=10 Karma Entries
|
|
Args:
|
|
n (Optional[int]) = 10: The number of top results to return
|
|
Returns:
|
|
list[tuple]
|
|
"""
|
|
try:
|
|
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
|
|
async with await db_conn.execute(
|
|
"SELECT keyword, score FROM karma ORDER BY score DESC LIMIT ?", (n,)
|
|
) as db_cursor:
|
|
return await db_cursor.fetchall()
|
|
except:
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
async def update_karma(
|
|
self, granter: str, keyword: str, flag: int
|
|
) -> Optional[bool]:
|
|
"""
|
|
Update Karma for Keyword
|
|
Args:
|
|
granter (str): The user who granted (increased/decreased) the karma
|
|
keyword (str): The keyword to update
|
|
flag (int): 0 to increase karma, 1 to decrease karma
|
|
Returns:
|
|
Optional[bool]
|
|
"""
|
|
|
|
if not flag in [0, 1]:
|
|
return None
|
|
|
|
modifier: str = "score + 1" if not flag else "score - 1"
|
|
query: str = (
|
|
f"UPDATE karma SET score = {modifier}, last_change = ? WHERE keyword LIKE ?"
|
|
)
|
|
new_keyword_query: str = (
|
|
"INSERT INTO karma(keyword, score, last_change) VALUES(?, ?, ?)"
|
|
)
|
|
friendly_flag: str = "++" if not flag else "--"
|
|
audit_message: str = (
|
|
f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}"
|
|
)
|
|
audit_query: str = (
|
|
"INSERT INTO karma_audit(impacted_keyword, comment) VALUES(?, ?)"
|
|
)
|
|
now: int = int(time.time())
|
|
|
|
logging.debug("Audit message: %s{audit_message}\nKeyword: %s{keyword}")
|
|
|
|
async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
|
|
async with await db_conn.execute(
|
|
audit_query,
|
|
(
|
|
keyword,
|
|
audit_message,
|
|
),
|
|
) as db_cursor:
|
|
await db_conn.commit()
|
|
async with await db_conn.execute(
|
|
query,
|
|
(
|
|
now,
|
|
keyword,
|
|
),
|
|
) as db_cursor:
|
|
if db_cursor.rowcount:
|
|
await db_conn.commit()
|
|
return True
|
|
if db_cursor.rowcount < 1: # Keyword does not already exist
|
|
await db_cursor.close()
|
|
new_val = 1 if not flag else -1
|
|
async with await db_conn.execute(
|
|
new_keyword_query,
|
|
(
|
|
keyword,
|
|
new_val,
|
|
now,
|
|
),
|
|
) as db_cursor:
|
|
if db_cursor.rowcount >= 1:
|
|
await db_conn.commit()
|
|
return True
|
|
else:
|
|
return False
|
|
return False
|
|
|
|
|
|
class Karma(FastAPI):
|
|
"""
|
|
Karma Endpoints
|
|
"""
|
|
|
|
def __init__(self, app: FastAPI, util, constants) -> None:
|
|
self.app: FastAPI = app
|
|
self.util = util
|
|
self.constants = constants
|
|
self.db = KarmaDB()
|
|
|
|
self.endpoints: dict = {
|
|
"karma/get": self.get_karma_handler,
|
|
"karma/modify": self.modify_karma_handler,
|
|
"karma/top": self.top_karma_handler,
|
|
}
|
|
|
|
for endpoint, handler in self.endpoints.items():
|
|
app.add_api_route(
|
|
f"/{endpoint}", handler, methods=["POST"], include_in_schema=True
|
|
)
|
|
|
|
async def top_karma_handler(
|
|
self, request: Request, data: Optional[ValidTopKarmaRequest] = None
|
|
) -> JSONResponse:
|
|
"""
|
|
Get top keywords for karma
|
|
- **n**: Number of top results to return (default: 10)
|
|
"""
|
|
|
|
if not self.util.check_key(
|
|
request.url.path, request.headers.get("X-Authd-With")
|
|
):
|
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
|
|
|
n: int = 10
|
|
if data and data.n:
|
|
n = int(data.n)
|
|
|
|
try:
|
|
top10: Optional[list[tuple]] = await self.db.get_top(n=n)
|
|
if not top10:
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"err": True,
|
|
"errorText": "General failure",
|
|
},
|
|
)
|
|
return JSONResponse(content=top10)
|
|
except:
|
|
traceback.print_exc()
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"err": True,
|
|
"errorText": "Exception occurred.",
|
|
},
|
|
)
|
|
|
|
async def get_karma_handler(
|
|
self, data: ValidKarmaRetrievalRequest, request: Request
|
|
) -> JSONResponse:
|
|
"""
|
|
Get current karma value
|
|
- **keyword**: Keyword to retrieve karma value for
|
|
"""
|
|
|
|
if not self.util.check_key(
|
|
request.url.path, request.headers.get("X-Authd-With")
|
|
):
|
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
|
|
|
keyword: str = data.keyword
|
|
try:
|
|
count: Union[int, dict] = await self.db.get_karma(keyword)
|
|
return JSONResponse(
|
|
content={
|
|
"keyword": keyword,
|
|
"count": count,
|
|
}
|
|
)
|
|
except:
|
|
traceback.print_exc()
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"err": True,
|
|
"errorText": "Exception occurred.",
|
|
},
|
|
)
|
|
|
|
async def modify_karma_handler(
|
|
self, data: ValidKarmaUpdateRequest, request: Request
|
|
) -> JSONResponse:
|
|
"""
|
|
Update karma count
|
|
- **granter**: User who granted the karma
|
|
- **keyword**: The keyword to modify
|
|
- **flag**: 0 to decrement (--), 1 to increment (++)
|
|
"""
|
|
|
|
if not self.util.check_key(
|
|
request.url.path, request.headers.get("X-Authd-With"), 2
|
|
):
|
|
raise HTTPException(status_code=403, detail="Unauthorized")
|
|
|
|
if not data.flag in [0, 1]:
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"err": True,
|
|
"errorText": "Invalid request",
|
|
},
|
|
)
|
|
|
|
return JSONResponse(
|
|
content={
|
|
"success": await self.db.update_karma(
|
|
data.granter, data.keyword, data.flag
|
|
)
|
|
}
|
|
)
|