Temporal, Composio, KEDA، اور Kubernetes کا استعمال کرتے ہوئے ایک پائیدار، سیلف اسکیلنگ AI ایجنٹ کیسے بنایا جائے

زیادہ تر AI ایجنٹ تیز رفتار کاموں میں اچھے ہوتے ہیں۔ جب آپ کوئی پیغام بھیجتے ہیں، تو آپ کا ایجنٹ چند ٹولز کو کال کر سکتا ہے اور سیکنڈوں میں جواب حاصل کر سکتا ہے۔ دستاویز کے خلاصوں کی درخواست کرتے وقت یا انٹرنیٹ تحقیق کرتے وقت یہ بالکل کام کرتا ہے۔

لیکن کیا ہوتا ہے جب کام میں وقت لگتا ہے؟ کچھ اس طرح کہ "آخری تین دنوں کی ای میلز کو دیکھیں، فوری جوابات کا مسودہ تیار کریں، اور انجینئرنگ سے متعلقہ ٹکٹوں کے لیے لکیری ٹکٹ بنائیں۔” یہ کوئی جلدی کی بات نہیں ہے۔ اس میں منٹ، گھنٹے یا زیادہ وقت لگ سکتا ہے۔ اور یہ صرف ایک مثال ہے۔

یہ آپ کا پورا ورک فلو ہے، اور جس لمحے سرور کریش ہوتا ہے یا عمل دوبارہ شروع ہوتا ہے، سب کچھ ضائع ہو جاتا ہے۔ کوئی دوبارہ کوشش نہیں، دوبارہ شروع نہیں. آپ شروع سے شروع کر رہے ہیں۔

یہ بالکل وہی مسئلہ ہے جس کو یہ مضمون حل کرتا ہے۔

اس مقالے میں، ہم ایک پائیدار بیک گراؤنڈ ایجنٹ رن ٹائم بناتے ہیں جو حقیقی دنیا کے حالات میں برقرار رہتا ہے۔ بس کام چھوڑ دو، چھوڑ دو، اور تمہارا کام ہو گیا۔

ہڈ کے نیچے، یہ KEDA آٹو اسکیلنگ کے ساتھ Kubernetes پر چلتا ہے، اس لیے ورکرز کام کے آنے کے وقت بیکار ہونے پر صفر تک پہنچ جاتے ہیں اور اس کا بیک اپ لیتے ہیں۔ ہم کریش ریکوری اور پائیدار عملدرآمد کے لیے Temporal اور ایجنٹ کی فعالیت اور ٹولنگ کے لیے Composio استعمال کرتے ہیں۔

کیا احاطہ کرتا ہے؟

اس ٹیوٹوریل میں، آپ ایک پائیدار بیک گراؤنڈ ایجنٹ رن ٹائم بناتے ہیں جو Kubernetes پر چلتا ہے اور حقیقی دنیا کے کام کے بوجھ کے ساتھ اسکیل کرتا ہے۔ اس کورس میں آپ جو کچھ سیکھیں گے وہ یہ ہے:

  • ایجنٹ لوپ کیا ہے اور کلاڈ اور کمپوزیو کا استعمال کرتے ہوئے اسے کیسے بنایا جائے۔

  • طویل عرصے سے چلنے والے ایجنٹ ٹاسک کو ہینڈل تنازعات بنانے کے لیے Temporal کا استعمال کیسے کریں۔

  • ایک ایسا گیٹ وے کیسے بنایا جائے جو ٹاسک ڈسپیچ کو عمل سے الگ کرے۔

  • ڈوکر کا استعمال کرتے ہوئے کارکنوں اور گیٹ ویز کو کنٹینرائز کرنے کا طریقہ

  • پورے سسٹم کو مقامی کبرنیٹس کلسٹر میں کیسے تعینات کیا جائے۔

  • قطار کی گہرائی کی بنیاد پر KEDA کا استعمال کرتے ہوئے کارکنوں کو خود بخود 0 تک کیسے پیمانہ کریں۔

یہ کچھ جدید تصورات کا احاطہ کرتا ہے، لیکن اگر آپ اس پر عمل کرتے ہیں، تو آپ راستے میں بہت کچھ سیکھیں گے۔

انڈیکس

منصوبے کیا ہیں (فن تعمیر)

کوڈ کو دیکھنے سے پہلے، یہ سمجھنا مددگار ہے کہ سب کچھ ایک ساتھ کیسے فٹ بیٹھتا ہے۔

سسٹم کو دو الگ الگ طیاروں میں تقسیم کیا گیا ہے: کنٹرول طیارہ، جو صارف کا سامنا کرنے والے تعاملات کو ہینڈل کرتا ہے (Next.js فرنٹ اینڈ)، اور ایگزیکیوشن پلین، جہاں اصل ایجنٹ کا کام ہوتا ہے۔ وہ کبھی بھی ایک دوسرے کو براہ راست نہیں کہتے، اور یہ علیحدگی جان بوجھ کر ہوتی ہے۔

شروع سے اختتام تک بہاؤ مندرجہ ذیل ہے:

کام کی ترسیل

جب کوئی صارف کوئی گول جمع کرتا ہے، تو گیٹ وے پہلے ایک پری ایگزیکیوشن چیک چلاتا ہے تاکہ یہ یقینی بنایا جا سکے کہ اس صارف کے لیے مطلوبہ کمپوزیو ٹول کنکشنز فعال ہیں۔ اگر ایسا ہے تو، ہم یہ کام ٹیمپورل کو سونپ دیتے ہیں اور یہ فوراً واپس آجاتا ہے۔ صارفین انتظار نہیں کرتے۔

