3
3
import logging
4
4
from datetime import datetime
5
5
from datetime import timedelta
6
+ from datetime import timezone
6
7
from typing import Dict
7
8
from typing import Iterable
8
9
from typing import List
9
- from typing import Optional
10
10
from typing import Tuple
11
11
from typing import Union
12
12
13
13
import aioredis
14
14
import requests
15
15
from aioredis .exceptions import ResponseError
16
- from dateutil .rrule import HOURLY
17
- from dateutil .rrule import rrule
18
- from fastapi import BackgroundTasks
19
- from fastapi import Body
20
16
from fastapi import Depends
21
17
from fastapi import FastAPI
22
18
from pydantic import BaseSettings
@@ -74,7 +70,7 @@ def timeseries_1_hour_price_key(self) -> str:
74
70
75
71
@prefixed_key
76
72
def cache_key (self ) -> str :
77
- return f'sentiment:mean:1h: cache'
73
+ return f'cache'
78
74
79
75
80
76
class Config (BaseSettings ):
@@ -143,13 +139,15 @@ async def get_current_hour_data(keys):
143
139
top_of_the_hour = int (
144
140
datetime .utcnow ().replace (
145
141
minute = 0 ,
142
+ second = 0 ,
143
+ microsecond = 0 ,
146
144
).timestamp () * 1000 ,
147
145
)
148
146
current_hour_avg_sentiment = await get_hourly_average (ts_sentiment_key , top_of_the_hour )
149
147
current_hour_avg_price = await get_hourly_average (ts_price_key , top_of_the_hour )
150
148
151
149
return {
152
- 'time' : top_of_the_hour ,
150
+ 'time' : datetime . fromtimestamp ( top_of_the_hour / 1000 , tz = timezone . utc ). isoformat () ,
153
151
'price' : current_hour_avg_price ,
154
152
'sentiment' : current_hour_avg_sentiment ,
155
153
}
@@ -179,7 +177,7 @@ async def set_current_hour_cache(keys: Keys):
179
177
180
178
# Now that we've ingested raw sentiment data, aggregate it for the current
181
179
# hour and cache the result.
182
- return refresh_hourly_cache (keys )
180
+ return await refresh_hourly_cache (keys )
183
181
184
182
185
183
@app .get ('/refresh' )
@@ -197,16 +195,17 @@ async def bitcoin(keys: Keys = Depends(make_keys)):
197
195
current_hour_stats_cached = await get_current_hour_cache (keys )
198
196
199
197
if not current_hour_stats_cached :
200
- await set_current_hour_cache (keys )
198
+ current_hour_stats_cached = await set_current_hour_cache (keys )
201
199
202
200
three_hours_ago_ms = int ((now - timedelta (hours = 3 )).timestamp () * 1000 )
203
201
sentiment = await redis .execute_command ('TS.RANGE' , sentiment_1h_key , three_hours_ago_ms , '+' )
204
202
price = await redis .execute_command ('TS.RANGE' , price_1h_key , three_hours_ago_ms , '+' )
205
- past_hours = [{'price' : data [0 ][1 ], 'sentiment' : data [1 ][1 ], 'time' : data [0 ][0 ]}
206
- for data in zip (price , sentiment )]
207
- current_hour = [current_hour_stats_cached ] + past_hours
208
-
209
- return current_hour + past_hours
203
+ past_hours = [{
204
+ 'price' : data [0 ][1 ], 'sentiment' : data [1 ][1 ],
205
+ 'time' : datetime .fromtimestamp (data [0 ][0 ] / 1000 , tz = timezone .utc ),
206
+ }
207
+ for data in zip (price , sentiment )]
208
+ return past_hours + [current_hour_stats_cached ]
210
209
211
210
212
211
async def make_timeseries (key ):
0 commit comments