Recopilar datos y enviarlos a Apache Kafka

Introducción



Para analizar los datos de transmisión, necesita las fuentes de estos datos. La información proporcionada por las fuentes también es importante. Y las fuentes con información textual, por ejemplo, también son raras.



Las fuentes interesantes incluyen las siguientes: twitter , vk . Pero estas fuentes no son adecuadas para todas las tareas.



Hay fuentes con los datos requeridos, pero estas fuentes no se transmiten. Los siguientes enlaces se pueden citar aquí: public-apis .



Puede utilizar el método anterior al resolver problemas con la transmisión de datos.



Descarga datos y envíalos a la transmisión.



Por ejemplo, puede utilizar la siguiente fuente: imdb .

Cabe señalar que imdb proporciona datos por sí solo. Consulte Conjuntos de datos de IMDb . Pero se puede suponer que los datos recopilados directamente contienen información más relevante.



Idioma: Java 1.8.

Bibliotecas: kafka 2.6.0, jsoup 1.13.1.



Recopilación de datos



La recogida de datos es un servicio que, según los datos de entrada, carga páginas html, busca la información necesaria y la convierte en un conjunto de objetos.



: imdb. : https://www.imdb.com/search/title/?release_date=%s,%s&countries=%s

1, 2 – . 3 – .



: imdb-extensive-dataset.



:



public interface MovieDirectScrapingService {
    Collection<Movie> scrap();
}


Movie – , ( ..).



class Movie {
    public final String titleId;
    public final String titleUrl;
    public final String title;
    public final String description;
    public final Double rating;
    public final String genres;
    public final String runtime;
    public final String baseUrl;
    public final String baseNameUrl;
    public final String baseTitleUrl;
    public final String participantIds;
    public final String participantNames;
    public final String directorIds;
    public final String directorNames;


.



. jsoup. html- .



String scrap(String url, List<Movie> items) {
    Document doc = null;
    try {
        doc = Jsoup.connect(url).header("Accept-Language", language).get();
    } catch (IOException e) {
        e.printStackTrace();
    }
    if (doc != null) {
        collectItems(doc, items);
        return nextUrl(doc);
    }
    return "";
}


.



String nextUrl(Document doc) {
    Elements nextPageElements = doc.select(".next-page");
    if (nextPageElements.size() > 0) {
        Element hrefElement = nextPageElements.get(0);
        return baseUrl + hrefElement.attributes().get("href");
    }
    return "";
}


. . . , . .



@Override
public Collection<Movie> scrap() {
    String url = String.format(
            baseUrl + "/search/title/?release_date=%s,%s&countries=%s",
            startDate, endDate, countries
    );
    List<Movie> items = new ArrayList<>();
    String nextUrl = url;
    while (true) {
        nextUrl = scrap(nextUrl, items);
        if ("".equals(nextUrl)) {
            break;
        }
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }
    return items;
}


.





: MovieProducer. : run.



. . .



public void run() {
    try (SimpleStringStringProducer producer = new SimpleStringStringProducer(
            bootstrapServers, clientId, topic)) {
        Collection<Data.Movie> movies = movieDirectScrapingService.scrap();
        List<SimpleStringStringProducer.KeyValueStringString> kvList = new ArrayList<>();
        for (Data.Movie move : movies) {
            Map<String, String> map = new HashMap<>();
            map.put("title_id", move.titleId);
            map.put("title_url", move.titleUrl);
            String value = JSONObject.toJSONString(map);
            String key = UUID.randomUUID().toString();
            kvList.add(new SimpleStringStringProducer.KeyValueStringString(key, value));
        }
        producer.produce(kvList);
    }
}




. .

: MovieDirectScrapingExecutor. : run.



. .



public void run() {
    int countriesCounter = 0;
    List<String> countriesSource = Arrays.asList("us");

    while (true) {
        try {
            LocalDate localDate = LocalDate.now();

            int year = localDate.getYear();
            int month = localDate.getMonthValue();
            int day = localDate.getDayOfMonth();

            String monthString = month < 9 ? "0" + month : Integer.toString(month);
            String dayString = day < 9 ? "0" + day : Integer.toString(day);

            String startDate = year + "-" + monthString + "-" + dayString;
            String endDate = startDate;

            String language = "en";
            String countries = countriesSource.get(countriesCounter);

            execute(language, startDate, endDate, countries);

            Thread.sleep(1000);

            countriesCounter += 1;
            if (countriesCounter >= countriesSource.size()) {
                countriesCounter = 0;
            }

        } catch (InterruptedException e) {
        }
    }
}


MovieDirectScrapingExecutor, , , main.



.



{
  "base_name_url": "https:\/\/www.imdb.com\/name",
  "participant_ids": "nm7947173~nm2373827~nm0005288~nm0942193~",
  "title_id": "tt13121702",
  "rating": "0.0",
  "base_url": "https:\/\/www.imdb.com",
  "description": "It's Christmas time and Jackie (Carly Hughes), an up-and-coming journalist, finds that her life is at a crossroads until she finds an unexpected opportunity - to run a small-town newspaper ... See full summary »",
  "runtime": "",
  "title": "The Christmas Edition",
  "director_ids": "nm0838289~",
  "title_url": "\/title\/tt13121702\/?ref_=adv_li_tt",
  "director_names": "Peter Sullivan~",
  "genres": "Drama, Romance",
  "base_title_url": "https:\/\/www.imdb.com\/title",
  "participant_names": "Carly Hughes~Rob Mayes~Marie Osmond~Aloma Wright~"
}


.





, , -. kafka-.

. Apache Kafka Kafka Server.



: MovieProducerTest.



public class MovieProducerTest {
    @Test
    void simple() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String zooKeeperHost = "127.0.0.1";
        int zooKeeperPort = 22183;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String topic = "q-data";
        String clientId = "simple";
        try (KafkaServerService kafkaServerService = new KafkaServerService(
                brokerHost, brokerPort, zooKeeperHost, zooKeeperPort
        )
        ) {
            kafkaServerService.start();
            kafkaServerService.createTopic(topic);

            MovieDirectScrapingService movieDirectScrapingServiceImpl = () -> Collections.singleton(
                    new Data.Movie(…)
            );
            MovieProducer movieProducer =
                    new MovieProducer(bootstrapServers, clientId, topic, movieDirectScrapingServiceImpl);
            movieProducer.run();

            kafkaServerService.poll(topic, "simple", 1, 5, (records) -> {
                assertTrue(records.count() > 0);
                ConsumerRecord<String, String> record = records.iterator().next();
                JSONParser jsonParser = new JSONParser();
                JSONObject jsonObject = null;
                try {
                    jsonObject = (JSONObject) jsonParser.parse(record.value());
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                assertNotNull(jsonObject);
            });

            Thread.sleep(5000);
        }
    }
}




, , . .





.




All Articles