میمو: یہ ایجنٹ سے جواب آنے کا انتظار نہیں کرتا۔ یہ سب کچھ پس منظر میں ہوتا ہے۔ یہ آپ کی عام چیٹ ایپ نہیں ہے۔ ایک کام شروع کریں اور اسے بھول جائیں۔

کام چلائیں

وقتی کاموں کو قطار میں کھڑا کرتا ہے اور کارکن پھلی انہیں اٹھا لیتے ہیں۔ کارکن ایجنٹ لوپ چلاتا ہے، مقصد کے بارے میں LLM وجوہات، Composio ٹول چلاتا ہے، اور نتائج واپس Temporal میں لکھے جاتے ہیں۔ فرنٹ اینڈ اسٹیٹس اپ ڈیٹس کے لیے گیٹ وے کو خود بخود پول کرتا ہے، لہذا آپ دیکھ سکتے ہیں کہ کچھ کیے بغیر کیا ہو رہا ہے۔

پیمانہ کاری

KEDA عارضی قطار کی گہرائی کا مشاہدہ کرتا ہے اور زیر التواء کام کی مقدار کی بنیاد پر ورکر پوڈ کا سائز تبدیل کرتا ہے۔ جب قطار خالی ہوتی ہے، تو کارکنوں کو صفر کر دیا جاتا ہے۔ کام آتے ہی اسے دوبارہ لوڈ کیا جائے گا۔ یہ خوبصورتی ہے!

گیٹ وے ایجنٹ کوڈ کو نہ چھونے کی وجہ آسان ہے۔ ایجنٹ کی کارروائیوں میں کام کے لحاظ سے منٹ یا گھنٹے لگ سکتے ہیں، اور ہم اسے اپنی API پرت میں نہیں چاہتے۔ ان کو الگ رکھنے سے اس بات کو یقینی بنانے میں مدد ملتی ہے کہ پس منظر میں کیا ہو رہا ہے اس سے قطع نظر کنٹرول طیارہ تیز رہتا ہے۔

ایپلی کیشن انسانی مداخلت کے بغیر لینکس کرون جاب طرز کے جاب شیڈولنگ کی بھی حمایت کرتی ہے۔ لہذا، عمل سے پہلے کی جانچ کرنا مددگار ہے۔ کیونکہ ڈسپیچ پر جلدی سے ناکام ہونا اس سے کہیں بہتر ہے کہ کسی گمشدہ ٹول کنکشن کی وجہ سے کام خاموشی سے ناکام ہو جائے۔

یہ ہماری درخواست کا اعلیٰ سطحی فن تعمیر ہے۔ سیدھے الفاظ میں:

  • Kubernetes (k8s): آرکیسٹریشن پرت

  • ڈبلیو ایچ او: خود بخود پھیلنے والی پرت

  • عارضی: پائیدار پرت

  • ترکیب: آلے کی پرت

  • مطلوبہ ایل ایل ایم (اس معاملے میں انتھروپک) = انفرنس پرت

پروجیکٹ کیسے ترتیب دیا جائے۔

شروع کرنے سے پہلے، یقینی بنائیں کہ آپ کے پاس درج ذیل انسٹال ہیں:

آپ کو Anthropic اور Composio کے لیے API کیز کی بھی ضرورت ہوگی۔

ذخیرہ کلوننگ کرکے شروع کریں۔

git clone https://github.com/shricodev/kron-k8s-agent.git
cd kron-k8s-agent

اگلا، کلسٹر بنائیں، تصویر بنائیں، اور اسے لوڈ کریں۔

# Create the local cluster
k3d cluster create agent --wait

# Build both images and import them into the cluster
bash scripts/build-images.sh
bash scripts/load-images.sh

# Deploy Temporal (creates the temporal namespace, Postgres, and server)
kubectl apply -f infra/k8s/temporal/temporal-dev.yaml

اگلا، ہم ایک نام کی جگہ اور راز بناتے ہیں۔ چونکہ پوڈز ایپ سے کیز پڑھتے ہیں، اس لیے ایپ کے تعینات ہونے سے پہلے راز کا موجود ہونا ضروری ہے۔

# Create the agent namespace
kubectl apply -f infra/k8s/00-namespace.yaml

# Create the secret with your keys (you're supposed to remove the placeholders with the actual values...)
kubectl create secret generic agent-secrets -n agent 
 --from-literal=ANTHROPIC_API_KEY=sk-ant-... 
 --from-literal=COMPOSIO_API_KEY=ak_... 
 --from-literal=JWT_SECRET=$(openssl rand -hex 32)

اس کو لاگو کرنے کے بعد، اپنی ایپ کو تعینات کریں اور آٹو اسکیلنگ سیٹ اپ کریں۔

# Install KEDA, then apply the scalers
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
helm install keda kedacore/keda -n keda --create-namespace --wait

kubectl apply -f infra/k8s/40-keda-worker-scaledobject.yaml -f infra/k8s/41-gateway-hpa.yaml

آخر میں، اپنے گیٹ وے کو پورٹ فارورڈ کریں تاکہ آپ کی مشینیں اس سے جڑ سکیں۔

# Port-forward the gateway to localhost:8000
kubectl -n agent port-forward svc/gateway 8000:8000

سامنے والے سرے کی طرف اشارہ کرتا ہے۔ http://localhost:8000 اب آپ کام شروع کرنے کے لیے تیار ہیں۔

