194 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			194 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3.12
 | 
						|
 | 
						|
import os
 | 
						|
import logging
 | 
						|
import time
 | 
						|
import datetime
 | 
						|
import traceback
 | 
						|
import aiosqlite as sqlite3
 | 
						|
from typing import LiteralString, Optional, Union
 | 
						|
from fastapi import FastAPI, Request, HTTPException
 | 
						|
from fastapi.responses import JSONResponse
 | 
						|
from .constructors import ValidTopKarmaRequest, ValidKarmaRetrievalRequest,\
 | 
						|
    ValidKarmaUpdateRequest
 | 
						|
 | 
						|
class KarmaDB:
 | 
						|
    """Karma DB Util"""
 | 
						|
    def __init__(self) -> None:
 | 
						|
        self.db_path: LiteralString = os.path.join("/", "usr", "local", "share",
 | 
						|
                                    "sqlite_dbs", "karma.db")
 | 
						|
 | 
						|
    async def get_karma(self, keyword: str) -> Union[int,  dict]:
 | 
						|
        """Get Karma Value for Keyword
 | 
						|
        Args:
 | 
						|
            keyword (str): The keyword to search
 | 
						|
        Returns:
 | 
						|
            Union[int, dict]
 | 
						|
        """
 | 
						|
        async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
 | 
						|
            async with await db_conn.execute("SELECT score FROM karma WHERE keyword LIKE ? LIMIT 1", (keyword,)) as db_cursor:
 | 
						|
                try:
 | 
						|
                    (score,) = await db_cursor.fetchone()
 | 
						|
                    return score
 | 
						|
                except TypeError:
 | 
						|
                    return {
 | 
						|
                        'err': True,
 | 
						|
                        'errorText': f'No records for {keyword}',
 | 
						|
                    }
 | 
						|
                
 | 
						|
    async def get_top(self, n: Optional[int] = 10) -> Optional[list[tuple]]:
 | 
						|
        """
 | 
						|
        Get Top n=10 Karma Entries
 | 
						|
        Args:
 | 
						|
            n (Optional[int]) = 10: The number of top results to return
 | 
						|
        Returns:
 | 
						|
            list[tuple]
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
 | 
						|
                async with await db_conn.execute("SELECT keyword, score FROM karma ORDER BY score DESC LIMIT ?", (n,)) as db_cursor:
 | 
						|
                    return await db_cursor.fetchall()
 | 
						|
        except:
 | 
						|
            traceback.print_exc()
 | 
						|
            return None
 | 
						|
        
 | 
						|
    async def update_karma(self, granter: str, keyword: str,
 | 
						|
                           flag: int) -> Optional[bool]:
 | 
						|
        """
 | 
						|
        Update Karma for Keyword
 | 
						|
        Args:
 | 
						|
            granter (str): The user who granted (increased/decreased) the karma
 | 
						|
            keyword (str): The keyword to update
 | 
						|
            flag (int): 0 to increase karma, 1 to decrease karma
 | 
						|
        Returns:
 | 
						|
            Optional[bool]
 | 
						|
        """
 | 
						|
        
 | 
						|
        if not flag in [0, 1]:
 | 
						|
            return None
 | 
						|
        
 | 
						|
        modifier: str = "score + 1" if not flag else "score - 1"
 | 
						|
        query: str = f"UPDATE karma SET score = {modifier}, last_change = ? WHERE keyword LIKE ?"
 | 
						|
        new_keyword_query: str = "INSERT INTO karma(keyword, score, last_change) VALUES(?, ?, ?)"
 | 
						|
        friendly_flag: str = "++" if not flag else "--"
 | 
						|
        audit_message: str = f"{granter} adjusted karma for {keyword} @ {datetime.datetime.now().isoformat()}: {friendly_flag}"
 | 
						|
        audit_query: str = "INSERT INTO karma_audit(impacted_keyword, comment) VALUES(?, ?)"
 | 
						|
        now: int = int(time.time())
 | 
						|
 | 
						|
        logging.debug("Audit message: %s{audit_message}\nKeyword: %s{keyword}")
 | 
						|
 | 
						|
        async with sqlite3.connect(self.db_path, timeout=2) as db_conn:
 | 
						|
            async with await db_conn.execute(audit_query, (keyword, audit_message,)) as db_cursor:
 | 
						|
                await db_conn.commit()          
 | 
						|
            async with await db_conn.execute(query, (now, keyword,)) as db_cursor:
 | 
						|
                if db_cursor.rowcount:
 | 
						|
                    await db_conn.commit()
 | 
						|
                    return True
 | 
						|
                if db_cursor.rowcount < 1:  # Keyword does not already exist
 | 
						|
                    await db_cursor.close()
 | 
						|
                    new_val = 1 if not flag else -1
 | 
						|
                    async with await db_conn.execute(new_keyword_query, (keyword, new_val, now,)) as db_cursor:
 | 
						|
                        if db_cursor.rowcount >= 1:
 | 
						|
                            await db_conn.commit()
 | 
						|
                            return True
 | 
						|
                        else:
 | 
						|
                            return False
 | 
						|
                return False
 | 
						|
                    
 | 
						|
class Karma(FastAPI):
 | 
						|
    """
 | 
						|
    Karma Endpoints
 | 
						|
    """   
 | 
						|
    def __init__(self, app: FastAPI, util, constants) -> None:
 | 
						|
        self.app: FastAPI = app
 | 
						|
        self.util = util
 | 
						|
        self.constants = constants
 | 
						|
        self.db = KarmaDB()
 | 
						|
 | 
						|
        self.endpoints: dict = {
 | 
						|
            "karma/get": self.get_karma_handler,
 | 
						|
            "karma/modify": self.modify_karma_handler,
 | 
						|
            "karma/top": self.top_karma_handler,
 | 
						|
            }
 | 
						|
 | 
						|
        for endpoint, handler in self.endpoints.items():
 | 
						|
            app.add_api_route(f"/{endpoint}", handler, methods=["POST"],
 | 
						|
                              include_in_schema=True)
 | 
						|
 | 
						|
    
 | 
						|
    async def top_karma_handler(self, request: Request,
 | 
						|
                                data: Optional[ValidTopKarmaRequest] = None) -> JSONResponse:
 | 
						|
        """
 | 
						|
        Get top keywords for karma
 | 
						|
         - **n**: Number of top results to return (default: 10)
 | 
						|
        """
 | 
						|
 | 
						|
        if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')):
 | 
						|
            raise HTTPException(status_code=403, detail="Unauthorized")
 | 
						|
                
 | 
						|
        n: int = 10
 | 
						|
        if data and data.n:
 | 
						|
            n = int(data.n)
 | 
						|
            
 | 
						|
 | 
						|
        try:
 | 
						|
            top10: Optional[list[tuple]] = await self.db.get_top(n=n)
 | 
						|
            if not top10:
 | 
						|
                return JSONResponse(status_code=500, content={
 | 
						|
                    'err': True,
 | 
						|
                    'errorText': 'General failure',
 | 
						|
                    })
 | 
						|
            return JSONResponse(content=top10)
 | 
						|
        except:
 | 
						|
            traceback.print_exc()
 | 
						|
            return JSONResponse(status_code=500, content={
 | 
						|
                'err': True,
 | 
						|
                'errorText': 'Exception occurred.',
 | 
						|
                })
 | 
						|
    
 | 
						|
    async def get_karma_handler(self, data: ValidKarmaRetrievalRequest,
 | 
						|
                                request: Request) -> JSONResponse:
 | 
						|
        """
 | 
						|
        Get current karma value
 | 
						|
        - **keyword**: Keyword to retrieve karma value for
 | 
						|
        """
 | 
						|
        
 | 
						|
        if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With')):
 | 
						|
            raise HTTPException(status_code=403, detail="Unauthorized")        
 | 
						|
 | 
						|
        keyword: str = data.keyword
 | 
						|
        try:
 | 
						|
            count: Union[int, dict] = await self.db.get_karma(keyword)
 | 
						|
            return JSONResponse(content={
 | 
						|
                'keyword': keyword,
 | 
						|
                'count': count,
 | 
						|
                })
 | 
						|
        except:
 | 
						|
            traceback.print_exc()
 | 
						|
            return JSONResponse(status_code=500, content={
 | 
						|
                'err': True,
 | 
						|
                'errorText': "Exception occurred.",
 | 
						|
                })
 | 
						|
    
 | 
						|
    async def modify_karma_handler(self, data: ValidKarmaUpdateRequest,
 | 
						|
                                   request: Request) -> JSONResponse:
 | 
						|
        """
 | 
						|
        Update karma count
 | 
						|
        - **granter**: User who granted the karma
 | 
						|
        - **keyword**: The keyword to modify
 | 
						|
        - **flag**: 0 to decrement (--), 1 to increment (++) 
 | 
						|
        """
 | 
						|
 | 
						|
        if not self.util.check_key(request.url.path, request.headers.get('X-Authd-With'), 2):
 | 
						|
            raise HTTPException(status_code=403, detail="Unauthorized")
 | 
						|
        
 | 
						|
        if not data.flag in [0, 1]:
 | 
						|
            return JSONResponse(status_code=500, content={
 | 
						|
                'err': True,
 | 
						|
                'errorText': 'Invalid request',
 | 
						|
            })
 | 
						|
        
 | 
						|
        return JSONResponse(content={
 | 
						|
            'success': await self.db.update_karma(data.granter,
 | 
						|
                                                  data.keyword, data.flag)
 | 
						|
            }) |