Spaces:
Sleeping
Sleeping
import tempfile as tfile | |
from datetime import datetime | |
from urllib.request import urlopen | |
import requests | |
from keras.utils import img_to_array | |
from lxml import etree | |
import keras | |
from keras.applications.imagenet_utils import decode_predictions, preprocess_input | |
from keras.models import Model | |
from PIL import Image | |
from io import BytesIO | |
import numpy as np | |
from sklearn.decomposition import PCA | |
from scipy.spatial import distance | |
from collections import OrderedDict | |
from consts import API_KEY | |
from schemas import Shop | |
def get_ids_from_feed(feed_url): | |
# create temp xml file | |
temp_file = tfile.NamedTemporaryFile(mode="w", suffix=".xml", prefix="feed") | |
f = temp_file.name | |
temp_file.write(urlopen(feed_url).read().decode('utf-8')) | |
# open xml file | |
tree = etree.parse(f) | |
temp_file.close() | |
root = tree.getroot() | |
# get image ids and shop base url | |
list_ids = [] | |
shop_url = root[0][1].text | |
for item in root.findall(".//g:mpn", root.nsmap): | |
list_ids.append(item.text) | |
return list_ids, shop_url | |
def get_image(url): | |
res = requests.get(url) | |
im = Image.open(BytesIO(res.content)).convert("RGB").resize((224, 224)) | |
img = img_to_array(im) | |
x = img_to_array(img) | |
x = np.expand_dims(x, axis=0) | |
x = preprocess_input(x) | |
return img, x | |
def load_image(url, img_id): | |
# print('get image url', id) | |
request_url = '{}/flat_thumb/{}/1/224'.format(url, img_id) | |
print('get image', request_url) | |
img, x = get_image(request_url) | |
return img, x | |
async def create_feature_files(shop: Shop): | |
model = keras.applications.VGG16(weights='imagenet', include_top=True) | |
feat_extractor = Model(inputs=model.input, outputs=model.get_layer("fc2").output) | |
await calculate_shop(shop, feat_extractor) | |
async def calculate_shop(shop: Shop, feat_extractor) -> None: | |
if shop.id: # temp | |
print(shop.id, shop.base_url, datetime.now()) | |
google_xml_feed_url = '{}/google_xml_feed'.format(shop.base_url) | |
try: | |
list_ids, shop_url = get_ids_from_feed(google_xml_feed_url) | |
except Exception as e: | |
list_ids = [] | |
print('could not get images from ', shop.id, e) | |
features = [] | |
list_of_fitted_designs = [] | |
design_json = {} | |
if len(list_ids) > 0: | |
print(f"step1: {datetime.now()}") | |
for l in list_ids: | |
try: | |
img, x = load_image(shop_url, l) | |
feat = feat_extractor.predict(x)[0] | |
features.append(feat) | |
list_of_fitted_designs.append(l) | |
except Exception as e: | |
print(l, ' failed loading feature extraction', e) | |
print(f"step2: {datetime.now()}") | |
try: | |
features = np.array(features) | |
# print(features.shape) | |
components = len(features) if len(features) < 300 else 300 | |
pca = PCA(n_components=components) # 300 | |
pca.fit(features) | |
pca_features = pca.transform(features) | |
except Exception as e: | |
print('pca too small?', e) | |
if len(list_of_fitted_designs) >= 80: | |
max_list_per_design = 80 | |
else: | |
max_list_per_design = len(list_of_fitted_designs) | |
try: | |
for im in list_of_fitted_designs: | |
query_image_idx = list_of_fitted_designs.index(im) | |
similar_idx = [distance.cosine(pca_features[query_image_idx], feat) for feat in pca_features] | |
filterd_idx = dict() | |
for i in range(len(similar_idx)): | |
filterd_idx[i] = {"dist": similar_idx[i], "id": list_of_fitted_designs[i]} | |
sorted_dict = dict( | |
OrderedDict(sorted(filterd_idx.items(), key=lambda i: i[1]['dist'])[1:max_list_per_design])) | |
design_list = [] | |
for k, v in sorted_dict.items(): | |
design_list.append(v) | |
design_dict = {"shop_id": shop.id, "design": im, | |
"recommendations": design_list | |
} | |
# print(design_dict) | |
if await push_home(design_dict, shop): | |
pass | |
else: | |
print("error sending recommendations") | |
# if calculation is ready send update home | |
print(f"step3: {datetime.now()}") | |
if await update_calculation_date(shop): | |
pass | |
else: | |
print("error sending shop calculation update") | |
except Exception as e: | |
print("could not create json with look-a-like for shop:", shop.id, e) | |
print(f"calculation for {shop.id} ended at {datetime.now()}") | |
async def push_home(design_dict, shop): | |
headers: dict[str, str] = { | |
"Authorization": API_KEY, | |
"Content-type": "application/json", | |
} | |
try: | |
url=f"{shop.webhook_url}/fill_recommendations" | |
response = requests.post(url, json=design_dict, headers=headers) | |
response.raise_for_status() | |
return True | |
except Exception as e: | |
print(e) | |
return False | |
async def update_calculation_date(shop): | |
headers: dict[str, str] = { | |
"Authorization": API_KEY, | |
"Content-type": "application/json", | |
} | |
try: | |
url = f"{shop.webhook_url}/shop_updated" | |
data = {"shop_id": shop.id} | |
response = requests.post(url, json=data, headers=headers) | |
response.raise_for_status() | |
return True | |
except Exception as e: | |
print(e) | |
return False | |