Verified Commit cf0aee3d authored by Eliot Berriot's avatar Eliot Berriot
Browse files

Fixed leaking redis connections

parent 58b21157
......@@ -218,6 +218,7 @@ ALLOWED_HOSTS = env.list("DJANGO_ALLOWED_HOSTS", default=["retribute.me"])
# CACHES
# ------------------------------------------------------------------------------
CACHES = {"default": env.cache()}
CACHE_DEFAULT_EXPIRATION = env.int("CACHE_DEFAULT_EXPIRATION", default=60 * 60 * 12)
ASYNC_REDIS_PARAMS = {"address": CACHES["default"]["LOCATION"]}
CHANNEL_LAYERS = {
"default": {
......
......@@ -52,15 +52,21 @@ class Redis(Backend):
v = await r.get(key)
except KeyError:
raise self.NotFound(key)
if v is None:
raise self.NotFound(key)
try:
return json.loads(v)
except TypeError:
# Null, empty string, etc.
return v
async def set(self, key, value):
async def set(self, key, value, expire=None):
if expire is None:
expire = settings.CACHE_DEFAULT_EXPIRATION
r = await self.redis()
await r.set(key, json.dumps(value))
if expire:
return await r.setex(key, expire, json.dumps(value))
return await r.set(key, json.dumps(value))
async def close(self):
if not self._redis:
......@@ -68,5 +74,12 @@ class Redis(Backend):
await self._redis.close()
_DEFAULT = None
def get_default():
return Redis(settings.ASYNC_REDIS_PARAMS)
global _DEFAULT
if _DEFAULT:
return _DEFAULT
_DEFAULT = Redis(settings.ASYNC_REDIS_PARAMS)
return _DEFAULT
......@@ -36,18 +36,6 @@ def wrapper_500(callback):
return callback
def with_cache(f):
async def inner(*args, **kwargs):
c = kwargs.setdefault("cache", cache.get_default())
try:
return await f(*args, **kwargs)
except Exception:
await c.close()
raise
return inner
def ignore_aiohttp_ssl_eror(loop, aiohttpversion="3.5.4"):
"""Ignore aiohttp #3535 issue with SSL data after close
......@@ -96,8 +84,7 @@ def ignore_aiohttp_ssl_eror(loop, aiohttpversion="3.5.4"):
class SearchSingleConsumer(AsyncHttpConsumer):
@wrapper_500
@with_cache
async def handle(self, body, cache):
async def handle(self, body):
lookup_type = self.scope["url_route"]["kwargs"]["lookup_type"]
lookup = self.scope["url_route"]["kwargs"]["lookup"]
ignore_aiohttp_ssl_eror(asyncio.get_running_loop())
......@@ -107,7 +94,7 @@ class SearchSingleConsumer(AsyncHttpConsumer):
await json_response(self, 400, {"detail": "Invalid lookup"})
try:
async with aiohttp.client.ClientSession(timeout=aiohttp_timeout) as session:
data = await source.get(lookup, session, cache=cache)
data = await source.get(lookup, session, cache=cache.get_default())
profile = sources.result_to_retribute_profile(lookup_type, lookup, data)
except (exceptions.SearchError, aiohttp.ClientError) as e:
await json_response(self, 400, {"detail": e.message})
......@@ -118,9 +105,9 @@ class SearchSingleConsumer(AsyncHttpConsumer):
await json_response(self, 200, profile)
async def do_lookup(lookup, lookup_type, session, source, results, cache):
async def do_lookup(lookup, lookup_type, session, source, results):
try:
data = await source.get(lookup, session, cache=cache)
data = await source.get(lookup, session, cache=cache.get_default())
profile = sources.result_to_retribute_profile(lookup_type, lookup, data)
except (
exceptions.SearchError,
......@@ -139,8 +126,7 @@ async def do_lookup(lookup, lookup_type, session, source, results, cache):
class SearchMultipleConsumer(AsyncHttpConsumer):
@wrapper_500
@with_cache
async def handle(self, body, cache):
async def handle(self, body):
if self.scope["method"] == "OPTIONS":
return await self.send_response(
200,
......@@ -181,7 +167,6 @@ class SearchMultipleConsumer(AsyncHttpConsumer):
session=session,
source=source,
results=results,
cache=cache,
)
)
try:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment