Chapter 7: Data Flow & Pipeline
This chapter traces the complete lifecycle of product data as it moves through the AdPriority system – from Shopify catalog, through priority scoring, into Google Merchant Center, and ultimately into Google Ads campaign targeting. Each pipeline stage is documented with its trigger, latency, and failure mode.
5.1 End-to-End Data Flow
The following diagram shows the full pipeline from product creation in Shopify to budget allocation in Google Ads.
+-------------+ +--------------+ +--------------+ +---------------+
| Shopify | | AdPriority | | Priority | | Google |
| Store | --> | Database | --> | Scoring | --> | Sheet |
| | | | | Engine | | |
| 5,582 prods | | products | | Rules + | | id + |
| 124k GMC | | rules | | Seasons + | | custom_label_ |
| variants | | seasons | | Overrides | | 0 through 4 |
+------+------+ +------+-------+ +------+-------+ +-------+-------+
| | | |
| Webhook/ | On product | Hourly | Daily
| Import | change | batch | fetch
v v v v
+-------------+ +--------------+ +--------------+ +---------------+
| Product | | Store in | | Write to | | GMC |
| payload | | database | | Google | | Supplemental |
| received | | with rules | | Sheet via | | Feed |
| | | applied | | Sheets API | | |
+-------------+ +--------------+ +--------------+ +-------+-------+
|
Custom labels
applied to
GMC products
|
v
+---------------+
| Google Ads |
| PMAX |
| Campaigns |
| |
| Listing group |
| filters by |
| custom_label_0|
+---------------+
Total pipeline latency (worst case): ~25 hours
- Webhook delivery: seconds
- Priority calculation: < 1 second
- Sheet update: up to 1 hour (next hourly batch)
- GMC fetch: up to 24 hours (daily schedule)
- Google Ads applies labels: near-instant after GMC update
Total pipeline latency (best case): ~1 hour
- Manual sync trigger writes to Sheet immediately
- GMC manual fetch or recently-scheduled fetch picks up changes
5.2 MVP Flow: Google Sheets Pipeline
The MVP uses Google Sheets as the transport layer between AdPriority and GMC. This approach was validated with a 10-product test batch (10/10 matched, zero issues). See ADR-001 in Chapter 7 for the rationale.
AdPriority Backend
|
+----------+----------+
| |
Priority Engine Sheets Writer
| |
v v
+-------------+ +----------------+
| products | | Google Sheets |
| table | ---> | API v4 |
| | | |
| priority: 4 | | sheets.values |
| needs_sync | | .update() |
+-------------+ +-------+--------+
|
+--------------------+
|
v
+----------------+
| Google Sheet |
| |
| Row format: |
| id | = shopify_US_{prodId}_{varId}
| custom_label_0 | = priority-{0-5}
| custom_label_1 | = {season}
| custom_label_2 | = {category_group}
| custom_label_3 | = {product_status}
| custom_label_4 | = {brand_tier}
+-------+--------+
|
| GMC fetches daily (pull model)
v
+----------------+
| Google |
| Merchant |
| Center |
| |
| Matches rows |
| by id column |
| to primary |
| feed products |
+----------------+
Sheet sizing for Nexus:
| Metric | Value |
|---|---|
| Active Shopify products | ~2,425 |
| Estimated active GMC variants | ~15,000-20,000 |
| Columns per row | 6 |
| Total cells | ~120,000 |
| Google Sheets cell limit | 10,000,000 |
| Usage percentage | 1.2% |
The Sheet comfortably handles the Nexus catalog. Even at full 124,060 variants, usage would be only 7.4% of the limit.
5.3 Future Flow: Content API Pipeline
For the SaaS product (Growth tier and above), a direct Content API integration removes the 24-hour GMC fetch delay and eliminates the Google Sheet dependency.
AdPriority Backend
|
+----------+----------+
| |
Priority Engine Content API Writer
| |
v v
+-------------+ +---------------------+
| products | | GMC Content API |
| table | ---> | v2.1 |
| | | |
| priority: 4 | | products.update() |
| needs_sync | | or custombatch() |
+-------------+ +----------+----------+
|
Direct API call
(near real-time)
|
v
+---------------------+
| Google Merchant |
| Center |
| |
| Labels applied |
| within minutes |
+---------------------+
Content API constraints:
| Constraint | Value |
|---|---|
| Updates per product per day | Maximum 2 |
| Batch size | Up to 10,000 entries per custombatch |
| Rate limiting | Dynamic (exponential backoff required) |
| OAuth scope required | https://www.googleapis.com/auth/content |
| Auth per tenant | Each tenant connects their own GMC |
Hybrid approach (recommended for production SaaS):
- Google Sheets for Starter tier (simpler setup, daily sync)
- Content API for Growth/Pro/Enterprise tiers (faster updates)
5.4 Product Sync Pipeline
5.4.1 Initial Import (App Install)
When a merchant installs AdPriority, all active products are imported from Shopify and scored.
App Install
|
v
1. Create tenant record in stores table
|
v
2. Fetch products from Shopify GraphQL API
(paginated, 250 per page)
Nexus: 5,582 products = ~23 pages
|
v
3. For each product + variant:
+--------------------------------------------------+
| a. Generate GMC ID: |
| shopify_US_{productId}_{variantId} |
| |
| b. Determine product type mapping |
| (from category-mapping rules) |
| |
| c. Calculate initial priority: |
| - Check manual override (priority: 1) |
| - Check seasonal calendar (priority: 2) |
| - Check new arrival status (priority: 3) |
| - Check category rules (priority: 4) |
| - Apply store default (priority: 5) |
| |
| d. Determine custom labels: |
| label_0 = priority-{score} |
| label_1 = {current_season} |
| label_2 = {category_group} |
| label_3 = {product_status} |
| label_4 = {brand_tier} |
| |
| e. Insert into products table |
| (needs_sync = true) |
+--------------------------------------------------+
|
v
4. Trigger immediate Sheet sync
(write all products to Google Sheet)
|
v
5. Create sync_log entry
(products_total, products_success, products_failed)
|
v
6. Return import summary to UI
Estimated import time for Nexus: 2-5 minutes (network-bound by Shopify API pagination rate limits).
5.4.2 Webhook-Driven Updates
After initial import, Shopify webhooks keep the database current.
Shopify Platform AdPriority Backend
| |
| products/create |
| -----------------------------------> |
| | 1. Verify HMAC
| | 2. Parse product payload
| | 3. Generate GMC IDs for all variants
| | 4. Apply rules -> calculate priority
| | 5. INSERT into products table
| | 6. Mark needs_sync = true
| | 7. Return 200 OK
| |
| products/update |
| -----------------------------------> |
| | 1. Verify HMAC
| | 2. Parse product payload
| | 3. Check: is priority_locked?
| | YES -> skip recalculation
| | NO -> continue
| | 4. Check: did product_type change?
| | YES -> recalculate priority
| | 5. Check: did tags change?
| | YES -> recalculate status label
| | 6. Check: did inventory change?
| | YES -> check for out-of-stock (priority 0)
| | 7. UPDATE products table
| | 8. Mark needs_sync = true (if changed)
| | 9. Return 200 OK
| |
| products/delete |
| -----------------------------------> |
| | 1. Verify HMAC
| | 2. Soft-delete from products table
| | 3. Remove from next Sheet sync
| | 4. Return 200 OK
| |
| app/uninstalled |
| -----------------------------------> |
| | 1. Verify HMAC
| | 2. Mark store as inactive
| | 3. Retain data for 30 days
| | 4. Schedule cleanup job
| | 5. Return 200 OK
5.5 Priority Recalculation Triggers
Priority scores are not static. The following events trigger a recalculation for affected products.
+---------------------------+-------------------+---------------------------+
| Trigger | Scope | Mechanism |
+---------------------------+-------------------+---------------------------+
| Manual override | Single product | API call from UI |
| | | Sets priority_locked=true |
+---------------------------+-------------------+---------------------------+
| Bulk priority update | Selected products | API call from UI |
| | | Batch UPDATE |
+---------------------------+-------------------+---------------------------+
| Rule created/modified | All matching | Rule engine evaluates |
| | products | all products against |
| | | new/changed rule |
+---------------------------+-------------------+---------------------------+
| Rule deleted | Previously | Fallback to next |
| | matched products | applicable rule or |
| | | store default |
+---------------------------+-------------------+---------------------------+
| Season transition | All products with | Cron job (daily check) |
| (calendar date reached) | season_rules for | or manual trigger from UI |
| | that category | |
+---------------------------+-------------------+---------------------------+
| New product arrives | New product | Webhook: products/create |
| (Shopify webhook) | | Apply new arrival boost |
+---------------------------+-------------------+---------------------------+
| Product type/tag change | Changed product | Webhook: products/update |
| (Shopify webhook) | | Re-evaluate rules |
+---------------------------+-------------------+---------------------------+
| New arrival expires | Products past | Cron job (daily) |
| (boost duration ended) | boost duration | Revert to rule/default |
+---------------------------+-------------------+---------------------------+
| Inventory hits zero | Out-of-stock | Webhook: products/update |
| | product | Set priority to 0 |
+---------------------------+-------------------+---------------------------+
5.5.1 Priority Resolution Order
When multiple sources assign a priority, the following hierarchy determines which value wins.
Priority 1 (Highest): Manual override (priority_locked = true)
|
v
Priority 2: Seasonal calendar rule
| (category x current season)
v
Priority 3: New arrival boost
| (product created within N days)
v
Priority 4: Category-based rule
| (first matching rule by order_index)
v
Priority 5 (Lowest): Store default
(stores.default_priority, typically 3)
Resolution algorithm:
function calculatePriority(product, store, currentSeason):
// 1. Manual override always wins
if product.priority_locked:
return { score: product.priority, source: "manual" }
// 2. Check seasonal calendar
seasonRule = findSeasonRule(product.product_type, currentSeason, store.id)
if seasonRule exists:
return { score: seasonRule.priority, source: "seasonal" }
// 3. Check new arrival boost
daysSinceCreation = daysBetween(product.created_at, now())
if daysSinceCreation <= store.new_arrival_days:
return { score: store.new_arrival_priority, source: "new_arrival" }
// 4. Check category-based rules (ordered by order_index)
matchingRule = evaluateRules(product, store.rules)
if matchingRule exists:
return { score: matchingRule.priority, source: "rule" }
// 5. Fall back to store default
return { score: store.default_priority, source: "default" }
5.6 Data Freshness Requirements
Each stage of the pipeline has different latency expectations. The table below defines the target and maximum acceptable delay for each.
+----------------------------+-----------+-------------+--------------------+
| Pipeline Stage | Target | Max | Bottleneck |
+----------------------------+-----------+-------------+--------------------+
| Shopify webhook delivery | < 5 sec | < 60 sec | Shopify platform |
+----------------------------+-----------+-------------+--------------------+
| AdPriority DB update | < 1 sec | < 5 sec | Database query |
+----------------------------+-----------+-------------+--------------------+
| Priority recalculation | < 1 sec | < 10 sec | Rule engine eval |
+----------------------------+-----------+-------------+--------------------+
| Google Sheet update | < 1 hour | < 2 hours | Hourly batch job |
| (hourly batch) | | | |
+----------------------------+-----------+-------------+--------------------+
| GMC supplemental feed | < 24 hrs | < 48 hrs | GMC daily fetch |
| fetch (daily) | | | schedule |
+----------------------------+-----------+-------------+--------------------+
| Google Ads label impact | < 1 hour | < 4 hours | Google Ads sync |
| (after GMC update) | | | with GMC |
+----------------------------+-----------+-------------+--------------------+
Implication: Priority changes are not real-time in Google Ads. The system is designed for strategic decisions (seasonal shifts, new arrivals, rule changes) where a 24-hour propagation window is acceptable. This is consistent with how merchants manage their advertising – priorities change weekly or seasonally, not minute-by-minute.
5.7 Batch Sync Process (Detailed)
The hourly batch sync is the primary mechanism for moving data from AdPriority to Google Sheets. This section describes the process in detail.
Hourly Cron Job Fires (minute 0 of each hour)
|
v
1. Query pending products
SELECT p.*, s.google_refresh_token, s.sheet_id
FROM products p
JOIN stores s ON p.store_id = s.id
WHERE p.needs_sync = true
AND s.is_active = true
ORDER BY s.id, p.updated_at
|
v
2. Group by store_id
{ store_abc: [product1, product2, ...],
store_def: [product3, product4, ...] }
|
v
3. For each store (sequential to respect rate limits):
|
+-- a. Authenticate with Google
| (use store's refresh token -> access token)
|
+-- b. Fetch ALL products for store (not just pending)
| (full sheet rewrite for consistency)
|
+-- c. Build sheet data
| Header: [id, custom_label_0, ..., custom_label_4]
| Rows: one per active variant
|
+-- d. Write to Google Sheet
| sheets.spreadsheets.values.clear() // clear old data
| sheets.spreadsheets.values.update() // write new data
|
+-- e. Update database
| UPDATE products SET needs_sync=false, sync_status='synced',
| last_synced_at=now()
| WHERE store_id = ? AND needs_sync = true
|
+-- f. Create sync_log entry
INSERT INTO sync_logs (store_id, sync_type, status,
products_total, products_success, ...)
|
v
4. Log summary
"Sync complete: 3 stores, 15,234 products, 2 errors"
Error handling during sync:
| Error | Response |
|---|---|
| Google auth token expired | Refresh token, retry once |
| Google Sheets API rate limit | Exponential backoff (1s, 2s, 4s, max 30s) |
| Google Sheets quota exceeded | Skip store, retry next cycle |
| Database connection lost | Abort sync, alert, retry next cycle |
| Individual product format error | Skip product, log error, continue batch |
5.8 Reconciliation Pipeline
A daily reconciliation job ensures the AdPriority database stays in sync with the Shopify catalog, catching any missed webhooks or data drift.
Daily Reconciliation (03:00 UTC)
|
v
1. For each active store:
|
+-- a. Fetch all active products from Shopify
| (paginated GraphQL query)
|
+-- b. Fetch all products from AdPriority database
|
+-- c. Compare:
| +--------------------------------------------------+
| | In Shopify, NOT in DB -> New product |
| | Action: Insert, apply rules, mark needs_sync |
| +--------------------------------------------------+
| | In DB, NOT in Shopify -> Deleted product |
| | Action: Soft-delete, remove from next Sheet |
| +--------------------------------------------------+
| | In both, type changed -> Product updated |
| | Action: Recalculate priority, mark needs_sync |
| +--------------------------------------------------+
| | In both, no changes -> No action |
| +--------------------------------------------------+
|
+-- d. Generate reconciliation report
| { new: 5, deleted: 2, updated: 8, unchanged: 2410 }
|
+-- e. Alert if discrepancy > 5% of catalog
(unusual, may indicate missed webhooks)
|
v
2. Log reconciliation results in sync_logs
5.9 Season Transition Flow
When the calendar date crosses a season boundary, the system automatically recalculates priorities for all affected products.
Season Transition Check (daily at 00:00 UTC)
|
v
1. Determine current date
|
v
2. For each active store:
|
+-- a. Load store's season definitions
| (seasons table: name, start_month, start_day, end_month, end_day)
|
+-- b. Determine current season from today's date
|
+-- c. Compare with last known season
| (stored in store settings or derived from last transition log)
|
+-- d. If season changed:
|
+-- i. Load season_rules for new season
| (category x priority mappings)
|
+-- ii. For each product (not priority_locked):
| - Find matching season_rule by product_type
| - Update priority and priority_source='seasonal'
| - Mark needs_sync = true
|
+-- iii. Create audit_log entries for all changes
|
+-- iv. Create sync_log entry:
| "Season transition: Winter -> Spring, 1,847 products updated"
|
+-- v. Trigger immediate Sheet sync (don't wait for hourly)
|
+-- vi. Send notification to merchant:
"Season changed to Spring. 1,847 products updated."
Example transition (Nexus, Winter to Spring on March 1):
Product Type | Winter Priority | Spring Priority | Products Affected
-----------------+-----------------+-----------------+------------------
Jackets | 5 | 3 | ~120
Hoodies | 5 | 3 | ~85
Sweaters | 5 | 2 | ~40
Shorts | 0 | 2 | ~150
Tank Tops | 0 | 2 | ~60
T-Shirts | 2 | 3 | ~350
Jeans | 4 | 4 | ~200 (no change)
5.10 Data Flow Diagram: Label Values
This section documents exactly what values flow into each custom label slot.
+------------------+--------------------+------------------------------------+
| Label | Source Function | Possible Values |
+------------------+--------------------+------------------------------------+
| custom_label_0 | Priority engine | priority-0 |
| (Priority Score) | | priority-1 |
| | | priority-2 |
| | | priority-3 |
| | | priority-4 |
| | | priority-5 |
+------------------+--------------------+------------------------------------+
| custom_label_1 | Season calendar | winter |
| (Season) | | spring |
| | | summer |
| | | fall |
+------------------+--------------------+------------------------------------+
| custom_label_2 | Category mapping | t-shirts |
| (Category Group) | from product_type | jeans-pants |
| | | shorts |
| | | outerwear-heavy |
| | | outerwear-light |
| | | hoodies-sweatshirts |
| | | headwear-caps |
| | | headwear-cold-weather |
| | | accessories |
| | | footwear |
| | | (store-configurable) |
+------------------+--------------------+------------------------------------+
| custom_label_3 | Tags + age logic | new-arrival |
| (Product Status) | | in-stock |
| | | low-inventory |
| | | clearance |
| | | dead-stock |
+------------------+--------------------+------------------------------------+
| custom_label_4 | Vendor + tags | name-brand |
| (Brand Tier) | | store-brand |
| | | off-brand |
+------------------+--------------------+------------------------------------+
GMC constraint: Each label supports a maximum of 1,000 unique values. All label schemas above are well within this limit (6 values for label_0, 4 for label_1, ~10-20 for label_2, 5 for label_3, 3 for label_4).
5.11 Chapter Summary
| Pipeline Stage | Trigger | Latency | Transport |
|---|---|---|---|
| Product ingestion | Install / Webhook | Seconds | Shopify API/Webhook |
| Priority calculation | Event-driven | < 1 second | In-process |
| Sheet update | Hourly cron | < 1 hour | Google Sheets API |
| GMC label application | Daily GMC fetch | < 24 hours | Supplemental feed |
| Google Ads impact | GMC sync | < 4 hours | Internal Google |
| Reconciliation | Daily cron | Background | Shopify API + DB |
| Season transition | Daily cron / manual | Background | In-process + Sheet |
The pipeline is designed around the principle that priority changes are strategic, not tactical. A 24-hour propagation window is acceptable because merchants adjust priorities on a weekly or seasonal basis, not in response to minute-by-minute market conditions. This constraint allows the MVP to use the simple, validated Google Sheets pipeline rather than the more complex Content API integration.