import tempfile as tfile from datetime import datetime from urllib.request import urlopen import requests from keras.utils import img_to_array from lxml import etree import keras from keras.applications.imagenet_utils import decode_predictions, preprocess_input from keras.models import Model from PIL import Image from io import BytesIO import numpy as np from sklearn.decomposition import PCA from scipy.spatial import distance from collections import OrderedDict from consts import API_KEY from schemas import Shop def get_ids_from_feed(feed_url): # create temp xml file temp_file = tfile.NamedTemporaryFile(mode="w", suffix=".xml", prefix="feed") f = temp_file.name temp_file.write(urlopen(feed_url).read().decode('utf-8')) # open xml file tree = etree.parse(f) temp_file.close() root = tree.getroot() # get image ids and shop base url list_ids = [] shop_url = root[0][1].text for item in root.findall(".//g:mpn", root.nsmap): list_ids.append(item.text) return list_ids, shop_url def get_image(url): res = requests.get(url) im = Image.open(BytesIO(res.content)).convert("RGB").resize((224, 224)) img = img_to_array(im) x = img_to_array(img) x = np.expand_dims(x, axis=0) x = preprocess_input(x) return img, x def load_image(url, img_id): # print('get image url', id) request_url = '{}/flat_thumb/{}/1/224'.format(url, img_id) print('get image', request_url) img, x = get_image(request_url) return img, x async def create_feature_files(shop: Shop): model = keras.applications.VGG16(weights='imagenet', include_top=True) feat_extractor = Model(inputs=model.input, outputs=model.get_layer("fc2").output) await calculate_shop(shop, feat_extractor) async def calculate_shop(shop: Shop, feat_extractor) -> None: if shop.id: # temp print(shop.id, shop.base_url, datetime.now()) google_xml_feed_url = '{}/google_xml_feed'.format(shop.base_url) try: list_ids, shop_url = get_ids_from_feed(google_xml_feed_url) except Exception as e: list_ids = [] print('could not get images from ', shop.id, e) features = [] list_of_fitted_designs = [] design_json = {} if len(list_ids) > 0: print(f"step1: {datetime.now()}") for l in list_ids: try: img, x = load_image(shop_url, l) feat = feat_extractor.predict(x)[0] features.append(feat) list_of_fitted_designs.append(l) except Exception as e: print(l, ' failed loading feature extraction', e) print(f"step2: {datetime.now()}") try: features = np.array(features) # print(features.shape) components = len(features) if len(features) < 300 else 300 pca = PCA(n_components=components) # 300 pca.fit(features) pca_features = pca.transform(features) except Exception as e: print('pca too small?', e) if len(list_of_fitted_designs) >= 80: max_list_per_design = 80 else: max_list_per_design = len(list_of_fitted_designs) try: for im in list_of_fitted_designs: query_image_idx = list_of_fitted_designs.index(im) similar_idx = [distance.cosine(pca_features[query_image_idx], feat) for feat in pca_features] filterd_idx = dict() for i in range(len(similar_idx)): filterd_idx[i] = {"dist": similar_idx[i], "id": list_of_fitted_designs[i]} sorted_dict = dict( OrderedDict(sorted(filterd_idx.items(), key=lambda i: i[1]['dist'])[1:max_list_per_design])) design_list = [] for k, v in sorted_dict.items(): design_list.append(v) design_dict = {"shop_id": shop.id, "design": im, "recommendations": design_list } # print(design_dict) if await push_home(design_dict, shop): pass else: print("error sending recommendations") # if calculation is ready send update home print(f"step3: {datetime.now()}") if await update_calculation_date(shop): pass else: print("error sending shop calculation update") except Exception as e: print("could not create json with look-a-like for shop:", shop.id, e) print(f"calculation for {shop.id} ended at {datetime.now()}") async def push_home(design_dict, shop): headers: dict[str, str] = { "Authorization": API_KEY, "Content-type": "application/json", } try: url=f"{shop.webhook_url}/fill_recommendations" response = requests.post(url, json=design_dict, headers=headers) response.raise_for_status() return True except Exception as e: print(e) return False async def update_calculation_date(shop): headers: dict[str, str] = { "Authorization": API_KEY, "Content-type": "application/json", } try: url = f"{shop.webhook_url}/shop_updated" data = {"shop_id": shop.id} response = requests.post(url, json=data, headers=headers) response.raise_for_status() return True except Exception as e: print(e) return False