import requests import sqlite3 import sys DB_PATH = "tarkov.db" API_URL = "https://api.tarkov.dev/graphql" # Slot nameId patterns that identify what type of mod goes in a slot SLOT_TYPE_MAP = { "mod_muzzle": "suppressor", "mod_scope": "scope", "mod_tactical": "flashlight", "mod_tactical_001": "flashlight", "mod_tactical_002": "flashlight", "mod_tactical_003": "flashlight", "mod_stock": "stock", "mod_stock_000": "stock", "mod_stock_001": "stock", "mod_pistol_grip": "grip", "mod_grip": "foregrip", "mod_foregrip": "foregrip", "mod_magazine": "magazine", "mod_barrel": "barrel", "mod_gas_block": "gas_block", "mod_handguard": "handguard", "mod_launcher": "launcher", "mod_bipod": "bipod", } # Fetch weapons separately due to large slots/allowedItems payload GRAPHQL_QUERY_WEAPONS = """ { weapons: items(types: [gun]) { id name shortName weight gridImageLink wikiLink properties { ... on ItemPropertiesWeapon { caliber fireRate ergonomics recoilVertical defaultWeight slots { id name nameId required filters { allowedItems { id } } } } } } } """ GRAPHQL_QUERY_GEAR = """ { armor: items(types: [armor]) { id name shortName weight gridImageLink wikiLink properties { ... on ItemPropertiesArmor { class durability material { name } zones } } } helmets: items(types: [helmet]) { id name shortName weight gridImageLink wikiLink properties { ... on ItemPropertiesHelmet { class durability material { name } headZones deafening } } } wearables: items(types: [wearable]) { id name shortName weight gridImageLink wikiLink properties { ... on ItemPropertiesHelmet { class durability material { name } headZones deafening } } } backpacks: items(types: [backpack]) { id name shortName weight gridImageLink wikiLink properties { ... on ItemPropertiesBackpack { capacity grids { width height } } } } rigs: items(types: [rig]) { id name shortName weight gridImageLink wikiLink properties { ... on ItemPropertiesChestRig { capacity class durability zones } } } suppressors: items(types: [suppressor]) { id name shortName weight gridImageLink wikiLink } mods: items(types: [mods]) { id name shortName weight gridImageLink wikiLink } } """ def gql(query, label="query"): response = requests.post( API_URL, json={"query": query}, timeout=90 ) response.raise_for_status() data = response.json() if "errors" in data: raise RuntimeError(f"{label} errors: {data['errors']}") return data["data"] def upsert_item(cursor, item_id, name, short_name, category, weight, grid_image_url, wiki_url, caliber=None, fire_rate=None, ergonomics=None, recoil_vertical=None, default_weight=None, armor_class=None, durability=None, material=None, zones=None, head_zones=None, deafening=None, capacity=None, mod_type=None): cursor.execute("SELECT id FROM gear_items WHERE api_id = ?", (item_id,)) existing = cursor.fetchone() if existing: cursor.execute(""" UPDATE gear_items SET name=?, short_name=?, category=?, weight_kg=?, grid_image_url=?, wiki_url=?, caliber=?, fire_rate=?, ergonomics=?, recoil_vertical=?, default_weight=?, armor_class=?, durability=?, material=?, zones=?, head_zones=?, deafening=?, capacity=?, mod_type=?, updated_at=CURRENT_TIMESTAMP WHERE api_id=? """, (name, short_name, category, weight, grid_image_url, wiki_url, caliber, fire_rate, ergonomics, recoil_vertical, default_weight, armor_class, durability, material, zones, head_zones, deafening, capacity, mod_type, item_id)) return "updated" else: cursor.execute(""" INSERT INTO gear_items (id, api_id, name, short_name, category, weight_kg, grid_image_url, wiki_url, caliber, fire_rate, ergonomics, recoil_vertical, default_weight, armor_class, durability, material, zones, head_zones, deafening, capacity, mod_type) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) """, (item_id, item_id, name, short_name, category, weight, grid_image_url, wiki_url, caliber, fire_rate, ergonomics, recoil_vertical, default_weight, armor_class, durability, material, zones, head_zones, deafening, capacity, mod_type)) return "inserted" def import_weapons(conn, weapons): cursor = conn.cursor() counts = {"inserted": 0, "updated": 0, "skipped": 0} # Collect slot data for phase 2 slot_data = {} # gun_id -> list of slot dicts for w in weapons: item_id = w.get("id") name = w.get("name") if not item_id or not name: counts["skipped"] += 1 continue props = w.get("properties") or {} result = upsert_item( cursor, item_id, name, w.get("shortName"), "gun", w.get("weight"), w.get("gridImageLink"), w.get("wikiLink"), caliber=props.get("caliber"), fire_rate=props.get("fireRate"), ergonomics=props.get("ergonomics"), recoil_vertical=props.get("recoilVertical"), default_weight=props.get("defaultWeight"), ) counts[result] += 1 slots = props.get("slots") or [] if slots: slot_data[item_id] = slots conn.commit() return counts, slot_data def import_armor(conn, items): cursor = conn.cursor() counts = {"inserted": 0, "updated": 0, "skipped": 0} for item in items: item_id = item.get("id") name = item.get("name") if not item_id or not name: counts["skipped"] += 1 continue props = item.get("properties") or {} material = (props.get("material") or {}).get("name") zones = ",".join(props.get("zones") or []) or None result = upsert_item( cursor, item_id, name, item.get("shortName"), "armor", item.get("weight"), item.get("gridImageLink"), item.get("wikiLink"), armor_class=props.get("class"), durability=props.get("durability"), material=material, zones=zones, ) counts[result] += 1 conn.commit() return counts def import_helmets(conn, items): cursor = conn.cursor() counts = {"inserted": 0, "updated": 0, "skipped": 0} for item in items: item_id = item.get("id") name = item.get("name") if not item_id or not name: counts["skipped"] += 1 continue props = item.get("properties") or {} material = (props.get("material") or {}).get("name") head_zones_list = props.get("headZones") or [] head_zones = ",".join(head_zones_list) or None # True helmets cover the top of the head; face masks etc. go to 'headwear' category = "helmet" if any("Top of the head" in z for z in head_zones_list) else "headwear" result = upsert_item( cursor, item_id, name, item.get("shortName"), category, item.get("weight"), item.get("gridImageLink"), item.get("wikiLink"), armor_class=props.get("class"), durability=props.get("durability"), material=material, head_zones=head_zones, deafening=props.get("deafening"), ) counts[result] += 1 conn.commit() return counts def import_backpacks(conn, items): cursor = conn.cursor() counts = {"inserted": 0, "updated": 0, "skipped": 0} for item in items: item_id = item.get("id") name = item.get("name") if not item_id or not name: counts["skipped"] += 1 continue props = item.get("properties") or {} capacity = props.get("capacity") if capacity is None: capacity = sum( g.get("width", 0) * g.get("height", 0) for g in (props.get("grids") or []) ) or None result = upsert_item( cursor, item_id, name, item.get("shortName"), "backpack", item.get("weight"), item.get("gridImageLink"), item.get("wikiLink"), capacity=capacity, ) counts[result] += 1 conn.commit() return counts def import_rigs(conn, items): cursor = conn.cursor() counts = {"inserted": 0, "updated": 0, "skipped": 0} for item in items: item_id = item.get("id") name = item.get("name") if not item_id or not name: counts["skipped"] += 1 continue props = item.get("properties") or {} zones = ",".join(props.get("zones") or []) or None result = upsert_item( cursor, item_id, name, item.get("shortName"), "rig", item.get("weight"), item.get("gridImageLink"), item.get("wikiLink"), armor_class=props.get("class"), durability=props.get("durability"), capacity=props.get("capacity"), zones=zones, ) counts[result] += 1 conn.commit() return counts def import_suppressors(conn, items): cursor = conn.cursor() counts = {"inserted": 0, "updated": 0, "skipped": 0} for item in items: item_id = item.get("id") name = item.get("name") if not item_id or not name: counts["skipped"] += 1 continue result = upsert_item( cursor, item_id, name, item.get("shortName"), "mod", item.get("weight"), item.get("gridImageLink"), item.get("wikiLink"), mod_type="suppressor", ) counts[result] += 1 conn.commit() return counts def import_mods(conn, items): """Import generic weapon mods (scopes, grips, stocks, barrels, etc.)""" cursor = conn.cursor() counts = {"inserted": 0, "updated": 0, "skipped": 0} for item in items: item_id = item.get("id") name = item.get("name") if not item_id or not name: counts["skipped"] += 1 continue # Don't overwrite suppressors already imported with their mod_type cursor.execute("SELECT id FROM gear_items WHERE api_id = ?", (item_id,)) existing = cursor.fetchone() if existing: # Only update weight/image if item already exists — don't overwrite mod_type cursor.execute(""" UPDATE gear_items SET name=?, short_name=?, weight_kg=?, grid_image_url=?, wiki_url=?, updated_at=CURRENT_TIMESTAMP WHERE api_id=? """, (name, item.get("shortName"), item.get("weight"), item.get("gridImageLink"), item.get("wikiLink"), item_id)) counts["updated"] += 1 else: result = upsert_item( cursor, item_id, name, item.get("shortName"), "mod", item.get("weight"), item.get("gridImageLink"), item.get("wikiLink"), ) counts[result] += 1 conn.commit() return counts def import_slots(conn, slot_data): """ Phase 2: insert gun_slots and gun_slot_items rows. slot_data: { gun_id: [slot_dict, ...] } Only inserts gun_slot_items rows where item_id exists in gear_items. """ cursor = conn.cursor() slots_inserted = 0 slot_items_inserted = 0 slot_items_skipped = 0 # Build set of known item IDs for fast lookup known_ids = {row[0] for row in cursor.execute("SELECT id FROM gear_items").fetchall()} for gun_id, slots in slot_data.items(): for slot in slots: slot_id = slot.get("id") slot_name = slot.get("name") or "" slot_nameid = slot.get("nameId") or "" required = 1 if slot.get("required") else 0 if not slot_id: continue cursor.execute(""" INSERT OR REPLACE INTO gun_slots (gun_id, slot_id, slot_name, slot_nameid, required) VALUES (?, ?, ?, ?, ?) """, (gun_id, slot_id, slot_name, slot_nameid, required)) slots_inserted += 1 filters = slot.get("filters") or {} allowed_items = filters.get("allowedItems") or [] for allowed in allowed_items: item_id = allowed.get("id") if not item_id or item_id not in known_ids: slot_items_skipped += 1 continue cursor.execute(""" INSERT OR IGNORE INTO gun_slot_items (gun_id, slot_id, item_id) VALUES (?, ?, ?) """, (gun_id, slot_id, item_id)) slot_items_inserted += 1 conn.commit() return slots_inserted, slot_items_inserted, slot_items_skipped def main(): print("=== OnlyScavs Gear Import ===") # Phase 1a: fetch weapons (separate query — large payload) print("\nFetching weapons from tarkov.dev...") try: weapon_data = gql(GRAPHQL_QUERY_WEAPONS, "weapons") except Exception as e: print(f"ERROR fetching weapons: {e}") sys.exit(1) weapons = weapon_data.get("weapons", []) print(f" Fetched {len(weapons)} weapons") # Phase 1b: fetch other gear print("Fetching armor, helmets, backpacks, rigs, suppressors...") try: gear_data = gql(GRAPHQL_QUERY_GEAR, "gear") except Exception as e: print(f"ERROR fetching gear: {e}") sys.exit(1) armor = gear_data.get("armor", []) helmets = gear_data.get("helmets", []) backpacks = gear_data.get("backpacks", []) rigs = gear_data.get("rigs", []) suppressors = gear_data.get("suppressors", []) mods = gear_data.get("mods", []) # Wearables: only keep those whose properties resolved to ItemPropertiesHelmet # (i.e. they have 'class' or 'headZones' set — not decorative items) wearables_raw = gear_data.get("wearables", []) # Known helmet IDs from the main helmet query — skip duplicates helmet_ids = {h["id"] for h in helmets} wearable_helmets = [ w for w in wearables_raw if w["id"] not in helmet_ids and isinstance(w.get("properties"), dict) and (w["properties"].get("class") or w["properties"].get("headZones")) ] print(f" armor={len(armor)}, helmets={len(helmets)}, wearable_helmets={len(wearable_helmets)}, " f"backpacks={len(backpacks)}, rigs={len(rigs)}, suppressors={len(suppressors)}, mods={len(mods)}") # Phase 2: insert into DB conn = sqlite3.connect(DB_PATH) conn.execute("PRAGMA foreign_keys = ON") try: print("\nImporting weapons...") wcounts, slot_data = import_weapons(conn, weapons) print(f" guns: +{wcounts['inserted']} inserted, ~{wcounts['updated']} updated, -{wcounts['skipped']} skipped") print("Importing armor...") c = import_armor(conn, armor) print(f" armor: +{c['inserted']} inserted, ~{c['updated']} updated, -{c['skipped']} skipped") print("Importing helmets...") c = import_helmets(conn, helmets) print(f" helmets: +{c['inserted']} inserted, ~{c['updated']} updated, -{c['skipped']} skipped") print("Importing wearable helmets (missing from helmet type)...") c = import_helmets(conn, wearable_helmets) print(f" wearable helmets: +{c['inserted']} inserted, ~{c['updated']} updated, -{c['skipped']} skipped") print("Importing backpacks...") c = import_backpacks(conn, backpacks) print(f" backpacks: +{c['inserted']} inserted, ~{c['updated']} updated, -{c['skipped']} skipped") print("Importing rigs...") c = import_rigs(conn, rigs) print(f" rigs: +{c['inserted']} inserted, ~{c['updated']} updated, -{c['skipped']} skipped") print("Importing suppressors...") c = import_suppressors(conn, suppressors) print(f" suppressors: +{c['inserted']} inserted, ~{c['updated']} updated, -{c['skipped']} skipped") print("Importing mods (scopes, grips, stocks, barrels, etc.)...") c = import_mods(conn, mods) print(f" mods: +{c['inserted']} inserted, ~{c['updated']} updated, -{c['skipped']} skipped") # Phase 3: gun slots (needs all items in DB first) print("Importing gun slots and compatibility data...") slots_ins, slot_items_ins, slot_items_skip = import_slots(conn, slot_data) print(f" gun_slots: {slots_ins} rows") print(f" gun_slot_items: {slot_items_ins} rows inserted, {slot_items_skip} skipped (item not in DB)") # Phase 4: classify mod_type for mods based on which slots they appear in print("Classifying mod types from slot data...") cursor = conn.cursor() updated_mod_types = 0 for slot_nameid, mod_type in SLOT_TYPE_MAP.items(): result = cursor.execute(""" UPDATE gear_items SET mod_type = ? WHERE id IN ( SELECT gsi.item_id FROM gun_slot_items gsi JOIN gun_slots gs ON gs.gun_id = gsi.gun_id AND gs.slot_id = gsi.slot_id WHERE gs.slot_nameid = ? ) AND category = 'mod' AND (mod_type IS NULL OR mod_type != 'suppressor') """, (mod_type, slot_nameid)) updated_mod_types += result.rowcount conn.commit() print(f" mod_type set on {updated_mod_types} mod items") except Exception as e: conn.rollback() print(f"ERROR: Database operation failed — {e}") import traceback traceback.print_exc() sys.exit(1) finally: conn.close() print("\n=== Import complete ===") print("Run: SELECT category, COUNT(*) FROM gear_items GROUP BY category;") print(" to verify row counts.") if __name__ == "__main__": main()