TypeScript میں ایک قابل اعتماد SSE کلائنٹ کیسے بنایا جائے۔

ڈیٹا کو سٹریم کرنے والی خصوصیات کی تعمیر کرتے وقت، جیسے کہ AI چیٹ کے جوابات یا ریئل ٹائم نوٹیفکیشن فیڈز، نیٹ ورک شاذ و نادر ہی اتنے کوآپریٹو ہوتے ہیں جیسے: fetch اسے مرئی بنائیں۔

کنکشن منقطع ہو جاتا ہے، ایک پراکسی بفر رسپانس ہوتا ہے، اور موبائل نیٹ ورک وائی فائی سے سیلولر مڈ سٹریم میں بدل جاتا ہے۔ اگر آپ کا اسٹریمنگ کوڈ اس کے لیے منصوبہ بندی نہیں کرتا ہے، تو صارف کو ایک خرابی یا جواب نظر آئے گا جو بحالی کے بغیر رک جاتا ہے۔

یہ مضمون ایک اوپن سورس ٹائپ اسکرپٹ لائبریری کا استعمال کرتا ہے جسے Ore کہا جاتا ہے اس کی عملی مثال کے طور پر ایک اسٹریمنگ کلائنٹ کیسے بنایا جائے جو حقیقی دنیا کے نیٹ ورک کے حالات کو ہینڈل کرتا ہے، بشمول خود کار طریقے سے دوبارہ کوششیں، ایک آفیشل سرور سے بھیجے گئے ایونٹس (SSE) کی تجزیہ کاری، اور React اور React سرور کے اجزاء کے ساتھ صاف انضمام۔

آخر میں، آپ سمجھ جائیں گے کہ کس طرح غیر مطابقت پذیر جنریٹرز، Fetch API، اور SSE وضاحتیں ڈیفالٹ سے کہیں زیادہ قابل بھروسہ چیز بنانے کے لیے اکٹھی ہوتی ہیں۔ fetch اور response.body.getReader() انگوٹھی

انڈیکس

  1. شرطیں

  2. کیا سیکھنا ہے۔

  3. سرور سے بھیجے گئے واقعات کیا ہیں؟

  4. اپنی مرضی کے مطابق اسٹریمنگ کلائنٹ کیوں بنائیں؟

  5. غیر مطابقت پذیر جنریٹر کا استعمال کرتے ہوئے کچے ٹکڑوں کو کیسے اسٹریم کریں۔

  6. سرور کے ذریعہ بھیجے گئے واقعات کو براہ راست پارس کرنے کا طریقہ

  7. Last-Event-ID کا استعمال کرتے ہوئے دوبارہ کنکشن کو کیسے نافذ کیا جائے۔

  8. بیک آف کے ساتھ دوبارہ کوششوں کو کیسے ہینڈل کریں۔

  9. React کے ساتھ اس کا استعمال کیسے کریں۔

  10. ری ایکٹ سرور کے اجزاء کے ساتھ اسے کیسے استعمال کریں۔

  11. نتیجہ

شرطیں

پیروی کرنے کے لیے آپ کو ضرورت ہو گی:

  • TypeScript کی کام کرنے والی سمجھ

  • واقف fetch, ReadableStreamاور async/await

  • رد عمل کا بنیادی علم (رد عمل سے متعلقہ حصوں کے لیے)

کیا سیکھنا ہے۔

  • اصل متن یا بائٹس کو کیسے سٹریم کریں۔ fetch غیر مطابقت پذیر جنریٹر کا استعمال کرتے ہوئے جواب دیں۔

  • سرور فیلڈ کی طرف سے فیلڈ کے ذریعہ بھیجے گئے ایونٹ کی تفصیلات کو براہ راست پارس کرنے کا طریقہ

  • خودکار کنکشن کو کیسے نافذ کیا جائے۔ Last-Event-ID اپنے ایونٹ سے محروم نہ ہوں۔

  • ایکسپونینشل بیک آف کے ساتھ دوبارہ کوششوں کو کیسے ہینڈل کریں۔

  • ری ایکٹ اسٹیٹ اور ری ایکٹ سرور کے اجزاء کے ساتھ اسٹریمنگ کلائنٹ کو کیسے مربوط کریں۔

سرور سے بھیجے گئے واقعات کیا ہیں؟