میمو: اسے چھونے کی ضرورت نہیں ہے۔ .env فائل apps/worker/ یا apps/gateway/ اس کے لیے۔ یہ ایپس کو براہ راست آپ کے کمپیوٹر پر چلانے کے لیے ہے۔

آپ کے کلسٹر میں، Pods کو ConfigMap سے ان کی ترتیب مل جائے گی، اور آپ نے ابھی جو راز بنایا ہے اسے رن ٹائم کے وقت ماحولیاتی متغیر کے طور پر انجکشن کیا جائے گا۔

درخواست کے بنیادی اجزاء

یہ منصوبہ بہت بڑا ہے۔ شروع سے ہر سطر کو پڑھنا ایک گھنٹہ کے قابل ہوگا، لہذا آئیے اس کے بجائے ان بنیادی اجزاء پر توجہ مرکوز کریں جو نظام کو اصل میں کام کرتے ہیں۔

ایجنٹ لوپ

ایجنٹ لوپ پورے نظام کا دماغ ہے۔ جب بھی کوئی ٹاسک روانہ کیا جاتا ہے تو اس پر عمل کیا جاتا ہے۔

خیال، چاہے عمل درآمد نہ ہو، سادہ ہے۔ LLM کو ایک مقصد دیں، ایک اندازہ لگائیں، ٹول کو کال کریں، نتائج کو فیڈ بیک کریں، اور مکمل ہونے تک دہرائیں۔

async def run_agent(
user_id: str,
goal: str,
toolkit_hint: str | None = None,
) -> dict:

اس کے لیے تین چیزوں کی ضرورت ہے: آپ کی صارف ID (تاکہ کمپوزیو کو معلوم ہو کہ کون سا لنک شدہ اکاؤنٹ استعمال کرنا ہے)، مقصد خود، اور ایک اختیاری ٹول کٹ اشارہ۔ اشارے آپ کو لوڈ کیے جانے والے ٹول کے دائرہ کار کی وضاحت کرنے کی اجازت دیتے ہیں۔ اگر ٹاسک واضح طور پر Gmail کا کام ہے، تو "gmail” کو پاس کرنے سے آپ کے کسی بھی ٹول کو لوڈ ہونے سے روکے گا۔

لوپ شروع ہونے سے پہلے، ہم ایک کمپوزیو سیشن بناتے ہیں اور اس صارف کے لیے ٹولز حاصل کرتے ہیں۔

session = await create_session(user_id, toolkit_hint);
tools = await get_tools(session);

پھر اصل لوپ چلتا ہے.

for turn in range(1, settings.max_iterations + 1):
    response = await client.messages.create(
        model=settings.model,
        max_tokens=settings.max_tokens,
        system=SYSTEM_PROMPT,
        tools=tools,
        messages=messages,
    )

if response.stop_reason == "end_turn":
return finish("completed", _extract_text(response.content))

if response.stop_reason == "tool_use":
      # execute the tools, append results, continue
      # ...

ہر موڑ پر، کلاڈ یہ فیصلہ کرنے سے پہلے کہ آگے کیا کرنا ہے اپنے مقاصد اور گفتگو کی تاریخ کو دیکھتا ہے۔ کہ stop_reason یہاں کیا ہوا ہے:

  • "end_turn": کلاڈ ختم ہو گیا ہے۔ کام مکمل کرتا ہے اور حتمی جواب دیتا ہے۔

  • "tool_use": آپ ایک یا زیادہ ٹولز کو کال کرنے کی کوشش کر رہے ہیں۔ لوپ اسے کمپوزیو کے ذریعے چلاتا ہے، نتائج کو پیغام کی سرگزشت میں واپس شامل کرتا ہے، اور پھر واپس چلا جاتا ہے۔

اگر کوئی ٹول کال ناکام ہو جاتی ہے، تو عملدرآمد میں خلل نہیں پڑتا ہے اور غلطی کو ڈائیلاگ میں واپس کر دیا جاتا ہے۔

except ComposioError as exc:
tool_result_blocks = [
    {
        "type": "tool_result",
        "tool_use_id": block.id,
        "content": f"Tool execution failed: {exc}",
        "is_error": True,
    }
        for block in tool_use_blocks
]

لوپ پوری صلاحیت سے چلتا ہے۔ max_iterations موڑ کی تعداد بطور ڈیفالٹ 20 ہے اور اس کی وضاحت اس میں کی گئی ہے: apps/worker/agent/config.py. اگر آپ مکمل کیے بغیر اس حد تک پہنچ جاتے ہیں۔ max_iterations_reached غیر معینہ مدت تک لٹکنے کے بجائے حیثیت۔

ہر run_agent کال وہی لغت کی شکل لوٹاتی ہے: حیثیت، خلاصہ، انجام دیئے گئے تمام اقدامات کی فہرست۔ مستقل فارمیٹ نتائج کو ذخیرہ کرنے اور ان کا معائنہ کرنے کے لیے ٹیمپورل کو آسان بنا دیتا ہے۔ اس کی وضاحت آگے کی جائے گی۔

Temporal کا استعمال کرتے ہوئے استحکام میں اضافہ کریں۔

ایجنٹ لوپ کے ساتھ ہی ایک حقیقی مسئلہ ہے۔ اگر کارکن کا عمل مرحلہ 15 کے وسط میں کریش ہو جائے تو سب کچھ ضائع ہو جاتا ہے۔ یہ جاننے کا کوئی طریقہ نہیں ہے کہ آپ کس حد تک آچکے ہیں اور آپ کو شروع سے شروع کرنا ہوگا۔

