Chapter 7: Data Flow & Pipeline

This chapter traces the complete lifecycle of product data as it moves through the AdPriority system – from Shopify catalog, through priority scoring, into Google Merchant Center, and ultimately into Google Ads campaign targeting. Each pipeline stage is documented with its trigger, latency, and failure mode.


5.1 End-to-End Data Flow

The following diagram shows the full pipeline from product creation in Shopify to budget allocation in Google Ads.

+-------------+     +--------------+     +--------------+     +---------------+
|   Shopify   |     |  AdPriority  |     |   Priority   |     |    Google     |
|   Store     | --> |   Database   | --> |   Scoring    | --> |    Sheet      |
|             |     |              |     |   Engine     |     |               |
| 5,582 prods |     | products     |     | Rules +      |     | id +          |
| 124k GMC    |     | rules        |     | Seasons +    |     | custom_label_ |
| variants    |     | seasons      |     | Overrides    |     | 0 through 4   |
+------+------+     +------+-------+     +------+-------+     +-------+-------+
       |                   |                    |                      |
       |  Webhook/         |  On product        |  Hourly              |  Daily
       |  Import           |  change            |  batch               |  fetch
       v                   v                    v                      v
+-------------+     +--------------+     +--------------+     +---------------+
|  Product    |     |  Store in    |     |  Write to    |     |    GMC        |
|  payload    |     |  database    |     |  Google      |     | Supplemental  |
|  received   |     |  with rules  |     |  Sheet via   |     |    Feed       |
|             |     |  applied     |     |  Sheets API  |     |               |
+-------------+     +--------------+     +--------------+     +-------+-------+
                                                                      |
                                                              Custom labels
                                                              applied to
                                                              GMC products
                                                                      |
                                                                      v
                                                              +---------------+
                                                              |  Google Ads   |
                                                              |  PMAX         |
                                                              |  Campaigns    |
                                                              |               |
                                                              | Listing group |
                                                              | filters by    |
                                                              | custom_label_0|
                                                              +---------------+

Total pipeline latency (worst case): ~25 hours

  • Webhook delivery: seconds
  • Priority calculation: < 1 second
  • Sheet update: up to 1 hour (next hourly batch)
  • GMC fetch: up to 24 hours (daily schedule)
  • Google Ads applies labels: near-instant after GMC update

Total pipeline latency (best case): ~1 hour

  • Manual sync trigger writes to Sheet immediately
  • GMC manual fetch or recently-scheduled fetch picks up changes

5.2 MVP Flow: Google Sheets Pipeline

The MVP uses Google Sheets as the transport layer between AdPriority and GMC. This approach was validated with a 10-product test batch (10/10 matched, zero issues). See ADR-001 in Chapter 7 for the rationale.

                        AdPriority Backend
                               |
                    +----------+----------+
                    |                     |
            Priority Engine        Sheets Writer
                    |                     |
                    v                     v
            +-------------+      +----------------+
            |  products   |      | Google Sheets  |
            |  table      | ---> | API v4         |
            |             |      |                |
            | priority: 4 |      | sheets.values  |
            | needs_sync  |      | .update()      |
            +-------------+      +-------+--------+
                                         |
                    +--------------------+
                    |
                    v
            +----------------+
            | Google Sheet   |
            |                |
            | Row format:    |
            | id             | = shopify_US_{prodId}_{varId}
            | custom_label_0 | = priority-{0-5}
            | custom_label_1 | = {season}
            | custom_label_2 | = {category_group}
            | custom_label_3 | = {product_status}
            | custom_label_4 | = {brand_tier}
            +-------+--------+
                    |
                    | GMC fetches daily (pull model)
                    v
            +----------------+
            | Google         |
            | Merchant       |
            | Center         |
            |                |
            | Matches rows   |
            | by id column   |
            | to primary     |
            | feed products  |
            +----------------+

Sheet sizing for Nexus:

MetricValue
Active Shopify products~2,425
Estimated active GMC variants~15,000-20,000
Columns per row6
Total cells~120,000
Google Sheets cell limit10,000,000
Usage percentage1.2%

The Sheet comfortably handles the Nexus catalog. Even at full 124,060 variants, usage would be only 7.4% of the limit.


5.3 Future Flow: Content API Pipeline