سرور سے بھیجے گئے واقعات (SSE) ایک ہی HTTP کنکشن پر سرور سے کلائنٹ تک یک طرفہ سلسلہ بندی کے لیے ایک ویب معیار ہے۔ WebSockets کے برعکس، یہ سادہ HTTP ہے، اس لیے یہ موجودہ انفراسٹرکچر جیسے لوڈ بیلنسرز اور پراکسیز پر بغیر کسی خاص ترتیب کے کام کرتا ہے۔

SSE کا جواب تار پر درج ذیل ہے:

event: update
id: 42
data: {"status": "processing"}

event: update
id: 43
data: {"status": "complete"}

ہر واقعہ کو خالی لائن سے الگ کیا جاتا ہے۔ کہ data میدان پے لوڈ لے جاتا ہے، event واقعہ کی قسم کا نام دیں اور id کلائنٹ دوبارہ جڑنے کے لیے سٹریم میں اپنی پوزیشن کو ٹریک کر سکتا ہے۔

براؤزر میں درج ذیل خصوصیات شامل ہیں: EventSource یہ اس کے لیے ایک API ہے، لیکن اس کی عملی حدود ہیں، بشمول کوئی کسٹم ہیڈر، کوئی POST درخواستیں، اور براؤزرز میں دوبارہ کنکشن کا متضاد رویہ۔ آسان ترین صورتوں سے ہٹ کر، آپ کو اکثر خود اس سلسلے کو پارس کرنے کی ضرورت ہوتی ہے۔

اپنی مرضی کے مطابق اسٹریمنگ کلائنٹ کیوں بنائیں؟

بہت سے سٹریمنگ کے استعمال کے کیسز، جیسے کہ AI چیٹ کے جوابات، SSE تفصیلات کو بالکل استعمال نہیں کرتے ہیں۔ وہ خام متن کے صرف ٹکڑے ہیں جو وقت کے ساتھ پہنچتے ہیں۔ دیگر معاملات، جیسے ریئل ٹائم اطلاعات، SSE فراہم کردہ ڈھانچے سے فائدہ اٹھاتے ہیں: نامزد واقعات، IDs کا دوبارہ استعمال، اور سرور کے زیر کنٹرول دوبارہ کوشش کے وقفے۔

ایسک دو الگ الگ کاموں میں دونوں کو ہینڈل کرتا ہے۔

  • stream() فارمیٹ کے بارے میں کسی مفروضے کے بغیر خام متن یا بائٹس کو اسٹریم کرنے کے لیے۔

  • streamSSE() خصوصیت کے مطابق SSE پارسنگ کے لیے

دونوں غیر مطابقت پذیر جنریٹر ہیں، لہذا کال سائٹ پر دونوں کا استعمال ایک جیسا نظر آئے گا۔

for await (const chunk of stream("https://api.example.com/chat")) {
  console.log(chunk);
}

غیر مطابقت پذیر جنریٹر کا استعمال کرتے ہوئے کچے ٹکڑوں کو کیسے اسٹریم کریں۔

سب سے آسان معاملہ خام متن کو اسٹریم کرنا ہے۔ یہ AI جوابات یا لاگ ٹیل کے لیے مفید ہے جہاں ایونٹ کا کوئی ڈھانچہ نہیں ہے، صرف بائٹس کا ایک سلسلہ وقت کے ساتھ پہنچتا ہے۔

یہاں کلید ہے stream():