ورک فلو اور سرگرمیاں

عارضی آپ کے کوڈ کو دو الگ الگ حصوں میں تقسیم کرتا ہے: ورک فلو اور سرگرمیاں۔

ایک ورک فلو بیان کرتا ہے کہ کیا ہونا چاہیے اور کس ترتیب میں، لیکن اصل کام خود انجام نہیں دیتا۔ کوئی نیٹ ورک کال نہیں، کچھ بھی نہیں۔ یہ رکاوٹ ٹیمپورل کو محفوظ طریقے سے اسے دوبارہ چلانے اور تصادم کے بعد ریاست کی تشکیل نو کی اجازت دیتی ہے۔

سرگرمیاں وہ ہیں جہاں حقیقی کام ہوتا ہے۔ نیٹ ورک کالز، ایل ایل ایم کی درخواستیں، ٹول ایگزیکیوشن وغیرہ سبھی سرگرمی میں شامل ہیں۔ سرگرمیاں ناکام ہو سکتی ہیں اور ورک فلو کی حالت کو متاثر کیے بغیر آزادانہ طور پر دوبارہ کوشش کی جا سکتی ہیں۔

اس منصوبے میں AgentWorkflow کو apps/worker/workflows.py یہ ایک ورک فلو ہے۔ run_agent_activity کو apps/worker/activities.py ایک ایسی سرگرمی جو ایجنٹ لوپ کو لپیٹتی ہے۔

کام کا بہاؤ

@workflow.defn(name="AgentWorkflow")
class AgentWorkflow:
    def init(self) -> None:
        self._status: str = "running"
        self._result: dict | None = None

Temporal اس ورک فلو کو شروع کرتا ہے جب کوئی کام ڈیلیور ہوتا ہے۔ دوبارہ کوشش کرنے کی پالیسی مرتب کریں اور تمام اصل کام کو سرگرمی میں منتقل کریں۔

retry = RetryPolicy(
  (initial_interval = timedelta((seconds = 2))),
  (backoff_coefficient = 2.0),
  (maximum_interval = timedelta((minutes = 2))),
  (maximum_attempts = 5),
  (non_retryable_error_types = ["ValueError", "AuthenticationError"]),
);

result = await workflow.execute_activity(
  run_agent_activity,
  (args = [user_id, goal, toolkit_hint]),
  (start_to_close_timeout = timedelta((minutes = 30))),
  (retry_policy = retry),
);

کہ start_to_close_timeout یہ 30 منٹ پر سیٹ ہے اور زیادہ سے زیادہ 5 کوششوں تک محدود ہے، کیونکہ ایجنٹ کے کاموں میں درحقیقت اتنا وقت لگ سکتا ہے۔ آپ اپنے کام کی ضروریات کے مطابق ٹائمر کو بڑھا یا گھٹا سکتے ہیں۔

ورک فلو استفسار

ایک چیز جو یہاں Temporal کو آسان بناتی ہے وہ ہے اس کا استفسار کرنے والا۔ ورک فلو موجودہ صورتحال اور نتائج کو ان کا پتہ لگانے کے لیے علیحدہ ڈیٹا بیس کی ضرورت کے بغیر ظاہر کرتے ہیں۔

@workflow.query
def status(self) -> str:
return self._status

@workflow.query
def result(self) -> dict | None:
return self._result

گیٹ وے ٹیمپورل سے پوچھ سکتا ہے، "ورک فلو X کی حیثیت کیا ہے؟” کسی بھی وقت ریئل ٹائم جوابات حاصل کریں۔ فرنٹ اینڈ پولنگ اس طرح کام کرتی ہے۔

سرگرمی

سرگرمی آسان ہے۔ ریپنگ run_agent اور ریکارڈ کیا ہوتا ہے:

@activity.defn(name="run_agent_activity")
async def run_agent_activity(user_id: str, goal: str, toolkit_hint: str | None) -> dict:
    result = await run_agent(user_id=user_id, goal=goal,             toolkit_hint=toolkit_hint)
    return result

نیٹ ورک کو چھونے والی ہر چیز یہاں رہتی ہے، ورک فلو میں نہیں۔ یہ علیحدگی ٹیمپورل کو اپنا کام کرنے کی اجازت دیتی ہے۔

کارکن

کارکن عمل ہر چیز کو رجسٹر کرتا ہے اور قطار میں پولنگ شروع کرتا ہے۔

worker = Worker(
            client,
            task_queue=temporal_settings.temporal_task_queue,
            workflows=[AgentWorkflow],
            activities=[run_agent_activity, notify_activity],
            max_concurrent_activities=5,
)

عارضی سے جڑیں، ورک فلو اور سرگرمیاں رجسٹر کریں، اور کام کی قطاروں کو سنیں۔ جب کوئی کام آتا ہے، اسے منتخب کریں اور اسے چلائیں. یہ ایک ایسا عمل ہے جو Kubernetes Pod کے اندر چلتا ہے، اور ایک ایسا عمل ہے جسے KEDA بعد میں قطار کی گہرائی کی بنیاد پر پیمانہ کرے گا۔

ایجنٹ گیٹ وے

گیٹ وے ایک فاسٹ اے پی آئی ایپ ہے جو آپ اور ٹیمپورل کے درمیان بیٹھتی ہے۔ جاب ڈسپیچ، سٹیٹس پولنگ، اور منسوخی کو ہینڈل کرتا ہے۔ اہم بات یہ ہے کہ یہ خود ایجنٹ کوڈ پر عمل نہیں کرتا ہے۔ آپ کا واحد مشن ٹیمپورل سے بات کرنا اور جلدی سے واپس آنا ہے۔