For the SaaS product (Growth tier and above), a direct Content API integration removes the 24-hour GMC fetch delay and eliminates the Google Sheet dependency.

                        AdPriority Backend
                               |
                    +----------+----------+
                    |                     |
            Priority Engine        Content API Writer
                    |                     |
                    v                     v
            +-------------+      +---------------------+
            |  products   |      | GMC Content API     |
            |  table      | ---> | v2.1                |
            |             |      |                     |
            | priority: 4 |      | products.update()   |
            | needs_sync  |      | or custombatch()    |
            +-------------+      +----------+----------+
                                            |
                                  Direct API call
                                  (near real-time)
                                            |
                                            v
                                 +---------------------+
                                 | Google Merchant     |
                                 | Center              |
                                 |                     |
                                 | Labels applied      |
                                 | within minutes      |
                                 +---------------------+

Content API constraints:

ConstraintValue
Updates per product per dayMaximum 2
Batch sizeUp to 10,000 entries per custombatch
Rate limitingDynamic (exponential backoff required)
OAuth scope requiredhttps://www.googleapis.com/auth/content
Auth per tenantEach tenant connects their own GMC

Hybrid approach (recommended for production SaaS):

  • Google Sheets for Starter tier (simpler setup, daily sync)
  • Content API for Growth/Pro/Enterprise tiers (faster updates)

5.4 Product Sync Pipeline

5.4.1 Initial Import (App Install)

When a merchant installs AdPriority, all active products are imported from Shopify and scored.

App Install
    |
    v
1. Create tenant record in stores table
    |
    v
2. Fetch products from Shopify GraphQL API
   (paginated, 250 per page)
   Nexus: 5,582 products = ~23 pages
    |
    v
3. For each product + variant:
   +--------------------------------------------------+
   | a. Generate GMC ID:                               |
   |    shopify_US_{productId}_{variantId}              |
   |                                                    |
   | b. Determine product type mapping                  |
   |    (from category-mapping rules)                   |
   |                                                    |
   | c. Calculate initial priority:                     |
   |    - Check manual override          (priority: 1)  |
   |    - Check seasonal calendar        (priority: 2)  |
   |    - Check new arrival status       (priority: 3)  |
   |    - Check category rules           (priority: 4)  |
   |    - Apply store default            (priority: 5)  |
   |                                                    |
   | d. Determine custom labels:                        |
   |    label_0 = priority-{score}                      |
   |    label_1 = {current_season}                      |
   |    label_2 = {category_group}                      |
   |    label_3 = {product_status}                      |
   |    label_4 = {brand_tier}                          |
   |                                                    |
   | e. Insert into products table                      |
   |    (needs_sync = true)                             |
   +--------------------------------------------------+
    |
    v
4. Trigger immediate Sheet sync
   (write all products to Google Sheet)
    |
    v
5. Create sync_log entry
   (products_total, products_success, products_failed)
    |
    v
6. Return import summary to UI

Estimated import time for Nexus: 2-5 minutes (network-bound by Shopify API pagination rate limits).

5.4.2 Webhook-Driven Updates

After initial import, Shopify webhooks keep the database current.

Shopify Platform                      AdPriority Backend
      |                                      |
      |  products/create                     |
      | -----------------------------------> |
      |                                      |  1. Verify HMAC
      |                                      |  2. Parse product payload
      |                                      |  3. Generate GMC IDs for all variants
      |                                      |  4. Apply rules -> calculate priority
      |                                      |  5. INSERT into products table
      |                                      |  6. Mark needs_sync = true
      |                                      |  7. Return 200 OK
      |                                      |
      |  products/update                     |
      | -----------------------------------> |
      |                                      |  1. Verify HMAC
      |                                      |  2. Parse product payload
      |                                      |  3. Check: is priority_locked?
      |                                      |     YES -> skip recalculation
      |                                      |     NO  -> continue
      |                                      |  4. Check: did product_type change?
      |                                      |     YES -> recalculate priority
      |                                      |  5. Check: did tags change?
      |                                      |     YES -> recalculate status label
      |                                      |  6. Check: did inventory change?
      |                                      |     YES -> check for out-of-stock (priority 0)
      |                                      |  7. UPDATE products table
      |                                      |  8. Mark needs_sync = true (if changed)
      |                                      |  9. Return 200 OK
      |                                      |
      |  products/delete                     |
      | -----------------------------------> |
      |                                      |  1. Verify HMAC
      |                                      |  2. Soft-delete from products table
      |                                      |  3. Remove from next Sheet sync
      |                                      |  4. Return 200 OK
      |                                      |
      |  app/uninstalled                     |
      | -----------------------------------> |
      |                                      |  1. Verify HMAC
      |                                      |  2. Mark store as inactive
      |                                      |  3. Retain data for 30 days
      |                                      |  4. Schedule cleanup job
      |                                      |  5. Return 200 OK