export async function* stream(
  url: string,
  options?: StreamOptions
): AsyncGenerator {
  const { headers, retries = 3, signal, decode = true } = options || {};

  let retryCount = 0;

  while (retryCount <= retries) {
    try {
      const response = await fetch(url, { method: "GET", headers, signal });

      if (!response.body) {
        throw new Error("Response body is null");
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();

      try {
        while (true) {
          const { done, value } = await reader.read();
          if (done) break;
          yield decode ? decoder.decode(value, { stream: true }) : value;
        }
      } finally {
        reader.releaseLock();
      }

      return;
    } catch (error: any) {
      if (signal?.aborted) throw error;
      retryCount++;
      if (retryCount > retries) {
        throw new Error(`Max retries exceeded. Last error: ${error.message}`);
      }
      await new Promise((r) => setTimeout(r, 1000 * retryCount));
    }
  }
}

ڈیزائن کے چند فیصلے قابل ذکر ہیں۔

یہ فنکشن ایک غیر مطابقت پذیر جنریٹر ہے (async function*) تاکہ کال کرنے والا استعمال کر سکے۔ for await...of لیڈروں اور لوپس کو دستی طور پر منظم کرنے کے بجائے۔ اصل کو بے نقاب کرنے میں یہی فرق ہے۔ ReadableStream اور یہ ان چیزوں کو بے نقاب کرتا ہے جو کھپت کے لیے اچھی ہیں۔

کہ finally بلاک ہمیشہ ریڈر لاک کو جاری کرتا ہے، یہاں تک کہ اگر لوپ اس کے ذریعے جلد باہر نکل جائے: break یا ایک استثناء۔ اس کو بھول جانا سٹریم لیک کی ایک عام وجہ ہے۔

دوبارہ کوشش کرنے والا لوپ صرف اس میں غلطیاں پکڑتا ہے: fetch کال کریں اور لوپ پڑھیں۔ اگر AbortSignal اگر یہ ناکامی کی وجہ ہے، تو جان بوجھ کر منسوخی کی دوبارہ کوشش کرنا کوئی معنی نہیں رکھتا، لہذا دوبارہ کوشش کرنے کے بجائے، آپ اسے فوری طور پر دوبارہ اٹھا دیں۔

سرور کے ذریعہ بھیجے گئے واقعات کو براہ راست پارس کرنے کا طریقہ

SSE تصریح ایک سادہ ٹیکسٹ فارمیٹ ہے، لیکن اسے صحیح طریقے سے پارس کرنے کا مطلب ہے بہت سے ایج کیسز کو سنبھالنا، جیسے کہ ڈیٹا کی ایک سے زیادہ لائنوں میں تقسیم ہونے والے واقعات، بڑی آنت سے شروع ہونے والی کمنٹ لائنز، بغیر قدر والی فیلڈز، اور ٹکڑوں کے آخر میں نامکمل لائنیں۔

اندر کی بنیادی ریاست مشین ہے: streamSSE():

let buffer = "";
let currentEvent: Partial = { data: "", event: null, id: null };
let hasData = false;

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split(/\r\n|\r|\n/);
  buffer = lines.pop() || ""; // keep the last incomplete line for the next chunk

  for (const line of lines) {
    if (line === "") {
      if (hasData) {
        const event: SSEEvent = {
          id: currentEvent.id ?? lastEventId,
          event: currentEvent.event ?? null,
          data: currentEvent.data!.endsWith("\n")
            ? currentEvent.data!.slice(0, -1)
            : currentEvent.data!,
          retry: currentEvent.retry,
        };
        if (event.id) lastEventId = event.id;
        yield event;
        currentEvent = { data: "", event: null, id: null };
        hasData = false;
      }
      continue;
    }

    if (line.startsWith(":")) continue; // comment line, ignore

    const colonIndex = line.indexOf(":");
    const field = colonIndex === -1 ? line : line.slice(0, colonIndex);
    let valueStr = colonIndex === -1 ? "" : line.slice(colonIndex + 1);
    if (valueStr.startsWith(" ")) valueStr = valueStr.slice(1);

    switch (field) {
      case "data":
        currentEvent.data += valueStr + "\n";
        hasData = true;
        break;
      case "event":
        currentEvent.event = valueStr;
        break;
      case "id":
        if (valueStr.indexOf("\0") === -1) currentEvent.id = valueStr;
        break;
      case "retry":
        const retry = parseInt(valueStr, 10);
        if (!isNaN(retry)) retryInterval = retry;
        break;
    }
  }
}

نیٹ ورک کے حصے لائن کی حدود کا احترام نہیں کرتے ہیں۔ اکیلا read() چونکہ کال ایک انٹرمیڈیٹ لائن پر ختم ہو سکتی ہے، اس لیے آخری لائن جو نامکمل ہو سکتی ہے ہولڈ پر رکھی جاتی ہے۔ buffer اس پر جلد کارروائی نہیں کی جاتی ہے اور اسے اگلے حصے میں شامل کیا جاتا ہے۔ یہ ایس ایس ای پارسنگ کا وہ حصہ ہے جس کو غلط کرنا آسان ہوتا ہے جب آپ کسی سادہ لوحی پر پہنچ جاتے ہیں۔ response.text() اور تار کی تقسیم۔

خالی لائن وہ لائن ہے جو واقعہ کو ختم کرتی ہے۔ SSE ایونٹس میں فکسڈ لینتھ ہیڈرز نہیں ہوتے ہیں۔ قیاس کے مطابق، ایک خالی لائن ایک باؤنڈری کو نشان زد کرتی ہے، لہذا تجزیہ کار صرف اس کی جانچ پڑتال کے بعد ہی ایک واقعہ تیار کرے گا۔

