Skip to content

Commit 51c4221

Browse files
authored
align streamer for genai llm notebooks (#2765)
1 parent 79cd7e4 commit 51c4221

7 files changed

+161
-397
lines changed

notebooks/deepseek-r1/deepseek-r1.ipynb

+4
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@
8989
" r = requests.get(url=\"https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/notebook_utils.py\")\n",
9090
" open(\"notebook_utils.py\", \"w\").write(r.text)\n",
9191
"\n",
92+
"if not Path(\"genai_helper.py\").exists():\n",
93+
" r = requests.get(url=\"https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/genai_helper.py\")\n",
94+
" open(\"genai_helper.py\", \"w\").write(r.text)\n",
95+
"\n",
9296
"# Read more about telemetry collection at https://github.com/openvinotoolkit/openvino_notebooks?tab=readme-ov-file#-telemetry\n",
9397
"from notebook_utils import collect_telemetry\n",
9498
"\n",

notebooks/deepseek-r1/gradio_helper.py

+1-131
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import queue
66
import sys
77
import re
8+
from genai_helper import ChunkStreamer
89

910
max_new_tokens = 256
1011

@@ -52,137 +53,6 @@ def get_system_prompt(model_language, system_prompt=None):
5253
return DEFAULT_SYSTEM_PROMPT_CHINESE if (model_language == "Chinese") else DEFAULT_SYSTEM_PROMPT
5354

5455

55-
class IterableStreamer(ov_genai.StreamerBase):
56-
"""
57-
A custom streamer class for handling token streaming and detokenization with buffering.
58-
59-
Attributes:
60-
tokenizer (Tokenizer): The tokenizer used for encoding and decoding tokens.
61-
tokens_cache (list): A buffer to accumulate tokens for detokenization.
62-
text_queue (Queue): A synchronized queue for storing decoded text chunks.
63-
print_len (int): The length of the printed text to manage incremental decoding.
64-
"""
65-
66-
def __init__(self, tokenizer):
67-
"""
68-
Initializes the IterableStreamer with the given tokenizer.
69-
70-
Args:
71-
tokenizer (Tokenizer): The tokenizer to use for encoding and decoding tokens.
72-
"""
73-
super().__init__()
74-
self.tokenizer = tokenizer
75-
self.tokens_cache = []
76-
self.text_queue = queue.Queue()
77-
self.print_len = 0
78-
79-
def __iter__(self):
80-
"""
81-
Returns the iterator object itself.
82-
"""
83-
return self
84-
85-
def __next__(self):
86-
"""
87-
Returns the next value from the text queue.
88-
89-
Returns:
90-
str: The next decoded text chunk.
91-
92-
Raises:
93-
StopIteration: If there are no more elements in the queue.
94-
"""
95-
value = self.text_queue.get() # get() will be blocked until a token is available.
96-
if value is None:
97-
raise StopIteration
98-
return value
99-
100-
def get_stop_flag(self):
101-
"""
102-
Checks whether the generation process should be stopped.
103-
104-
Returns:
105-
bool: Always returns False in this implementation.
106-
"""
107-
return False
108-
109-
def put_word(self, word: str):
110-
"""
111-
Puts a word into the text queue.
112-
113-
Args:
114-
word (str): The word to put into the queue.
115-
"""
116-
self.text_queue.put(word)
117-
118-
def put(self, token_id: int) -> bool:
119-
"""
120-
Processes a token and manages the decoding buffer. Adds decoded text to the queue.
121-
122-
Args:
123-
token_id (int): The token_id to process.
124-
125-
Returns:
126-
bool: True if generation should be stopped, False otherwise.
127-
"""
128-
self.tokens_cache.append(token_id)
129-
text = self.tokenizer.decode(self.tokens_cache)
130-
131-
word = ""
132-
if len(text) > self.print_len and "\n" == text[-1]:
133-
# Flush the cache after the new line symbol.
134-
word = text[self.print_len :]
135-
self.tokens_cache = []
136-
self.print_len = 0
137-
elif len(text) >= 3 and text[-3:] == chr(65533):
138-
# Don't print incomplete text.
139-
pass
140-
elif len(text) > self.print_len:
141-
# It is possible to have a shorter text after adding new token.
142-
# Print to output only if text length is increaesed.
143-
word = text[self.print_len :]
144-
self.print_len = len(text)
145-
self.put_word(word)
146-
147-
if self.get_stop_flag():
148-
# When generation is stopped from streamer then end is not called, need to call it here manually.
149-
self.end()
150-
return True # True means stop generation
151-
else:
152-
return False # False means continue generation
153-
154-
def end(self):
155-
"""
156-
Flushes residual tokens from the buffer and puts a None value in the queue to signal the end.
157-
"""
158-
text = self.tokenizer.decode(self.tokens_cache)
159-
if len(text) > self.print_len:
160-
word = text[self.print_len :]
161-
self.put_word(word)
162-
self.tokens_cache = []
163-
self.print_len = 0
164-
self.put_word(None)
165-
166-
def reset(self):
167-
self.tokens_cache = []
168-
self.text_queue = queue.Queue()
169-
self.print_len = 0
170-
171-
172-
class ChunkStreamer(IterableStreamer):
173-
174-
def __init__(self, tokenizer, tokens_len=4):
175-
super().__init__(tokenizer)
176-
self.tokens_len = tokens_len
177-
178-
def put(self, token_id: int) -> bool:
179-
if (len(self.tokens_cache) + 1) % self.tokens_len != 0:
180-
self.tokens_cache.append(token_id)
181-
return False
182-
sys.stdout.flush()
183-
return super().put(token_id)
184-
185-
18656
def make_demo(pipe, model_configuration, model_id, model_language, disable_advanced=False):
18757
import gradio as gr
18858

notebooks/llm-chatbot/gradio_helper_genai.py

+1-133
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
import openvino_genai as ov_genai
33
from uuid import uuid4
44
from threading import Event, Thread
5-
import queue
6-
import sys
5+
from gena_helper import ChunkStreamer
76

87
max_new_tokens = 256
98

@@ -65,137 +64,6 @@ def get_system_prompt(model_language, system_prompt=None):
6564
)
6665

