4 Commits

1 changed files with 98 additions and 1 deletions
Split View
  1. +98
    -1
      radicale-auth-ldap/__init__.py

+ 98
- 1
radicale-auth-ldap/__init__.py View File

@ -2,6 +2,8 @@ from radicale.auth import BaseAuth
from radicale.log import logger
import ldap3
import hashlib
import datetime
PLUGIN_CONFIG_SCHEMA = {"auth": {
"ldap_server_url": {
@ -27,12 +29,91 @@ PLUGIN_CONFIG_SCHEMA = {"auth": {
"type": str},
"ldap_access_group_attribute": {
"value": "None",
"type": str}}}
"type": str},
"ldap_user_cache_in_seconds": {
"value": 0,
"type": int}}}
class UserCacheEntry:
def __init__(self, secret, expireDatetime):
self.secret = secret
self.expireDatetime = expireDatetime
def __str__(self):
temp = "secret [" + self.secret + "]\n"
temp += "expireDatetime [" + str(self.expireDatetime) + "]\n"
return temp
def checkSecret(self, secret):
if self.secret == secret:
return True
else:
return False
class UserCache:
def __init__(self, cacheTimeInSeconds):
self.salt = str(datetime.datetime.now(tz=datetime.UTC))
self.cacheTimeInSeconds = cacheTimeInSeconds
self.cache = {}
def clean(self):
now = datetime.datetime.now(tz=datetime.UTC)
entriesToDelete = []
for entry in self.cache:
if self.cache.get(entry).expireDatetime < now:
entriesToDelete.append(entry)
for entry in entriesToDelete:
self.cache.pop(entry, None)
def addEntry(self, userName, password):
self.clean()
hashedUserName = hashlib.sha256((userName + self.salt).encode()).hexdigest()
hashedPassword = hashlib.sha256((password + self.salt).encode()).hexdigest()
expireDatetime = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta(seconds=self.cacheTimeInSeconds)
self.cache[hashedUserName] = UserCacheEntry(hashedPassword, expireDatetime)
def checkIfEntryIsCached(self, userName, password):
self.clean()
hashedUserName = hashlib.sha256((userName + self.salt).encode()).hexdigest()
hashedPassword = hashlib.sha256((password + self.salt).encode()).hexdigest()
## Check if user is in cache
if hashedUserName in self.cache:
## Check if user password is correct
if self.cache[hashedUserName].checkSecret(hashedPassword):
return True
else:
## Delete cache entry if password is wrong
self.cache.pop(hashedUserName)
return False
else:
return False
class Auth(BaseAuth):
def __init__(self, configuration):
super().__init__(configuration.copy(PLUGIN_CONFIG_SCHEMA))
userCacheInSeconds = self.configuration.get("auth", "ldap_user_cache_in_seconds")
if userCacheInSeconds > 0:
self.grantedUserAccessCache = UserCache(userCacheInSeconds)
logger.info("LDAP: use user cache")
logger.debug("LDAP: user cache salt: %s" % self.grantedUserAccessCache.salt)
logger.debug("LDAP: user cache cacheTimeInSeconds: %s" % str(self.grantedUserAccessCache.cacheTimeInSeconds))
else:
self.grantedUserAccessCache = None
logger.info("LDAP: do not use cache")
def login(self, user, password):
"""Check if ``user``/``password`` couple is valid."""
serverUrl = self.configuration.get("auth", "ldap_server_url")
@ -44,6 +125,16 @@ class Auth(BaseAuth):
accessGroupFilter = self.configuration.get("auth", "ldap_access_group_filter")
accessGroupAttribute = self.configuration.get("auth", "ldap_access_group_attribute")
if self.grantedUserAccessCache is not None:
logger.info("LDAP: check if user is cached")
if self.grantedUserAccessCache.checkIfEntryIsCached(user, password):
logger.info("LDAP: user is cached")
return user
else:
logger.info("LDAP: user is not cached")
logger.info("LDAP: start connection")
logger.debug("LDAP: server URL: %s" % serverUrl)
logger.debug("LDAP: binddn: %s" % binddn)
@ -138,4 +229,10 @@ class Auth(BaseAuth):
return ""
else:
logger.info("LDAP: user successful verified")
if self.grantedUserAccessCache is not None:
logger.info("LDAP: add user to cache")
self.grantedUserAccessCache.addEntry(user, password)
return userAttributeValue

Loading…
Cancel
Save