کام کی ترسیل

ڈسپیچ اینڈ پوائنٹ apps/gateway/routes/tasks.py یہیں سے یہ سب شروع ہوتا ہے۔

  @router.post("/dispatch", response_model=DispatchResponse)
  async def dispatch(
      body: DispatchRequest,
      user_id: str = Depends(current_user_id),
  ) -> DispatchResponse:
      if body.toolkit:
          access = await check_toolkit_access(user_id, body.toolkit)
          if not access["allowed"]:
              raise HTTPException(
                  status_code=status.HTTP_409_CONFLICT,
                  detail={
                      "error": "toolkit_not_connected",
                      "connect_url": access["connect_url"],
                  },
              )

      workflow_id = f"agent-{user_id}-{uuid.uuid4().hex[:8]}"
      await client.start_workflow(
          WORKFLOW_NAME,
          args=[user_id, body.goal, body.toolkit],
          id=workflow_id,
          task_queue=settings.temporal_task_queue,
          cron_schedule=body.schedule or "",
      )
      return DispatchResponse(workflow_id=workflow_id, status="dispatched")

درخواست میں تین فیلڈز ہیں: گول، اختیاری ٹول کٹ کا نام (تاکہ آپ ٹول کٹ کا نام معلوم کرنے کی کوشش میں وقت ضائع نہ کریں)، اور اختیاری کرون شیڈول۔ اینڈ پوائنٹ ایک پری ایگزیکیوشن چیک چلاتا ہے، ٹاسک کو ٹیمپورل کو دیتا ہے، اور پھر ورک فلو ID کو فوری طور پر واپس کرتا ہے۔ صارفین ایجنٹ کے مکمل ہونے کا انتظار نہیں کرتے۔

توجہ فرمائیں cron_schedule میدان یہاں معیاری کرون ایکسپریشن پاس کرنا ٹاسک کو بار بار چلنے والے کام میں بدل دے گا۔ وقتی نظام الاوقات کو خود ہینڈل کرتا ہے، لہذا کسی اضافی انفراسٹرکچر کی ضرورت نہیں ہے۔

پرواز سے پہلے کا معائنہ

پرواز سے پہلے کا معائنہ apps/gateway/routes/preflight.py. کسی کام کو بھیجنے سے پہلے، یہ تصدیق کرتا ہے کہ صارف کے پاس درحقیقت مطلوبہ ٹول کٹ Composio سے منسلک ہے۔

  async def check_toolkit_access(user_id: str, toolkit_hint: str | None) -> dict:
      if not toolkit_hint:
          return {"allowed": True}

      connected = await asyncio.to_thread(
          _has_active_account, composio, user_id, toolkit_hint
      )
      if connected:
          return {"allowed": True}

      connect_url = await asyncio.to_thread(
          _connect_link, composio, user_id, toolkit_hint
      )
      return {"allowed": False, "toolkit": toolkit_hint, "connect_url": connect_url}

اگر کوئی کنکشن غائب ہے، گیٹ وے connect_url صارفین آپ کی ایپ کو فوری طور پر منظور کر سکتے ہیں۔ یہ خاص طور پر طے شدہ کاموں کے لیے اہم ہے۔

اسٹیٹس چیک کر رہا ہے۔

جب کوئی کام چلتا ہے، تو فرنٹ اینڈ مندرجہ ذیل اختتامی پوائنٹس کو پول کرتا ہے:

  @router.get("/{workflow_id}", response_model=TaskStatusResponse)
  async def get_task(workflow_id: str, ...) -> TaskStatusResponse:
      if not _owns(workflow_id, user_id):
          raise HTTPException(status_code=404, detail="task not found")

      handle = client.get_workflow_handle(workflow_id, run_id=run_id)
      agent_status = await handle.query("status")

      if desc.status == WorkflowExecutionStatus.COMPLETED:
          result = await handle.query("result")

      return TaskStatusResponse(...)

کہ status اور result یہ براہ راست ٹیمپورل کے استفسار کے ہینڈلر سے آتا ہے جیسا کہ ورک فلو میں دیکھا گیا ہے۔ کوئی الگ اسٹیٹ ٹیبل نہیں ہے اور ہر قدم کے بعد کوئی ڈیٹا بیس نہیں لکھتا ہے۔ کچھ عارضی سچائی کا ذریعہ.

ایپلی کیشن کنٹینرائزیشن

گیٹ ویز اور ورکرز کو دو الگ الگ امیجز کے طور پر پیک کیا گیا ہے۔ رن ٹائم پر کچھ بھی شیئر نہیں کیا جاتا ہے۔ یہ بالکل وہی ہے جو آپ چاہتے ہیں کیونکہ یہ آزادانہ طور پر پیمانہ ہے اور اس کی مختلف ذمہ داریاں ہیں۔

دونوں ڈاکر فائلز /docker اسے ڈائرکٹری میں محفوظ کریں اور ملٹی اسٹیج بلڈ استعمال کریں۔

کثیر سطحی کیوں؟

بلڈر مرحلہ Python پیکجوں کو مرتب کرنے کے لیے کمپائلر اور بلڈ ٹولز کو انسٹال کرتا ہے۔ رن ٹائم مرحلہ صرف مکمل انحصار اور ایپلیکیشن کوڈ درآمد کرتا ہے۔ حتمی تصویر میں تعمیراتی اوزار ڈالنے کا کوئی مطلب نہیں ہے۔

