Skip to content

Commit cdbd989

Browse files
author
Andrew Brookins
committed
Simplify the design - no compaction rules
1 parent cd1c31e commit cdbd989

File tree

5 files changed

+129
-116
lines changed

5 files changed

+129
-116
lines changed

README.md

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,40 @@
1-
# fastapi-redis
1+
# FastAPI Redis Example
2+
3+
This is an example API that demonstrates how to use Redis, RedisTimeSeries, and
4+
FastAPI together.
5+
6+
The service tracks Bitcoin sentiment and prices over time, rolling these up into
7+
hourly averages using RedisTimeSeries. You can use the API to get average
8+
Bitcoin price and sentiment for each of the last three hours, with a quick
9+
indication of price and sentiment movement.
10+
11+
12+
## Setup
13+
14+
To run this service, you'll need Docker. First, clone the repo and
15+
install the dependencies:
16+
17+
$ git clone http://github.com/redis-developer/fastapi-redis-tutorial.git
18+
$ cd fastapi-redis-tutorial
19+
$ docker-compose build
20+
21+
22+
## Running the API
23+
24+
The `docker-compose.yaml` file in this project configures a Redis instance with
25+
the RedisTimeSeries module, the Python app for the example API, and a test
26+
runner.
27+
28+
Use this command to run the app:
29+
30+
$ docker-compose up
31+
32+
This command starts Redis and the API server.
33+
34+
### Ingesting Price and Sentiment Data
35+
36+
The app assumes a scheduler (cron job, Cloud scheduler, etc.) will hit the `/refresh` endpoint in the app to trigger ingesting Bitcoin price and sentiment data on a regular basis.
37+
38+
Use this API to ingest data when you're playing with the API:
39+
40+
$

app/main.py

Lines changed: 57 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
import aioredis
1414
import requests
1515
from aioredis.exceptions import ResponseError
16+
from fastapi import BackgroundTasks
1617
from fastapi import Depends
1718
from fastapi import FastAPI
1819
from pydantic import BaseSettings
1920

20-
2121
DEFAULT_KEY_PREFIX = 'is-bitcoin-lit'
2222
SENTIMENT_API_URL = 'http://api.senticrypt.com/v1/bitcoin.json'
2323
TWO_MINUTES = 60 * 60
@@ -49,25 +49,15 @@ def __init__(self, prefix: str = DEFAULT_KEY_PREFIX):
4949
self.prefix = prefix
5050

5151
@prefixed_key
52-
def timeseries_30_second_sentiment_key(self) -> str:
52+
def timeseries_sentiment_key(self) -> str:
5353
"""A time series containing 30-second snapshots of BTC sentiment."""
5454
return f'sentiment:mean:30s'
5555

5656
@prefixed_key
57-
def timeseries_1_hour_sentiment_key(self) -> str:
58-
"""A time series containing 1-hour snapshots of BTC sentiment."""
59-
return f'sentiment:mean:1h'
60-
61-
@prefixed_key
62-
def timeseries_30_second_price_key(self) -> str:
57+
def timeseries_price_key(self) -> str:
6358
"""A time series containing 30-second snapshots of BTC price."""
6459
return f'price:mean:30s'
6560

66-
@prefixed_key
67-
def timeseries_1_hour_price_key(self) -> str:
68-
"""A time series containing 1-hour snapshots of BTC price."""
69-
return f'price:mean:1h'
70-
7161
@prefixed_key
7262
def cache_key(self) -> str:
7363
return f'cache'
@@ -112,8 +102,8 @@ def make_keys():
112102

113103