کہ id کسی فیلڈ کو یکسر مسترد کر دیا جائے گا اگر اس میں تصریح کے مطابق null بائٹس ہوں۔ یہ وہ چھوٹی تفصیلات ہیں جو دستکاری سے بنائے گئے عمل کو پہلی کوشش میں ٹھیک کام نہیں کرتی ہیں۔

Last-Event-ID کا استعمال کرتے ہوئے دوبارہ کنکشن کو کیسے نافذ کیا جائے۔

یہ SSE کا وہ حصہ ہے جو ریگولر کے مقابلے حقیقی فوائد پیش کرتا ہے۔ fetch سلسلہ: آپ کے مقام کو کھونے کے بغیر کنکشن کھو جانے کے بعد دوبارہ شروع کرنے کے لیے بلٹ ان سپورٹ۔

let lastEventId: string | null = null;

while (retryCount <= retries) {
  const headers = { ...customHeaders };
  if (lastEventId) {
    (headers as any)["Last-Event-ID"] = lastEventId;
  }

  const response = await fetch(url, { method: "GET", headers, signal });
  // ... read and parse events, updating lastEventId as they arrive
}

جب بھی کوئی واقعہ ہوتا ہے۔ id میدان میں پہنچ کر، lastEventId اسے اپ ڈیٹ کیا جائے گا۔ جب کنکشن ٹوٹ جاتا ہے اور کلائنٹ دوبارہ جڑ جاتا ہے۔ Last-Event-ID درخواست کے ہیڈر میں۔ ایک اچھا برتاؤ کرنے والا سرور ہر چیز کو چلانے یا چھوڑنے کے بجائے اس ہیڈر کو صحیح مقام پر سلسلہ کو دوبارہ شروع کرنے کے لیے استعمال کر سکتا ہے۔

یہ صرف اس صورت میں کام کرتا ہے جب سرور اصل میں ہیڈر کا احترام کرتا ہے۔ لہذا، یہ کلائنٹ اور سرور کے درمیان ایک معاہدہ ہے، نہ کہ ایسی کوئی چیز جس کی کلائنٹ اکیلے ضمانت دے سکے۔ لیکن اس بات کو یقینی بنانا کہ کلائنٹ کو ٹریک کریں اور انہیں درست طریقے سے منتقل کریں اس معاہدے کا ایک ضروری نصف حصہ ہے۔

بیک آف کے ساتھ دوبارہ کوششوں کو کیسے ہینڈل کریں۔

دونوں stream() اور streamSSE() اگر یہ ناکام ہوجاتا ہے، تو یہ دوبارہ کوشش کرتا ہے، لیکن ناکام ہونے کے لحاظ سے یہ تھوڑا سا مختلف ہوتا ہے۔

stream() یہ دوبارہ کوششوں کی تعداد سے منسلک ایک سادہ لکیری بیک آف کا استعمال کرتا ہے۔

await new Promise((resolve) => setTimeout(resolve, 1000 * retryCount));

streamSSE() سرور کی وضاحت کا احترام کریں۔ retry اگر فیلڈ فراہم کی گئی ہے، تو اسے SSE تفصیلات سے ہٹا دیں، بصورت دیگر اس کی ڈیفالٹ قدر پر واپس جائیں۔

let retryInterval = 1000;
// ... updated from the "retry" field if the server sends one
await new Promise((r) => setTimeout(r, retryInterval));

عملی طور پر، سرور کو دوبارہ کوشش کرنے کے وقفے کو متاثر کرنے کی اجازت دینا ضروری ہے۔ بوجھ کے نیچے ایک سرور اپنے گاہکوں کو زیادہ دیر تک دور رہنے کو کہہ سکتا ہے۔ یہ بالکل اسی قسم کا کوآپریٹو رویہ ہے جس کی حمایت کے لیے SSE کی وضاحت تیار کی گئی تھی۔

یہ دونوں افعال میں ٹوٹ گیا۔ AbortSignal دوبارہ کوشش کرنے والے لوپ کو ہمیشہ شارٹ سرکٹ کریں۔ جان بوجھ کر منسوخی کو دوبارہ قابل کوشش ناکامیوں کے طور پر سمجھنا ایک عام مسئلہ ہے اور اسے ٹھیک کرنا صرف ایک تصدیق ہے۔ signal?.aborted دوبارہ کوشش کرنے کا فیصلہ کرنے سے پہلے