5.5 Priority Recalculation Triggers

Priority scores are not static. The following events trigger a recalculation for affected products.

+---------------------------+-------------------+---------------------------+
| Trigger                   | Scope             | Mechanism                 |
+---------------------------+-------------------+---------------------------+
| Manual override           | Single product    | API call from UI          |
|                           |                   | Sets priority_locked=true |
+---------------------------+-------------------+---------------------------+
| Bulk priority update      | Selected products | API call from UI          |
|                           |                   | Batch UPDATE              |
+---------------------------+-------------------+---------------------------+
| Rule created/modified     | All matching      | Rule engine evaluates     |
|                           | products          | all products against      |
|                           |                   | new/changed rule          |
+---------------------------+-------------------+---------------------------+
| Rule deleted              | Previously        | Fallback to next          |
|                           | matched products  | applicable rule or        |
|                           |                   | store default             |
+---------------------------+-------------------+---------------------------+
| Season transition         | All products with | Cron job (daily check)    |
| (calendar date reached)   | season_rules for  | or manual trigger from UI |
|                           | that category     |                           |
+---------------------------+-------------------+---------------------------+
| New product arrives       | New product       | Webhook: products/create  |
| (Shopify webhook)         |                   | Apply new arrival boost   |
+---------------------------+-------------------+---------------------------+
| Product type/tag change   | Changed product   | Webhook: products/update  |
| (Shopify webhook)         |                   | Re-evaluate rules         |
+---------------------------+-------------------+---------------------------+
| New arrival expires       | Products past     | Cron job (daily)          |
| (boost duration ended)    | boost duration    | Revert to rule/default    |
+---------------------------+-------------------+---------------------------+
| Inventory hits zero       | Out-of-stock      | Webhook: products/update  |
|                           | product           | Set priority to 0         |
+---------------------------+-------------------+---------------------------+

5.5.1 Priority Resolution Order

When multiple sources assign a priority, the following hierarchy determines which value wins.

Priority 1 (Highest):  Manual override (priority_locked = true)
    |
    v
Priority 2:            Seasonal calendar rule
    |                  (category x current season)
    v
Priority 3:            New arrival boost
    |                  (product created within N days)
    v
Priority 4:            Category-based rule
    |                  (first matching rule by order_index)
    v
Priority 5 (Lowest):   Store default
                       (stores.default_priority, typically 3)

Resolution algorithm:

function calculatePriority(product, store, currentSeason):
    // 1. Manual override always wins
    if product.priority_locked:
        return { score: product.priority, source: "manual" }

    // 2. Check seasonal calendar
    seasonRule = findSeasonRule(product.product_type, currentSeason, store.id)
    if seasonRule exists:
        return { score: seasonRule.priority, source: "seasonal" }

    // 3. Check new arrival boost
    daysSinceCreation = daysBetween(product.created_at, now())
    if daysSinceCreation <= store.new_arrival_days:
        return { score: store.new_arrival_priority, source: "new_arrival" }

    // 4. Check category-based rules (ordered by order_index)
    matchingRule = evaluateRules(product, store.rules)
    if matchingRule exists:
        return { score: matchingRule.priority, source: "rule" }

    // 5. Fall back to store default
    return { score: store.default_priority, source: "default" }

5.6 Data Freshness Requirements

Each stage of the pipeline has different latency expectations. The table below defines the target and maximum acceptable delay for each.

+----------------------------+-----------+-------------+--------------------+
| Pipeline Stage             | Target    | Max         | Bottleneck         |
+----------------------------+-----------+-------------+--------------------+
| Shopify webhook delivery   | < 5 sec   | < 60 sec    | Shopify platform   |
+----------------------------+-----------+-------------+--------------------+
| AdPriority DB update       | < 1 sec   | < 5 sec     | Database query     |
+----------------------------+-----------+-------------+--------------------+
| Priority recalculation     | < 1 sec   | < 10 sec    | Rule engine eval   |
+----------------------------+-----------+-------------+--------------------+
| Google Sheet update        | < 1 hour  | < 2 hours   | Hourly batch job   |
| (hourly batch)             |           |             |                    |
+----------------------------+-----------+-------------+--------------------+
| GMC supplemental feed      | < 24 hrs  | < 48 hrs    | GMC daily fetch    |
| fetch (daily)              |           |             | schedule           |
+----------------------------+-----------+-------------+--------------------+
| Google Ads label impact    | < 1 hour  | < 4 hours   | Google Ads sync    |
| (after GMC update)         |           |             | with GMC           |
+----------------------------+-----------+-------------+--------------------+