114104
async def persist(keys: Keys, data: BitcoinSentiments):
115-
ts_sentiment_key = keys.timeseries_30_second_sentiment_key()
116-
ts_price_key = keys.timeseries_30_second_price_key()
105+
ts_sentiment_key = keys.timeseries_sentiment_key()
106+
ts_price_key = keys.timeseries_price_key()
117107
await add_many_to_timeseries(
118108
(
119109
(ts_price_key, 'btc_price'),
@@ -129,61 +119,35 @@ async def get_hourly_average(ts_key: str, top_of_the_hour: int):
129119
)
130120
# Return the average without the timestamp. The response is a list
131121
# of the structure [timestamp, average].
132-
return response[0][1]
133-
134-
135-
async def get_current_hour_data(keys):
136-
ts_sentiment_key = keys.timeseries_30_second_sentiment_key()
137-
ts_price_key = keys.timeseries_30_second_price_key()
138-
top_of_the_hour = int(
139-
datetime.utcnow().replace(
140-
minute=0,
141-
second=0,
142-
microsecond=0,
143-
).timestamp() * 1000,
144-
)
145-
current_hour_avg_sentiment = await get_hourly_average(ts_sentiment_key, top_of_the_hour)
146-
current_hour_avg_price = await get_hourly_average(ts_price_key, top_of_the_hour)
122+
return response
147123

148-
return {
149-
'time': datetime.fromtimestamp(top_of_the_hour / 1000, tz=timezone.utc).isoformat(),
150-
'price': current_hour_avg_price,
151-
'sentiment': current_hour_avg_sentiment,
152-
}
124+
125+
def datetime_parser(dct):
126+
for k, v in dct.items():
127+
if isinstance(v, str) and v.endswith('+00:00'):
128+
try:
129+
dct[k] = datetime.datetime.fromisoformat(v)
130+
except:
131+
pass
132+
return dct
153133

154134

155-
async def get_current_hour_cache(keys: Keys):
135+
async def get_cache(keys: Keys):
156136
current_hour_cache_key = keys.cache_key()
157137
current_hour_stats = await redis.get(current_hour_cache_key)
158138

159139
if current_hour_stats:
160-
return json.loads(current_hour_stats)
140+
return json.loads(current_hour_stats, object_hook=datetime_parser)
161141

162142

163-
async def refresh_hourly_cache(keys: Keys):
164-
current_hour_stats = await get_current_hour_data(keys)
143+
async def set_cache(data, keys: Keys):
144+
def serialize_dates(v): return v.isoformat(
145+
) if isinstance(v, datetime) else v
165146
await redis.set(
166-
keys.cache_key(), json.dumps(current_hour_stats),
147+
keys.cache_key(),
148+
json.dumps(data, default=serialize_dates),
167149
ex=TWO_MINUTES,
168150
)
169-
return current_hour_stats
170-
171-
172-
async def set_current_hour_cache(keys: Keys):
173-
# First, scrape the sentiment API and persist the data.
174-
data = requests.get(SENTIMENT_API_URL).json()
175-
await persist(keys, data)
176-
177-
# Now that we've ingested raw sentiment data, aggregate it for the current
178-
# hour and cache the result.
179-
return await refresh_hourly_cache(keys)
180-
181-
182-
@app.get('/refresh')
183-
async def bitcoin(keys: Keys = Depends(make_keys)):
184-
data = requests.get(SENTIMENT_API_URL).json()
185-
await persist(keys, data)
186-
await refresh_hourly_cache(keys)
187151

188152

189153
def get_direction(last_three_hours, key: str):
@@ -195,25 +159,24 @@ def get_direction(last_three_hours, key: str):
195159
return 'flat'
196160

197161

198-
@app.get('/is-bitcoin-lit')
199-
async def bitcoin(keys: Keys = Depends(make_keys)):
200-
now = datetime.utcnow()
201-
sentiment_1h_key = keys.timeseries_1_hour_sentiment_key()
202-
price_1h_key = keys.timeseries_1_hour_price_key()
203-
current_hour_stats_cached = await get_current_hour_cache(keys)
204-
205-
if not current_hour_stats_cached:
206-
current_hour_stats_cached = await set_current_hour_cache(keys)
207-
208-
three_hours_ago_ms = int((now - timedelta(hours=3)).timestamp() * 1000)
209-
sentiment = await redis.execute_command('TS.RANGE', sentiment_1h_key, three_hours_ago_ms, '+')
210-
price = await redis.execute_command('TS.RANGE', price_1h_key, three_hours_ago_ms, '+')
211-
past_hours = [{
162+
def now():
163+
"""Wrap call to utcnow, so that we can mock this function in tests."""
164+
return datetime.utcnow()
165+
166+
167+
async def calculate_three_hours_of_data(keys: Keys) -> Dict[str, str]:
168+
sentiment_key = keys.timeseries_sentiment_key()
169+
price_key = keys.timeseries_price_key()
170+
three_hours_ago_ms = int((now() - timedelta(hours=3)).timestamp() * 1000)
171+
172+
sentiment = await get_hourly_average(sentiment_key, three_hours_ago_ms)
173+
price = await get_hourly_average(price_key, three_hours_ago_ms)
174+
175+
last_three_hours = [{
212176
'price': data[0][1], 'sentiment': data[1][1],
213177
'time': datetime.fromtimestamp(data[0][0] / 1000, tz=timezone.utc),
214178
}
215179
for data in zip(price, sentiment)]
216-
last_three_hours = past_hours + [current_hour_stats_cached]
217180

218181
return {
219182
'hourly_average_of_averages': last_three_hours,
@@ -222,6 +185,25 @@ async def bitcoin(keys: Keys = Depends(make_keys)):
222185
}
223186

224187

188+
@app.post('/refresh')
189+
async def bitcoin(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
190+
data = requests.get(SENTIMENT_API_URL).json()
191+
await persist(keys, data)
192+
data = await calculate_three_hours_of_data(keys)
193+
background_tasks.add_task(set_cache, data, keys)
194+
195+
196+
@app.get('/is-bitcoin-lit')
197+
async def bitcoin(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
198+
data = await get_cache(keys)
199+
200+
if not data:
201+
data = await calculate_three_hours_of_data(keys)
202+
background_tasks.add_task(set_cache, data, keys)
203+
204+
return data
205+
206+
225207
async def make_timeseries(key):
226208
"""
227209
Create a timeseries with the Redis key `key`.
@@ -243,36 +225,9 @@ async def make_timeseries(key):
243225
log.info('Could not create timeseries %s, error: %s', key, e)
244226

245227

246-
async def make_rule(src: str, dest: str):
247-
"""
248-
Create a compaction rule from timeseries at `str` to `dest`.
249-
250-
This rule aggregates metrics using 'avg' into hourly buckets.
251-
"""
252-
try:
253-
await redis.execute_command(
254-
'TS.CREATERULE', src, dest, 'AGGREGATION', 'avg', HOURLY_BUCKET,
255-
)
256-
except ResponseError as e:
257-
# Rule probably already exists.
258-
log.info(
259-
'Could not create timeseries rule (from %s to %s), error: %s', src, dest, e,
260-
)
261-
262-
263228
async def initialize_redis(keys: Keys):
264-
ts_30_sec_sentiment = keys.timeseries_30_second_sentiment_key()
265-
ts_1_hour_sentiment = keys.timeseries_1_hour_sentiment_key()
266-
ts_30_sec_price = keys.timeseries_30_second_price_key()
267-
ts_1_hour_price = keys.timeseries_1_hour_price_key()
268-
269-
await make_timeseries(ts_30_sec_sentiment)
270-
await make_timeseries(ts_1_hour_sentiment)
271-
await make_timeseries(ts_30_sec_price)
272-
await make_timeseries(ts_1_hour_price)
273-
274-
await make_rule(ts_30_sec_sentiment, ts_1_hour_sentiment)
275-
await make_rule(ts_30_sec_price, ts_1_hour_price)
229+
await make_timeseries(keys.timeseries_sentiment_key())
230+
await make_timeseries(keys.timeseries_price_key())
276231

277232

278233
@app.on_event('startup')

poetry.lock

Lines changed: 13 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pytest-cov = "^2.10.1"
2121
pytest-asyncio = "^0.15.1"
2222
httpx = "^0.18.1"
2323
ipdb = "^0.13.9"
24+
yapf = "^0.31.0"
2425

2526
[build-system]
2627
requires = ["poetry>=0.12"]

tests/test_bitcoin_api.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import json
23
import os.path
34
from unittest import mock
@@ -23,23 +24,28 @@
2324

2425
@pytest.fixture
2526
def mock_bitcoin_api():
26-
with mock.patch('requests.get') as mock_get:
27-
with mock.patch('requests.Response') as mock_response:
28-
# Mock out Response.json()
29-
m = mock.MagicMock()
30-
with open(JSON_FIXTURE) as f:
31-
m.return_value = json.loads(f.read())
32-
mock_response.json = m
27+
with mock.patch('app.main.now') as mock_utcnow:
28+
with mock.patch('requests.get') as mock_get:
29+
with mock.patch('requests.Response') as mock_response:
30+
mock_utcnow.return_value = datetime.datetime(
31+
2021, 7, 7, 10, 30, 0, 0, # 2020-07-07 10:30:00 UTC
32+
)
3333

34-
# Make get() return our fake Response.
35-
mock_get.return_value = mock_response
34+
# Mock out Response.json()
35+
m = mock.MagicMock()
36+
with open(JSON_FIXTURE) as f:
37+
m.return_value = json.loads(f.read())
38+
mock_response.json = m
3639

37-
yield mock_get
40+
# Make get() return our fake Response.
41+
mock_get.return_value = mock_response
42+
43+
yield mock_get
3844

3945

4046
@pytest.mark.asyncio
4147
async def test_api(client: AsyncClient, mock_bitcoin_api: mock.MagicMock):
42-
await client.get(REFRESH_URL)
48+
await client.post(REFRESH_URL)
4349
res = await client.get(URL)
4450
summary = res.json()
4551

@@ -54,7 +60,7 @@ async def test_api_timeseries(
5460
client: AsyncClient, redis: Redis,
5561
mock_bitcoin_api: mock.MagicMock
5662
):
57-
await client.get(REFRESH_URL)
63+
await client.post(REFRESH_URL)
5864
data = await client.get(URL)
5965
summary = data.json()
6066

0 commit comments

Comments
 (0)