6766

68-
class IterableStreamer(ov_genai.StreamerBase):
69-
"""
70-
A custom streamer class for handling token streaming and detokenization with buffering.
71-
72-
Attributes:
73-
tokenizer (Tokenizer): The tokenizer used for encoding and decoding tokens.
74-
tokens_cache (list): A buffer to accumulate tokens for detokenization.
75-
text_queue (Queue): A synchronized queue for storing decoded text chunks.
76-
print_len (int): The length of the printed text to manage incremental decoding.
77-
"""
78-
79-
def __init__(self, tokenizer):
80-
"""
81-
Initializes the IterableStreamer with the given tokenizer.
82-
83-
Args:
84-
tokenizer (Tokenizer): The tokenizer to use for encoding and decoding tokens.
85-
"""
86-
super().__init__()
87-
self.tokenizer = tokenizer
88-
self.tokens_cache = []
89-
self.text_queue = queue.Queue()
90-
self.print_len = 0
91-
self.decoded_lengths = []
92-
93-
def __iter__(self):
94-
"""
95-
Returns the iterator object itself.
96-
"""
97-
return self
98-
99-
def __next__(self):
100-
"""
101-
Returns the next value from the text queue.
102-
103-
Returns:
104-
str: The next decoded text chunk.
105-
106-
Raises:
107-
StopIteration: If there are no more elements in the queue.
108-
"""
109-
value = self.text_queue.get() # get() will be blocked until a token is available.
110-
if value is None:
111-
raise StopIteration
112-
return value
113-
114-
def get_stop_flag(self):
115-
"""
116-
Checks whether the generation process should be stopped.
117-
118-
Returns:
119-
bool: Always returns False in this implementation.
120-
"""
121-
return False
122-
123-
def put_word(self, word: str):
124-
"""
125-
Puts a word into the text queue.
126-
127-
Args:
128-
word (str): The word to put into the queue.
129-
"""
130-
self.text_queue.put(word)
131-
132-
def put(self, token_id: int) -> bool:
133-
"""
134-
Processes a token and manages the decoding buffer. Adds decoded text to the queue.
135-
136-
Args:
137-
token_id (int): The token_id to process.
138-
139-
Returns:
140-
bool: True if generation should be stopped, False otherwise.
141-
"""
142-
self.tokens_cache.append(token_id)
143-
text = self.tokenizer.decode(self.tokens_cache)
144-
self.decoded_lengths.append(len(text))
145-
146-
word = ""
147-
delay_n_tokens = 3
148-
if len(text) > self.print_len and "\n" == text[-1]:
149-
# Flush the cache after the new line symbol.
150-
word = text[self.print_len :]
151-
self.tokens_cache = []
152-
self.decoded_lengths = []
153-
self.print_len = 0
154-
elif len(text) > 0 and text[-1] == chr(65533):
155-
# Don't print incomplete text.
156-
self.decoded_lengths[-1] = -1
157-
elif len(self.tokens_cache) >= delay_n_tokens:
158-
print_until = self.decoded_lengths[-delay_n_tokens]
159-
if print_until != -1 and print_until > self.print_len:
160-
# It is possible to have a shorter text after adding new token.
161-
# Print to output only if text length is increased and text is complete (print_until != -1).
162-
word = text[self.print_len : print_until]
163-
self.print_len = print_until
164-
self.put_word(word)
165-
166-
if self.get_stop_flag():
167-
# When generation is stopped from streamer then end is not called, need to call it here manually.
168-
self.end()
169-
return True # True means stop generation
170-
else:
171-
return False # False means continue generation
172-
173-
def end(self):
174-
"""
175-
Flushes residual tokens from the buffer and puts a None value in the queue to signal the end.
176-
"""
177-
text = self.tokenizer.decode(self.tokens_cache)
178-
if len(text) > self.print_len:
179-
word = text[self.print_len :]
180-
self.put_word(word)
181-
self.tokens_cache = []
182-
self.print_len = 0
183-
self.put_word(None)
184-
185-
186-
class ChunkStreamer(IterableStreamer):
187-
188-
def __init__(self, tokenizer, tokens_len):
189-
super().__init__(tokenizer)
190-
self.tokens_len = tokens_len
191-
192-
def put(self, token_id: int) -> bool:
193-
if (len(self.tokens_cache) + 1) % self.tokens_len != 0:
194-
self.tokens_cache.append(token_id)
195-
self.decoded_lengths.append(-1)
196-
return False
197-
return super().put(token_id)
198-
19967