گیٹ وے کی تصویر

FROM python:3.14-slim-bookworm AS builder

RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

COPY requirements.txt .
RUN pip install -r requirements.txt

FROM python:3.14-slim-bookworm AS runtime

ENV PYTHONUNBUFFERED=1 PATH="/opt/venv/bin:$PATH"

RUN useradd --create-home --uid 10001 app
WORKDIR /app

COPY --from=builder /opt/venv /opt/venv
COPY . .

USER app
EXPOSE 8000
CMD ["python", "main.py"]

رن ٹائم مرحلے پر venv بلڈر میں، نان روٹ صارف پر جائیں اور FastAPI ایپ شروع کریں۔ اور ہمیشہ کی طرح، ایک غیر جڑ صارف کے طور پر چلائیں (ایک اچھا ڈویلپر بنیں اور مناسب حفاظتی طریقوں پر عمل کریں )۔

کارکن کی تصویر

ورکر ڈاکر فائل ایک چھوٹے سے فرق کے علاوہ تقریباً ایک جیسی ہے۔

# procps gives us pgrep for the liveness probe
RUN apt-get update 
 && apt-get install -y --no-install-recommends procps 
 && rm -rf /var/lib/apt/lists/*

procps انسٹال کریں تاکہ Kubernetes liveness probe pgrep کو اس بات کا تعین کرنے کے لیے چلا سکے کہ آیا یہ عمل ابھی بھی فعال ہے۔

تصاویر بنائیں اور لوڈ کریں۔

تعمیر کا اسکرپٹ ہے۔ scripts/build-images.sh دونوں امیجز بنائیں اور ہر ایپ ڈائرکٹری کو بلڈ سیاق و سباق کے طور پر پاس کریں۔

docker build 
 -f "$ROOT/docker/Dockerfile.gateway" 
    -t "agent-gateway:$TAG" 
 "$ROOT/apps/gateway"

docker build 
 -f "$ROOT/docker/Dockerfile.worker" 
    -t "agent-worker:$TAG" 
 "$ROOT/apps/worker"

Dockerfile پر واقع ہے: docker/ تاہم، ہر ایک اپنی ایپ ڈائرکٹری کے خلاف بنایا گیا ہے۔ بس COPY . . اصل میں اس کی کاپی کریں۔

تعمیر کرنے کے بعد، آپ کو اپنے کلسٹر پر تصویر چلانے کے لیے ایک اور قدم اٹھانے کی ضرورت ہے۔ مقامی k3d کلسٹر کو Docker ڈیمون تک رسائی نہیں ہے، لہذا یہ مقامی طور پر بنائی گئی تصاویر تک رسائی حاصل نہیں کر سکتا۔ آپ کو اسے واضح طور پر درآمد کرنا ہوگا۔

k3d image import "agent-gateway:dev" "agent-worker:dev" -c agent

scripts/load-images.sh میں یہ تمہارے لیے کرتا ہوں۔ درآمد مکمل ہونے کے بعد، کلسٹر حسب معمول تصویر کھینچ سکتا ہے اور پوڈز شروع ہو جائیں گے۔

Kubernetes پر تعینات کریں۔

ایک بار جب تصویر بن جاتی ہے اور کلسٹر میں لوڈ ہو جاتی ہے، تو اگلا مرحلہ مینی فیسٹ کو لاگو کرنا ہے۔ سیٹ اپ دو تہوں پر مشتمل ہے۔ ٹائر 1 اہم درخواست ہے۔ namespace، configاور deployments. ٹائر 2 آٹو اسکیلنگ ہے اور اگلے حصے میں اس کا احاطہ کیا گیا ہے۔

ترتیب اور راز

غیر حساس ترتیب ہے۔ ConfigMap کو infra/k8s/01-configmap.yaml:

data:
  MODEL: "claude-opus-4-8"
  MAX_TOKENS: "4096"
  MAX_ITERATIONS: "20"
  TEMPORAL_HOST: "temporal-frontend.temporal.svc.cluster.local:7233"
  TEMPORAL_TASK_QUEUE: "agent-tasks"
  GATEWAY_HOST: "0.0.0.0"
  GATEWAY_PORT: "8000"

یہ وہ جگہ ہے جہاں سے عارضی میزبان کا پتہ آتا ہے۔ عالمی انٹرا کلسٹر DNS نام کا استعمال کریں جو عارضی نام کی جگہ میں عارضی فرنٹ اینڈ سروس کی طرف اشارہ کرتا ہے۔ پتہ صرف اندرونی طور پر کلسٹر میں حل کیا جاتا ہے۔ یہ ٹھیک ہے کیونکہ گیٹ وے اور کارکن دونوں کلسٹر پر چلتے ہیں۔

API کلید ایک Kubernetes راز میں جاتی ہے جسے آپ دستی طور پر بناتے ہیں اور Git میں ارتکاب نہیں کرتے ہیں۔ دونوں ConfigMap اور Secret یہ استعمال کرتے ہوئے ایک ماحولیاتی متغیر کے طور پر نصب کیا جاتا ہے: envFrom ہر تعیناتی میں۔

گیٹ وے تعیناتی۔

spec:
  replicas: 2
  containers:
    - name: gateway
      image: agent-gateway:dev
      imagePullPolicy: IfNotPresent
      command:
        [
          "python",
          "-m",
          "uvicorn",
          "main:/app",
          "--host",
          "0.0.0.0",
          "--port",
          "8000",
        ]
      readinessProbe:
        httpGet:
          path: /health
          port: 8000
      resources:
        requests:
          cpu: 100m
          memory: 256Mi
        limits:
          cpu: 500m
          memory: 512Mi

چند باتیں قابل توجہ ہیں۔ imagePullPolicy: IfNotPresent Kubernetes کو رجسٹری سے کھینچنے کے بجائے مقامی طور پر بھری ہوئی تصویر کو استعمال کرنے کی ہدایت کرتا ہے۔ اسٹارٹ اپ کمانڈ reload=True فلیگ کو نظرانداز کرتی ہے جو main.py کے ذریعے براہ راست مقامی طور پر چلایا جاتا ہے۔ ریڈی نیس پروب ہٹ /health گیٹ وے صرف درخواستیں وصول کرتا ہے جب یہ حقیقت میں کام کرنے سے پہلے Kubernetes Pods پر ٹریفک بھیجتا ہے۔

گیٹ وے بھی ہے۔ ClusterIP خدمات فراہم کرتا ہے تاکہ دیگر پوڈز اور پورٹ فارورڈنگ ان تک پہنچ سکے۔

apiVersion: v1
kind: Service
metadata:
  name: gateway
spec:
  type: ClusterIP
  ports:
    - port: 8000
      targetPort: 8000

کارکنوں کی تعیناتی۔

# just polls Temporal.
spec:
  replicas: 1
  containers:
    - name: worker
      image: agent-worker:dev
      livenessProbe:
        exec:
          command: ["pgrep", "-f", "worker.py"]
        initialDelaySeconds: 15
        periodSeconds: 20
      resources:
        requests:
          cpu: 250m
          memory: 512Mi
        limits:
          cpu: "1"
          memory: 1Gi

کارکنوں کی کوئی خدمت نہیں ہے۔ آنے والے رابطوں کو قبول نہیں کرتا ہے۔ یہ بیرونی طور پر عارضی اور کام کے لیے پولز سے جڑتا ہے، اس لیے کسی چیز تک پہنچنے کی ضرورت نہیں ہے۔ اسی لیے procps Dockerfile میں انسٹال ہے۔.

مزید برآں، کارکنوں کو گیٹ ویز سے زیادہ وسائل ملتے ہیں۔ چونکہ آپ LLM کالز اور ٹولز چلا رہے ہیں، اس کے لیے مزید وسائل درکار ہیں۔ آپ کی ضروریات کے مطابق کیپس کی وضاحت کی جاسکتی ہے۔

سب کچھ لاگو کریں

تعیناتی اسکرپٹ یہاں واقع ہے: scripts/deploy.sh صحیح ترتیب میں ٹائر 1 کا اطلاق کریں۔

kubectl apply -f "$K8S/00-namespace.yaml"
kubectl apply -f "$K8S/01-configmap.yaml"
kubectl apply -f "$K8S/10-gateway-deployment.yaml"
kubectl apply -f "$K8S/20-worker-deployment.yaml"

آرڈر یہاں اہم ہے۔ نام کی جگہ کا موجود ہونا ضروری ہے اس سے پہلے کہ آپ اس کے اندر کوئی اور چیز بنا سکیں۔ ConfigMap پوڈ کے مندرجات کو پڑھنا شروع کرنے سے پہلے اس کا موجود ہونا ضروری ہے۔

KEDA کے ساتھ آٹو اسکیلنگ

Kubernetes CPU یا میموری کی بنیاد پر Pods کو ترازو کرتا ہے۔ یہ ان گیٹ ویز کے لیے موزوں ہے جو HTTP درخواستوں کو ہینڈل کرتے ہیں اور درحقیقت ٹریفک کے تناسب سے CPU استعمال کرتے ہیں۔ لیکن کارکنوں کے لیے یہ درست اشارہ نہیں ہے۔

اگر کوئی کام انتظار میں نہیں ہے، تو کارکن مکمل طور پر بیکار ہے۔ کسی بھی CPU لیٹنسی کو استعمال نہیں کرتا ہے۔ ایک بار جب کام پہنچ جاتا ہے، یہ تیزی سے مصروف ہو جاتا ہے. آپ واقعی جس چیز کو پیمانہ کرنے کی کوشش کر رہے ہیں وہ قطار کی گہرائی ہے، یعنی ملازمتوں کی تعداد جو چننے کے منتظر ہیں۔

یہ بالکل وہی ہے جو KEDA کرتا ہے۔ بیرونی میٹرکس جیسے قطار کی لمبائی، پیغامات کی تعداد، یا اس معاملے میں عارضی کام کی قطار کی گہرائی کو پڑھیں، اور اسی کے مطابق اپنی تعیناتی کی پیمائش کریں۔

کارکن کی توسیع

کہ ScaledObject کو infra/k8s/40-keda-worker-scaledobject.yaml یہ ہے جو KEDA دیکھ رہا ہے:

spec:
  scaleTargetRef:
    name: worker
  minReplicaCount: 0
  maxReplicaCount: 10
  cooldownPeriod: 120
  triggers:
    - type: temporal
      metadata:
        endpoint: temporal-frontend.temporal.svc.cluster.local:7233
        namespace: default
        taskQueue: agent-tasks
        queueTypes: "workflow,activity"
        targetQueueSize: "5"
        activationTargetQueueSize: "0"

آئیے اہم شعبوں کو دیکھتے ہیں۔

  • minReplicaCount: 0 سب سے بڑا ہے۔ KEDA صفر تک پیمانہ کر سکتا ہے، جو معیاری HPA نہیں کر سکتا۔ جب قطار خالی ہوتی ہے، تمام ورکر پوڈز ختم کردیئے جاتے ہیں۔ جب آپ کا سسٹم بیکار ہے تو آپ ادائیگی نہیں کرتے ہیں۔

  • activationTargetQueueSize: "0” کا مطلب ہے کہ KEDA اس وقت تعیناتی کو بیدار کر دے گا جب کوئی ایک کام قطار میں داخل ہو گا۔ کوئی کام نہیں، کوئی پھلی نہیں۔ ایک عمل سے پھلی گھومنا شروع ہو جاتی ہے۔

  • targetQueueSize: "5” KEDA کو ہر پانچ زیر التواء کاموں کے لیے تقریباً ایک ورکر پوڈ کو نشانہ بنانے کی ہدایت کرتا ہے۔ 10 قطار والے کام 2 پوڈ کے برابر ہیں۔

  • cooldownPeriod:120 قطار صاف ہونے کے بعد KEDA کے دوبارہ سکڑنے سے پہلے 120 سیکنڈ کا بفر شامل کرتا ہے۔

  • queueTypes: "ورک فلو، سرگرمی” دونوں قطاروں کو دیکھتی ہے۔ اس کے بغیر، KEDA زیر التواء کام کا صرف ایک حصہ دیکھ سکے گا۔

میمو: ایڈہاک اسکیلر کے لیے KEDA v2.17 یا اس سے زیادہ کی ضرورت ہے۔ یقینی بنائیں کہ آپ کا ہیلم انسٹالیشن مناسب ورژن یا اس سے اوپر ہے۔

گیٹ وے کی توسیع

گیٹ ویز باقاعدہ CPU پر مبنی HPA حاصل کرتے ہیں۔ infra/k8s/41-gateway-hpa.yaml:

spec:
  minReplicas: 2
  maxReplicas: 6
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 60

گیٹ وے اصل کام آنے والی HTTP درخواستوں کے متناسب کرتا ہے، اس لیے یہاں CPU درست سگنل ہے۔ API کی طرف کوئی کولڈ اسٹارٹ تاخیر نہیں ہے کیونکہ اسے کم از کم 2 نقلوں کے ساتھ برقرار رکھا جاتا ہے۔

KEDA کی تنصیب

درخواست دینے سے پہلے ہیلم کے ذریعے KEDA انسٹال کیا جاتا ہے: ScaledObject:

helm install keda kedacore/keda -n keda --create-namespace --wait
kubectl apply -f infra/k8s/40-keda-worker-scaledobject.yaml -f infra/k8s/41-gateway-hpa.yaml

ایک بار لاگو ہونے کے بعد، سسٹم مکمل طور پر کام کر رہا ہے۔ اپنا کام جمع کروائیں اور دیکھیں کہ آیا آپ کا ورکر پوڈ نظر آتا ہے۔ اپنی قطار کو صاف کریں اور اسے غائب ہوتے دیکھیں۔ یہی بات ہے۔

اس طرح، آپ کے پاس ایک انتہائی پائیدار، آٹو اسکیل ایبل AI ایجنٹ ہے جسے کسی بھی وقت چلانے کے لیے شیڈول کیا جا سکتا ہے۔ یہ کتنا ٹھنڈا ہے؟

فعال ایجنٹ

عمل میں ایجنٹ کا ایک فوری ڈیمو یہ ہے (کوبرنیٹس کلسٹر کے اندر چل رہا ہے):

نتیجہ

پروڈکشن میں AI ایجنٹ کو چلانا AI ایجنٹ بنانے سے بالکل مختلف معاملہ ہے۔ میں نے یہاں اس فرق پر توجہ مرکوز کرنے کی کوشش کی ہے، اور امید ہے کہ یہ اس بات کے لیے ٹھوس حوالہ کے طور پر کام کرے گا کہ میں استحکام اور توسیع پذیری کے بارے میں کیسے سوچتا ہوں۔ اور مجھے امید ہے کہ اس سے آپ کو آپ کی عام AI چیٹ ایپلی کیشن سے کچھ مختلف بنانے یا سمجھنے میں مدد ملی۔

Temporal اور KEDA کا امتزاج وہ چیز ہے جس کے بارے میں آپ کو واقعی جاننے اور مزید جاننے کی ضرورت ہے کہ آیا آپ AI ایجنٹس بنا رہے ہیں یا عام طور پر DevOps کر رہے ہیں۔ Temporal AI ایجنٹوں (پائیدار) کے ساتھ سب سے بڑے مسئلے کو حل کرنے میں مدد کرتا ہے، اور KEDA اس بات کو یقینی بناتا ہے کہ آپ بیکار کارکنوں کو 2am پر ادائیگی نہ کریں جب کچھ نہیں چل رہا ہے (اگر پیداوار میں استعمال کیا جاتا ہے)۔ واقعات کی بنیاد پر پیمائش کرنا ضروری ہے، نہ صرف CPU۔

یہاں سے توسیع کے لیے کافی گنجائش ہے۔ آپ اپنی ڈیولپمنٹ JWT کو ایک مناسب OIDC سے بدل سکتے ہیں یا مزید ورک فلو کو سپورٹ کرنے کے لیے Composio کے ساتھ ٹول کٹ کوریج کو بڑھا سکتے ہیں۔

بنیاد وہیں ہے۔ باقی صرف اس کے اوپر تعمیر کر رہا ہے.

آپ کو مکمل سورس کوڈ یہاں مل سکتا ہے: shricodev/kron-k8s-agent

اوپر تک سکرول کریں۔