React کے ساتھ اس کا استعمال کیسے کریں۔

دونوں فنکشنز غیر مطابقت پذیر جنریٹر ہیں، لہذا React ریاست کے ساتھ انضمام تکرار اور کالنگ کا معاملہ ہے۔ setState فی حصہ:

function ChatComponent() {
  const [messages, setMessages] = useState("");

  useEffect(() => {
    const controller = new AbortController();

    (async () => {
      try {
        for await (const chunk of stream("/api/chat", { signal: controller.signal })) {
          setMessages((prev) => prev + chunk);
        }
      } catch (err: any) {
        if (err.name !== "AbortError") console.error(err);
      }
    })();

    return () => controller.abort();
  }, []);

  return 

{messages}

; }

کلین اپ فنکشن کال controller.abort() یہاں حقیقی کام ہو رہا ہے۔ اس کے بغیر، اگر آپ اسٹریم کے فعال ہونے کے دوران کسی جزو سے ہٹ جاتے ہیں، تو درآمد پس منظر میں چلے گی اور غیر نصب شدہ جزو کی حالت کو اپ ڈیٹ کر دے گی۔

ری ایکٹ سرور کے اجزاء کے ساتھ اسے کیسے استعمال کریں۔

چونکہ جنریٹر ایک وقت میں ایک ایک قدر پیدا کرتا ہے، اس لیے آپ ریکسریو سسپنس باؤنڈری کو براہ راست غیر مطابقت پذیر تکرار کرنے والے پر بھی چلا سکتے ہیں تاکہ ہر ایک حصہ کے آتے ہی کلائنٹ تک HTML کو سٹریم کیا جا سکے۔

async function StreamViewer({ iterator }: { iterator: AsyncIterator }) {
  const { value, done } = await iterator.next();
  if (done) return null;

  return (
    
      {value}
      
        
      
    
  );
}

export default function Page() {
  const dataStream = stream("https://api.example.com/stream");
  const iterator = dataStream[Symbol.asyncIterator]();

  return (
    
      
    
  );
}

ہر بار بار آنے والی کال اگلے حصے کا انتظار کرتی ہے اور نیسٹڈ ٹکڑوں کو پیش کرتی ہے۔ Suspense باقی کی سرحد. HTML کے ہر ٹکڑے کو کلائنٹ کے لیے اسٹریمز کریں جیسا کہ یہ حل ہوتا ہے، بجائے کہ پورے جواب کا انتظار کریں۔

نتیجہ

ایک قابل اعتماد سٹریمنگ کلائنٹ کو کامیابی کے لیے صرف ایک راستے سے زیادہ ہینڈل کرنا چاہیے۔ کنکشن ختم ہو گئے ہیں، ٹکڑوں کو لائن کی حدود میں تقسیم کر دیا گیا ہے، اور منسوخیوں کو ناکامیوں سے ممتاز کیا جانا چاہیے۔

اس کے بارے میں ایسک کا نقطہ نظر اس چھوٹے سے خیال کے گرد بنایا گیا ہے:

  • صارفین کے استعمال کے لیے ایک سٹریم کو ایک غیر مطابقت پذیر جنریٹر کے طور پر ظاہر کریں۔ for await...of

  • یہ SSE کو براہ راست فیلڈ کے ذریعے پارس کرتا ہے، اسپیک کی خالی لائن ایونٹ کی حدود کا احترام کرتے ہوئے اور حصوں میں نامکمل لائنوں کو بفر کرتا ہے۔

  • سڑک Last-Event-ID دوبارہ شروع کرنے کے بجائے، آپ کنکشن کو دوبارہ شروع کر سکتے ہیں۔

  • دوبارہ کوشش اور منسوخی کو الگ الگ مسائل سمجھیں۔

  • React اور React سرور کے اجزاء کے لیے پتلی انٹیگریشن پوائنٹس کے ساتھ بنیادی طور پر فریم ورک کو اجناسٹک رکھیں۔

یہ امتزاج وہی ہے جو ایک اسٹریمنگ کلائنٹ کو مختلف کرتا ہے جو اسٹریمنگ کلائنٹ سے ڈیمو میں کام کرتا ہے جو حقیقی دنیا کے نیٹ ورک کے حالات کا مقابلہ کرسکتا ہے۔ آپ github.com/glamboyosa/ore پر مکمل سورس کوڈ دریافت کر سکتے ہیں۔

Scroll to Top