20068
def make_demo(pipe, model_configuration, model_id, model_language, disable_advanced=False):
20169
import gradio as gr

notebooks/llm-chatbot/llm-chatbot-generate-api.ipynb

+4
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@
134134
" r = requests.get(url=\"https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/notebook_utils.py\")\n",
135135
" open(\"notebook_utils.py\", \"w\").write(r.text)\n",
136136
"\n",
137+
"if not Path(\"genai_helper.py\").exists():\n",
138+
" r = requests.get(url=\"https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/genai_helper.py\")\n",
139+
" open(\"genai_helper.py\", \"w\").write(r.text)\n",
140+
"\n",
137141
"# Read more about telemetry collection at https://github.com/openvinotoolkit/openvino_notebooks?tab=readme-ov-file#-telemetry\n",
138142
"from notebook_utils import collect_telemetry\n",
139143
"\n",

notebooks/llm-rag-langchain/llm-rag-langchain-genai.ipynb

+4
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@
9494
" )\n",
9595
" open(\"pip_helper.py\", \"w\").write(r.text)\n",
9696
"\n",
97+
"if not Path(\"genai_helper.py\").exists():\n",
98+
" r = requests.get(url=\"https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/genai_helper.py\")\n",
99+
" open(\"genai_helper.py\", \"w\").write(r.text)\n",
100+
"\n",
97101
"from pip_helper import pip_install\n",
98102
"\n",
99103
"os.environ[\"GIT_CLONE_PROTECTION_ACTIVE\"] = \"false\"\n",

0 commit comments

Comments
 (0)