import os
import string
import io
[docs]class KeywordProcessor(object):
"""KeywordProcessor
Attributes:
_keyword (str): Used as key to store keywords in trie dictionary.
Defaults to '_keyword_'
non_word_boundaries (set(str)): Characters that will determine if the word is continuing.
Defaults to set([A-Za-z0-9_])
keyword_trie_dict (dict): Trie dict built character by character, that is used for lookup
Defaults to empty dictionary
case_sensitive (boolean): if the search algorithm should be case sensitive or not.
Defaults to False
Examples:
>>> # import module
>>> from flashtext import KeywordProcessor
>>> # Create an object of KeywordProcessor
>>> keyword_processor = KeywordProcessor()
>>> # add keywords
>>> keyword_names = ['NY', 'new-york', 'SF']
>>> clean_names = ['new york', 'new york', 'san francisco']
>>> for keyword_name, clean_name in zip(keyword_names, clean_names):
>>> keyword_processor.add_keyword(keyword_name, clean_name)
>>> keywords_found = keyword_processor.extract_keywords('I love SF and NY. new-york is the best.')
>>> keywords_found
>>> ['san francisco', 'new york', 'new york']
Note:
* loosely based on `Aho-Corasick algorithm <https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm>`_.
* Idea came from this `Stack Overflow Question <https://stackoverflow.com/questions/44178449/regex-replace-is-taking-time-for-millions-of-documents-how-to-make-it-faster>`_.
"""
[docs] def __init__(self, case_sensitive=False):
"""
Args:
case_sensitive (boolean): Keyword search should be case sensitive set or not.
Defaults to False
"""
self._keyword = '_keyword_'
self._white_space_chars = set(['.', '\t', '\n', '\a', ' ', ','])
try:
# python 2.x
self.non_word_boundaries = set(string.digits + string.letters + '_')
except AttributeError:
# python 3.x
self.non_word_boundaries = set(string.digits + string.ascii_letters + '_')
self.keyword_trie_dict = dict()
self.case_sensitive = case_sensitive
self._terms_in_trie = 0
def __len__(self):
"""Number of terms present in the keyword_trie_dict
Returns:
length : int
Count of number of distinct terms in trie dictionary.
"""
return self._terms_in_trie
def __contains__(self, word):
"""To check if word is present in the keyword_trie_dict
Args:
word : string
word that you want to check
Returns:
status : bool
If word is present as it is in keyword_trie_dict then we return True, else False
Examples:
>>> keyword_processor.add_keyword('Big Apple')
>>> 'Big Apple' in keyword_processor
>>> # True
"""
if not self.case_sensitive:
word = word.lower()
current_dict = self.keyword_trie_dict
len_covered = 0
for char in word:
if char in current_dict:
current_dict = current_dict[char]
len_covered += 1
else:
break
return self._keyword in current_dict and len_covered == len(word)
def __getitem__(self, word):
"""if word is present in keyword_trie_dict return the clean name for it.
Args:
word : string
word that you want to check
Returns:
keyword : string
If word is present as it is in keyword_trie_dict then we return keyword mapped to it.
Examples:
>>> keyword_processor.add_keyword('Big Apple', 'New York')
>>> keyword_processor['Big Apple']
>>> # New York
"""
if not self.case_sensitive:
word = word.lower()
current_dict = self.keyword_trie_dict
len_covered = 0
for char in word:
if char in current_dict:
current_dict = current_dict[char]
len_covered += 1
else:
break
if self._keyword in current_dict and len_covered == len(word):
return current_dict[self._keyword]
def __setitem__(self, keyword, clean_name=None):
"""To add keyword to the dictionary
pass the keyword and the clean name it maps to.
Args:
keyword : string
keyword that you want to identify
clean_name : string
clean term for that keyword that you would want to get back in return or replace
if not provided, keyword will be used as the clean name also.
Examples:
>>> keyword_processor['Big Apple'] = 'New York'
"""
status = False
if not clean_name and keyword:
clean_name = keyword
if keyword and clean_name:
if not self.case_sensitive:
keyword = keyword.lower()
current_dict = self.keyword_trie_dict
for letter in keyword:
current_dict = current_dict.setdefault(letter, {})
if self._keyword not in current_dict:
status = True
self._terms_in_trie += 1
current_dict[self._keyword] = clean_name
return status
def __delitem__(self, keyword):
"""To remove keyword from the dictionary
pass the keyword and the clean name it maps to.
Args:
keyword : string
keyword that you want to remove if it's present
Examples:
>>> keyword_processor.add_keyword('Big Apple')
>>> del keyword_processor['Big Apple']
"""
status = False
if keyword:
if not self.case_sensitive:
keyword = keyword.lower()
current_dict = self.keyword_trie_dict
character_trie_list = []
for letter in keyword:
if letter in current_dict:
character_trie_list.append((letter, current_dict))
current_dict = current_dict[letter]
else:
# if character is not found, break out of the loop
current_dict = None
break
# remove the characters from trie dict if there are no other keywords with them
if current_dict and self._keyword in current_dict:
# we found a complete match for input keyword.
character_trie_list.append((self._keyword, current_dict))
character_trie_list.reverse()
for key_to_remove, dict_pointer in character_trie_list:
if len(dict_pointer.keys()) == 1:
dict_pointer.pop(key_to_remove)
else:
# more than one key means more than 1 path.
# Delete not required path and keep the other
dict_pointer.pop(key_to_remove)
break
# successfully removed keyword
status = True
self._terms_in_trie -= 1
return status
def __iter__(self):
"""Disabled iteration as get_all_keywords() is the right way to iterate
"""
raise NotImplementedError("Please use get_all_keywords() instead")
[docs] def set_non_word_boundaries(self, non_word_boundaries):
"""set of characters that will be considered as part of word.
Args:
non_word_boundaries (set(str)):
Set of characters that will be considered as part of word.
"""
self.non_word_boundaries = non_word_boundaries
[docs] def add_non_word_boundary(self, character):
"""add a character that will be considered as part of word.
Args:
character (char):
Character that will be considered as part of word.
"""
self.non_word_boundaries.add(character)
[docs] def add_keyword(self, keyword, clean_name=None):
"""To add one or more keywords to the dictionary
pass the keyword and the clean name it maps to.
Args:
keyword : string
keyword that you want to identify
clean_name : string
clean term for that keyword that you would want to get back in return or replace
if not provided, keyword will be used as the clean name also.
Returns:
status : bool
The return value. True for success, False otherwise.
Examples:
>>> keyword_processor.add_keyword('Big Apple', 'New York')
>>> # This case 'Big Apple' will return 'New York'
>>> # OR
>>> keyword_processor.add_keyword('Big Apple')
>>> # This case 'Big Apple' will return 'Big Apple'
"""
return self.__setitem__(keyword, clean_name)
[docs] def remove_keyword(self, keyword):
"""To remove one or more keywords from the dictionary
pass the keyword and the clean name it maps to.
Args:
keyword : string
keyword that you want to remove if it's present
Returns:
status : bool
The return value. True for success, False otherwise.
Examples:
>>> keyword_processor.add_keyword('Big Apple')
>>> keyword_processor.remove_keyword('Big Apple')
>>> # Returns True
>>> # This case 'Big Apple' will no longer be a recognized keyword
>>> keyword_processor.remove_keyword('Big Apple')
>>> # Returns False
"""
return self.__delitem__(keyword)
[docs] def get_keyword(self, word):
"""if word is present in keyword_trie_dict return the clean name for it.
Args:
word : string
word that you want to check
Returns:
keyword : string
If word is present as it is in keyword_trie_dict then we return keyword mapped to it.
Examples:
>>> keyword_processor.add_keyword('Big Apple', 'New York')
>>> keyword_processor.get('Big Apple')
>>> # New York
"""
return self.__getitem__(word)
[docs] def add_keyword_from_file(self, keyword_file, encoding="utf-8"):
"""To add keywords from a file
Args:
keyword_file : path to keywords file
encoding : specify the encoding of the file
Examples:
keywords file format can be like:
>>> # Option 1: keywords.txt content
>>> # java_2e=>java
>>> # java programing=>java
>>> # product management=>product management
>>> # product management techniques=>product management
>>> # Option 2: keywords.txt content
>>> # java
>>> # python
>>> # c++
>>> keyword_processor.add_keyword_from_file('keywords.txt')
Raises:
IOError: If `keyword_file` path is not valid
"""
if not os.path.isfile(keyword_file):
raise IOError("Invalid file path {}".format(keyword_file))
with io.open(keyword_file, encoding=encoding) as f:
for line in f:
if '=>' in line:
keyword, clean_name = line.split('=>')
self.add_keyword(keyword, clean_name.strip())
else:
keyword = line.strip()
self.add_keyword(keyword)
[docs] def add_keywords_from_dict(self, keyword_dict):
"""To add keywords from a dictionary
Args:
keyword_dict (dict): A dictionary with `str` key and (list `str`) as value
Examples:
>>> keyword_dict = {
"java": ["java_2e", "java programing"],
"product management": ["PM", "product manager"]
}
>>> keyword_processor.add_keywords_from_dict(keyword_dict)
Raises:
AttributeError: If value for a key in `keyword_dict` is not a list.
"""
for clean_name, keywords in keyword_dict.items():
if not isinstance(keywords, list):
raise AttributeError("Value of key {} should be a list".format(clean_name))
for keyword in keywords:
self.add_keyword(keyword, clean_name)
[docs] def remove_keywords_from_dict(self, keyword_dict):
"""To remove keywords from a dictionary
Args:
keyword_dict (dict): A dictionary with `str` key and (list `str`) as value
Examples:
>>> keyword_dict = {
"java": ["java_2e", "java programing"],
"product management": ["PM", "product manager"]
}
>>> keyword_processor.remove_keywords_from_dict(keyword_dict)
Raises:
AttributeError: If value for a key in `keyword_dict` is not a list.
"""
for clean_name, keywords in keyword_dict.items():
if not isinstance(keywords, list):
raise AttributeError("Value of key {} should be a list".format(clean_name))
for keyword in keywords:
self.remove_keyword(keyword)
[docs] def add_keywords_from_list(self, keyword_list):
"""To add keywords from a list
Args:
keyword_list (list(str)): List of keywords to add
Examples:
>>> keyword_processor.add_keywords_from_list(["java", "python"]})
Raises:
AttributeError: If `keyword_list` is not a list.
"""
if not isinstance(keyword_list, list):
raise AttributeError("keyword_list should be a list")
for keyword in keyword_list:
self.add_keyword(keyword)
[docs] def remove_keywords_from_list(self, keyword_list):
"""To remove keywords present in list
Args:
keyword_list (list(str)): List of keywords to remove
Examples:
>>> keyword_processor.remove_keywords_from_list(["java", "python"]})
Raises:
AttributeError: If `keyword_list` is not a list.
"""
if not isinstance(keyword_list, list):
raise AttributeError("keyword_list should be a list")
for keyword in keyword_list:
self.remove_keyword(keyword)
[docs] def get_all_keywords(self, term_so_far='', current_dict=None):
"""Recursively builds a dictionary of keywords present in the dictionary
And the clean name mapped to those keywords.
Args:
term_so_far : string
term built so far by adding all previous characters
current_dict : dict
current recursive position in dictionary
Returns:
terms_present : dict
A map of key and value where each key is a term in the keyword_trie_dict.
And value mapped to it is the clean name mapped to it.
Examples:
>>> keyword_processor = KeywordProcessor()
>>> keyword_processor.add_keyword('j2ee', 'Java')
>>> keyword_processor.add_keyword('Python', 'Python')
>>> keyword_processor.get_all_keywords()
>>> {'j2ee': 'Java', 'python': 'Python'}
>>> # NOTE: for case_insensitive all keys will be lowercased.
"""
terms_present = {}
if not term_so_far:
term_so_far = ''
if current_dict is None:
current_dict = self.keyword_trie_dict
for key in current_dict:
if key == '_keyword_':
terms_present[term_so_far] = current_dict[key]
else:
sub_values = self.get_all_keywords(term_so_far + key, current_dict[key])
for key in sub_values:
terms_present[key] = sub_values[key]
return terms_present
[docs] def replace_keywords(self, sentence):
"""Searches in the string for all keywords present in corpus.
Keywords present are replaced by the clean name and a new string is returned.
Args:
sentence (str): Line of text where we will replace keywords
Returns:
new_sentence (str): Line of text with replaced keywords
Examples:
>>> from flashtext import KeywordProcessor
>>> keyword_processor = KeywordProcessor()
>>> keyword_processor.add_keyword('Big Apple', 'New York')
>>> keyword_processor.add_keyword('Bay Area')
>>> new_sentence = keyword_processor.replace_keywords('I love Big Apple and bay area.')
>>> new_sentence
>>> 'I love New York and Bay Area.'
"""
if not sentence:
# if sentence is empty or none just return the same.
return sentence
new_sentence = ''
orig_sentence = sentence
if not self.case_sensitive:
sentence = sentence.lower()
current_word = ''
current_dict = self.keyword_trie_dict
current_white_space = ''
sequence_end_pos = 0
idx = 0
sentence_len = len(sentence)
while idx < sentence_len:
char = sentence[idx]
current_word += orig_sentence[idx]
# when we reach whitespace
if char not in self.non_word_boundaries:
current_white_space = char
# if end is present in current_dict
if self._keyword in current_dict or char in current_dict:
# update longest sequence found
sequence_found = None
longest_sequence_found = None
is_longer_seq_found = False
if self._keyword in current_dict:
sequence_found = current_dict[self._keyword]
longest_sequence_found = current_dict[self._keyword]
sequence_end_pos = idx
# re look for longest_sequence from this position
if char in current_dict:
current_dict_continued = current_dict[char]
current_word_continued = current_word
idy = idx + 1
while idy < sentence_len:
inner_char = sentence[idy]
current_word_continued += orig_sentence[idy]
if inner_char not in self.non_word_boundaries and self._keyword in current_dict_continued:
# update longest sequence found
current_white_space = inner_char
longest_sequence_found = current_dict_continued[self._keyword]
sequence_end_pos = idy
is_longer_seq_found = True
if inner_char in current_dict_continued:
current_dict_continued = current_dict_continued[inner_char]
else:
break
idy += 1
else:
# end of sentence reached.
if self._keyword in current_dict_continued:
# update longest sequence found
current_white_space = ''
longest_sequence_found = current_dict_continued[self._keyword]
sequence_end_pos = idy
is_longer_seq_found = True
if is_longer_seq_found:
idx = sequence_end_pos
current_word = current_word_continued
current_dict = self.keyword_trie_dict
if longest_sequence_found:
new_sentence += longest_sequence_found + current_white_space
current_word = ''
current_white_space = ''
else:
new_sentence += current_word
current_word = ''
current_white_space = ''
else:
# we reset current_dict
current_dict = self.keyword_trie_dict
new_sentence += current_word
current_word = ''
current_white_space = ''
elif char in current_dict:
# we can continue from this char
current_dict = current_dict[char]
else:
# we reset current_dict
current_dict = self.keyword_trie_dict
# skip to end of word
idy = idx + 1
while idy < sentence_len:
char = sentence[idy]
current_word += orig_sentence[idy]
if char not in self.non_word_boundaries:
break
idy += 1
idx = idy
new_sentence += current_word
current_word = ''
current_white_space = ''
# if we are end of sentence and have a sequence discovered
if idx + 1 >= sentence_len:
if self._keyword in current_dict:
sequence_found = current_dict[self._keyword]
new_sentence += sequence_found
else:
new_sentence += current_word
idx += 1
return new_sentence