Implication: Priority changes are not real-time in Google Ads. The system is designed for strategic decisions (seasonal shifts, new arrivals, rule changes) where a 24-hour propagation window is acceptable. This is consistent with how merchants manage their advertising – priorities change weekly or seasonally, not minute-by-minute.


5.7 Batch Sync Process (Detailed)

The hourly batch sync is the primary mechanism for moving data from AdPriority to Google Sheets. This section describes the process in detail.

Hourly Cron Job Fires (minute 0 of each hour)
    |
    v
1. Query pending products
   SELECT p.*, s.google_refresh_token, s.sheet_id
   FROM products p
   JOIN stores s ON p.store_id = s.id
   WHERE p.needs_sync = true
   AND s.is_active = true
   ORDER BY s.id, p.updated_at
    |
    v
2. Group by store_id
   { store_abc: [product1, product2, ...],
     store_def: [product3, product4, ...] }
    |
    v
3. For each store (sequential to respect rate limits):
   |
   +-- a. Authenticate with Google
   |      (use store's refresh token -> access token)
   |
   +-- b. Fetch ALL products for store (not just pending)
   |      (full sheet rewrite for consistency)
   |
   +-- c. Build sheet data
   |      Header: [id, custom_label_0, ..., custom_label_4]
   |      Rows:   one per active variant
   |
   +-- d. Write to Google Sheet
   |      sheets.spreadsheets.values.clear()   // clear old data
   |      sheets.spreadsheets.values.update()  // write new data
   |
   +-- e. Update database
   |      UPDATE products SET needs_sync=false, sync_status='synced',
   |                         last_synced_at=now()
   |      WHERE store_id = ? AND needs_sync = true
   |
   +-- f. Create sync_log entry
          INSERT INTO sync_logs (store_id, sync_type, status,
                                 products_total, products_success, ...)
    |
    v
4. Log summary
   "Sync complete: 3 stores, 15,234 products, 2 errors"

Error handling during sync:

ErrorResponse
Google auth token expiredRefresh token, retry once
Google Sheets API rate limitExponential backoff (1s, 2s, 4s, max 30s)
Google Sheets quota exceededSkip store, retry next cycle
Database connection lostAbort sync, alert, retry next cycle
Individual product format errorSkip product, log error, continue batch

5.8 Reconciliation Pipeline

A daily reconciliation job ensures the AdPriority database stays in sync with the Shopify catalog, catching any missed webhooks or data drift.

Daily Reconciliation (03:00 UTC)
    |
    v
1. For each active store:
   |
   +-- a. Fetch all active products from Shopify
   |      (paginated GraphQL query)
   |
   +-- b. Fetch all products from AdPriority database
   |
   +-- c. Compare:
   |      +--------------------------------------------------+
   |      | In Shopify, NOT in DB  -> New product             |
   |      |   Action: Insert, apply rules, mark needs_sync   |
   |      +--------------------------------------------------+
   |      | In DB, NOT in Shopify  -> Deleted product         |
   |      |   Action: Soft-delete, remove from next Sheet     |
   |      +--------------------------------------------------+
   |      | In both, type changed  -> Product updated         |
   |      |   Action: Recalculate priority, mark needs_sync   |
   |      +--------------------------------------------------+
   |      | In both, no changes    -> No action               |
   |      +--------------------------------------------------+
   |
   +-- d. Generate reconciliation report
   |      { new: 5, deleted: 2, updated: 8, unchanged: 2410 }
   |
   +-- e. Alert if discrepancy > 5% of catalog
          (unusual, may indicate missed webhooks)
    |
    v
2. Log reconciliation results in sync_logs

5.9 Season Transition Flow

When the calendar date crosses a season boundary, the system automatically recalculates priorities for all affected products.

Season Transition Check (daily at 00:00 UTC)
    |
    v
1. Determine current date
    |
    v
2. For each active store:
   |
   +-- a. Load store's season definitions
   |      (seasons table: name, start_month, start_day, end_month, end_day)
   |
   +-- b. Determine current season from today's date
   |
   +-- c. Compare with last known season
   |      (stored in store settings or derived from last transition log)
   |
   +-- d. If season changed:
          |
          +-- i.   Load season_rules for new season
          |        (category x priority mappings)
          |
          +-- ii.  For each product (not priority_locked):
          |        - Find matching season_rule by product_type
          |        - Update priority and priority_source='seasonal'
          |        - Mark needs_sync = true
          |
          +-- iii. Create audit_log entries for all changes
          |
          +-- iv.  Create sync_log entry:
          |        "Season transition: Winter -> Spring, 1,847 products updated"
          |
          +-- v.   Trigger immediate Sheet sync (don't wait for hourly)
          |
          +-- vi.  Send notification to merchant:
                   "Season changed to Spring. 1,847 products updated."

Example transition (Nexus, Winter to Spring on March 1):

Product Type     | Winter Priority | Spring Priority | Products Affected
-----------------+-----------------+-----------------+------------------
Jackets          |       5         |       3         |      ~120
Hoodies          |       5         |       3         |      ~85
Sweaters         |       5         |       2         |      ~40
Shorts           |       0         |       2         |      ~150
Tank Tops        |       0         |       2         |      ~60
T-Shirts         |       2         |       3         |      ~350
Jeans            |       4         |       4         |      ~200 (no change)

5.10 Data Flow Diagram: Label Values

This section documents exactly what values flow into each custom label slot.

+------------------+--------------------+------------------------------------+
| Label            | Source Function     | Possible Values                   |
+------------------+--------------------+------------------------------------+
| custom_label_0   | Priority engine     | priority-0                        |
| (Priority Score) |                     | priority-1                        |
|                  |                     | priority-2                        |
|                  |                     | priority-3                        |
|                  |                     | priority-4                        |
|                  |                     | priority-5                        |
+------------------+--------------------+------------------------------------+
| custom_label_1   | Season calendar     | winter                            |
| (Season)         |                     | spring                            |
|                  |                     | summer                            |
|                  |                     | fall                              |
+------------------+--------------------+------------------------------------+
| custom_label_2   | Category mapping    | t-shirts                          |
| (Category Group) | from product_type   | jeans-pants                       |
|                  |                     | shorts                            |
|                  |                     | outerwear-heavy                   |
|                  |                     | outerwear-light                   |
|                  |                     | hoodies-sweatshirts               |
|                  |                     | headwear-caps                     |
|                  |                     | headwear-cold-weather             |
|                  |                     | accessories                       |
|                  |                     | footwear                          |
|                  |                     | (store-configurable)              |
+------------------+--------------------+------------------------------------+
| custom_label_3   | Tags + age logic    | new-arrival                       |
| (Product Status) |                     | in-stock                          |
|                  |                     | low-inventory                     |
|                  |                     | clearance                         |
|                  |                     | dead-stock                        |
+------------------+--------------------+------------------------------------+
| custom_label_4   | Vendor + tags       | name-brand                        |
| (Brand Tier)     |                     | store-brand                       |
|                  |                     | off-brand                         |
+------------------+--------------------+------------------------------------+

GMC constraint: Each label supports a maximum of 1,000 unique values. All label schemas above are well within this limit (6 values for label_0, 4 for label_1, ~10-20 for label_2, 5 for label_3, 3 for label_4).


5.11 Chapter Summary

Pipeline StageTriggerLatencyTransport
Product ingestionInstall / WebhookSecondsShopify API/Webhook
Priority calculationEvent-driven< 1 secondIn-process
Sheet updateHourly cron< 1 hourGoogle Sheets API
GMC label applicationDaily GMC fetch< 24 hoursSupplemental feed
Google Ads impactGMC sync< 4 hoursInternal Google
ReconciliationDaily cronBackgroundShopify API + DB
Season transitionDaily cron / manualBackgroundIn-process + Sheet

The pipeline is designed around the principle that priority changes are strategic, not tactical. A 24-hour propagation window is acceptable because merchants adjust priorities on a weekly or seasonal basis, not in response to minute-by-minute market conditions. This constraint allows the MVP to use the simple, validated Google Sheets pipeline rather than the more complex Content API integration.